Skip to content

Commit 801fe3e

Browse files
authored
further work on signals.py
This version seems to actually delete all indexes and related docs instances correctly
1 parent 3df0d60 commit 801fe3e

File tree

1 file changed

+79
-32
lines changed

1 file changed

+79
-32
lines changed

django_elasticsearch_dsl/signals.py

Lines changed: 79 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -144,26 +144,59 @@ def handle_delete(self, sender, instance, **kwargs):
144144
"""
145145
self.prepare_registry_delete_task(instance)
146146

147+
148+
@shared_task()
149+
def registry_delete_related_task(doc_module, doc_class, object_ids, action):
150+
"""
151+
A Celery task that fetches the latest data for given object IDs and performs the required indexing action.
152+
This version uses the custom `get_queryset()` method defined in the document class.
153+
154+
Instead of deleting the related objects we update it so that the deleted connection between
155+
the deleted model and the related model is updated into elasticsearch.
156+
"""
157+
doc_instance = getattr(import_module(doc_module), doc_class)()
158+
model = doc_instance.django.model
159+
160+
# Fetch the latest instances from the database
161+
#object_list = model.objects.filter(pk__in=object_ids).all()
162+
# Use the custom queryset method if available
163+
object_list = doc_instance.get_queryset().filter(pk__in=object_ids)
164+
if not object_list:
165+
return
166+
167+
# Generate the bulk update data
168+
bulk_data = list(doc_instance._get_actions(object_list, action))
169+
170+
if bulk_data:
171+
doc_instance._bulk(bulk_data, parallel=True)
172+
173+
147174
def prepare_registry_delete_related_task(self, instance):
148175
"""
149-
Select its related instance before this instance was deleted.
150-
And pass that to celery.
176+
Collect IDs of related instances before the main instance is deleted and queue these IDs
177+
for indexing in Elasticsearch through a registry_delete_related_task.
151178
"""
152-
action = 'index'
153-
for doc in registry._get_related_doc(instance):
154-
doc_instance = doc(related_instance_to_ignore=instance)
179+
related_docs = list(registry._get_related_doc(instance))
180+
if not related_docs:
181+
return
182+
183+
for doc_class in related_docs:
184+
doc_instance = doc_class()
155185
try:
156186
related = doc_instance.get_instances_from_related(instance)
157187
except ObjectDoesNotExist:
158188
related = None
159-
if related is not None:
160-
doc_instance.update(related)
189+
190+
if related:
161191
if isinstance(related, models.Model):
162-
object_list = [related]
192+
object_ids = [related.pk]
163193
else:
164-
object_list = related
165-
bulk_data = list(doc_instance._get_actions(object_list, action))
166-
self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data)
194+
object_ids = [obj.pk for obj in related if hasattr(obj, 'pk')]
195+
196+
action = 'index' # Set the operation as 'index'
197+
# Send only the IDs to the task
198+
self.registry_delete_related_task.delay(doc_class.__module__, doc_class.__name__, object_ids, action)
199+
167200

168201
@shared_task()
169202
def registry_delete_task(doc_module, doc_class, bulk_data):
@@ -177,25 +210,32 @@ def registry_delete_task(doc_module, doc_class, bulk_data):
177210
parallel = True
178211
doc_instance._bulk(bulk_data, parallel=parallel)
179212

213+
180214
def prepare_registry_delete_task(self, instance):
181215
"""
182-
Get the prepare did before database record deleted.
216+
Prepare deletion of the instance itself from Elasticsearch.
183217
"""
184218
action = 'delete'
185-
for doc in registry._get_related_doc(instance):
186-
doc_instance = doc(related_instance_to_ignore=instance)
187-
try:
188-
related = doc_instance.get_instances_from_related(instance)
189-
except ObjectDoesNotExist:
190-
related = None
191-
if related is not None:
192-
doc_instance.update(related)
193-
if isinstance(related, models.Model):
194-
object_list = [related]
195-
else:
196-
object_list = related
197-
bulk_data = list(doc_instance.get_actions(object_list, action))
198-
self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data)
219+
220+
# Find all documents in the registry that are related to the instance's model class
221+
if instance.__class__ not in registry._models:
222+
return
223+
224+
bulk_data = []
225+
for doc_class in registry._models[instance.__class__]:
226+
doc_instance = doc_class() # Create an instance of the document
227+
if isinstance(instance, models.Model):
228+
object_list = [instance]
229+
else:
230+
object_list = instance
231+
232+
# Assuming get_actions method prepares the correct delete actions for Elasticsearch
233+
bulk_data.extend(list(doc_instance._get_actions(object_list, action)))
234+
235+
if bulk_data:
236+
# Ensure registry_delete_task is prepared to handle bulk deletion
237+
self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data)
238+
199239

200240
@shared_task()
201241
def registry_update_task(pk, app_label, model_name):
@@ -205,9 +245,13 @@ def registry_update_task(pk, app_label, model_name):
205245
except LookupError:
206246
pass
207247
else:
208-
registry.update(
209-
model.objects.get(pk=pk)
210-
)
248+
try:
249+
registry.update(
250+
model.objects.get(pk=pk)
251+
)
252+
except ObjectDoesNotExist as e:
253+
print(f'Error registry_update_task: {e}')
254+
211255

212256
@shared_task()
213257
def registry_update_related_task(pk, app_label, model_name):
@@ -217,6 +261,9 @@ def registry_update_related_task(pk, app_label, model_name):
217261
except LookupError:
218262
pass
219263
else:
220-
registry.update_related(
221-
model.objects.get(pk=pk)
222-
)
264+
try:
265+
registry.update_related(
266+
model.objects.get(pk=pk)
267+
)
268+
except ObjectDoesNotExist as e:
269+
print(f'Error registry_update_related_task: {e}')

0 commit comments

Comments
 (0)