@@ -117,7 +117,6 @@ def clean_registries(self, timestamp: Optional[float] = None) -> None:
117
117
logger .exception (f"Job { self .name } : error while executing failure callback" )
118
118
raise
119
119
120
-
121
120
else :
122
121
logger .warning (
123
122
f"{ self .__class__ .__name__ } cleanup: Moving job to { self .failed_job_registry .key } "
@@ -189,24 +188,24 @@ def get_all_jobs(self) -> List[JobModel]:
189
188
return JobModel .get_many (job_names , connection = self .connection )
190
189
191
190
def create_and_enqueue_job (
192
- self ,
193
- func : FunctionReferenceType ,
194
- args : Union [Tuple , List , None ] = None ,
195
- kwargs : Optional [Dict ] = None ,
196
- timeout : Optional [int ] = None ,
197
- result_ttl : Optional [int ] = None ,
198
- job_info_ttl : Optional [int ] = None ,
199
- description : Optional [str ] = None ,
200
- name : Optional [str ] = None ,
201
- at_front : bool = False ,
202
- meta : Optional [Dict ] = None ,
203
- on_success : Optional [Callback ] = None ,
204
- on_failure : Optional [Callback ] = None ,
205
- on_stopped : Optional [Callback ] = None ,
206
- task_type : Optional [str ] = None ,
207
- scheduled_task_id : Optional [int ] = None ,
208
- when : Optional [datetime ] = None ,
209
- pipeline : Optional [ConnectionType ] = None ,
191
+ self ,
192
+ func : FunctionReferenceType ,
193
+ args : Union [Tuple , List , None ] = None ,
194
+ kwargs : Optional [Dict ] = None ,
195
+ timeout : Optional [int ] = None ,
196
+ result_ttl : Optional [int ] = None ,
197
+ job_info_ttl : Optional [int ] = None ,
198
+ description : Optional [str ] = None ,
199
+ name : Optional [str ] = None ,
200
+ at_front : bool = False ,
201
+ meta : Optional [Dict ] = None ,
202
+ on_success : Optional [Callback ] = None ,
203
+ on_failure : Optional [Callback ] = None ,
204
+ on_stopped : Optional [Callback ] = None ,
205
+ task_type : Optional [str ] = None ,
206
+ scheduled_task_id : Optional [int ] = None ,
207
+ when : Optional [datetime ] = None ,
208
+ pipeline : Optional [ConnectionType ] = None ,
210
209
) -> JobModel :
211
210
"""Creates a job to represent the delayed function call and enqueues it.
212
211
:param when: When to schedule the job (None to enqueue immediately)
@@ -258,23 +257,37 @@ def create_and_enqueue_job(
258
257
def job_handle_success (self , job : JobModel , result : Any , result_ttl : int , connection : ConnectionType ):
259
258
"""Saves and cleanup job after successful execution"""
260
259
job .after_execution (
261
- result_ttl , JobStatus .FINISHED ,
260
+ result_ttl ,
261
+ JobStatus .FINISHED ,
262
262
prev_registry = self .active_job_registry ,
263
- new_registry = self .finished_job_registry , connection = connection )
264
- Result .create (connection , job_name = job .name , worker_name = job .worker_name , _type = ResultType .SUCCESSFUL ,
265
- return_value = result , ttl = result_ttl )
263
+ new_registry = self .finished_job_registry ,
264
+ connection = connection ,
265
+ )
266
+ Result .create (
267
+ connection ,
268
+ job_name = job .name ,
269
+ worker_name = job .worker_name ,
270
+ _type = ResultType .SUCCESSFUL ,
271
+ return_value = result ,
272
+ ttl = result_ttl ,
273
+ )
266
274
267
275
def job_handle_failure (self , status : JobStatus , job : JobModel , exc_string : str , connection : ConnectionType ):
268
276
# Does not set job status since the job might be stopped
269
277
job .after_execution (
270
- SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL , status ,
278
+ SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
279
+ status ,
271
280
prev_registry = self .active_job_registry ,
272
281
new_registry = self .failed_job_registry ,
273
- connection = connection )
282
+ connection = connection ,
283
+ )
274
284
Result .create (
275
- connection , job .name , job .worker_name ,
276
- ResultType .FAILED , SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
277
- exc_string = exc_string
285
+ connection ,
286
+ job .name ,
287
+ job .worker_name ,
288
+ ResultType .FAILED ,
289
+ SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL ,
290
+ exc_string = exc_string ,
278
291
)
279
292
280
293
def run_job (self , job : JobModel ) -> JobModel :
@@ -299,7 +312,7 @@ def run_job(self, job: JobModel) -> JobModel:
299
312
return job
300
313
301
314
def enqueue_job (
302
- self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
315
+ self , job_model : JobModel , connection : Optional [ConnectionType ] = None , at_front : bool = False
303
316
) -> JobModel :
304
317
"""Enqueues a job for delayed execution without checking dependencies.
305
318
@@ -350,10 +363,10 @@ def run_sync(self, job: JobModel) -> JobModel:
350
363
351
364
@classmethod
352
365
def dequeue_any (
353
- cls ,
354
- queues : List [Self ],
355
- timeout : Optional [int ],
356
- connection : Optional [ConnectionType ] = None ,
366
+ cls ,
367
+ queues : List [Self ],
368
+ timeout : Optional [int ],
369
+ connection : Optional [ConnectionType ] = None ,
357
370
) -> Tuple [Optional [JobModel ], Optional [Self ]]:
358
371
"""Class method returning a Job instance at the front of the given set of Queues, where the order of the queues
359
372
is important.
@@ -439,8 +452,9 @@ def cancel_job(self, job_name: str) -> None:
439
452
if new_status == JobStatus .CANCELED :
440
453
self .canceled_job_registry .add (pipe , job_name , 0 )
441
454
else :
442
- self .finished_job_registry .add (pipe , job_name ,
443
- current_timestamp () + SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL )
455
+ self .finished_job_registry .add (
456
+ pipe , job_name , current_timestamp () + SCHEDULER_CONFIG .DEFAULT_FAILURE_TTL
457
+ )
444
458
pipe .execute ()
445
459
break
446
460
except WatchError :
0 commit comments