Skip to content

Commit ed25caa

Browse files
committed
move the worker queue consumer to the batch queue instead of being inside the fair queue
1 parent 50c462f commit ed25caa

File tree

6 files changed

+831
-694
lines changed

6 files changed

+831
-694
lines changed

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 177 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ import {
1111
FairQueue,
1212
DRRScheduler,
1313
CallbackFairQueueKeyProducer,
14+
WorkerQueueManager,
15+
BatchedSpanManager,
1416
type FairQueueOptions,
17+
type StoredMessage,
1518
} from "@trigger.dev/redis-worker";
1619
import { Logger } from "@trigger.dev/core/logger";
1720
import type {
@@ -48,8 +51,14 @@ export { BatchCompletionTracker } from "./completionTracker.js";
4851
// Redis key for environment concurrency limits
4952
const ENV_CONCURRENCY_KEY_PREFIX = "batch:env_concurrency";
5053

54+
// Single worker queue ID for all batch items
55+
// BatchQueue uses a single shared worker queue - FairQueue handles fair scheduling,
56+
// then all messages are routed to this queue for BatchQueue's own consumer loop.
57+
const BATCH_WORKER_QUEUE_ID = "batch-worker-queue";
58+
5159
export class BatchQueue {
5260
private fairQueue: FairQueue<typeof BatchItemPayloadSchema>;
61+
private workerQueueManager: WorkerQueueManager;
5362
private completionTracker: BatchCompletionTracker;
5463
private logger: Logger;
5564
private tracer?: Tracer;
@@ -59,6 +68,13 @@ export class BatchQueue {
5968
private processItemCallback?: ProcessBatchItemCallback;
6069
private completionCallback?: BatchCompletionCallback;
6170

71+
// Consumer loop state
72+
private isRunning = false;
73+
private abortController: AbortController;
74+
private workerQueueConsumerLoops: Promise<void>[] = [];
75+
private workerQueueBlockingTimeoutSeconds: number;
76+
private batchedSpanManager: BatchedSpanManager;
77+
6278
// Metrics
6379
private batchesEnqueuedCounter?: Counter;
6480
private itemsEnqueuedCounter?: Counter;
@@ -72,6 +88,8 @@ export class BatchQueue {
7288
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
7389
this.tracer = options.tracer;
7490
this.defaultConcurrency = options.defaultConcurrency ?? 10;
91+
this.abortController = new AbortController();
92+
this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10;
7593

7694
// Initialize metrics if meter is provided
7795
if (options.meter) {
@@ -116,6 +134,8 @@ export class BatchQueue {
116134
});
117135

118136
// Create FairQueue with telemetry and environment-based concurrency limiting
137+
// FairQueue handles fair scheduling and routes messages to the batch worker queue
138+
// BatchQueue runs its own consumer loop to process messages from the worker queue
119139
const fairQueueOptions: FairQueueOptions<typeof BatchItemPayloadSchema> = {
120140
redis: options.redis,
121141
keys: keyProducer,
@@ -132,15 +152,11 @@ export class BatchQueue {
132152
threshold: 5,
133153
periodMs: 5_000,
134154
},
135-
// Enable two-stage processing with worker queues for better parallelism (when configured)
136-
// Worker queues provide better concurrency by separating queue selection from message processing
137-
workerQueue:
138-
options.workerQueueBlockingTimeoutSeconds !== undefined
139-
? {
140-
enabled: true,
141-
blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds,
142-
}
143-
: undefined,
155+
// Worker queue configuration - FairQueue routes all messages to our single worker queue
156+
workerQueue: {
157+
// All batch items go to the same worker queue - BatchQueue handles consumption
158+
resolveWorkerQueue: () => BATCH_WORKER_QUEUE_ID,
159+
},
144160
// Concurrency group based on tenant (environment)
145161
// This limits how many batch items can be processed concurrently per environment
146162
// Items wait in queue until capacity frees up
@@ -167,6 +183,24 @@ export class BatchQueue {
167183

168184
this.fairQueue = new FairQueue(fairQueueOptions);
169185

186+
// Create worker queue manager for consuming from the batch worker queue
187+
this.workerQueueManager = new WorkerQueueManager({
188+
redis: options.redis,
189+
keys: keyProducer,
190+
logger: {
191+
debug: (msg, ctx) => this.logger.debug(msg, ctx),
192+
error: (msg, ctx) => this.logger.error(msg, ctx),
193+
},
194+
});
195+
196+
// Initialize batched span manager for worker queue consumer tracing
197+
this.batchedSpanManager = new BatchedSpanManager({
198+
tracer: options.tracer,
199+
name: "batch-queue-worker",
200+
maxIterations: options.consumerTraceMaxIterations ?? 1000,
201+
timeoutSeconds: options.consumerTraceTimeoutSeconds ?? 60,
202+
});
203+
170204
// Create completion tracker
171205
this.completionTracker = new BatchCompletionTracker({
172206
redis: options.redis,
@@ -177,11 +211,6 @@ export class BatchQueue {
177211
},
178212
});
179213

180-
// Set up message handler
181-
this.fairQueue.onMessage(async (ctx) => {
182-
await this.#handleMessage(ctx);
183-
});
184-
185214
// Register telemetry gauge callbacks for observable metrics
186215
// Note: observedTenants is not provided since tenant list is dynamic
187216
this.fairQueue.registerTelemetryGauges();
@@ -420,29 +449,62 @@ export class BatchQueue {
420449

421450
/**
422451
* Start the consumer loops.
452+
* FairQueue runs the master queue consumer loop (claim and push to worker queue).
453+
* BatchQueue runs its own worker queue consumer loops to process messages.
423454
*/
424455
start(): void {
456+
if (this.isRunning) {
457+
return;
458+
}
459+
460+
this.isRunning = true;
461+
this.abortController = new AbortController();
462+
463+
// Start FairQueue's master queue consumers (routes messages to worker queue)
425464
this.fairQueue.start();
465+
466+
// Start worker queue consumer loops
467+
for (let consumerId = 0; consumerId < this.options.consumerCount; consumerId++) {
468+
const loop = this.#runWorkerQueueConsumerLoop(consumerId);
469+
this.workerQueueConsumerLoops.push(loop);
470+
}
471+
426472
this.logger.info("BatchQueue consumers started", {
427473
consumerCount: this.options.consumerCount,
428474
intervalMs: this.options.consumerIntervalMs,
429475
drrQuantum: this.options.drr.quantum,
476+
workerQueueId: BATCH_WORKER_QUEUE_ID,
430477
});
431478
}
432479

433480
/**
434481
* Stop the consumer loops gracefully.
435482
*/
436483
async stop(): Promise<void> {
484+
if (!this.isRunning) {
485+
return;
486+
}
487+
488+
this.isRunning = false;
489+
this.abortController.abort();
490+
491+
// Stop FairQueue's master queue consumers
437492
await this.fairQueue.stop();
493+
494+
// Wait for worker queue consumer loops to finish
495+
await Promise.allSettled(this.workerQueueConsumerLoops);
496+
this.workerQueueConsumerLoops = [];
497+
438498
this.logger.info("BatchQueue consumers stopped");
439499
}
440500

441501
/**
442502
* Close the BatchQueue and all Redis connections.
443503
*/
444504
async close(): Promise<void> {
505+
await this.stop();
445506
await this.fairQueue.close();
507+
await this.workerQueueManager.close();
446508
await this.completionTracker.close();
447509
await this.concurrencyRedis.quit();
448510
}
@@ -526,56 +588,132 @@ export class BatchQueue {
526588
});
527589
}
528590

591+
// ============================================================================
592+
// Private - Worker Queue Consumer Loop
593+
// ============================================================================
594+
595+
/**
596+
* Run a worker queue consumer loop.
597+
* This pops messages from the batch worker queue and processes them.
598+
*/
599+
async #runWorkerQueueConsumerLoop(consumerId: number): Promise<void> {
600+
const loopId = `batch-worker-${consumerId}`;
601+
602+
// Initialize batched span tracking for this loop
603+
this.batchedSpanManager.initializeLoop(loopId);
604+
605+
try {
606+
while (this.isRunning) {
607+
if (!this.processItemCallback) {
608+
await new Promise((resolve) => setTimeout(resolve, 100));
609+
continue;
610+
}
611+
612+
try {
613+
await this.batchedSpanManager.withBatchedSpan(
614+
loopId,
615+
async (span) => {
616+
span.setAttribute("consumer_id", consumerId);
617+
618+
// Blocking pop from worker queue
619+
const messageKey = await this.workerQueueManager.blockingPop(
620+
BATCH_WORKER_QUEUE_ID,
621+
this.workerQueueBlockingTimeoutSeconds,
622+
this.abortController.signal
623+
);
624+
625+
if (!messageKey) {
626+
this.batchedSpanManager.incrementStat(loopId, "empty_iterations");
627+
return false; // Timeout, no work
628+
}
629+
630+
// Parse message key (format: "messageId:queueId")
631+
const colonIndex = messageKey.indexOf(":");
632+
if (colonIndex === -1) {
633+
this.logger.error("Invalid message key format", { messageKey });
634+
this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
635+
return false;
636+
}
637+
638+
const messageId = messageKey.substring(0, colonIndex);
639+
const queueId = messageKey.substring(colonIndex + 1);
640+
641+
await this.#handleMessage(loopId, messageId, queueId);
642+
this.batchedSpanManager.incrementStat(loopId, "messages_processed");
643+
return true; // Had work
644+
},
645+
{
646+
iterationSpanName: "processWorkerQueueMessage",
647+
attributes: { consumer_id: consumerId },
648+
}
649+
);
650+
} catch (error) {
651+
if (this.abortController.signal.aborted) {
652+
break;
653+
}
654+
this.logger.error("Worker queue consumer error", {
655+
loopId,
656+
error: error instanceof Error ? error.message : String(error),
657+
});
658+
this.batchedSpanManager.markForRotation(loopId);
659+
}
660+
}
661+
} catch (error) {
662+
if (error instanceof Error && error.name === "AbortError") {
663+
this.logger.debug("Worker queue consumer aborted", { loopId });
664+
this.batchedSpanManager.cleanup(loopId);
665+
return;
666+
}
667+
throw error;
668+
} finally {
669+
this.batchedSpanManager.cleanup(loopId);
670+
}
671+
}
672+
529673
// ============================================================================
530674
// Private - Message Handling
531675
// ============================================================================
532676

533-
async #handleMessage(ctx: {
534-
message: {
535-
id: string;
536-
queueId: string;
537-
payload: BatchItemPayload;
538-
timestamp: number;
539-
attempt: number;
540-
};
541-
queue: { id: string; tenantId: string };
542-
consumerId: string;
543-
heartbeat: () => Promise<boolean>;
544-
complete: () => Promise<void>;
545-
release: () => Promise<void>;
546-
fail: (error?: Error) => Promise<void>;
547-
}): Promise<void> {
548-
const { batchId, friendlyId, itemIndex, item } = ctx.message.payload;
677+
async #handleMessage(consumerId: string, messageId: string, queueId: string): Promise<void> {
678+
// Get message data from FairQueue's in-flight storage
679+
const storedMessage = await this.fairQueue.getMessageData(messageId, queueId);
680+
681+
if (!storedMessage) {
682+
this.logger.error("Message not found in in-flight data", { messageId, queueId });
683+
return;
684+
}
685+
686+
const { batchId, friendlyId, itemIndex, item } = storedMessage.payload;
549687

550688
return this.#startSpan("BatchQueue.handleMessage", async (span) => {
551689
span?.setAttributes({
552690
"batch.id": batchId,
553691
"batch.friendlyId": friendlyId,
554692
"batch.itemIndex": itemIndex,
555693
"batch.task": item.task,
556-
"batch.consumerId": ctx.consumerId,
557-
"batch.attempt": ctx.message.attempt,
694+
"batch.consumerId": consumerId,
695+
"batch.attempt": storedMessage.attempt,
558696
});
559697

560698
// Record queue time metric (time from enqueue to processing)
561-
const queueTimeMs = Date.now() - ctx.message.timestamp;
562-
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId });
699+
const queueTimeMs = Date.now() - storedMessage.timestamp;
700+
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: storedMessage.tenantId });
563701
span?.setAttribute("batch.queueTimeMs", queueTimeMs);
564702

