Skip to content

Commit ce601bc

Browse files
authored
feat: add support for task locks (#60)
* feat: add support for task locks * make phpcpd happy
1 parent 835ba88 commit ce601bc

File tree

2 files changed

+229
-9
lines changed

2 files changed

+229
-9
lines changed

src/Commands/QueueWork.php

+47-6
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,15 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
239239
timer()->start('work');
240240
$payload = $work->payload;
241241

242+
$payloadMetadata = null;
243+
242244
try {
245+
// Load payload metadata
246+
$payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []);
247+
248+
// Renew lock if needed
249+
$this->renewLock($payloadMetadata);
250+
243251
$class = $config->resolveJobClass($payload['job']);
244252
$job = new $class($payload['data']);
245253
$job->process();
@@ -250,9 +258,7 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
250258
CLI::write('The processing of this job was successful', 'green');
251259

252260
// Check chained jobs
253-
if (isset($payload['metadata']) && $payload['metadata'] !== []) {
254-
$this->processNextJobInChain($payload['metadata']);
255-
}
261+
$this->processNextJobInChain($payloadMetadata);
256262
} catch (Throwable $err) {
257263
if (isset($job) && ++$work->attempts < ($tries ?? $job->getTries())) {
258264
// Schedule for later
@@ -263,6 +269,9 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
263269
}
264270
CLI::write('The processing of this job failed', 'red');
265271
} finally {
272+
// Remove lock if needed
273+
$this->clearLock($payloadMetadata);
274+
266275
timer()->stop('work');
267276
CLI::write(sprintf('It took: %s sec', timer()->getElapsedTime('work')) . PHP_EOL, 'cyan');
268277
}
@@ -271,10 +280,8 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
271280
/**
272281
* Process the next job in the chain
273282
*/
274-
private function processNextJobInChain(array $payloadMetadata): void
283+
private function processNextJobInChain(PayloadMetadata $payloadMetadata): void
275284
{
276-
$payloadMetadata = PayloadMetadata::fromArray($payloadMetadata);
277-
278285
if (! $payloadMetadata->hasChainedJobs()) {
279286
return;
280287
}
@@ -305,6 +312,40 @@ private function processNextJobInChain(array $payloadMetadata): void
305312
CLI::write(sprintf('Chained job: %s has been placed in the queue: %s', $nextPayload->getJob(), $nextPayload->getQueue()), 'green');
306313
}
307314

