Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions dojo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import mimetypes
import os
import pathlib
import random
import re
import time
from calendar import monthrange
from collections.abc import Callable
from datetime import date, datetime, timedelta
Expand All @@ -29,6 +31,7 @@
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.contrib.contenttypes.models import ContentType
from django.core.paginator import Paginator
from django.db import OperationalError
from django.db.models import Case, Count, F, IntegerField, Q, Sum, Value, When
from django.db.models.query import QuerySet
from django.db.models.signals import post_save
Expand Down Expand Up @@ -2003,22 +2006,51 @@ def __init__(self, *args, **kwargs):
@dojo_async_task
@app.task
def delete_chunk(self, objects, **kwargs):
# Now delete all objects with retry for deadlocks
max_retries = 3
for obj in objects:
try:
obj.delete()
except AssertionError:
logger.debug("ASYNC_DELETE: object has already been deleted elsewhere. Skipping")
# The id must be None
# The object has already been deleted elsewhere
except LogEntry.MultipleObjectsReturned:
# Delete the log entrys first, then delete
LogEntry.objects.filter(
content_type=ContentType.objects.get_for_model(obj.__class__),
object_pk=str(obj.pk),
action=LogEntry.Action.DELETE,
).delete()
# Now delete the object again
obj.delete()
retry_count = 0
while retry_count < max_retries:
try:
obj.delete()
break # Success, exit retry loop
except OperationalError as e:
error_msg = str(e)
if "deadlock detected" in error_msg.lower():
retry_count += 1
if retry_count < max_retries:
# Exponential backoff with jitter
wait_time = (2 ** retry_count) + random.uniform(0, 1) # noqa: S311
logger.warning(
f"ASYNC_DELETE: Deadlock detected deleting {self.get_object_name(obj)} {obj.pk}, "
f"retrying ({retry_count}/{max_retries}) after {wait_time:.2f}s",
)
time.sleep(wait_time)
# Refresh object from DB before retry
obj.refresh_from_db()
else:
logger.error(
f"ASYNC_DELETE: Deadlock persisted after {max_retries} retries for {self.get_object_name(obj)} {obj.pk}: {e}",
)
raise
else:
# Not a deadlock, re-raise
raise
except AssertionError:
logger.debug("ASYNC_DELETE: object has already been deleted elsewhere. Skipping")
# The id must be None
# The object has already been deleted elsewhere
break
except LogEntry.MultipleObjectsReturned:
# Delete the log entrys first, then delete
LogEntry.objects.filter(
content_type=ContentType.objects.get_for_model(obj.__class__),
object_pk=str(obj.pk),
action=LogEntry.Action.DELETE,
).delete()
# Now delete the object again (no retry needed for this case)
obj.delete()
break

@dojo_async_task
@app.task
Expand All @@ -2037,18 +2069,29 @@ def delete(self, obj, **kwargs):
@app.task
def crawl(self, obj, model_list, **kwargs):
logger.debug("ASYNC_DELETE: Crawling " + self.get_object_name(obj) + ": " + str(obj))
task_results = []
for model_info in model_list:
model = model_info[0]
model_query = model_info[1]
filter_dict = {model_query: obj}
# Only fetch the IDs since we will make a list of IDs in the following function call
objects_to_delete = model.objects.only("id").filter(**filter_dict)
objects_to_delete = model.objects.only("id").filter(**filter_dict).distinct().order_by("id")
logger.debug("ASYNC_DELETE: Deleting " + str(len(objects_to_delete)) + " " + self.get_object_name(model) + "s in chunks")
chunks = self.chunk_list(model, objects_to_delete)
for chunk in chunks:
logger.debug(f"deleting {len(chunk)} {self.get_object_name(model)}")
self.delete_chunk(chunk)
self.delete_chunk([obj])
result = self.delete_chunk(chunk)
# Collect async task results to wait for them all at once
if hasattr(result, "get"):
task_results.append(result)
# Wait for all chunk deletions to complete (they run in parallel)
for task_result in task_results:
task_result.get(timeout=300) # 5 minute timeout per chunk
# Now delete the main object after all chunks are done
result = self.delete_chunk([obj])
# Wait for final deletion to complete
if hasattr(result, "get"):
result.get(timeout=300) # 5 minute timeout
logger.debug("ASYNC_DELETE: Successfully deleted " + self.get_object_name(obj) + ": " + str(obj))

def chunk_list(self, model, full_list):
Expand Down
Loading