Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not allow queuing job if job.meta.unique_job_id already queued #302

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,13 @@ def enqueue_job(self, job):
job.meta['repeat'] = int(repeat) - 1

queue = self.get_queue_for_job(job)
queue.enqueue_job(job, at_front=bool(job.enqueue_at_front))
# check to see if a job with meta.unique_job_id set is already in job queue
unique_job_id = job.meta.get('unique_job_id')
if unique_job_id and unique_job_id in [j.meta.get('unique_job_id') for j in queue.jobs]:
self.log.info(f'job {job.id} already queued in {queue.name} queue by meta.unique_job_id {unique_job_id}')
else:
queue.enqueue_job(job, at_front=bool(job.enqueue_at_front))

self.connection.zrem(self.scheduled_jobs_key, job.id)

if interval:
Expand Down
30 changes: 30 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,36 @@ def test_count(self):
self.assertEqual(self.scheduler.count(future_test_time), 1)
self.assertEqual(self.scheduler.count(), 2)

def test_queue_duplicates(self):
"""
Ensure that jobs without meta.allow_duplicates nor meta.unique_id can enque many
"""
time_now_minus10 = datetime.utcnow() - timedelta(minutes=10)
interval = 1
job_id = 'testingmany'
job = self.scheduler.schedule(time_now_minus10, say_hello, id=job_id, interval=interval, queue_name='allow_dupes_many_queue')
job_queue = self.scheduler.get_queue_for_job(job)
self.scheduler.enqueue_job(job)
self.assertEqual([job_id], [j.id for j in job_queue.jobs])
self.scheduler.enqueue_job(job)
self.assertEqual([job_id, job_id], [j.id for j in job_queue.jobs])


def test_job_with_allow_duplicates_false_meta(self):
"""
Ensure that jobs with meta.unique_job_id will NOT be queued if job is already queued with meta.unique_job_id
"""
time_now_minus10 = datetime.utcnow() - timedelta(minutes=10)
interval = 1
job_unique_id = 'baba yaga'
meta = {'unique_job_id': job_unique_id}
job = self.scheduler.schedule(time_now_minus10, say_hello, interval=interval, meta=meta, queue_name='allow_dupes_false_queue')
job_queue = self.scheduler.get_queue_for_job(job)
self.scheduler.enqueue_job(job)
self.assertEqual([job_unique_id], [j.meta.get('unique_job_id', 'none') for j in job_queue.jobs])
self.scheduler.enqueue_job(job)
self.assertEqual([job_unique_id], [j.meta.get('unique_job_id', 'none') for j in job_queue.jobs])

def test_get_jobs(self):
"""
Ensure get_jobs() returns all jobs until the specified time.
Expand Down