Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,14 @@ const EnvironmentSchema = z
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
// Number of master queue shards for horizontal scaling
BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
// Maximum queues to fetch from master queue per iteration
BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000),
// Enable worker queue for two-stage processing (claim messages, push to worker queue, process from worker queue)
BATCH_QUEUE_WORKER_QUEUE_ENABLED: BoolEnv.default(true),
// Worker queue blocking timeout in seconds (for two-stage processing, only used when BATCH_QUEUE_WORKER_QUEUE_ENABLED is true)
BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
// Global rate limit: max items processed per second across all consumers
// If not set, no global rate limiting is applied
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ function createRunEngine() {
drr: {
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
masterQueueLimit: env.BATCH_QUEUE_MASTER_QUEUE_LIMIT,
},
shardCount: env.BATCH_QUEUE_SHARD_COUNT,
workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_ENABLED
? env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS
: undefined,
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
// Default processing concurrency when no specific limit is set
Expand Down
216 changes: 185 additions & 31 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import {
FairQueue,
DRRScheduler,
CallbackFairQueueKeyProducer,
WorkerQueueManager,
BatchedSpanManager,
isAbortError,
type FairQueueOptions,
type StoredMessage,
} from "@trigger.dev/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import type {
Expand Down Expand Up @@ -48,8 +52,14 @@ export { BatchCompletionTracker } from "./completionTracker.js";
// Redis key for environment concurrency limits
const ENV_CONCURRENCY_KEY_PREFIX = "batch:env_concurrency";

// Single worker queue ID for all batch items
// BatchQueue uses a single shared worker queue - FairQueue handles fair scheduling,
// then all messages are routed to this queue for BatchQueue's own consumer loop.
const BATCH_WORKER_QUEUE_ID = "batch-worker-queue";

export class BatchQueue {
private fairQueue: FairQueue<typeof BatchItemPayloadSchema>;
private workerQueueManager: WorkerQueueManager;
private completionTracker: BatchCompletionTracker;
private logger: Logger;
private tracer?: Tracer;
Expand All @@ -59,6 +69,13 @@ export class BatchQueue {
private processItemCallback?: ProcessBatchItemCallback;
private completionCallback?: BatchCompletionCallback;

// Consumer loop state
private isRunning = false;
private abortController: AbortController;
private workerQueueConsumerLoops: Promise<void>[] = [];
private workerQueueBlockingTimeoutSeconds: number;
private batchedSpanManager: BatchedSpanManager;

// Metrics
private batchesEnqueuedCounter?: Counter;
private itemsEnqueuedCounter?: Counter;
Expand All @@ -72,6 +89,8 @@ export class BatchQueue {
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
this.tracer = options.tracer;
this.defaultConcurrency = options.defaultConcurrency ?? 10;
this.abortController = new AbortController();
this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10;

// Initialize metrics if meter is provided
if (options.meter) {
Expand Down Expand Up @@ -108,20 +127,23 @@ export class BatchQueue {
keys: keyProducer,
quantum: options.drr.quantum,
maxDeficit: options.drr.maxDeficit,
masterQueueLimit: options.drr.masterQueueLimit,
logger: {
debug: (msg, ctx) => this.logger.debug(msg, ctx),
error: (msg, ctx) => this.logger.error(msg, ctx),
},
});

// Create FairQueue with telemetry and environment-based concurrency limiting
// FairQueue handles fair scheduling and routes messages to the batch worker queue
// BatchQueue runs its own consumer loop to process messages from the worker queue
const fairQueueOptions: FairQueueOptions<typeof BatchItemPayloadSchema> = {
redis: options.redis,
keys: keyProducer,
scheduler,
payloadSchema: BatchItemPayloadSchema,
validateOnEnqueue: false, // We control the payload
shardCount: 1, // Batches don't need sharding
shardCount: options.shardCount ?? 1,
consumerCount: options.consumerCount,
consumerIntervalMs: options.consumerIntervalMs,
visibilityTimeoutMs: 60_000, // 1 minute for batch item processing
Expand All @@ -131,6 +153,11 @@ export class BatchQueue {
threshold: 5,
periodMs: 5_000,
},
// Worker queue configuration - FairQueue routes all messages to our single worker queue
workerQueue: {
// All batch items go to the same worker queue - BatchQueue handles consumption
resolveWorkerQueue: () => BATCH_WORKER_QUEUE_ID,
},
// Concurrency group based on tenant (environment)
// This limits how many batch items can be processed concurrently per environment
// Items wait in queue until capacity frees up
Expand All @@ -157,6 +184,24 @@ export class BatchQueue {

this.fairQueue = new FairQueue(fairQueueOptions);

// Create worker queue manager for consuming from the batch worker queue
this.workerQueueManager = new WorkerQueueManager({
redis: options.redis,
keys: keyProducer,
logger: {
debug: (msg, ctx) => this.logger.debug(msg, ctx),
error: (msg, ctx) => this.logger.error(msg, ctx),
},
});

// Initialize batched span manager for worker queue consumer tracing
this.batchedSpanManager = new BatchedSpanManager({
tracer: options.tracer,
name: "batch-queue-worker",
maxIterations: options.consumerTraceMaxIterations ?? 1000,
timeoutSeconds: options.consumerTraceTimeoutSeconds ?? 60,
});

// Create completion tracker
this.completionTracker = new BatchCompletionTracker({
redis: options.redis,
Expand All @@ -167,11 +212,6 @@ export class BatchQueue {
},
});

// Set up message handler
this.fairQueue.onMessage(async (ctx) => {
await this.#handleMessage(ctx);
});

// Register telemetry gauge callbacks for observable metrics
// Note: observedTenants is not provided since tenant list is dynamic
this.fairQueue.registerTelemetryGauges();
Expand Down Expand Up @@ -410,29 +450,66 @@ export class BatchQueue {

/**
* Start the consumer loops.
* FairQueue runs the master queue consumer loop (claim and push to worker queue).
* BatchQueue runs its own worker queue consumer loops to process messages.
*/
start(): void {
if (this.isRunning) {
return;
}

this.isRunning = true;
this.abortController = new AbortController();

// Start FairQueue's master queue consumers (routes messages to worker queue)
this.fairQueue.start();

// Start worker queue consumer loops
for (let consumerId = 0; consumerId < this.options.consumerCount; consumerId++) {
const loop = this.#runWorkerQueueConsumerLoop(consumerId);
this.workerQueueConsumerLoops.push(loop);
}

this.logger.info("BatchQueue consumers started", {
consumerCount: this.options.consumerCount,
intervalMs: this.options.consumerIntervalMs,
drrQuantum: this.options.drr.quantum,
workerQueueId: BATCH_WORKER_QUEUE_ID,
});
}

/**
* Stop the consumer loops gracefully.
*/
async stop(): Promise<void> {
if (!this.isRunning) {
return;
}

this.isRunning = false;
this.abortController.abort();

// Stop FairQueue's master queue consumers
await this.fairQueue.stop();

// Wait for worker queue consumer loops to finish
await Promise.allSettled(this.workerQueueConsumerLoops);
this.workerQueueConsumerLoops = [];

this.logger.info("BatchQueue consumers stopped");
}

/**
* Close the BatchQueue and all Redis connections.
*/
async close(): Promise<void> {
await this.stop();

// Clean up any remaining batched spans (safety net for spans not cleaned up by consumer loops)
this.batchedSpanManager.cleanupAll();

await this.fairQueue.close();
await this.workerQueueManager.close();
await this.completionTracker.close();
await this.concurrencyRedis.quit();
}
Expand Down Expand Up @@ -516,56 +593,133 @@ export class BatchQueue {
});
}

// ============================================================================
// Private - Worker Queue Consumer Loop
// ============================================================================

/**
* Run a worker queue consumer loop.
* This pops messages from the batch worker queue and processes them.
*/
async #runWorkerQueueConsumerLoop(consumerId: number): Promise<void> {
const loopId = `batch-worker-${consumerId}`;

// Initialize batched span tracking for this loop
this.batchedSpanManager.initializeLoop(loopId);

try {
while (this.isRunning) {
if (!this.processItemCallback) {
await new Promise((resolve) => setTimeout(resolve, 100));
continue;
}

try {
await this.batchedSpanManager.withBatchedSpan(
loopId,
async (span) => {
span.setAttribute("consumer_id", consumerId);

// Blocking pop from worker queue
const messageKey = await this.workerQueueManager.blockingPop(
BATCH_WORKER_QUEUE_ID,
this.workerQueueBlockingTimeoutSeconds,
this.abortController.signal
);

if (!messageKey) {
this.batchedSpanManager.incrementStat(loopId, "empty_iterations");
return false; // Timeout, no work
}

// Parse message key (format: "messageId:queueId")
const colonIndex = messageKey.indexOf(":");
if (colonIndex === -1) {
this.logger.error("Invalid message key format", { messageKey });
this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
return false;
}

const messageId = messageKey.substring(0, colonIndex);
const queueId = messageKey.substring(colonIndex + 1);

await this.#handleMessage(loopId, messageId, queueId);
this.batchedSpanManager.incrementStat(loopId, "messages_processed");
return true; // Had work
},
{
iterationSpanName: "processWorkerQueueMessage",
attributes: { consumer_id: consumerId },
}
);
} catch (error) {
if (this.abortController.signal.aborted) {
break;
}
this.logger.error("Worker queue consumer error", {
loopId,
error: error instanceof Error ? error.message : String(error),
});
this.batchedSpanManager.markForRotation(loopId);
}
}
} catch (error) {
if (isAbortError(error)) {
this.logger.debug("Worker queue consumer aborted", { loopId });
this.batchedSpanManager.cleanup(loopId);
return;
}
throw error;
} finally {
this.batchedSpanManager.cleanup(loopId);
}
}

// ============================================================================
// Private - Message Handling
// ============================================================================

async #handleMessage(ctx: {
message: {
id: string;
queueId: string;
payload: BatchItemPayload;
timestamp: number;
attempt: number;
};
queue: { id: string; tenantId: string };
consumerId: string;
heartbeat: () => Promise<boolean>;
complete: () => Promise<void>;
release: () => Promise<void>;
fail: (error?: Error) => Promise<void>;
}): Promise<void> {
const { batchId, friendlyId, itemIndex, item } = ctx.message.payload;
async #handleMessage(consumerId: string, messageId: string, queueId: string): Promise<void> {
// Get message data from FairQueue's in-flight storage
const storedMessage = await this.fairQueue.getMessageData(messageId, queueId);

if (!storedMessage) {
this.logger.error("Message not found in in-flight data", { messageId, queueId });
await this.fairQueue.completeMessage(messageId, queueId);
return;
}

const { batchId, friendlyId, itemIndex, item } = storedMessage.payload;

return this.#startSpan("BatchQueue.handleMessage", async (span) => {
span?.setAttributes({
"batch.id": batchId,
"batch.friendlyId": friendlyId,
"batch.itemIndex": itemIndex,
"batch.task": item.task,
"batch.consumerId": ctx.consumerId,
"batch.attempt": ctx.message.attempt,
"batch.consumerId": consumerId,
"batch.attempt": storedMessage.attempt,
});

// Record queue time metric (time from enqueue to processing)
const queueTimeMs = Date.now() - ctx.message.timestamp;
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId });
const queueTimeMs = Date.now() - storedMessage.timestamp;
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: storedMessage.tenantId });
span?.setAttribute("batch.queueTimeMs", queueTimeMs);

this.logger.debug("Processing batch item", {
batchId,
friendlyId,
itemIndex,
task: item.task,
consumerId: ctx.consumerId,
attempt: ctx.message.attempt,
consumerId,
attempt: storedMessage.attempt,
queueTimeMs,
});

if (!this.processItemCallback) {
this.logger.error("No process item callback set", { batchId, itemIndex });
// Still complete the message to avoid blocking
await ctx.complete();
await this.fairQueue.completeMessage(messageId, queueId);
return;
}

Expand All @@ -576,7 +730,7 @@ export class BatchQueue {

if (!meta) {
this.logger.error("Batch metadata not found", { batchId, itemIndex });
await ctx.complete();
await this.fairQueue.completeMessage(messageId, queueId);
return;
}

Expand Down Expand Up @@ -712,7 +866,7 @@ export class BatchQueue {
// This must happen after recording success/failure to ensure the counter
// is updated before the message is considered done
await this.#startSpan("BatchQueue.completeMessage", async () => {
return ctx.complete();
return this.fairQueue.completeMessage(messageId, queueId);
});

// Check if all items have been processed using atomic counter
Expand Down
Loading