15
15
from kombu .utils .json import dumps , loads
16
16
17
17
from django .conf import settings
18
- from django .db import transaction , close_old_connections
18
+ from django .db import (
19
+ DEFAULT_DB_ALIAS ,
20
+ close_old_connections ,
21
+ router ,
22
+ transaction
23
+ )
19
24
from django .db .utils import DatabaseError , InterfaceError
20
25
from django .core .exceptions import ObjectDoesNotExist
21
26
@@ -258,7 +263,7 @@ def schedule_changed(self):
258
263
# other transactions until the current transaction is
259
264
# committed (Issue #41).
260
265
try :
261
- transaction .commit ()
266
+ transaction .commit (using = self . target_db )
262
267
except transaction .TransactionManagementError :
263
268
pass # not in transaction management.
264
269
@@ -287,7 +292,18 @@ def reserve(self, entry):
287
292
self ._dirty .add (new_entry .name )
288
293
return new_entry
289
294
290
- def sync (self ):
295
+ @property
296
+ def target_db (self ):
297
+ """Determine if there is a django route"""
298
+ if not settings .DATABASE_ROUTERS :
299
+ return DEFAULT_DB_ALIAS
300
+ # If the project does not actually implement this method,
301
+ # DEFAULT_DB_ALIAS will be automatically returned.
302
+ # The exception will be located to the django routing section
303
+ db = router .db_for_write (self .Model )
304
+ return db
305
+
306
+ def _sync (self ):
291
307
if logger .isEnabledFor (logging .DEBUG ):
292
308
debug ('Writing entries...' )
293
309
_tried = set ()
@@ -313,6 +329,10 @@ def sync(self):
313
329
# retry later, only for the failed ones
314
330
self ._dirty |= _failed
315
331
332
+ def sync (self ):
333
+ with transaction .atomic (using = self .target_db ):
334
+ self ._sync ()
335
+
316
336
def update_from_dict (self , mapping ):
317
337
s = {}
318
338
for name , entry_fields in mapping .items ():
0 commit comments