565703
this.logger.debug("Processing batch item", {
566704
batchId,
567705
friendlyId,
568706
itemIndex,
569707
task: item.task,
570-
consumerId: ctx.consumerId,
571-
attempt: ctx.message.attempt,
708+
consumerId,
709+
attempt: storedMessage.attempt,
572710
queueTimeMs,
573711
});
574712

575713
if (!this.processItemCallback) {
576714
this.logger.error("No process item callback set", { batchId, itemIndex });
577715
// Still complete the message to avoid blocking
578-
await ctx.complete();
716+
await this.fairQueue.completeMessage(messageId, queueId);
579717
return;
580718
}
581719

@@ -586,7 +724,7 @@ export class BatchQueue {
586724

587725
if (!meta) {
588726
this.logger.error("Batch metadata not found", { batchId, itemIndex });
589-
await ctx.complete();
727+
await this.fairQueue.completeMessage(messageId, queueId);
590728
return;
591729
}
592730

@@ -722,7 +860,7 @@ export class BatchQueue {
722860
// This must happen after recording success/failure to ensure the counter
723861
// is updated before the message is considered done
724862
await this.#startSpan("BatchQueue.completeMessage", async () => {
725-
return ctx.complete();
863+
return this.fairQueue.completeMessage(messageId, queueId);
726864
});
727865

728866
// Check if all items have been processed using atomic counter

internal-packages/run-engine/src/batch-queue/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ export type BatchQueueOptions = {
222222
tracer?: Tracer;
223223
/** OpenTelemetry meter for metrics */
224224
meter?: Meter;
225+
/** Maximum iterations before rotating consumer loop trace span (default: 1000) */
226+
consumerTraceMaxIterations?: number;
227+
/** Maximum seconds before rotating consumer loop trace span (default: 60) */
228+
consumerTraceTimeoutSeconds?: number;
225229
};
226230

227231
/**

0 commit comments

Comments
 (0)