From d8453123027c3b722525cacf5cc31ec4ed2098bb Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 12:16:16 +0100 Subject: [PATCH 01/15] Add retry logic for insert operations Add a generic retry mechanism for task run and payload inserts to handle transient connection errors. The new #insertWithRetry method retries up to three times with exponential backoff and jitter on retryable connection errors such as connection resets or timeouts. Errors are logged and recorded in tracing spans to improve observability and robustness of the replication service. --- .../services/runsReplicationService.server.ts | 98 ++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index eed1d3c9f3..d1fc907e23 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -445,8 +445,35 @@ export class RunsReplicationService { payloadInserts: payloadInserts.length, }); - await this.#insertTaskRunInserts(taskRunInserts); - await this.#insertPayloadInserts(payloadInserts); + // Insert task runs and payloads with retry logic for connection errors + const [taskRunError, taskRunResult] = await this.#insertWithRetry( + () => this.#insertTaskRunInserts(taskRunInserts), + "task run inserts", + flushId + ); + + const [payloadError, payloadResult] = await this.#insertWithRetry( + () => this.#insertPayloadInserts(payloadInserts), + "payload inserts", + flushId + ); + + // Log any errors that occurred + if (taskRunError) { + this.logger.error("Error inserting task run inserts", { + error: taskRunError, + flushId, + }); + recordSpanError(span, taskRunError); + } + + if (payloadError) { + this.logger.error("Error inserting payload inserts", { + error: payloadError, + flushId, + }); + recordSpanError(span, payloadError); + } this.logger.debug("Flushed inserts", { flushId, @@ -456,6 +483,73 @@ export class RunsReplicationService { }); } + // New method to handle inserts with retry logic for connection errors + async #insertWithRetry( + insertFn: () => Promise, + operationName: string, + flushId: string, + maxRetries: number = 3 + ): Promise<[Error | null, T | null]> { + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + const result = await insertFn(); + return [null, result]; + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)); + + // Check if this is a retryable connection error + if (this.#isRetryableConnectionError(lastError) && attempt < maxRetries) { + const delay = this.#calculateConnectionRetryDelay(attempt); + + this.logger.warn(`Retrying ${operationName} due to connection error`, { + flushId, + attempt, + maxRetries, + error: lastError.message, + delay, + }); + + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + break; + } + } + + return [lastError, null]; + } + + // New method to check if an error is a retryable connection error + #isRetryableConnectionError(error: Error): boolean { + const errorMessage = error.message.toLowerCase(); + const retryableConnectionPatterns = [ + "socket hang up", + "econnreset", + "connection reset", + "connection refused", + "connection timeout", + "network error", + "read econnreset", + "write econnreset", + ]; + + return retryableConnectionPatterns.some((pattern) => errorMessage.includes(pattern)); + } + + // New method to calculate retry delay for connection errors + #calculateConnectionRetryDelay(attempt: number): number { + // Exponential backoff: 100ms, 200ms, 400ms + const baseDelay = 100; + const maxDelay = 2000; + const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay); + + // Add some jitter to prevent thundering herd + const jitter = Math.random() * 100; + return delay + jitter; + } + async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( From 9225be5882ea7f5585be231e61cb8a30660de4be Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 16:01:14 +0100 Subject: [PATCH 02/15] Replication settings are configurable --- apps/webapp/app/env.server.ts | 4 +++ .../runsReplicationInstance.server.ts | 3 ++ .../services/runsReplicationService.server.ts | 31 +++++++++++++------ 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 6f742faa26..18abe94291 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -840,6 +840,10 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"), RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(), RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10), + // Retry configuration for insert operations + RUN_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3), + RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), + RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), // Clickhouse CLICKHOUSE_URL: z.string().optional(), diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 8a8b0df650..86b17601a7 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -62,6 +62,9 @@ function initializeRunsReplicationInstance() { logLevel: env.RUN_REPLICATION_LOG_LEVEL, waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1", tracer: provider.getTracer("runs-replication-service"), + insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES, + insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS, + insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS, }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index d1fc907e23..9d435089c2 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -51,6 +51,10 @@ export type RunsReplicationServiceOptions = { logLevel?: LogLevel; tracer?: Tracer; waitForAsyncInsert?: boolean; + // Retry configuration for insert operations + insertMaxRetries?: number; + insertBaseDelayMs?: number; + insertMaxDelayMs?: number; }; type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" }; @@ -80,6 +84,10 @@ export class RunsReplicationService { private _latestCommitEndLsn: string | null = null; private _lastAcknowledgedLsn: string | null = null; private _acknowledgeInterval: NodeJS.Timeout | null = null; + // Retry configuration + private _insertMaxRetries: number; + private _insertBaseDelayMs: number; + private _insertMaxDelayMs: number; public readonly events: EventEmitter; @@ -151,6 +159,11 @@ export class RunsReplicationService { this._replicationClient.events.on("leaderElection", (isLeader) => { this.logger.info("Leader election", { isLeader }); }); + + // Initialize retry configuration + this._insertMaxRetries = options.insertMaxRetries ?? 3; + this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100; + this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; } public async shutdown() { @@ -487,12 +500,11 @@ export class RunsReplicationService { async #insertWithRetry( insertFn: () => Promise, operationName: string, - flushId: string, - maxRetries: number = 3 + flushId: string ): Promise<[Error | null, T | null]> { let lastError: Error | null = null; - for (let attempt = 1; attempt <= maxRetries; attempt++) { + for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) { try { const result = await insertFn(); return [null, result]; @@ -500,13 +512,13 @@ export class RunsReplicationService { lastError = error instanceof Error ? error : new Error(String(error)); // Check if this is a retryable connection error - if (this.#isRetryableConnectionError(lastError) && attempt < maxRetries) { + if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) { const delay = this.#calculateConnectionRetryDelay(attempt); this.logger.warn(`Retrying ${operationName} due to connection error`, { flushId, attempt, - maxRetries, + maxRetries: this._insertMaxRetries, error: lastError.message, delay, }); @@ -540,10 +552,11 @@ export class RunsReplicationService { // New method to calculate retry delay for connection errors #calculateConnectionRetryDelay(attempt: number): number { - // Exponential backoff: 100ms, 200ms, 400ms - const baseDelay = 100; - const maxDelay = 2000; - const delay = Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay); + // Exponential backoff: baseDelay, baseDelay*2, baseDelay*4, etc. + const delay = Math.min( + this._insertBaseDelayMs * Math.pow(2, attempt - 1), + this._insertMaxDelayMs + ); // Add some jitter to prevent thundering herd const jitter = Math.random() * 100; From a80316f7122ebc99dfdd25fc1c4f0ce064c7c47d Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 15:51:41 +0100 Subject: [PATCH 03/15] Log out the runIds for failed batches --- apps/webapp/app/services/runsReplicationService.server.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 9d435089c2..44460442ed 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -476,6 +476,7 @@ export class RunsReplicationService { this.logger.error("Error inserting task run inserts", { error: taskRunError, flushId, + runIds: taskRunInserts.map((r) => r.run_id), }); recordSpanError(span, taskRunError); } @@ -484,6 +485,7 @@ export class RunsReplicationService { this.logger.error("Error inserting payload inserts", { error: payloadError, flushId, + runIds: payloadInserts.map((r) => r.run_id), }); recordSpanError(span, payloadError); } From 060c4f71726678cd793953ac17bf85977809e969 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 18:57:07 +0100 Subject: [PATCH 04/15] Detecting bad JSON in run replication and ignoring it --- .../services/runsReplicationService.server.ts | 10 + apps/webapp/app/utils/detectBadJsonStrings.ts | 48 +++++ apps/webapp/test/detectbadJsonStrings.test.ts | 137 +++++++++++++ .../test/runsReplicationService.part2.test.ts | 183 +++++++++++++++++- 4 files changed, 374 insertions(+), 4 deletions(-) create mode 100644 apps/webapp/app/utils/detectBadJsonStrings.ts create mode 100644 apps/webapp/test/detectbadJsonStrings.test.ts diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 44460442ed..0175f83aef 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -15,6 +15,8 @@ import { TaskRun } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import EventEmitter from "node:events"; import pLimit from "p-limit"; +import { logger } from "./logger.server"; +import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -740,6 +742,14 @@ export class RunsReplicationService { return { data: undefined }; } + if (detectBadJsonStrings(data)) { + this.logger.warn("Detected bad JSON strings", { + data, + dataType, + }); + return { data: undefined }; + } + const packet = { data, dataType, diff --git a/apps/webapp/app/utils/detectBadJsonStrings.ts b/apps/webapp/app/utils/detectBadJsonStrings.ts new file mode 100644 index 0000000000..b58cb1e03a --- /dev/null +++ b/apps/webapp/app/utils/detectBadJsonStrings.ts @@ -0,0 +1,48 @@ +export function detectBadJsonStrings(jsonString: string): boolean { + // Single regex with global flag to find all matches with their positions + const regex = /\\ud[89ab][0-9a-f]{2}|\\ud[cd][0-9a-f]{2}/g; + const matches: Array<{ index: number; isHigh: boolean }> = []; + + let match; + while ((match = regex.exec(jsonString)) !== null) { + const isHigh = + match[0].startsWith("\\ud8") || + match[0].startsWith("\\ud9") || + match[0].startsWith("\\uda") || + match[0].startsWith("\\udb"); + matches.push({ index: match.index, isHigh }); + } + + if (matches.length === 0) { + return false; // No Unicode escapes found + } + + // Check for incomplete pairs + const highSurrogates = new Set(); + const lowSurrogates = new Set(); + + for (const { index, isHigh } of matches) { + if (isHigh) { + highSurrogates.add(index); + } else { + lowSurrogates.add(index); + } + } + + // Check for unmatched surrogates + for (const highIndex of highSurrogates) { + const expectedLowIndex = highIndex + 6; // Length of high surrogate + if (!lowSurrogates.has(expectedLowIndex)) { + return true; // Incomplete high surrogate + } + } + + for (const lowIndex of lowSurrogates) { + const expectedHighIndex = lowIndex - 6; // Length of low surrogate + if (!highSurrogates.has(expectedHighIndex)) { + return true; // Incomplete low surrogate + } + } + + return false; +} diff --git a/apps/webapp/test/detectbadJsonStrings.test.ts b/apps/webapp/test/detectbadJsonStrings.test.ts new file mode 100644 index 0000000000..55828cf26a --- /dev/null +++ b/apps/webapp/test/detectbadJsonStrings.test.ts @@ -0,0 +1,137 @@ +import { describe, expect, it } from "vitest"; +import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; + +describe("detectBadJsonStrings", () => { + it("should not detect valid JSON string", () => { + const goodJson = `{"title": "hello"}`; + const result = detectBadJsonStrings(goodJson); + expect(result).toBe(false); + }); + + it("should detect incomplete Unicode escape sequences", () => { + const badJson = `{"title": "hello\\ud835"}`; + const result = detectBadJsonStrings(badJson); + expect(result).toBe(true); + }); + + it("should not detect complete Unicode escape sequences", () => { + const goodJson = `{"title": "hello\\ud835\\udc00"}`; + const result = detectBadJsonStrings(goodJson); + expect(result).toBe(false); + }); + + it("should detect incomplete low surrogate", () => { + const badJson = `{"title": "hello\\udc00"}`; + const result = detectBadJsonStrings(badJson); + expect(result).toBe(true); + }); + + it("should handle multiple Unicode sequences correctly", () => { + const goodJson = `{"title": "hello\\ud835\\udc00\\ud835\\udc01"}`; + const result = detectBadJsonStrings(goodJson); + expect(result).toBe(false); + }); + + it("should detect mixed complete and incomplete sequences", () => { + const badJson = `{"title": "hello\\ud835\\udc00\\ud835"}`; + const result = detectBadJsonStrings(badJson); + expect(result).toBe(true); + }); + + it("should have acceptable performance overhead", () => { + const longText = `hello world `.repeat(1_000); + const goodJson = `{"title": "hello", "text": "${longText}"}`; + const badJson = `{"title": "hello\\ud835", "text": "${longText}"}`; + + const iterations = 100_000; + + // Warm up + for (let i = 0; i < 1000; i++) { + detectBadJsonStrings(goodJson); + detectBadJsonStrings(badJson); + } + + // Measure good JSON (most common case) + const goodStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(goodJson); + } + const goodTime = performance.now() - goodStart; + + // Measure bad JSON (edge case) + const badStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(badJson); + } + const badTime = performance.now() - badStart; + + // Measure baseline (just function call overhead) + const baselineStart = performance.now(); + for (let i = 0; i < iterations; i++) { + // Empty function call to measure baseline + } + const baselineTime = performance.now() - baselineStart; + + const goodOverhead = goodTime - baselineTime; + const badOverhead = badTime - baselineTime; + + console.log(`Baseline (${iterations} iterations): ${baselineTime.toFixed(2)}ms`); + console.log( + `Good JSON (${iterations} iterations): ${goodTime.toFixed( + 2 + )}ms (overhead: ${goodOverhead.toFixed(2)}ms)` + ); + console.log( + `Bad JSON (${iterations} iterations): ${badTime.toFixed( + 2 + )}ms (overhead: ${badOverhead.toFixed(2)}ms)` + ); + console.log( + `Average per call - Good: ${(goodOverhead / iterations).toFixed(4)}ms, Bad: ${( + badOverhead / iterations + ).toFixed(4)}ms` + ); + + // Assertions for performance expectations + // Good JSON should be reasonably fast (most common case) + expect(goodOverhead / iterations).toBeLessThan(0.01); // Less than 10 microseconds per call + + // Bad JSON can be slower due to regex matching, but still reasonable + expect(badOverhead / iterations).toBeLessThan(0.02); // Less than 20 microseconds per call + + // Total overhead for 100k calls should be reasonable + expect(goodOverhead).toBeLessThan(1000); // Less than 1 second for 100k calls + }); + + it("should handle various JSON sizes efficiently", () => { + const sizes = [100, 1000, 10000, 100000]; + const iterations = 10_000; + + for (const size of sizes) { + const text = `hello world `.repeat(size / 11); // Approximate size + const goodJson = `{"title": "hello", "text": "${text}"}`; + + const start = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(goodJson); + } + const time = performance.now() - start; + + console.log( + `Size ${size} chars (${iterations} iterations): ${time.toFixed(2)}ms (${( + time / iterations + ).toFixed(4)}ms per call)` + ); + + // Performance should scale reasonably with size + expect(time / iterations).toBeLessThan(size / 1000); // Roughly linear scaling + } + }); +}); + +function processPacket(data: string): { data?: string; dataType?: string } { + if (detectBadJsonStrings(data)) { + return { data: undefined }; + } + return { data, dataType: "application/json" }; +} diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index 01627e4ec8..9736d99176 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -7,11 +7,13 @@ import { TaskRunStatus } from "~/database-types"; import { RunsReplicationService } from "~/services/runsReplicationService.server"; import { createInMemoryTracing } from "./utils/tracing"; import superjson from "superjson"; +import { readFile } from "node:fs/promises"; +import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; vi.setConfig({ testTimeout: 60_000 }); describe("RunsReplicationService (part 2/2)", () => { - containerTest( + containerTest.skip( "should handover leadership to a second service, and the second service should be able to extend the leader lock", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -140,7 +142,7 @@ describe("RunsReplicationService (part 2/2)", () => { } ); - containerTest( + containerTest.skip( "should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -254,7 +256,7 @@ describe("RunsReplicationService (part 2/2)", () => { } ); - containerTest( + containerTest.skip( "should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse with updates", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -374,7 +376,7 @@ describe("RunsReplicationService (part 2/2)", () => { } ); - containerTest( + containerTest.skip( "should replicate all events in a single transaction (insert, update)", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -611,4 +613,177 @@ describe("RunsReplicationService (part 2/2)", () => { }, { timeout: 60_000 * 5 } ); + + containerTest( + "should insert TaskRuns even if there are incomplete Unicode escape sequences in the JSON", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-stress-bulk-insert", + }); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-stress-bulk-insert", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 10, + flushIntervalMs: 100, + flushBatchSize: 50, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-stress-bulk-insert", "info"), + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { + title: "test-stress-bulk-insert", + slug: "test-stress-bulk-insert", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test-stress-bulk-insert", + slug: "test-stress-bulk-insert", + organizationId: organization.id, + externalRef: "test-stress-bulk-insert", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-stress-bulk-insert", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test-stress-bulk-insert", + pkApiKey: "test-stress-bulk-insert", + shortcode: "test-stress-bulk-insert", + }, + }); + + // Prepare 9 unique TaskRuns + const now = Date.now(); + const runsData = Array.from({ length: 9 }, (_, i) => ({ + friendlyId: `run_bulk_${now}_${i}`, + taskIdentifier: `my-task-bulk`, + payload: `{"title": "hello"}`, + payloadType: "application/json", + traceId: `bulk-${i}`, + spanId: `bulk-${i}`, + queue: "test-stress-bulk-insert", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT" as const, + engine: "V2" as const, + status: "PENDING" as const, + attemptNumber: 1, + createdAt: new Date(now + i), + updatedAt: new Date(now + i), + })); + + //add a run with incomplete Unicode escape sequences + const badPayload = await readFile(`${__dirname}/bad-clickhouse-output.json`, "utf-8"); + const hasProblems = detectBadJsonStrings(badPayload); + expect(hasProblems).toBe(true); + + runsData.push({ + friendlyId: `run_bulk_${now}_10`, + taskIdentifier: `my-task-bulk`, + payload: badPayload, + payloadType: "application/json", + traceId: `bulk-10`, + spanId: `bulk-10`, + queue: "test-stress-bulk-insert", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT" as const, + engine: "V2" as const, + status: "PENDING" as const, + attemptNumber: 1, + createdAt: new Date(now + 10), + updatedAt: new Date(now + 10), + }); + + // Bulk insert + const created = await prisma.taskRun.createMany({ data: runsData }); + expect(created.count).toBe(10); + + // Update the runs (not the 10th one) + await prisma.taskRun.updateMany({ + where: { + spanId: { not: "bulk-10" }, + }, + data: { + status: "COMPLETED_SUCCESSFULLY", + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + // Give the 10th one a bad payload + await prisma.taskRun.updateMany({ + where: { + spanId: "bulk-10", + }, + data: { + status: "COMPLETED_SUCCESSFULLY", + output: badPayload, + outputType: "application/json", + }, + }); + + // Wait for replication + await setTimeout(5000); + + // Query ClickHouse for all runs using FINAL + const queryRuns = clickhouse.reader.query({ + name: "runs-replication-stress-bulk-insert", + query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`, + schema: z.any(), + }); + + const [queryError, result] = await queryRuns({}); + expect(queryError).toBeNull(); + expect(result?.length).toBe(10); + + console.log("Data", { + runsData, + result, + }); + + // Check a few random runs for correctness + for (let i = 0; i < 9; i++) { + const expected = runsData[i]; + const found = result?.find((r: any) => r.friendly_id === expected.friendlyId); + expect(found).toBeDefined(); + expect(found).toEqual( + expect.objectContaining({ + friendly_id: expected.friendlyId, + trace_id: expected.traceId, + task_identifier: expected.taskIdentifier, + status: "COMPLETED_SUCCESSFULLY", + }) + ); + expect(found?.output).toBeDefined(); + } + + // Check the run with the bad JSON + const foundBad = result?.find((r: any) => r.span_id === "bulk-10"); + expect(foundBad).toBeDefined(); + expect(foundBad?.output).toStrictEqual({}); + + await runsReplicationService.stop(); + } + ); }); From 71d3662fc493513c1f535606266c0cb4cd3600fd Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 16:30:18 +0100 Subject: [PATCH 05/15] Reproduced split unicode error --- .../src/fixtures/bad-clickhouse-output.json | 3 + .../clickhouse/src/taskRuns.test.ts | 90 +++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 internal-packages/clickhouse/src/fixtures/bad-clickhouse-output.json diff --git a/internal-packages/clickhouse/src/fixtures/bad-clickhouse-output.json b/internal-packages/clickhouse/src/fixtures/bad-clickhouse-output.json new file mode 100644 index 0000000000..8900b56fe5 --- /dev/null +++ b/internal-packages/clickhouse/src/fixtures/bad-clickhouse-output.json @@ -0,0 +1,3 @@ +{ + "title": "❜ 𝐒 𝐏𝗈𝗌𝗍 . . . 𝐍ð–ū𝗐 𝐂𝗈𝗇𝗍ð–ū𝗇𝗍 ꒰ ⚔ïļ ę’ą 𝐒𝐋 ❜ 𝐔𝐋\n\n꒰ âĪïļ ę’ą 𓃊 𝐋ð—ēð—Ū𝘃ð—ē 𝖚 ð—đð—ķð—ļð—ē 𝖚𝗇\ud835" +} diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index 30ea0270ba..d506252b7b 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -2,6 +2,7 @@ import { clickhouseTest } from "@internal/testcontainers"; import { z } from "zod"; import { ClickhouseClient } from "./client/client.js"; import { getTaskRunsQueryBuilder, insertRawTaskRunPayloads, insertTaskRuns } from "./taskRuns.js"; +import { readFile } from "node:fs/promises"; describe("Task Runs V2", () => { clickhouseTest("should be able to insert task runs", async ({ clickhouseContainer }) => { @@ -395,4 +396,93 @@ describe("Task Runs V2", () => { ); } ); + + clickhouseTest( + "should be able to insert task runs with invalid output", + async ({ clickhouseContainer }) => { + const client = new ClickhouseClient({ + name: "test", + url: clickhouseContainer.getConnectionUrl(), + logLevel: "debug", + }); + + const insert = insertTaskRuns(client, { + async_insert: 0, // turn off async insert for this test + }); + + const output = await readFile(`${__dirname}/fixtures/bad-clickhouse-output.json`, "utf-8"); + + const [insertError, insertResult] = await insert([ + { + environment_id: "env_1234", + environment_type: "DEVELOPMENT", + organization_id: "org_1234", + project_id: "project_1234", + run_id: "run_1234", + friendly_id: "friendly_1234", + attempt: 1, + engine: "V2", + status: "PENDING", + task_identifier: "my-task", + queue: "my-queue", + schedule_id: "schedule_1234", + batch_id: "batch_1234", + created_at: Date.now(), + updated_at: Date.now(), + completed_at: undefined, + tags: ["tag1", "tag2"], + output: JSON.parse(output), + error: { + type: "BUILT_IN_ERROR", + name: "Error", + message: "error", + stackTrace: "stack trace", + }, + usage_duration_ms: 1000, + cost_in_cents: 100, + task_version: "1.0.0", + sdk_version: "1.0.0", + cli_version: "1.0.0", + machine_preset: "small-1x", + is_test: true, + span_id: "span_1234", + trace_id: "trace_1234", + idempotency_key: "idempotency_key_1234", + expiration_ttl: "1h", + root_run_id: "root_run_1234", + parent_run_id: "parent_run_1234", + depth: 1, + _version: "1", + }, + ]); + + expect(insertError).toBeNull(); + expect(insertResult).toEqual(expect.objectContaining({ executed: true })); + expect(insertResult?.summary?.written_rows).toEqual("1"); + + const query = client.query({ + name: "query-task-runs", + query: "SELECT * FROM trigger_dev.task_runs_v2", + schema: z.object({ + environment_id: z.string(), + run_id: z.string(), + }), + params: z.object({ + run_id: z.string(), + }), + }); + + const [queryError, result] = await query({ run_id: "run_1234" }); + + expect(queryError).toBeNull(); + expect(result).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + environment_id: "env_1234", + run_id: "run_1234", + }), + ]) + ); + } + ); }); From c2483e47228e58315b105f8db72127eeb15666e2 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 18:57:24 +0100 Subject: [PATCH 06/15] Move output file --- .../src/fixtures => apps/webapp/test}/bad-clickhouse-output.json | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {internal-packages/clickhouse/src/fixtures => apps/webapp/test}/bad-clickhouse-output.json (100%) diff --git a/internal-packages/clickhouse/src/fixtures/bad-clickhouse-output.json b/apps/webapp/test/bad-clickhouse-output.json similarity index 100% rename from internal-packages/clickhouse/src/fixtures/bad-clickhouse-output.json rename to apps/webapp/test/bad-clickhouse-output.json From b7617d93e9f0d387d8e261a6d9a31ef13d477b7b Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 19:48:22 +0100 Subject: [PATCH 07/15] Massively improved the performance --- apps/webapp/app/utils/detectBadJsonStrings.ts | 85 ++++++++++--------- apps/webapp/test/detectbadJsonStrings.test.ts | 48 ++++++++++- 2 files changed, 90 insertions(+), 43 deletions(-) diff --git a/apps/webapp/app/utils/detectBadJsonStrings.ts b/apps/webapp/app/utils/detectBadJsonStrings.ts index b58cb1e03a..000eb58601 100644 --- a/apps/webapp/app/utils/detectBadJsonStrings.ts +++ b/apps/webapp/app/utils/detectBadJsonStrings.ts @@ -1,48 +1,49 @@ export function detectBadJsonStrings(jsonString: string): boolean { - // Single regex with global flag to find all matches with their positions - const regex = /\\ud[89ab][0-9a-f]{2}|\\ud[cd][0-9a-f]{2}/g; - const matches: Array<{ index: number; isHigh: boolean }> = []; + // Fast path: skip everything if no \u + let idx = jsonString.indexOf("\\u"); + if (idx === -1) return false; - let match; - while ((match = regex.exec(jsonString)) !== null) { - const isHigh = - match[0].startsWith("\\ud8") || - match[0].startsWith("\\ud9") || - match[0].startsWith("\\uda") || - match[0].startsWith("\\udb"); - matches.push({ index: match.index, isHigh }); - } - - if (matches.length === 0) { - return false; // No Unicode escapes found - } - - // Check for incomplete pairs - const highSurrogates = new Set(); - const lowSurrogates = new Set(); - - for (const { index, isHigh } of matches) { - if (isHigh) { - highSurrogates.add(index); - } else { - lowSurrogates.add(index); - } - } - - // Check for unmatched surrogates - for (const highIndex of highSurrogates) { - const expectedLowIndex = highIndex + 6; // Length of high surrogate - if (!lowSurrogates.has(expectedLowIndex)) { - return true; // Incomplete high surrogate - } - } - - for (const lowIndex of lowSurrogates) { - const expectedHighIndex = lowIndex - 6; // Length of low surrogate - if (!highSurrogates.has(expectedHighIndex)) { - return true; // Incomplete low surrogate + // Only check the area around each \u + while (idx !== -1 && idx < jsonString.length - 5) { + if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") { + const third = jsonString[idx + 3]; + // High surrogate + if ( + /[89ab]/.test(third) && + /[0-9a-f]/.test(jsonString[idx + 4]) && + /[0-9a-f]/.test(jsonString[idx + 5]) + ) { + // Check for low surrogate after + if ( + jsonString.substr(idx + 6, 2) !== "\\u" || + jsonString[idx + 8] !== "d" || + !/[cd]/.test(jsonString[idx + 9]) || + !/[0-9a-f]/.test(jsonString[idx + 10]) || + !/[0-9a-f]/.test(jsonString[idx + 11]) + ) { + return true; // Incomplete high surrogate + } + } + // Low surrogate + if ( + (third === "c" || third === "d") && + /[0-9a-f]/.test(jsonString[idx + 4]) && + /[0-9a-f]/.test(jsonString[idx + 5]) + ) { + // Check for high surrogate before + if ( + idx < 6 || + jsonString.substr(idx - 6, 2) !== "\\u" || + jsonString[idx - 4] !== "d" || + !/[89ab]/.test(jsonString[idx - 3]) || + !/[0-9a-f]/.test(jsonString[idx - 2]) || + !/[0-9a-f]/.test(jsonString[idx - 1]) + ) { + return true; // Incomplete low surrogate + } + } } + idx = jsonString.indexOf("\\u", idx + 1); } - return false; } diff --git a/apps/webapp/test/detectbadJsonStrings.test.ts b/apps/webapp/test/detectbadJsonStrings.test.ts index 55828cf26a..2e014a3bf7 100644 --- a/apps/webapp/test/detectbadJsonStrings.test.ts +++ b/apps/webapp/test/detectbadJsonStrings.test.ts @@ -97,7 +97,7 @@ describe("detectBadJsonStrings", () => { expect(goodOverhead / iterations).toBeLessThan(0.01); // Less than 10 microseconds per call // Bad JSON can be slower due to regex matching, but still reasonable - expect(badOverhead / iterations).toBeLessThan(0.02); // Less than 20 microseconds per call + expect(badOverhead / iterations).toBeLessThan(0.01); // Less than 20 microseconds per call // Total overhead for 100k calls should be reasonable expect(goodOverhead).toBeLessThan(1000); // Less than 1 second for 100k calls @@ -127,6 +127,52 @@ describe("detectBadJsonStrings", () => { expect(time / iterations).toBeLessThan(size / 1000); // Roughly linear scaling } }); + + it("should show significant performance improvement with quick rejection", () => { + const longText = `hello world `.repeat(1_000); + const goodJson = `{"title": "hello", "text": "${longText}"}`; + const badJson = `{"title": "hello\\ud835", "text": "${longText}"}`; + const noUnicodeJson = `{"title": "hello", "text": "${longText}"}`; + + const iterations = 100_000; + + // Warm up + for (let i = 0; i < 1000; i++) { + detectBadJsonStrings(goodJson); + detectBadJsonStrings(badJson); + detectBadJsonStrings(noUnicodeJson); + } + + // Test strings with no Unicode escapes (99.9% case) + const noUnicodeStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(noUnicodeJson); + } + const noUnicodeTime = performance.now() - noUnicodeStart; + + // Test strings with Unicode escapes (0.1% case) + const withUnicodeStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(badJson); + } + const withUnicodeTime = performance.now() - withUnicodeStart; + + console.log( + `No Unicode escapes (${iterations} iterations): ${noUnicodeTime.toFixed(2)}ms (${( + noUnicodeTime / iterations + ).toFixed(4)}ms per call)` + ); + console.log( + `With Unicode escapes (${iterations} iterations): ${withUnicodeTime.toFixed(2)}ms (${( + withUnicodeTime / iterations + ).toFixed(4)}ms per call)` + ); + console.log( + `Performance ratio: ${(withUnicodeTime / noUnicodeTime).toFixed( + 2 + )}x slower for Unicode strings` + ); + }); }); function processPacket(data: string): { data?: string; dataType?: string } { From d8c875b65e5d44819ea231cd0a0ff29a322a6265 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Fri, 27 Jun 2025 19:54:06 +0100 Subject: [PATCH 08/15] Minor performance improvements --- apps/webapp/app/utils/detectBadJsonStrings.ts | 39 ++++++++++++++----- apps/webapp/test/detectbadJsonStrings.test.ts | 7 ++++ 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/apps/webapp/app/utils/detectBadJsonStrings.ts b/apps/webapp/app/utils/detectBadJsonStrings.ts index 000eb58601..4a000b5429 100644 --- a/apps/webapp/app/utils/detectBadJsonStrings.ts +++ b/apps/webapp/app/utils/detectBadJsonStrings.ts @@ -3,19 +3,30 @@ export function detectBadJsonStrings(jsonString: string): boolean { let idx = jsonString.indexOf("\\u"); if (idx === -1) return false; - // Only check the area around each \u - while (idx !== -1 && idx < jsonString.length - 5) { + // Use a more efficient scanning strategy + const length = jsonString.length; + + while (idx !== -1 && idx < length - 5) { + // Only check if we have enough characters left + if (idx + 6 > length) break; + if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") { const third = jsonString[idx + 3]; - // High surrogate + + // High surrogate check if ( /[89ab]/.test(third) && /[0-9a-f]/.test(jsonString[idx + 4]) && /[0-9a-f]/.test(jsonString[idx + 5]) ) { - // Check for low surrogate after + // Check for low surrogate after (need at least 6 more chars) + if (idx + 12 > length) { + return true; // Incomplete high surrogate (not enough chars left) + } + if ( - jsonString.substr(idx + 6, 2) !== "\\u" || + jsonString[idx + 6] !== "\\" || + jsonString[idx + 7] !== "u" || jsonString[idx + 8] !== "d" || !/[cd]/.test(jsonString[idx + 9]) || !/[0-9a-f]/.test(jsonString[idx + 10]) || @@ -24,16 +35,21 @@ export function detectBadJsonStrings(jsonString: string): boolean { return true; // Incomplete high surrogate } } - // Low surrogate + + // Low surrogate check if ( (third === "c" || third === "d") && /[0-9a-f]/.test(jsonString[idx + 4]) && /[0-9a-f]/.test(jsonString[idx + 5]) ) { - // Check for high surrogate before + // Check for high surrogate before (need at least 6 chars before) + if (idx < 6) { + return true; // Incomplete low surrogate (not enough chars before) + } + if ( - idx < 6 || - jsonString.substr(idx - 6, 2) !== "\\u" || + jsonString[idx - 6] !== "\\" || + jsonString[idx - 5] !== "u" || jsonString[idx - 4] !== "d" || !/[89ab]/.test(jsonString[idx - 3]) || !/[0-9a-f]/.test(jsonString[idx - 2]) || @@ -43,7 +59,10 @@ export function detectBadJsonStrings(jsonString: string): boolean { } } } - idx = jsonString.indexOf("\\u", idx + 1); + + // More efficient next search - skip ahead by 2 to avoid overlapping matches + idx = jsonString.indexOf("\\u", idx + 2); } + return false; } diff --git a/apps/webapp/test/detectbadJsonStrings.test.ts b/apps/webapp/test/detectbadJsonStrings.test.ts index 2e014a3bf7..7d14bf4aee 100644 --- a/apps/webapp/test/detectbadJsonStrings.test.ts +++ b/apps/webapp/test/detectbadJsonStrings.test.ts @@ -172,6 +172,13 @@ describe("detectBadJsonStrings", () => { 2 )}x slower for Unicode strings` ); + + // Both cases should be extremely fast (under 1 microsecond per call) + expect(noUnicodeTime / iterations).toBeLessThan(0.001); // Less than 1 microsecond + expect(withUnicodeTime / iterations).toBeLessThan(0.001); // Less than 1 microsecond + + // The difference should be reasonable (not more than 5x) + expect(noUnicodeTime / withUnicodeTime).toBeLessThan(5); }); }); From 46b97ccc5f162be22e9e5f834dbe534082d71e06 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 30 Jun 2025 10:00:40 +0100 Subject: [PATCH 09/15] Unskip tests --- .../test/runsReplicationService.part2.test.ts | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index 9736d99176..cb04867f93 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -1,19 +1,16 @@ import { ClickHouse } from "@internal/clickhouse"; import { containerTest } from "@internal/testcontainers"; import { Logger } from "@trigger.dev/core/logger"; +import { readFile } from "node:fs/promises"; import { setTimeout } from "node:timers/promises"; import { z } from "zod"; -import { TaskRunStatus } from "~/database-types"; import { RunsReplicationService } from "~/services/runsReplicationService.server"; -import { createInMemoryTracing } from "./utils/tracing"; -import superjson from "superjson"; -import { readFile } from "node:fs/promises"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; vi.setConfig({ testTimeout: 60_000 }); describe("RunsReplicationService (part 2/2)", () => { - containerTest.skip( + containerTest( "should handover leadership to a second service, and the second service should be able to extend the leader lock", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -142,7 +139,7 @@ describe("RunsReplicationService (part 2/2)", () => { } ); - containerTest.skip( + containerTest( "should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -256,7 +253,7 @@ describe("RunsReplicationService (part 2/2)", () => { } ); - containerTest.skip( + containerTest( "should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse with updates", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); @@ -376,7 +373,7 @@ describe("RunsReplicationService (part 2/2)", () => { } ); - containerTest.skip( + containerTest( "should replicate all events in a single transaction (insert, update)", async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); From 1bc0d5e29ede17212e5f67ff412ad48cc444856a Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 30 Jun 2025 10:02:25 +0100 Subject: [PATCH 10/15] Remove unused test in CH package --- .../clickhouse/src/taskRuns.test.ts | 89 ------------------- 1 file changed, 89 deletions(-) diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index d506252b7b..e2fa8be0ab 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -396,93 +396,4 @@ describe("Task Runs V2", () => { ); } ); - - clickhouseTest( - "should be able to insert task runs with invalid output", - async ({ clickhouseContainer }) => { - const client = new ClickhouseClient({ - name: "test", - url: clickhouseContainer.getConnectionUrl(), - logLevel: "debug", - }); - - const insert = insertTaskRuns(client, { - async_insert: 0, // turn off async insert for this test - }); - - const output = await readFile(`${__dirname}/fixtures/bad-clickhouse-output.json`, "utf-8"); - - const [insertError, insertResult] = await insert([ - { - environment_id: "env_1234", - environment_type: "DEVELOPMENT", - organization_id: "org_1234", - project_id: "project_1234", - run_id: "run_1234", - friendly_id: "friendly_1234", - attempt: 1, - engine: "V2", - status: "PENDING", - task_identifier: "my-task", - queue: "my-queue", - schedule_id: "schedule_1234", - batch_id: "batch_1234", - created_at: Date.now(), - updated_at: Date.now(), - completed_at: undefined, - tags: ["tag1", "tag2"], - output: JSON.parse(output), - error: { - type: "BUILT_IN_ERROR", - name: "Error", - message: "error", - stackTrace: "stack trace", - }, - usage_duration_ms: 1000, - cost_in_cents: 100, - task_version: "1.0.0", - sdk_version: "1.0.0", - cli_version: "1.0.0", - machine_preset: "small-1x", - is_test: true, - span_id: "span_1234", - trace_id: "trace_1234", - idempotency_key: "idempotency_key_1234", - expiration_ttl: "1h", - root_run_id: "root_run_1234", - parent_run_id: "parent_run_1234", - depth: 1, - _version: "1", - }, - ]); - - expect(insertError).toBeNull(); - expect(insertResult).toEqual(expect.objectContaining({ executed: true })); - expect(insertResult?.summary?.written_rows).toEqual("1"); - - const query = client.query({ - name: "query-task-runs", - query: "SELECT * FROM trigger_dev.task_runs_v2", - schema: z.object({ - environment_id: z.string(), - run_id: z.string(), - }), - params: z.object({ - run_id: z.string(), - }), - }); - - const [queryError, result] = await query({ run_id: "run_1234" }); - - expect(queryError).toBeNull(); - expect(result).toEqual( - expect.arrayContaining([ - expect.objectContaining({ - environment_id: "env_1234", - run_id: "run_1234", - }), - ]) - ); - } - ); }); From 0f77577f513f9cf0bbdbb0e6ef2f96b2061c967b Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 30 Jun 2025 16:05:12 +0100 Subject: [PATCH 11/15] Fix for the ClickHouse UI explorer --- docker/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 02b00de7b4..9338816649 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -111,7 +111,7 @@ services: ports: - "5521:5521" environment: - VITE_CLICKHOUSE_URL: "http://clickhouse:8123" + VITE_CLICKHOUSE_URL: "http://localhost:8123" VITE_CLICKHOUSE_USER: "default" VITE_CLICKHOUSE_PASS: "password" networks: From f26efe2489493116135db427a15c2631b1c13b1d Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 30 Jun 2025 13:35:50 +0100 Subject: [PATCH 12/15] RunReplication keepAlive defaults to false --- apps/webapp/app/env.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 18abe94291..1324d1ad01 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -837,7 +837,7 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), - RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"), + RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"), RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(), RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10), // Retry configuration for insert operations From 9e2bda827e08b429ffda0ff19dbd0eece9d9319c Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 30 Jun 2025 16:06:14 +0100 Subject: [PATCH 13/15] Add concurrency_key and bulk_action_group_ids to ClickHouse task runs --- .../005_add_task_runs_v2_concurrency_bulkactions.sql | 12 ++++++++++++ internal-packages/clickhouse/src/taskRuns.test.ts | 7 ++++++- internal-packages/clickhouse/src/taskRuns.ts | 2 ++ 3 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 internal-packages/clickhouse/schema/005_add_task_runs_v2_concurrency_bulkactions.sql diff --git a/internal-packages/clickhouse/schema/005_add_task_runs_v2_concurrency_bulkactions.sql b/internal-packages/clickhouse/schema/005_add_task_runs_v2_concurrency_bulkactions.sql new file mode 100644 index 0000000000..3dd624746d --- /dev/null +++ b/internal-packages/clickhouse/schema/005_add_task_runs_v2_concurrency_bulkactions.sql @@ -0,0 +1,12 @@ +-- +goose Up +/* +Add concurrency_key and bulk_action_group_ids columns with defaults. + */ +ALTER TABLE trigger_dev.task_runs_v2 +ADD COLUMN concurrency_key String DEFAULT '', +ADD COLUMN bulk_action_group_ids Array(String) DEFAULT []; + +-- +goose Down +ALTER TABLE trigger_dev.task_runs_v2 +DROP COLUMN concurrency_key, +DROP COLUMN bulk_action_group_ids; \ No newline at end of file diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index e2fa8be0ab..b51c9f38c0 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -2,7 +2,6 @@ import { clickhouseTest } from "@internal/testcontainers"; import { z } from "zod"; import { ClickhouseClient } from "./client/client.js"; import { getTaskRunsQueryBuilder, insertRawTaskRunPayloads, insertTaskRuns } from "./taskRuns.js"; -import { readFile } from "node:fs/promises"; describe("Task Runs V2", () => { clickhouseTest("should be able to insert task runs", async ({ clickhouseContainer }) => { @@ -62,6 +61,8 @@ describe("Task Runs V2", () => { root_run_id: "root_run_1234", parent_run_id: "parent_run_1234", depth: 1, + concurrency_key: "concurrency_key_1234", + bulk_action_group_ids: ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], _version: "1", }, ]); @@ -76,6 +77,8 @@ describe("Task Runs V2", () => { schema: z.object({ environment_id: z.string(), run_id: z.string(), + concurrency_key: z.string(), + bulk_action_group_ids: z.array(z.string()), }), params: z.object({ run_id: z.string(), @@ -90,6 +93,8 @@ describe("Task Runs V2", () => { expect.objectContaining({ environment_id: "env_1234", run_id: "run_1234", + concurrency_key: "concurrency_key_1234", + bulk_action_group_ids: ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], }), ]) ); diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index aff8af829c..86830b5bd7 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -42,6 +42,8 @@ export const TaskRunV2 = z.object({ idempotency_key: z.string(), expiration_ttl: z.string(), is_test: z.boolean().default(false), + concurrency_key: z.string().default(""), + bulk_action_group_ids: z.array(z.string()).default([]), _version: z.string(), _is_deleted: z.number().int().default(0), }); From 06f98d94638d179c8049dbdfbea04e84f02a68f0 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 30 Jun 2025 16:18:38 +0100 Subject: [PATCH 14/15] ClickHouse package doesn't need to be built anymore for the webapp --- internal-packages/clickhouse/package.json | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/internal-packages/clickhouse/package.json b/internal-packages/clickhouse/package.json index e2767708eb..d85051b506 100644 --- a/internal-packages/clickhouse/package.json +++ b/internal-packages/clickhouse/package.json @@ -25,5 +25,15 @@ "db:migrate:down": "GOOSE_COMMAND=down pnpm run db:migrate", "test": "vitest --sequence.concurrent=false --no-file-parallelism", "test:coverage": "vitest --sequence.concurrent=false --no-file-parallelism --coverage.enabled" + }, + "exports": { + "./package.json": "./package.json", + ".": { + "import": { + "@triggerdotdev/source": "./src/index.ts", + "types": "./dist/src/index.d.ts", + "default": "./dist/src/index.js" + } + } } -} \ No newline at end of file +} From 21f5f6f7f4b6b3cefcc5f714206448210e098c72 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Mon, 30 Jun 2025 16:18:52 +0100 Subject: [PATCH 15/15] Set the concurrency_key from the run replication service --- apps/webapp/app/services/runsReplicationService.server.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 0175f83aef..97ceb23ea8 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -715,6 +715,7 @@ export class RunsReplicationService { idempotency_key: run.idempotencyKey ?? "", expiration_ttl: run.ttl ?? "", output, + concurrency_key: run.concurrencyKey ?? "", _version: _version.toString(), _is_deleted: event === "delete" ? 1 : 0, };