Skip to content

ClickHouse replication improvements (retrying, strip bad unicode chars) #2205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -837,9 +837,13 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"),
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("0"),
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
// Retry configuration for insert operations
RUN_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),

// Clickhouse
CLICKHOUSE_URL: z.string().optional(),
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ function initializeRunsReplicationInstance() {
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
waitForAsyncInsert: env.RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT === "1",
tracer: provider.getTracer("runs-replication-service"),
insertMaxRetries: env.RUN_REPLICATION_INSERT_MAX_RETRIES,
insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS,
insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS,
});

if (env.RUN_REPLICATION_ENABLED === "1") {
Expand Down
124 changes: 122 additions & 2 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { TaskRun } from "@trigger.dev/database";
import { nanoid } from "nanoid";
import EventEmitter from "node:events";
import pLimit from "p-limit";
import { logger } from "./logger.server";
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";

interface TransactionEvent<T = any> {
tag: "insert" | "update" | "delete";
Expand Down Expand Up @@ -51,6 +53,10 @@ export type RunsReplicationServiceOptions = {
logLevel?: LogLevel;
tracer?: Tracer;
waitForAsyncInsert?: boolean;
// Retry configuration for insert operations
insertMaxRetries?: number;
insertBaseDelayMs?: number;
insertMaxDelayMs?: number;
};

type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
Expand Down Expand Up @@ -80,6 +86,10 @@ export class RunsReplicationService {
private _latestCommitEndLsn: string | null = null;
private _lastAcknowledgedLsn: string | null = null;
private _acknowledgeInterval: NodeJS.Timeout | null = null;
// Retry configuration
private _insertMaxRetries: number;
private _insertBaseDelayMs: number;
private _insertMaxDelayMs: number;

public readonly events: EventEmitter<RunsReplicationServiceEvents>;

Expand Down Expand Up @@ -151,6 +161,11 @@ export class RunsReplicationService {
this._replicationClient.events.on("leaderElection", (isLeader) => {
this.logger.info("Leader election", { isLeader });
});

// Initialize retry configuration
this._insertMaxRetries = options.insertMaxRetries ?? 3;
this._insertBaseDelayMs = options.insertBaseDelayMs ?? 100;
this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000;
}

public async shutdown() {
Expand Down Expand Up @@ -445,8 +460,37 @@ export class RunsReplicationService {
payloadInserts: payloadInserts.length,
});

await this.#insertTaskRunInserts(taskRunInserts);
await this.#insertPayloadInserts(payloadInserts);
// Insert task runs and payloads with retry logic for connection errors
const [taskRunError, taskRunResult] = await this.#insertWithRetry(
() => this.#insertTaskRunInserts(taskRunInserts),
"task run inserts",
flushId
);

const [payloadError, payloadResult] = await this.#insertWithRetry(
() => this.#insertPayloadInserts(payloadInserts),
"payload inserts",
flushId
);

// Log any errors that occurred
if (taskRunError) {
this.logger.error("Error inserting task run inserts", {
error: taskRunError,
flushId,
runIds: taskRunInserts.map((r) => r.run_id),
});
recordSpanError(span, taskRunError);
}

if (payloadError) {
this.logger.error("Error inserting payload inserts", {
error: payloadError,
flushId,
runIds: payloadInserts.map((r) => r.run_id),
});
recordSpanError(span, payloadError);
}

this.logger.debug("Flushed inserts", {
flushId,
Expand All @@ -456,6 +500,73 @@ export class RunsReplicationService {
});
}

// New method to handle inserts with retry logic for connection errors
async #insertWithRetry<T>(
insertFn: () => Promise<T>,
operationName: string,
flushId: string
): Promise<[Error | null, T | null]> {
let lastError: Error | null = null;

for (let attempt = 1; attempt <= this._insertMaxRetries; attempt++) {
try {
const result = await insertFn();
return [null, result];
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));

// Check if this is a retryable connection error
if (this.#isRetryableConnectionError(lastError) && attempt < this._insertMaxRetries) {
const delay = this.#calculateConnectionRetryDelay(attempt);

this.logger.warn(`Retrying ${operationName} due to connection error`, {
flushId,
attempt,
maxRetries: this._insertMaxRetries,
error: lastError.message,
delay,
});

await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
break;
}
}

return [lastError, null];
}

