diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 6f742faa26..1324d1ad01 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -837,9 +837,13 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), - RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"), + RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"), RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(), RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10), + // Retry configuration for insert operations + RUN_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3), + RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), + RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), // Clickhouse CLICKHOUSE_URL: z.string().optional(), diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 8a8b0df650..86b17601a7 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -62,6 +62,9 @@ function initializeRunsReplicationInstance() { logLevel: env.RUN_REPLICATION_LOG_LEVEL, waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1", tracer: provider.getTracer("runs-replication-service"), + insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES, + insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS, + insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS, }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index eed1d3c9f3..97ceb23ea8 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -15,6 +15,8 @@ import { TaskRun } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import EventEmitter from "node:events"; import pLimit from "p-limit"; +import { logger } from "./logger.server"; +import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -51,6 +53,10 @@ export type RunsReplicationServiceOptions = { logLevel?: LogLevel; tracer?: Tracer; waitForAsyncInsert?: boolean; + // Retry configuration for insert operations + insertMaxRetries?: number; + insertBaseDelayMs?: number; + insertMaxDelayMs?: number; }; type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" }; @@ -80,6 +86,10 @@ export class RunsReplicationService { private _latestCommitEndLsn: string | null = null; private _lastAcknowledgedLsn: string | null = null; private _acknowledgeInterval: NodeJS.Timeout | null = null; + // Retry configuration + private _insertMaxRetries: number; + private _insertBaseDelayMs: number; + private _insertMaxDelayMs: number; public readonly events: EventEmitter; @@ -151,6 +161,11 @@ export class RunsReplicationService { this._replicationClient.events.on("leaderElection", (isLeader) => { this.logger.info("Leader election", { isLeader }); }); + + // Initialize retry configuration + this._insertMaxRetries = options.insertMaxRetries ?? 3; + this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100; + this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; } public async shutdown() { @@ -445,8 +460,37 @@ export class RunsReplicationService { payloadInserts: payloadInserts.length, }); - await this.#insertTaskRunInserts(taskRunInserts); - await this.#insertPayloadInserts(payloadInserts); + // Insert task runs and payloads with retry logic for connection errors + const [taskRunError, taskRunResult] = await this.#insertWithRetry( + () => this.#insertTaskRunInserts(taskRunInserts), + "task run inserts", + flushId + ); + + const [payloadError, payloadResult] = await this.#insertWithRetry( + () => this.#insertPayloadInserts(payloadInserts), + "payload inserts", + flushId + ); + + // Log any errors that occurred + if (taskRunError) { + this.logger.error("Error inserting task run inserts", { + error: taskRunError, + flushId, + runIds: taskRunInserts.map((r) => r.run_id), + }); + recordSpanError(span, taskRunError); + } + + if (payloadError) { + this.logger.error("Error inserting payload inserts", { + error: payloadError, + flushId, + runIds: payloadInserts.map((r) => r.run_id), + }); + recordSpanError(span, payloadError); + } this.logger.debug("Flushed inserts", { flushId, @@ -456,6 +500,73 @@ export class RunsReplicationService { }); } + // New method to handle inserts with retry logic for connection errors + async #insertWithRetry( + insertFn: () => Promise, + operationName: string, + flushId: string + ): Promise<[Error | null, T | null]> { + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) { + try { + const result = await insertFn(); + return [null, result]; + } catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)); + + // Check if this is a retryable connection error + if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) { + const delay = this.#calculateConnectionRetryDelay(attempt); + + this.logger.warn(`Retrying ${operationName} due to connection error`, { + flushId, + attempt, + maxRetries: this._insertMaxRetries, + error: lastError.message, + delay, + }); + + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + break; + } + } + + return [lastError, null]; + } + + // New method to check if an error is a retryable connection error + #isRetryableConnectionError(error: Error): boolean { + const errorMessage = error.message.toLowerCase(); + const retryableConnectionPatterns = [ + "socket hang up", + "econnreset", + "connection reset", + "connection refused", + "connection timeout", + "network error", + "read econnreset", + "write econnreset", + ]; + + return retryableConnectionPatterns.some((pattern) => errorMessage.includes(pattern)); + } + + // New method to calculate retry delay for connection errors + #calculateConnectionRetryDelay(attempt: number): number { + // Exponential backoff: baseDelay, baseDelay*2, baseDelay*4, etc. + const delay = Math.min( + this._insertBaseDelayMs * Math.pow(2, attempt - 1), + this._insertMaxDelayMs + ); + + // Add some jitter to prevent thundering herd + const jitter = Math.random() * 100; + return delay + jitter; + } + async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) { return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert( @@ -604,6 +715,7 @@ export class RunsReplicationService { idempotency_key: run.idempotencyKey ?? "", expiration_ttl: run.ttl ?? "", output, + concurrency_key: run.concurrencyKey ?? "", _version: _version.toString(), _is_deleted: event === "delete" ? 1 : 0, }; @@ -631,6 +743,14 @@ export class RunsReplicationService { return { data: undefined }; } + if (detectBadJsonStrings(data)) { + this.logger.warn("Detected bad JSON strings", { + data, + dataType, + }); + return { data: undefined }; + } + const packet = { data, dataType, diff --git a/apps/webapp/app/utils/detectBadJsonStrings.ts b/apps/webapp/app/utils/detectBadJsonStrings.ts new file mode 100644 index 0000000000..4a000b5429 --- /dev/null +++ b/apps/webapp/app/utils/detectBadJsonStrings.ts @@ -0,0 +1,68 @@ +export function detectBadJsonStrings(jsonString: string): boolean { + // Fast path: skip everything if no \u + let idx = jsonString.indexOf("\\u"); + if (idx === -1) return false; + + // Use a more efficient scanning strategy + const length = jsonString.length; + + while (idx !== -1 && idx < length - 5) { + // Only check if we have enough characters left + if (idx + 6 > length) break; + + if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") { + const third = jsonString[idx + 3]; + + // High surrogate check + if ( + /[89ab]/.test(third) && + /[0-9a-f]/.test(jsonString[idx + 4]) && + /[0-9a-f]/.test(jsonString[idx + 5]) + ) { + // Check for low surrogate after (need at least 6 more chars) + if (idx + 12 > length) { + return true; // Incomplete high surrogate (not enough chars left) + } + + if ( + jsonString[idx + 6] !== "\\" || + jsonString[idx + 7] !== "u" || + jsonString[idx + 8] !== "d" || + !/[cd]/.test(jsonString[idx + 9]) || + !/[0-9a-f]/.test(jsonString[idx + 10]) || + !/[0-9a-f]/.test(jsonString[idx + 11]) + ) { + return true; // Incomplete high surrogate + } + } + + // Low surrogate check + if ( + (third === "c" || third === "d") && + /[0-9a-f]/.test(jsonString[idx + 4]) && + /[0-9a-f]/.test(jsonString[idx + 5]) + ) { + // Check for high surrogate before (need at least 6 chars before) + if (idx < 6) { + return true; // Incomplete low surrogate (not enough chars before) + } + + if ( + jsonString[idx - 6] !== "\\" || + jsonString[idx - 5] !== "u" || + jsonString[idx - 4] !== "d" || + !/[89ab]/.test(jsonString[idx - 3]) || + !/[0-9a-f]/.test(jsonString[idx - 2]) || + !/[0-9a-f]/.test(jsonString[idx - 1]) + ) { + return true; // Incomplete low surrogate + } + } + } + + // More efficient next search - skip ahead by 2 to avoid overlapping matches + idx = jsonString.indexOf("\\u", idx + 2); + } + + return false; +} diff --git a/apps/webapp/test/bad-clickhouse-output.json b/apps/webapp/test/bad-clickhouse-output.json new file mode 100644 index 0000000000..8900b56fe5 --- /dev/null +++ b/apps/webapp/test/bad-clickhouse-output.json @@ -0,0 +1,3 @@ +{ + "title": "❜ 𝐒 𝐏𝗈𝗌𝗍 . . . 𝐍ð–ū𝗐 𝐂𝗈𝗇𝗍ð–ū𝗇𝗍 ꒰ ⚔ïļ ę’ą 𝐒𝐋 ❜ 𝐔𝐋\n\n꒰ âĪïļ ę’ą 𓃊 𝐋ð—ēð—Ū𝘃ð—ē 𝖚 ð—đð—ķð—ļð—ē 𝖚𝗇\ud835" +} diff --git a/apps/webapp/test/detectbadJsonStrings.test.ts b/apps/webapp/test/detectbadJsonStrings.test.ts new file mode 100644 index 0000000000..7d14bf4aee --- /dev/null +++ b/apps/webapp/test/detectbadJsonStrings.test.ts @@ -0,0 +1,190 @@ +import { describe, expect, it } from "vitest"; +import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; + +describe("detectBadJsonStrings", () => { + it("should not detect valid JSON string", () => { + const goodJson = `{"title": "hello"}`; + const result = detectBadJsonStrings(goodJson); + expect(result).toBe(false); + }); + + it("should detect incomplete Unicode escape sequences", () => { + const badJson = `{"title": "hello\\ud835"}`; + const result = detectBadJsonStrings(badJson); + expect(result).toBe(true); + }); + + it("should not detect complete Unicode escape sequences", () => { + const goodJson = `{"title": "hello\\ud835\\udc00"}`; + const result = detectBadJsonStrings(goodJson); + expect(result).toBe(false); + }); + + it("should detect incomplete low surrogate", () => { + const badJson = `{"title": "hello\\udc00"}`; + const result = detectBadJsonStrings(badJson); + expect(result).toBe(true); + }); + + it("should handle multiple Unicode sequences correctly", () => { + const goodJson = `{"title": "hello\\ud835\\udc00\\ud835\\udc01"}`; + const result = detectBadJsonStrings(goodJson); + expect(result).toBe(false); + }); + + it("should detect mixed complete and incomplete sequences", () => { + const badJson = `{"title": "hello\\ud835\\udc00\\ud835"}`; + const result = detectBadJsonStrings(badJson); + expect(result).toBe(true); + }); + + it("should have acceptable performance overhead", () => { + const longText = `hello world `.repeat(1_000); + const goodJson = `{"title": "hello", "text": "${longText}"}`; + const badJson = `{"title": "hello\\ud835", "text": "${longText}"}`; + + const iterations = 100_000; + + // Warm up + for (let i = 0; i < 1000; i++) { + detectBadJsonStrings(goodJson); + detectBadJsonStrings(badJson); + } + + // Measure good JSON (most common case) + const goodStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(goodJson); + } + const goodTime = performance.now() - goodStart; + + // Measure bad JSON (edge case) + const badStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(badJson); + } + const badTime = performance.now() - badStart; + + // Measure baseline (just function call overhead) + const baselineStart = performance.now(); + for (let i = 0; i < iterations; i++) { + // Empty function call to measure baseline + } + const baselineTime = performance.now() - baselineStart; + + const goodOverhead = goodTime - baselineTime; + const badOverhead = badTime - baselineTime; + + console.log(`Baseline (${iterations} iterations): ${baselineTime.toFixed(2)}ms`); + console.log( + `Good JSON (${iterations} iterations): ${goodTime.toFixed( + 2 + )}ms (overhead: ${goodOverhead.toFixed(2)}ms)` + ); + console.log( + `Bad JSON (${iterations} iterations): ${badTime.toFixed( + 2 + )}ms (overhead: ${badOverhead.toFixed(2)}ms)` + ); + console.log( + `Average per call - Good: ${(goodOverhead / iterations).toFixed(4)}ms, Bad: ${( + badOverhead / iterations + ).toFixed(4)}ms` + ); + + // Assertions for performance expectations + // Good JSON should be reasonably fast (most common case) + expect(goodOverhead / iterations).toBeLessThan(0.01); // Less than 10 microseconds per call + + // Bad JSON can be slower due to regex matching, but still reasonable + expect(badOverhead / iterations).toBeLessThan(0.01); // Less than 20 microseconds per call + + // Total overhead for 100k calls should be reasonable + expect(goodOverhead).toBeLessThan(1000); // Less than 1 second for 100k calls + }); + + it("should handle various JSON sizes efficiently", () => { + const sizes = [100, 1000, 10000, 100000]; + const iterations = 10_000; + + for (const size of sizes) { + const text = `hello world `.repeat(size / 11); // Approximate size + const goodJson = `{"title": "hello", "text": "${text}"}`; + + const start = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(goodJson); + } + const time = performance.now() - start; + + console.log( + `Size ${size} chars (${iterations} iterations): ${time.toFixed(2)}ms (${( + time / iterations + ).toFixed(4)}ms per call)` + ); + + // Performance should scale reasonably with size + expect(time / iterations).toBeLessThan(size / 1000); // Roughly linear scaling + } + }); + + it("should show significant performance improvement with quick rejection", () => { + const longText = `hello world `.repeat(1_000); + const goodJson = `{"title": "hello", "text": "${longText}"}`; + const badJson = `{"title": "hello\\ud835", "text": "${longText}"}`; + const noUnicodeJson = `{"title": "hello", "text": "${longText}"}`; + + const iterations = 100_000; + + // Warm up + for (let i = 0; i < 1000; i++) { + detectBadJsonStrings(goodJson); + detectBadJsonStrings(badJson); + detectBadJsonStrings(noUnicodeJson); + } + + // Test strings with no Unicode escapes (99.9% case) + const noUnicodeStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(noUnicodeJson); + } + const noUnicodeTime = performance.now() - noUnicodeStart; + + // Test strings with Unicode escapes (0.1% case) + const withUnicodeStart = performance.now(); + for (let i = 0; i < iterations; i++) { + detectBadJsonStrings(badJson); + } + const withUnicodeTime = performance.now() - withUnicodeStart; + + console.log( + `No Unicode escapes (${iterations} iterations): ${noUnicodeTime.toFixed(2)}ms (${( + noUnicodeTime / iterations + ).toFixed(4)}ms per call)` + ); + console.log( + `With Unicode escapes (${iterations} iterations): ${withUnicodeTime.toFixed(2)}ms (${( + withUnicodeTime / iterations + ).toFixed(4)}ms per call)` + ); + console.log( + `Performance ratio: ${(withUnicodeTime / noUnicodeTime).toFixed( + 2 + )}x slower for Unicode strings` + ); + + // Both cases should be extremely fast (under 1 microsecond per call) + expect(noUnicodeTime / iterations).toBeLessThan(0.001); // Less than 1 microsecond + expect(withUnicodeTime / iterations).toBeLessThan(0.001); // Less than 1 microsecond + + // The difference should be reasonable (not more than 5x) + expect(noUnicodeTime / withUnicodeTime).toBeLessThan(5); + }); +}); + +function processPacket(data: string): { data?: string; dataType?: string } { + if (detectBadJsonStrings(data)) { + return { data: undefined }; + } + return { data, dataType: "application/json" }; +} diff --git a/apps/webapp/test/runsReplicationService.part2.test.ts b/apps/webapp/test/runsReplicationService.part2.test.ts index 01627e4ec8..cb04867f93 100644 --- a/apps/webapp/test/runsReplicationService.part2.test.ts +++ b/apps/webapp/test/runsReplicationService.part2.test.ts @@ -1,12 +1,11 @@ import { ClickHouse } from "@internal/clickhouse"; import { containerTest } from "@internal/testcontainers"; import { Logger } from "@trigger.dev/core/logger"; +import { readFile } from "node:fs/promises"; import { setTimeout } from "node:timers/promises"; import { z } from "zod"; -import { TaskRunStatus } from "~/database-types"; import { RunsReplicationService } from "~/services/runsReplicationService.server"; -import { createInMemoryTracing } from "./utils/tracing"; -import superjson from "superjson"; +import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; vi.setConfig({ testTimeout: 60_000 }); @@ -611,4 +610,177 @@ describe("RunsReplicationService (part 2/2)", () => { }, { timeout: 60_000 * 5 } ); + + containerTest( + "should insert TaskRuns even if there are incomplete Unicode escape sequences in the JSON", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`); + + const clickhouse = new ClickHouse({ + url: clickhouseContainer.getConnectionUrl(), + name: "runs-replication-stress-bulk-insert", + }); + + const runsReplicationService = new RunsReplicationService({ + clickhouse, + pgConnectionUrl: postgresContainer.getConnectionUri(), + serviceName: "runs-replication-stress-bulk-insert", + slotName: "task_runs_to_clickhouse_v1", + publicationName: "task_runs_to_clickhouse_v1_publication", + redisOptions, + maxFlushConcurrency: 10, + flushIntervalMs: 100, + flushBatchSize: 50, + leaderLockTimeoutMs: 5000, + leaderLockExtendIntervalMs: 1000, + ackIntervalSeconds: 5, + logger: new Logger("runs-replication-stress-bulk-insert", "info"), + }); + + await runsReplicationService.start(); + + const organization = await prisma.organization.create({ + data: { + title: "test-stress-bulk-insert", + slug: "test-stress-bulk-insert", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test-stress-bulk-insert", + slug: "test-stress-bulk-insert", + organizationId: organization.id, + externalRef: "test-stress-bulk-insert", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test-stress-bulk-insert", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test-stress-bulk-insert", + pkApiKey: "test-stress-bulk-insert", + shortcode: "test-stress-bulk-insert", + }, + }); + + // Prepare 9 unique TaskRuns + const now = Date.now(); + const runsData = Array.from({ length: 9 }, (_, i) => ({ + friendlyId: `run_bulk_${now}_${i}`, + taskIdentifier: `my-task-bulk`, + payload: `{"title": "hello"}`, + payloadType: "application/json", + traceId: `bulk-${i}`, + spanId: `bulk-${i}`, + queue: "test-stress-bulk-insert", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT" as const, + engine: "V2" as const, + status: "PENDING" as const, + attemptNumber: 1, + createdAt: new Date(now + i), + updatedAt: new Date(now + i), + })); + + //add a run with incomplete Unicode escape sequences + const badPayload = await readFile(`${__dirname}/bad-clickhouse-output.json`, "utf-8"); + const hasProblems = detectBadJsonStrings(badPayload); + expect(hasProblems).toBe(true); + + runsData.push({ + friendlyId: `run_bulk_${now}_10`, + taskIdentifier: `my-task-bulk`, + payload: badPayload, + payloadType: "application/json", + traceId: `bulk-10`, + spanId: `bulk-10`, + queue: "test-stress-bulk-insert", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT" as const, + engine: "V2" as const, + status: "PENDING" as const, + attemptNumber: 1, + createdAt: new Date(now + 10), + updatedAt: new Date(now + 10), + }); + + // Bulk insert + const created = await prisma.taskRun.createMany({ data: runsData }); + expect(created.count).toBe(10); + + // Update the runs (not the 10th one) + await prisma.taskRun.updateMany({ + where: { + spanId: { not: "bulk-10" }, + }, + data: { + status: "COMPLETED_SUCCESSFULLY", + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + + // Give the 10th one a bad payload + await prisma.taskRun.updateMany({ + where: { + spanId: "bulk-10", + }, + data: { + status: "COMPLETED_SUCCESSFULLY", + output: badPayload, + outputType: "application/json", + }, + }); + + // Wait for replication + await setTimeout(5000); + + // Query ClickHouse for all runs using FINAL + const queryRuns = clickhouse.reader.query({ + name: "runs-replication-stress-bulk-insert", + query: `SELECT * FROM trigger_dev.task_runs_v2 FINAL`, + schema: z.any(), + }); + + const [queryError, result] = await queryRuns({}); + expect(queryError).toBeNull(); + expect(result?.length).toBe(10); + + console.log("Data", { + runsData, + result, + }); + + // Check a few random runs for correctness + for (let i = 0; i < 9; i++) { + const expected = runsData[i]; + const found = result?.find((r: any) => r.friendly_id === expected.friendlyId); + expect(found).toBeDefined(); + expect(found).toEqual( + expect.objectContaining({ + friendly_id: expected.friendlyId, + trace_id: expected.traceId, + task_identifier: expected.taskIdentifier, + status: "COMPLETED_SUCCESSFULLY", + }) + ); + expect(found?.output).toBeDefined(); + } + + // Check the run with the bad JSON + const foundBad = result?.find((r: any) => r.span_id === "bulk-10"); + expect(foundBad).toBeDefined(); + expect(foundBad?.output).toStrictEqual({}); + + await runsReplicationService.stop(); + } + ); }); diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 02b00de7b4..9338816649 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -111,7 +111,7 @@ services: ports: - "5521:5521" environment: - VITE_CLICKHOUSE_URL: "http://clickhouse:8123" + VITE_CLICKHOUSE_URL: "http://localhost:8123" VITE_CLICKHOUSE_USER: "default" VITE_CLICKHOUSE_PASS: "password" networks: diff --git a/internal-packages/clickhouse/package.json b/internal-packages/clickhouse/package.json index e2767708eb..d85051b506 100644 --- a/internal-packages/clickhouse/package.json +++ b/internal-packages/clickhouse/package.json @@ -25,5 +25,15 @@ "db:migrate:down": "GOOSE_COMMAND=down pnpm run db:migrate", "test": "vitest --sequence.concurrent=false --no-file-parallelism", "test:coverage": "vitest --sequence.concurrent=false --no-file-parallelism --coverage.enabled" + }, + "exports": { + "./package.json": "./package.json", + ".": { + "import": { + "@triggerdotdev/source": "./src/index.ts", + "types": "./dist/src/index.d.ts", + "default": "./dist/src/index.js" + } + } } -} \ No newline at end of file +} diff --git a/internal-packages/clickhouse/schema/005_add_task_runs_v2_concurrency_bulkactions.sql b/internal-packages/clickhouse/schema/005_add_task_runs_v2_concurrency_bulkactions.sql new file mode 100644 index 0000000000..3dd624746d --- /dev/null +++ b/internal-packages/clickhouse/schema/005_add_task_runs_v2_concurrency_bulkactions.sql @@ -0,0 +1,12 @@ +-- +goose Up +/* +Add concurrency_key and bulk_action_group_ids columns with defaults. + */ +ALTER TABLE trigger_dev.task_runs_v2 +ADD COLUMN concurrency_key String DEFAULT '', +ADD COLUMN bulk_action_group_ids Array(String) DEFAULT []; + +-- +goose Down +ALTER TABLE trigger_dev.task_runs_v2 +DROP COLUMN concurrency_key, +DROP COLUMN bulk_action_group_ids; \ No newline at end of file diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index 30ea0270ba..b51c9f38c0 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -61,6 +61,8 @@ describe("Task Runs V2", () => { root_run_id: "root_run_1234", parent_run_id: "parent_run_1234", depth: 1, + concurrency_key: "concurrency_key_1234", + bulk_action_group_ids: ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], _version: "1", }, ]); @@ -75,6 +77,8 @@ describe("Task Runs V2", () => { schema: z.object({ environment_id: z.string(), run_id: z.string(), + concurrency_key: z.string(), + bulk_action_group_ids: z.array(z.string()), }), params: z.object({ run_id: z.string(), @@ -89,6 +93,8 @@ describe("Task Runs V2", () => { expect.objectContaining({ environment_id: "env_1234", run_id: "run_1234", + concurrency_key: "concurrency_key_1234", + bulk_action_group_ids: ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], }), ]) ); diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index aff8af829c..86830b5bd7 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -42,6 +42,8 @@ export const TaskRunV2 = z.object({ idempotency_key: z.string(), expiration_ttl: z.string(), is_test: z.boolean().default(false), + concurrency_key: z.string().default(""), + bulk_action_group_ids: z.array(z.string()).default([]), _version: z.string(), _is_deleted: z.number().int().default(0), });