315+
/**
316+
* Renew task lock
317+
*/
318+
private function renewLock(PayloadMetadata $payloadMetadata): void
319+
{
320+
if (! $payloadMetadata->has('taskLockTTL') || ! $payloadMetadata->has('taskLockKey')) {
321+
return;
322+
}
323+
324+
$ttl = $payloadMetadata->get('taskLockTTL');
325+
$key = $payloadMetadata->get('taskLockKey');
326+
327+
// Permanent lock, no need to renew
328+
if ($ttl === 0) {
329+
return;
330+
}
331+
332+
cache()->save($key, [], $ttl);
333+
}
334+
335+
/**
336+
* Remove task lock
337+
*/
338+
private function clearLock(PayloadMetadata $payloadMetadata): void
339+
{
340+
if (! $payloadMetadata->has('taskLockKey')) {
341+
return;
342+
}
343+
344+
$key = $payloadMetadata->get('taskLockKey');
345+
346+
cache()->delete($key);
347+
}
348+
308349
private function maxJobsCheck(int $maxJobs, int $countJobs): bool
309350
{
310351
if ($maxJobs > 0 && $countJobs >= $maxJobs) {

tests/Commands/QueueWorkTest.php

+182-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
namespace Tests\Commands;
1515

16+
use CodeIgniter\Cache\CacheInterface;
17+
use CodeIgniter\Config\Services;
1618
use CodeIgniter\I18n\Time;
1719
use CodeIgniter\Queue\Models\QueueJobModel;
1820
use CodeIgniter\Test\Filters\CITestStreamFilter;
@@ -123,13 +125,17 @@ public function testRunWithChainedQueueSucceed(): void
123125
'job' => 'success',
124126
'data' => ['key' => 'value'],
125127
'metadata' => [
126-
'queue' => 'queue',
128+
'queue' => 'test',
127129
'chainedJobs' => [
128130
[
129-
'job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => [
131+
'job' => 'success',
132+
'data' => [
133+
'key3' => 'value3',
134+
],
135+
'metadata' => [
130136
'queue' => 'queue',
131137
'priority' => 'high',
132-
'delay' => 10,
138+
'delay' => 30,
133139
],
134140
],
135141
],
@@ -154,5 +160,178 @@ public function testRunWithChainedQueueSucceed(): void
154160
$this->assertSame('The processing of this job was successful', $this->getLine(4));
155161
$this->assertSame('Chained job: success has been placed in the queue: queue', $this->getLine(5));
156162
$this->assertSame('No job available. Stopping.', $this->getLine(8));
163+
164+
$this->seeInDatabase('queue_jobs', [
165+
'queue' => 'queue',
166+
'payload' => json_encode([
167+
'job' => 'success',
168+
'data' => ['key3' => 'value3'],
169+
'metadata' => [
170+
'queue' => 'queue',
171+
'priority' => 'high',
172+
'delay' => 30,
173+
],
174+
]),
175+
]);
176+
}
177+
178+
public function testRunWithTaskLock(): void
179+
{
180+
$lockKey = 'test_lock_key';
181+
$lockTTL = 300; // 5 minutes
182+
183+
Time::setTestNow('2023-12-19 14:15:16');
184+
185+
$cache = $this->createMock(CacheInterface::class);
186+
187+
// Set up expectations
188+
$cache->expects($this->once())
189+
->method('save')
190+
->with($lockKey, $this->anything(), $lockTTL)
191+
->willReturn(true);
192+
193+
$cache->expects($this->once())
194+
->method('delete')
195+
->with($lockKey)
196+
->willReturn(true);
197+
198+
// Replace the cache service
199+
Services::injectMock('cache', $cache);
200+
201+
fake(QueueJobModel::class, [
202+
'connection' => 'database',
203+
'queue' => 'test',
204+
'payload' => [
205+
'job' => 'success',
206+
'data' => ['key' => 'value'],
207+
'metadata' => [
208+
'taskLockKey' => $lockKey,
209+
'taskLockTTL' => $lockTTL,
210+
'queue' => 'test',
211+
],
212+
],
213+
'priority' => 'default',
214+
'status' => 0,
215+
'attempts' => 0,
216+
'available_at' => 1_702_977_074,
217+
]);
218+
219+
CITestStreamFilter::registration();
220+
CITestStreamFilter::addOutputFilter();
221+
222+
$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
223+
$this->parseOutput(CITestStreamFilter::$buffer);
224+
225+
CITestStreamFilter::removeOutputFilter();
226+
227+
$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
228+
$this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3));
229+
$this->assertSame('The processing of this job was successful', $this->getLine(4));
230+
}
231+
232+
public function testRunWithPermanentTaskLock(): void
233+
{
234+
$lockKey = 'permanent_lock_key';
235+
$lockTTL = 0; // Permanent lock
236+
237+
Time::setTestNow('2023-12-19 14:15:16');
238+
239+
$cache = $this->createMock(CacheInterface::class);
240+
241+
// For permanent lock (TTL=0), save should NOT be called
242+
$cache->expects($this->never())
243+
->method('save');
244+
245+
$cache->expects($this->once())
246+
->method('delete')
247+
->with($lockKey)
248+
->willReturn(true);
249+
250+
// Replace the cache service
251+
Services::injectMock('cache', $cache);
252+
253+
fake(QueueJobModel::class, [
254+
'connection' => 'database',
255+
'queue' => 'test',
256+
'payload' => [
257+
'job' => 'success',
258+
'data' => ['key4' => 'value4'],
259+
'metadata' => [
260+
'taskLockKey' => $lockKey,
261+
'taskLockTTL' => $lockTTL,
262+
'queue' => 'test',
263+
],
264+
],
265+
'priority' => 'default',
266+
'status' => 0,
267+
'attempts' => 0,
268+
'available_at' => 1_702_977_074,
269+
]);
270+
271+
CITestStreamFilter::registration();
272+
CITestStreamFilter::addOutputFilter();
273+
274+
$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
275+
$this->parseOutput(CITestStreamFilter::$buffer);
276+
277+
CITestStreamFilter::removeOutputFilter();
278+
279+
$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
280+
$this->assertSame('Starting a new job: success, with ID: 1', $this->getLine(3));
281+
$this->assertSame('The processing of this job was successful', $this->getLine(4));
282+
}
283+
284+
public function testLockClearedOnFailure(): void
285+
{
286+
$lockKey = 'failure_lock_key';
287+
$lockTTL = 300;
288+
289+
Time::setTestNow('2023-12-19 14:15:16');
290+
291+
$cache = $this->createMock(CacheInterface::class);
292+
293+
// Set up expectations
294+
$cache->expects($this->once())
295+
->method('save')
296+
->with($lockKey, $this->anything(), $lockTTL)
297+
->willReturn(true);
298+
299+
$cache->expects($this->once())
300+
->method('delete')
301+
->with($lockKey)
302+
->willReturn(true);
303+
304+
// Replace the cache service
305+
Services::injectMock('cache', $cache);
306+
307+
fake(QueueJobModel::class, [
308+
'connection' => 'database',
309+
'queue' => 'test',
310+
'payload' => [
311+
'job' => 'failure',
312+
'data' => ['key' => 'value'],
313+
'metadata' => [
314+
'taskLockKey' => $lockKey,
315+
'taskLockTTL' => $lockTTL,
316+
'queue' => 'test',
317+
],
318+
],
319+
'priority' => 'default',
320+
'status' => 0,
321+
'attempts' => 0,
322+
'available_at' => 1_702_977_074,
323+
]);
324+
325+
CITestStreamFilter::registration();
326+
CITestStreamFilter::addOutputFilter();
327+
328+
$this->assertNotFalse(command('queue:work test sleep 1 --stop-when-empty'));
329+
$this->parseOutput(CITestStreamFilter::$buffer);
330+
331+
CITestStreamFilter::removeOutputFilter();
332+
333+
$this->assertSame('Listening for the jobs with the queue: test', $this->getLine(0));
334+
$this->assertSame('Starting a new job: failure, with ID: 1', $this->getLine(3));
335+
$this->assertSame('The processing of this job failed', $this->getLine(4));
157336
}
158337
}

0 commit comments

Comments
 (0)