// New method to check if an error is a retryable connection error
#isRetryableConnectionError(error: Error): boolean {
const errorMessage = error.message.toLowerCase();
const retryableConnectionPatterns = [
"socket hang up",
"econnreset",
"connection reset",
"connection refused",
"connection timeout",
"network error",
"read econnreset",
"write econnreset",
];

return retryableConnectionPatterns.some((pattern) => errorMessage.includes(pattern));
}

// New method to calculate retry delay for connection errors
#calculateConnectionRetryDelay(attempt: number): number {
// Exponential backoff: baseDelay, baseDelay*2, baseDelay*4, etc.
const delay = Math.min(
this._insertBaseDelayMs * Math.pow(2, attempt - 1),
this._insertMaxDelayMs
);

// Add some jitter to prevent thundering herd
const jitter = Math.random() * 100;
return delay + jitter;
}

async #insertTaskRunInserts(taskRunInserts: TaskRunV2[]) {
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
Expand Down Expand Up @@ -604,6 +715,7 @@ export class RunsReplicationService {
idempotency_key: run.idempotencyKey ?? "",
expiration_ttl: run.ttl ?? "",
output,
concurrency_key: run.concurrencyKey ?? "",
_version: _version.toString(),
_is_deleted: event === "delete" ? 1 : 0,
};
Expand Down Expand Up @@ -631,6 +743,14 @@ export class RunsReplicationService {
return { data: undefined };
}

if (detectBadJsonStrings(data)) {
this.logger.warn("Detected bad JSON strings", {
data,
dataType,
});
return { data: undefined };
}

const packet = {
data,
dataType,
Expand Down
68 changes: 68 additions & 0 deletions apps/webapp/app/utils/detectBadJsonStrings.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
export function detectBadJsonStrings(jsonString: string): boolean {
// Fast path: skip everything if no \u
let idx = jsonString.indexOf("\\u");
if (idx === -1) return false;

// Use a more efficient scanning strategy
const length = jsonString.length;

while (idx !== -1 && idx < length - 5) {
// Only check if we have enough characters left
if (idx + 6 > length) break;

if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") {
const third = jsonString[idx + 3];

// High surrogate check
if (
/[89ab]/.test(third) &&
/[0-9a-f]/.test(jsonString[idx + 4]) &&
/[0-9a-f]/.test(jsonString[idx + 5])
) {
// Check for low surrogate after (need at least 6 more chars)
if (idx + 12 > length) {
return true; // Incomplete high surrogate (not enough chars left)
}

if (
jsonString[idx + 6] !== "\\" ||
jsonString[idx + 7] !== "u" ||
jsonString[idx + 8] !== "d" ||
!/[cd]/.test(jsonString[idx + 9]) ||
!/[0-9a-f]/.test(jsonString[idx + 10]) ||
!/[0-9a-f]/.test(jsonString[idx + 11])
) {
return true; // Incomplete high surrogate
}
}

// Low surrogate check
if (
(third === "c" || third === "d") &&
/[0-9a-f]/.test(jsonString[idx + 4]) &&
/[0-9a-f]/.test(jsonString[idx + 5])
) {
// Check for high surrogate before (need at least 6 chars before)
if (idx < 6) {
return true; // Incomplete low surrogate (not enough chars before)
}

if (
jsonString[idx - 6] !== "\\" ||
jsonString[idx - 5] !== "u" ||
jsonString[idx - 4] !== "d" ||
!/[89ab]/.test(jsonString[idx - 3]) ||
!/[0-9a-f]/.test(jsonString[idx - 2]) ||
!/[0-9a-f]/.test(jsonString[idx - 1])
) {
return true; // Incomplete low surrogate
}
}
}

// More efficient next search - skip ahead by 2 to avoid overlapping matches
idx = jsonString.indexOf("\\u", idx + 2);
}

return false;
}
3 changes: 3 additions & 0 deletions apps/webapp/test/bad-clickhouse-output.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"title": "❜ 𝐒 𝐏𝗈𝗌𝗍 . . . 𝐍𝖾𝗐 𝐂𝗈𝗇𝗍𝖾𝗇𝗍 ꒰ ⚔️ ꒱ 𝐒𝐋 ❜ 𝐔𝐋\n\n꒰ ❤️ ꒱ 𓃊 𝐋𝗲𝗮𝘃𝗲 𝖺 𝗹𝗶𝗸𝗲 𝖺𝗇\ud835"
}
Loading