Skip to content

Commit e9d4403

Browse files
committed
rotor: metrics performance improvements
1 parent bc3b2aa commit e9d4403

File tree

14 files changed

+252
-240
lines changed

14 files changed

+252
-240
lines changed

libs/core-functions/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@
4444
"juava": "workspace:*",
4545
"lodash": "^4.17.21",
4646
"mongodb": "^6.10.0",
47-
"@clickhouse/client": "^1.9.1",
47+
"@clickhouse/client": "^1.10.1",
4848
"node-cache": "^5.1.2",
4949
"parse-duration": "^1.1.0",
50-
"axios": "^1.7.7",
50+
"axios": "1.8.2",
5151
"posthog-node": "^4.2.1",
5252
"tslib": "^2.6.3",
5353
"zod": "^3.23.8"

libs/core-functions/src/functions/lib/clickhouse-logger.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const log = getLog("clickhouseLogger");
44

55
import { createClient } from "@clickhouse/client";
66
import { EventsStore } from "./index";
7+
import { Readable } from "stream";
78

89
type LogEntry = {
910
actorId: string;
@@ -34,6 +35,7 @@ export function createClickhouseLogger(): EventsStore {
3435
async_insert: 1,
3536
wait_for_async_insert: 0,
3637
async_insert_busy_timeout_ms: 10000,
38+
async_insert_busy_timeout_max_ms: 10000,
3739
date_time_input_format: "best_effort",
3840
},
3941
});
@@ -42,18 +44,32 @@ export function createClickhouseLogger(): EventsStore {
4244
if (buffer.length === 0) {
4345
return;
4446
}
45-
const copy = [...buffer];
47+
const copy = buffer.slice();
4648
buffer.length = 0;
47-
const res = await clickhouse.insert<LogEntry>({
49+
const eventsStream = new Readable({ objectMode: true });
50+
const res = clickhouse.insert<LogEntry>({
4851
table: metricsSchema + ".events_log",
4952
format: "JSONEachRow",
50-
values: copy,
53+
values: eventsStream,
5154
});
52-
if (res.executed) {
53-
log.atDebug().log(`Inserted ${copy.length} records.`);
54-
} else {
55-
log.atError().log(`Failed to insert ${copy.length} records: ${JSON.stringify(res)}`);
56-
}
55+
const asyncWrite = async () => {
56+
for (let i = 0; i < copy.length; i++) {
57+
eventsStream.push(copy[i]);
58+
}
59+
eventsStream.push(null);
60+
return res;
61+
};
62+
return asyncWrite()
63+
.then(res => {
64+
if (res.executed) {
65+
log.atDebug().log(`Inserted ${copy.length} records.`);
66+
} else {
67+
log.atError().log(`Failed to insert ${copy.length} records: ${JSON.stringify(res)}`);
68+
}
69+
})
70+
.catch(e => {
71+
log.atError().withCause(e).log(`Failed to insert ${copy.length} records`);
72+
});
5773
};
5874

5975
const interval = setInterval(async () => {

libs/core-functions/src/functions/lib/udf_wrapper.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ export async function UDFTestRun(
365365
let realStore = false;
366366
try {
367367
const eventContext: EventContext = {
368+
receivedAt: new Date(),
368369
geo: {
369370
country: {
370371
code: "US",

0 commit comments

Comments
 (0)