diff --git a/apps/api/src/__tests__/clickhouse.test.ts b/apps/api/src/__tests__/clickhouse.test.ts index 2a2433e4..2bbcecf2 100644 --- a/apps/api/src/__tests__/clickhouse.test.ts +++ b/apps/api/src/__tests__/clickhouse.test.ts @@ -64,18 +64,33 @@ async function waitForRow( return undefined; } +async function deleteRow( + executor: ReturnType, + sessionId: string, + maxRetries = 3, +): Promise { + for (let attempt = 0; attempt < maxRetries; attempt++) { + try { + await executor.execute({ + query: `DELETE FROM ${liveTable} WHERE session_id = {sessionId:String}`, + query_params: { sessionId }, + }); + insertedSessionIds.delete(sessionId); + return true; + } catch { + if (attempt === maxRetries - 1) return false; + await Bun.sleep(1_000 * (attempt + 1)); + } + } + + return false; +} + afterAll(() => { if (insertedSessionIds.size === 0) return; const exec = getExecutor(); void Promise.all( - [...insertedSessionIds].map((sessionId) => - exec - .execute({ - query: `DELETE FROM ${liveTable} WHERE session_id = {sessionId:String}`, - query_params: { sessionId }, - }) - .catch(() => {}), - ), + [...insertedSessionIds].map((sessionId) => deleteRow(exec, sessionId)), ); }); @@ -193,13 +208,7 @@ describe("clickhouse executor", () => { expect(row?.project_path).toBe(projectPath); expect(row?.content).toBe(content); - await executor.execute({ - query: `DELETE FROM ${liveTable} WHERE session_id = {sessionId:String}`, - query_params: { - sessionId, - }, - }); - insertedSessionIds.delete(sessionId); + await deleteRow(executor, sessionId); }, 120_000); }); diff --git a/apps/api/src/__tests__/session-visibility.integration.ts b/apps/api/src/__tests__/session-visibility.integration.ts new file mode 100644 index 00000000..d7622de5 --- /dev/null +++ b/apps/api/src/__tests__/session-visibility.integration.ts @@ -0,0 +1,286 @@ +import { afterAll, describe, expect, test } from "bun:test"; +import { getAdapter } from "@rudel/agent-adapters"; +import type { IngestSessionInput } from "@rudel/api-routes"; +import { createClickHouseExecutor } from "../clickhouse.js"; +import { getSessionAnalytics } from "../services/session-analytics.service.js"; + +const baseExecutor = createClickHouseExecutor({ + url: process.env.CLICKHOUSE_URL || "http://localhost:8123", + username: + process.env.CLICKHOUSE_USERNAME || process.env.CLICKHOUSE_USER || "default", + password: process.env.CLICKHOUSE_PASSWORD || "", + database: "default", +}); + +const executor: typeof baseExecutor = { + ...baseExecutor, + async insert(params) { + for (let attempt = 0; attempt < 5; attempt++) { + try { + await baseExecutor.insert(params); + return; + } catch (error) { + const isRaceCondition = + error instanceof Error && + error.message.includes("INSERT race condition"); + if (!isRaceCondition || attempt === 4) throw error; + await Bun.sleep(1_000 * 2 ** attempt); + } + } + }, +}; + +const insertedClaudeIds = new Set(); +const insertedAnalyticsIds = new Set(); + +function isoMinutesAgo(minutes: number): string { + return new Date(Date.now() - minutes * 60_000).toISOString(); +} + +async function waitForAnalytics( + orgId: string, + days: number, + predicate: ( + sessions: Awaited>, + ) => boolean, + timeoutMs = 30_000, + intervalMs = 2_000, +) { + const deadline = Date.now() + timeoutMs; + + while (Date.now() < deadline) { + try { + const sessions = await getSessionAnalytics(orgId, { + days, + limit: 100, + }); + if (predicate(sessions)) return sessions; + } catch { + // Retry while MV propagation settles. + } + await Bun.sleep(intervalMs); + } + + return [] as Awaited>; +} + +async function waitForBaseCount(orgId: string, expected: number) { + const deadline = Date.now() + 30_000; + + while (Date.now() < deadline) { + try { + const rows = await executor.query<{ count: number }>({ + query: `SELECT count() AS count + FROM rudel.claude_sessions FINAL + WHERE organization_id = {orgId:String}`, + query_params: { orgId }, + }); + if (rows[0]?.count === expected) return rows[0].count; + } catch { + // Retry transient ClickHouse errors. + } + await Bun.sleep(2_000); + } + + return 0; +} + +afterAll(() => { + if (insertedAnalyticsIds.size > 0) { + void executor + .execute({ + query: `DELETE FROM rudel.session_analytics WHERE session_id IN (${[ + ...insertedAnalyticsIds, + ] + .map((id) => `'${id}'`) + .join(", ")})`, + }) + .catch(() => {}); + } + + if (insertedClaudeIds.size > 0) { + void executor + .execute({ + query: `DELETE FROM rudel.claude_sessions WHERE session_id IN (${[ + ...insertedClaudeIds, + ] + .map((id) => `'${id}'`) + .join(", ")})`, + }) + .catch(() => {}); + } +}); + +describe("session visibility regression", () => { + test("service-layer fix: a mixed upload batch stays fully visible", async () => { + const orgId = `org_service_visibility_${Date.now()}`; + const adapter = getAdapter("claude_code"); + const visibleId = `service_visible_${Date.now()}`; + const hiddenIds = Array.from( + { length: 9 }, + (_, index) => `service_hidden_${Date.now()}_${index}`, + ); + const allIds = [visibleId, ...hiddenIds]; + + for (const sessionId of allIds) { + insertedClaudeIds.add(sessionId); + insertedAnalyticsIds.add(sessionId); + } + + const requests: IngestSessionInput[] = [ + { + source: "claude_code", + sessionId: visibleId, + projectPath: "/tests/service-visible", + content: [ + JSON.stringify({ + type: "user", + timestamp: isoMinutesAgo(4), + message: { content: "hello" }, + }), + JSON.stringify({ + type: "assistant", + timestamp: isoMinutesAgo(3), + message: { + model: "claude-3-7-sonnet", + usage: { input_tokens: 10, output_tokens: 20 }, + }, + }), + ].join("\n"), + }, + ...hiddenIds.map( + (sessionId, index): IngestSessionInput => ({ + source: "claude_code", + sessionId, + projectPath: `/tests/service-hidden-${index}`, + content: + index % 2 === 0 + ? [ + JSON.stringify({ + type: "user", + message: { content: "hello" }, + }), + JSON.stringify({ + type: "assistant", + message: { + model: "claude-3-7-sonnet", + usage: { input_tokens: 5, output_tokens: 10 }, + }, + }), + ].join("\n") + : [ + JSON.stringify({ type: "summary", sessionId }), + JSON.stringify({ + toolUseResult: { agentId: "sub-agent-001", result: "done" }, + }), + ].join("\n"), + }), + ), + ]; + + for (const request of requests) { + await adapter.ingest(executor, request, { + userId: "user_service_visibility", + organizationId: orgId, + }); + } + + const baseCount = await waitForBaseCount(orgId, requests.length); + expect(baseCount).toBe(requests.length); + + const visibleSessions = await waitForAnalytics( + orgId, + 3650, + (sessions) => sessions.length === requests.length, + ); + + expect( + new Set(visibleSessions.map((session) => session.session_id)), + ).toEqual(new Set(allIds)); + }, 90_000); + + test("control: old but valid sessions are hidden by a 7-day query window, not by ingest failure", async () => { + const orgId = `org_service_date_range_${Date.now()}`; + const adapter = getAdapter("claude_code"); + const oldId = `service_old_${Date.now()}`; + const recentId = `service_recent_${Date.now()}`; + const oldStart = new Date(Date.now() - 40 * 24 * 60 * 60 * 1000); + const oldEnd = new Date(oldStart.getTime() + 5 * 60 * 1000); + + for (const sessionId of [oldId, recentId]) { + insertedClaudeIds.add(sessionId); + insertedAnalyticsIds.add(sessionId); + } + + const requests: IngestSessionInput[] = [ + { + source: "claude_code", + sessionId: oldId, + projectPath: "/tests/service-old-session", + content: [ + JSON.stringify({ + type: "user", + timestamp: oldStart.toISOString(), + message: { content: "hello from the past" }, + }), + JSON.stringify({ + type: "assistant", + timestamp: oldEnd.toISOString(), + message: { + model: "claude-3-7-sonnet", + usage: { input_tokens: 10, output_tokens: 20 }, + }, + }), + ].join("\n"), + }, + { + source: "claude_code", + sessionId: recentId, + projectPath: "/tests/service-recent-session", + content: [ + JSON.stringify({ + type: "user", + timestamp: isoMinutesAgo(15), + message: { content: "recent hello" }, + }), + JSON.stringify({ + type: "assistant", + timestamp: isoMinutesAgo(14), + message: { + model: "claude-3-7-sonnet", + usage: { input_tokens: 10, output_tokens: 20 }, + }, + }), + ].join("\n"), + }, + ]; + + for (const request of requests) { + await adapter.ingest(executor, request, { + userId: "user_service_visibility", + organizationId: orgId, + }); + } + + const allSessions = await waitForAnalytics( + orgId, + 90, + (sessions) => + sessions.some((session) => session.session_id === oldId) && + sessions.some((session) => session.session_id === recentId), + ); + + expect(new Set(allSessions.map((session) => session.session_id))).toEqual( + new Set([oldId, recentId]), + ); + + const lastSevenDays = await getSessionAnalytics(orgId, { + days: 7, + limit: 100, + }); + + expect(lastSevenDays.map((session) => session.session_id)).toEqual([ + recentId, + ]); + }, 90_000); +}); diff --git a/apps/cli/src/__tests__/timestamp-extraction.test.ts b/apps/cli/src/__tests__/timestamp-extraction.test.ts new file mode 100644 index 00000000..8214f580 --- /dev/null +++ b/apps/cli/src/__tests__/timestamp-extraction.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, test } from "bun:test"; +import { claudeCodeAdapter, codexAdapter } from "@rudel/agent-adapters"; + +describe("timestamp extraction helpers", () => { + test("Claude extracts the min/max timestamp from user and assistant lines only", () => { + const content = [ + "not-json", + JSON.stringify({ + type: "summary", + timestamp: "2026-03-13T08:59:00.000Z", + }), + JSON.stringify({ + type: "user", + timestamp: "2026-03-13T09:00:00.000Z", + message: { content: "hello" }, + }), + JSON.stringify({ + type: "assistant", + timestamp: "2026-03-13T09:05:00.000Z", + message: { model: "claude-3-7-sonnet" }, + }), + ].join("\n"); + + expect(claudeCodeAdapter.extractTimestamps(content)).toEqual({ + sessionDate: "2026-03-13T09:00:00.000Z", + lastInteractionDate: "2026-03-13T09:05:00.000Z", + }); + }); + + test("Claude returns null when user/assistant lines are missing timestamps", () => { + const content = [ + JSON.stringify({ type: "user", message: { content: "hello" } }), + JSON.stringify({ + type: "assistant", + message: { model: "claude-3-7-sonnet" }, + }), + ].join("\n"); + + expect(claudeCodeAdapter.extractTimestamps(content)).toBeNull(); + }); + + test("Codex returns null when no JSONL line has a timestamp", () => { + const content = [ + JSON.stringify({ + type: "session_meta", + payload: { id: "sess-1", cwd: "/tmp/project" }, + }), + JSON.stringify({ type: "response_item", payload: { id: "resp-1" } }), + ].join("\n"); + + expect(codexAdapter.extractTimestamps(content)).toBeNull(); + }); + + test("Codex extracts the min/max timestamp across timestamped lines", () => { + const content = [ + JSON.stringify({ + type: "session_meta", + timestamp: "2026-03-13T09:00:00.000Z", + payload: { id: "sess-1", cwd: "/tmp/project" }, + }), + JSON.stringify({ + type: "response_item", + timestamp: "2026-03-13T09:02:00.000Z", + payload: { id: "resp-1" }, + }), + JSON.stringify({ + type: "event_msg", + timestamp: "2026-03-13T09:04:00.000Z", + payload: { text: "done" }, + }), + ].join("\n"); + + expect(codexAdapter.extractTimestamps(content)).toEqual({ + sessionDate: "2026-03-13T09:00:00.000Z", + lastInteractionDate: "2026-03-13T09:04:00.000Z", + }); + }); +}); diff --git a/packages/ch-schema/chx/meta/snapshot.json b/packages/ch-schema/chx/meta/snapshot.json index 6fa8d1bc..0ba52fcc 100644 --- a/packages/ch-schema/chx/meta/snapshot.json +++ b/packages/ch-schema/chx/meta/snapshot.json @@ -1,6 +1,6 @@ { "version": 1, - "generatedAt": "2026-03-11T08:46:57.492Z", + "generatedAt": "2026-03-13T14:01:57.498Z", "definitions": [ { "database": "rudel", @@ -83,7 +83,8 @@ "partitionBy": "toYYYYMM(toDate(session_date))", "ttl": "toDate(session_date) + toIntervalDay(365)", "settings": { - "index_granularity": "8192" + "index_granularity": "8192", + "storage_policy": "'s3'" }, "kind": "table" }, @@ -163,7 +164,8 @@ "partitionBy": "toYYYYMM(toDate(session_date))", "ttl": "toDate(session_date) + toIntervalDay(365)", "settings": { - "index_granularity": "8192" + "index_granularity": "8192", + "storage_policy": "'s3'" }, "kind": "table" }, @@ -415,7 +417,7 @@ "database": "rudel", "name": "session_analytics" }, - "as": "WITH arrayFilter( x -> x != '', splitByChar('\\n', cs.content) ) AS _all_lines, arrayFilter(x -> JSONHas(x, 'timestamp'), _all_lines) AS _ts_lines, arrayMap( x -> parseDateTime64BestEffort(JSONExtractString(x, 'timestamp')), _ts_lines ) AS _timestamps, if(length(_timestamps) > 1, arrayMap(i -> dateDiff('second', _timestamps[i], _timestamps[i+1]), range(1, length(_timestamps))), [] ) AS _prompt_periods_sec, if(length(_timestamps) > 1, arrayMap(i -> if(i < length(_timestamps), dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _inference_gaps, arrayFilter( x -> JSONExtractString(x, 'type') = 'response_item' OR JSONExtractString(x, 'type') = 'event_msg', _all_lines ) AS _interaction_lines, arrayFilter( x -> position(x, '\"turn.completed\"') > 0 OR position(x, '\"response.completed\"') > 0, _all_lines ) AS _completion_lines, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'input_tokens')), _completion_lines)) AS _input_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'output_tokens')), _completion_lines)) AS _output_tokens, arrayMin(_timestamps) AS _session_date, arrayMax(_timestamps) AS _last_interaction_date, dateDiff('minute', _session_date, _last_interaction_date) AS _duration_min, JSONExtractString( JSONExtractRaw( arrayElement( arrayFilter(x -> JSONExtractString(x, 'type') = 'session_meta', _all_lines), 1 ), 'payload' ), 'model_provider' ) AS _model_provider SELECT * EXCEPT (session_date, last_interaction_date), _session_date as session_date, _last_interaction_date as last_interaction_date, 'codex' as source, _input_tokens as input_tokens, _output_tokens as output_tokens, toUInt64(0) as cache_read_input_tokens, toUInt64(0) as cache_creation_input_tokens, _input_tokens + _output_tokens as total_tokens, [] :: Array(String) as skills, [] :: Array(String) as slash_commands, [] :: Array(String) as subagent_types, map() as subagents, toUInt32(length(_interaction_lines)) as total_interactions, toUInt32(_duration_min) as actual_duration_min, if(length(_prompt_periods_sec) > 0, round(arrayAvg(_prompt_periods_sec), 2), 0) as avg_period_sec, if( length(_prompt_periods_sec) > 0, toFloat64(arrayElement( arraySort(_prompt_periods_sec), toUInt64(ceil(length(_prompt_periods_sec) / 2)) )), 0 ) as median_period_sec, toUInt32(arrayCount(x -> x < 5, _prompt_periods_sec)) as quick_responses, toUInt32(arrayCount(x -> x >= 5 AND x <= 60, _prompt_periods_sec)) as normal_responses, toUInt32(arrayCount(x -> x > 300, _prompt_periods_sec)) as long_pauses, toUInt32( length(extractAll(cs.content, '\"status\":\"failed\"')) + length(extractAll(cs.content, '\"error\"')) ) as error_count, if(_model_provider != '', _model_provider, 'unknown') as model_used, toUInt8(if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 1, 0)) as has_commit, toUInt8(0) as used_plan_mode, toUInt32(arraySum(_inference_gaps)) as inference_duration_sec, toUInt32(0) as human_duration_sec, CASE WHEN _duration_min <= 10 AND (_input_tokens + _output_tokens) < 500000 AND _output_tokens > 1000 THEN 'quick_win' WHEN _duration_min > 30 AND _output_tokens > 50000 AND cs.git_sha IS NOT NULL AND cs.git_sha != '' THEN 'deep_work' WHEN (_input_tokens + _output_tokens) > 1000000 AND (_output_tokens / nullif(_input_tokens, 0)) < 0.3 AND _duration_min > 20 THEN 'struggle' WHEN _duration_min < 3 AND _output_tokens < 500 THEN 'abandoned' ELSE 'standard' END as session_archetype, toUInt8(round( 50 + (if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 20, 0)) + (if((_output_tokens / nullif(_input_tokens, 0)) > 0.5, 15, 0)) - (if((_input_tokens + _output_tokens) > 1500000 AND (cs.git_sha IS NULL OR cs.git_sha = ''), 20, 0)) - (if(_duration_min < 2 AND _output_tokens < 200, 30, 0)) - (least(toUInt32( length(extractAll(cs.content, '\"status\":\"failed\"')) + length(extractAll(cs.content, '\"error\"')) ), 10) * 2) )) as success_score FROM rudel.codex_sessions AS cs WHERE length(_timestamps) > 0 QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1", + "as": "WITH arrayFilter( x -> x != '', splitByChar('\\n', cs.content) ) AS _all_lines, arrayFilter(x -> JSONHas(x, 'timestamp'), _all_lines) AS _ts_lines, arrayMap( x -> parseDateTime64BestEffort(JSONExtractString(x, 'timestamp')), _ts_lines ) AS _timestamps, if(length(_timestamps) > 1, arrayMap(i -> dateDiff('second', _timestamps[i], _timestamps[i+1]), range(1, length(_timestamps))), [] ) AS _prompt_periods_sec, if(length(_timestamps) > 1, arrayMap(i -> if(i < length(_timestamps), dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _inference_gaps, arrayFilter( x -> JSONExtractString(x, 'type') = 'response_item' OR JSONExtractString(x, 'type') = 'event_msg', _all_lines ) AS _interaction_lines, arrayFilter( x -> position(x, '\"turn.completed\"') > 0 OR position(x, '\"response.completed\"') > 0, _all_lines ) AS _completion_lines, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'input_tokens')), _completion_lines)) AS _input_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'output_tokens')), _completion_lines)) AS _output_tokens, if(length(_timestamps) > 0, arrayMin(_timestamps), cs.session_date) AS _session_date, if(length(_timestamps) > 0, arrayMax(_timestamps), cs.last_interaction_date) AS _last_interaction_date, if(length(_timestamps) > 1, dateDiff('minute', arrayMin(_timestamps), arrayMax(_timestamps)), 0) AS _duration_min, JSONExtractString( JSONExtractRaw( arrayElement( arrayFilter(x -> JSONExtractString(x, 'type') = 'session_meta', _all_lines), 1 ), 'payload' ), 'model_provider' ) AS _model_provider SELECT * EXCEPT (session_date, last_interaction_date), _session_date as session_date, _last_interaction_date as last_interaction_date, 'codex' as source, _input_tokens as input_tokens, _output_tokens as output_tokens, toUInt64(0) as cache_read_input_tokens, toUInt64(0) as cache_creation_input_tokens, _input_tokens + _output_tokens as total_tokens, [] :: Array(String) as skills, [] :: Array(String) as slash_commands, [] :: Array(String) as subagent_types, map() as subagents, toUInt32(length(_interaction_lines)) as total_interactions, toUInt32(_duration_min) as actual_duration_min, if(length(_prompt_periods_sec) > 0, round(arrayAvg(_prompt_periods_sec), 2), 0) as avg_period_sec, if( length(_prompt_periods_sec) > 0, toFloat64(arrayElement( arraySort(_prompt_periods_sec), toUInt64(ceil(length(_prompt_periods_sec) / 2)) )), 0 ) as median_period_sec, toUInt32(arrayCount(x -> x < 5, _prompt_periods_sec)) as quick_responses, toUInt32(arrayCount(x -> x >= 5 AND x <= 60, _prompt_periods_sec)) as normal_responses, toUInt32(arrayCount(x -> x > 300, _prompt_periods_sec)) as long_pauses, toUInt32( length(extractAll(cs.content, '\"status\":\"failed\"')) + length(extractAll(cs.content, '\"error\"')) ) as error_count, if(_model_provider != '', _model_provider, 'unknown') as model_used, toUInt8(if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 1, 0)) as has_commit, toUInt8(0) as used_plan_mode, toUInt32(arraySum(_inference_gaps)) as inference_duration_sec, toUInt32(0) as human_duration_sec, CASE WHEN _duration_min <= 10 AND (_input_tokens + _output_tokens) < 500000 AND _output_tokens > 1000 THEN 'quick_win' WHEN _duration_min > 30 AND _output_tokens > 50000 AND cs.git_sha IS NOT NULL AND cs.git_sha != '' THEN 'deep_work' WHEN (_input_tokens + _output_tokens) > 1000000 AND (_output_tokens / nullif(_input_tokens, 0)) < 0.3 AND _duration_min > 20 THEN 'struggle' WHEN _duration_min < 3 AND _output_tokens < 500 THEN 'abandoned' ELSE 'standard' END as session_archetype, toUInt8(round( 50 + (if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 20, 0)) + (if((_output_tokens / nullif(_input_tokens, 0)) > 0.5, 15, 0)) - (if((_input_tokens + _output_tokens) > 1500000 AND (cs.git_sha IS NULL OR cs.git_sha = ''), 20, 0)) - (if(_duration_min < 2 AND _output_tokens < 200, 30, 0)) - (least(toUInt32( length(extractAll(cs.content, '\"status\":\"failed\"')) + length(extractAll(cs.content, '\"error\"')) ), 10) * 2) )) as success_score FROM rudel.codex_sessions AS cs QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1", "kind": "materialized_view" }, { @@ -425,7 +427,7 @@ "database": "rudel", "name": "session_analytics" }, - "as": "WITH arrayFilter( x -> JSONExtractString(x, 'type') IN ('user', 'assistant'), splitByChar('\\n', cs.content) ) AS _interaction_lines, arrayFilter(x -> JSONHas(x, 'timestamp'), _interaction_lines) AS _ts_lines, arrayMap( x -> parseDateTime64BestEffort(JSONExtractString(x, 'timestamp')), _ts_lines ) AS _timestamps, arrayMap( x -> JSONExtractString(x, 'type'), _ts_lines ) AS _msg_types, if(length(_timestamps) > 1, arrayMap(i -> dateDiff('second', _timestamps[i], _timestamps[i+1]), range(1, length(_timestamps))), [] ) AS _prompt_periods_sec, if(length(_timestamps) > 1, arrayMap(i -> if(_msg_types[i] = 'user' AND _msg_types[i+1] = 'assistant', dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _inference_gaps, if(length(_timestamps) > 1, arrayMap(i -> if(_msg_types[i] = 'assistant' AND _msg_types[i+1] = 'user', dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _human_gaps, arrayFilter( x -> JSONExtractString(x, 'type') = 'assistant' AND JSONHas(x, 'message'), splitByChar('\\n', cs.content) ) AS _assistant_lines, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'input_tokens')), _assistant_lines)) AS _input_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'output_tokens')), _assistant_lines)) AS _output_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'cache_read_input_tokens')), _assistant_lines)) AS _cache_read, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'cache_creation_input_tokens')), _assistant_lines)) AS _cache_creation, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '\"name\":\"Skill\"[^}]*\"skill\":\"([^\"]+)\"'))) AS _skills, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '\"name\":\"Task\"[^}]*\"subagent_type\":\"([^\"]+)\"'))) AS _subagent_types, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '/([^<]+)'))) AS _slash_commands, arrayMin(_timestamps) AS _session_date, arrayMax(_timestamps) AS _last_interaction_date, dateDiff('minute', _session_date, _last_interaction_date) AS _duration_min SELECT * EXCEPT (session_date, last_interaction_date), _session_date as session_date, _last_interaction_date as last_interaction_date, 'claude_code' as source, _input_tokens as input_tokens, _output_tokens as output_tokens, _cache_read as cache_read_input_tokens, _cache_creation as cache_creation_input_tokens, _input_tokens + _output_tokens as total_tokens, _skills as skills, _slash_commands as slash_commands, _subagent_types as subagent_types, toUInt32(length(_timestamps)) as total_interactions, toUInt32(_duration_min) as actual_duration_min, if(length(_prompt_periods_sec) > 0, round(arrayAvg(_prompt_periods_sec), 2), 0) as avg_period_sec, if( length(_prompt_periods_sec) > 0, toFloat64(arrayElement( arraySort(_prompt_periods_sec), toUInt64(ceil(length(_prompt_periods_sec) / 2)) )), 0 ) as median_period_sec, toUInt32(arrayCount(x -> x < 5, _prompt_periods_sec)) as quick_responses, toUInt32(arrayCount(x -> x >= 5 AND x <= 60, _prompt_periods_sec)) as normal_responses, toUInt32(arrayCount(x -> x > 300, _prompt_periods_sec)) as long_pauses, toUInt32( length(extractAll(cs.content, '\"isApiErrorMessage\":true')) + length(extractAll(cs.content, '\"is_error\":true')) ) as error_count, JSONExtractString( JSONExtractRaw( arrayElement( arrayFilter( x -> JSONExtractString(x, 'type') = 'assistant', splitByChar('\\n', cs.content) ), -1 ), 'message' ), 'model' ) as model_used, toUInt8(if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 1, 0)) as has_commit, toUInt8(if(position(cs.content, '\"name\":\"EnterPlanMode\"') > 0, 1, 0)) as used_plan_mode, toUInt32(arraySum(_inference_gaps)) as inference_duration_sec, toUInt32(arraySum(_human_gaps)) as human_duration_sec, CASE WHEN _duration_min <= 10 AND (_input_tokens + _output_tokens) < 500000 AND _output_tokens > 1000 THEN 'quick_win' WHEN _duration_min > 30 AND _output_tokens > 50000 AND cs.git_sha IS NOT NULL AND cs.git_sha != '' THEN 'deep_work' WHEN (_input_tokens + _output_tokens) > 1000000 AND (_output_tokens / nullif(_input_tokens, 0)) < 0.3 AND _duration_min > 20 THEN 'struggle' WHEN length(_skills) >= 3 AND (cs.git_sha IS NULL OR cs.git_sha = '') AND (_input_tokens + _output_tokens) > 200000 THEN 'exploration' WHEN _duration_min < 3 AND _output_tokens < 500 THEN 'abandoned' ELSE 'standard' END as session_archetype, toUInt8(round( 50 + (if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 20, 0)) + (if((_output_tokens / nullif(_input_tokens, 0)) > 0.5, 15, 0)) + (least(toUInt32(length(_skills)), 3) * 5) - (if((_input_tokens + _output_tokens) > 1500000 AND (cs.git_sha IS NULL OR cs.git_sha = ''), 20, 0)) - (if(_duration_min < 2 AND _output_tokens < 200, 30, 0)) - (least(toUInt32( length(extractAll(cs.content, '\"isApiErrorMessage\":true')) + length(extractAll(cs.content, '\"is_error\":true')) ), 10) * 2) )) as success_score FROM rudel.claude_sessions AS cs WHERE length(_timestamps) > 0 QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1", + "as": "WITH arrayFilter( x -> JSONExtractString(x, 'type') IN ('user', 'assistant'), splitByChar('\\n', cs.content) ) AS _interaction_lines, arrayFilter(x -> JSONHas(x, 'timestamp'), _interaction_lines) AS _ts_lines, arrayMap( x -> parseDateTime64BestEffort(JSONExtractString(x, 'timestamp')), _ts_lines ) AS _timestamps, arrayMap( x -> JSONExtractString(x, 'type'), _ts_lines ) AS _msg_types, if(length(_timestamps) > 1, arrayMap(i -> dateDiff('second', _timestamps[i], _timestamps[i+1]), range(1, length(_timestamps))), [] ) AS _prompt_periods_sec, if(length(_timestamps) > 1, arrayMap(i -> if(_msg_types[i] = 'user' AND _msg_types[i+1] = 'assistant', dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _inference_gaps, if(length(_timestamps) > 1, arrayMap(i -> if(_msg_types[i] = 'assistant' AND _msg_types[i+1] = 'user', dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _human_gaps, arrayFilter( x -> JSONExtractString(x, 'type') = 'assistant' AND JSONHas(x, 'message'), splitByChar('\\n', cs.content) ) AS _assistant_lines, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'input_tokens')), _assistant_lines)) AS _input_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'output_tokens')), _assistant_lines)) AS _output_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'cache_read_input_tokens')), _assistant_lines)) AS _cache_read, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'cache_creation_input_tokens')), _assistant_lines)) AS _cache_creation, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '\"name\":\"Skill\"[^}]*\"skill\":\"([^\"]+)\"'))) AS _skills, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '\"name\":\"Task\"[^}]*\"subagent_type\":\"([^\"]+)\"'))) AS _subagent_types, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '/([^<]+)'))) AS _slash_commands, if(length(_timestamps) > 0, arrayMin(_timestamps), cs.session_date) AS _session_date, if(length(_timestamps) > 0, arrayMax(_timestamps), cs.last_interaction_date) AS _last_interaction_date, if(length(_timestamps) > 1, dateDiff('minute', arrayMin(_timestamps), arrayMax(_timestamps)), 0) AS _duration_min SELECT * EXCEPT (session_date, last_interaction_date), _session_date as session_date, _last_interaction_date as last_interaction_date, 'claude_code' as source, _input_tokens as input_tokens, _output_tokens as output_tokens, _cache_read as cache_read_input_tokens, _cache_creation as cache_creation_input_tokens, _input_tokens + _output_tokens as total_tokens, _skills as skills, _slash_commands as slash_commands, _subagent_types as subagent_types, toUInt32(length(_timestamps)) as total_interactions, toUInt32(_duration_min) as actual_duration_min, if(length(_prompt_periods_sec) > 0, round(arrayAvg(_prompt_periods_sec), 2), 0) as avg_period_sec, if( length(_prompt_periods_sec) > 0, toFloat64(arrayElement( arraySort(_prompt_periods_sec), toUInt64(ceil(length(_prompt_periods_sec) / 2)) )), 0 ) as median_period_sec, toUInt32(arrayCount(x -> x < 5, _prompt_periods_sec)) as quick_responses, toUInt32(arrayCount(x -> x >= 5 AND x <= 60, _prompt_periods_sec)) as normal_responses, toUInt32(arrayCount(x -> x > 300, _prompt_periods_sec)) as long_pauses, toUInt32( length(extractAll(cs.content, '\"isApiErrorMessage\":true')) + length(extractAll(cs.content, '\"is_error\":true')) ) as error_count, JSONExtractString( JSONExtractRaw( arrayElement( arrayFilter( x -> JSONExtractString(x, 'type') = 'assistant', splitByChar('\\n', cs.content) ), -1 ), 'message' ), 'model' ) as model_used, toUInt8(if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 1, 0)) as has_commit, toUInt8(if(position(cs.content, '\"name\":\"EnterPlanMode\"') > 0, 1, 0)) as used_plan_mode, toUInt32(arraySum(_inference_gaps)) as inference_duration_sec, toUInt32(arraySum(_human_gaps)) as human_duration_sec, CASE WHEN _duration_min <= 10 AND (_input_tokens + _output_tokens) < 500000 AND _output_tokens > 1000 THEN 'quick_win' WHEN _duration_min > 30 AND _output_tokens > 50000 AND cs.git_sha IS NOT NULL AND cs.git_sha != '' THEN 'deep_work' WHEN (_input_tokens + _output_tokens) > 1000000 AND (_output_tokens / nullif(_input_tokens, 0)) < 0.3 AND _duration_min > 20 THEN 'struggle' WHEN length(_skills) >= 3 AND (cs.git_sha IS NULL OR cs.git_sha = '') AND (_input_tokens + _output_tokens) > 200000 THEN 'exploration' WHEN _duration_min < 3 AND _output_tokens < 500 THEN 'abandoned' ELSE 'standard' END as session_archetype, toUInt8(round( 50 + (if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 20, 0)) + (if((_output_tokens / nullif(_input_tokens, 0)) > 0.5, 15, 0)) + (least(toUInt32(length(_skills)), 3) * 5) - (if((_input_tokens + _output_tokens) > 1500000 AND (cs.git_sha IS NULL OR cs.git_sha = ''), 20, 0)) - (if(_duration_min < 2 AND _output_tokens < 200, 30, 0)) - (least(toUInt32( length(extractAll(cs.content, '\"isApiErrorMessage\":true')) + length(extractAll(cs.content, '\"is_error\":true')) ), 10) * 2) )) as success_score FROM rudel.claude_sessions AS cs QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1", "kind": "materialized_view" } ] diff --git a/packages/ch-schema/chx/migrations/20260313140157_auto.sql b/packages/ch-schema/chx/migrations/20260313140157_auto.sql new file mode 100644 index 00000000..3e7a242d --- /dev/null +++ b/packages/ch-schema/chx/migrations/20260313140157_auto.sql @@ -0,0 +1,27 @@ +-- chkit-migration-format: v1 +-- generated-at: 2026-03-13T14:01:57.497Z +-- cli-version: 0.1.0-beta.16 +-- definition-count: 5 +-- operation-count: 6 +-- rename-suggestion-count: 0 +-- risk-summary: safe=0, caution=6, danger=0 + +-- operation: drop_materialized_view key=materialized_view:rudel.codex_session_analytics_mv risk=caution +DROP TABLE IF EXISTS rudel.codex_session_analytics_mv SYNC; + +-- operation: drop_materialized_view key=materialized_view:rudel.session_analytics_mv risk=caution +DROP TABLE IF EXISTS rudel.session_analytics_mv SYNC; + +-- operation: alter_table_modify_setting key=table:rudel.claude_sessions:setting:storage_policy risk=caution +ALTER TABLE rudel.claude_sessions MODIFY SETTING storage_policy = 's3'; + +-- operation: alter_table_modify_setting key=table:rudel.codex_sessions:setting:storage_policy risk=caution +ALTER TABLE rudel.codex_sessions MODIFY SETTING storage_policy = 's3'; + +-- operation: create_materialized_view key=materialized_view:rudel.codex_session_analytics_mv risk=caution +CREATE MATERIALIZED VIEW IF NOT EXISTS rudel.codex_session_analytics_mv TO rudel.session_analytics AS +WITH arrayFilter( x -> x != '', splitByChar('\n', cs.content) ) AS _all_lines, arrayFilter(x -> JSONHas(x, 'timestamp'), _all_lines) AS _ts_lines, arrayMap( x -> parseDateTime64BestEffort(JSONExtractString(x, 'timestamp')), _ts_lines ) AS _timestamps, if(length(_timestamps) > 1, arrayMap(i -> dateDiff('second', _timestamps[i], _timestamps[i+1]), range(1, length(_timestamps))), [] ) AS _prompt_periods_sec, if(length(_timestamps) > 1, arrayMap(i -> if(i < length(_timestamps), dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _inference_gaps, arrayFilter( x -> JSONExtractString(x, 'type') = 'response_item' OR JSONExtractString(x, 'type') = 'event_msg', _all_lines ) AS _interaction_lines, arrayFilter( x -> position(x, '"turn.completed"') > 0 OR position(x, '"response.completed"') > 0, _all_lines ) AS _completion_lines, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'input_tokens')), _completion_lines)) AS _input_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'output_tokens')), _completion_lines)) AS _output_tokens, if(length(_timestamps) > 0, arrayMin(_timestamps), cs.session_date) AS _session_date, if(length(_timestamps) > 0, arrayMax(_timestamps), cs.last_interaction_date) AS _last_interaction_date, if(length(_timestamps) > 1, dateDiff('minute', arrayMin(_timestamps), arrayMax(_timestamps)), 0) AS _duration_min, JSONExtractString( JSONExtractRaw( arrayElement( arrayFilter(x -> JSONExtractString(x, 'type') = 'session_meta', _all_lines), 1 ), 'payload' ), 'model_provider' ) AS _model_provider SELECT * EXCEPT (session_date, last_interaction_date), _session_date as session_date, _last_interaction_date as last_interaction_date, 'codex' as source, _input_tokens as input_tokens, _output_tokens as output_tokens, toUInt64(0) as cache_read_input_tokens, toUInt64(0) as cache_creation_input_tokens, _input_tokens + _output_tokens as total_tokens, [] :: Array(String) as skills, [] :: Array(String) as slash_commands, [] :: Array(String) as subagent_types, map() as subagents, toUInt32(length(_interaction_lines)) as total_interactions, toUInt32(_duration_min) as actual_duration_min, if(length(_prompt_periods_sec) > 0, round(arrayAvg(_prompt_periods_sec), 2), 0) as avg_period_sec, if( length(_prompt_periods_sec) > 0, toFloat64(arrayElement( arraySort(_prompt_periods_sec), toUInt64(ceil(length(_prompt_periods_sec) / 2)) )), 0 ) as median_period_sec, toUInt32(arrayCount(x -> x < 5, _prompt_periods_sec)) as quick_responses, toUInt32(arrayCount(x -> x >= 5 AND x <= 60, _prompt_periods_sec)) as normal_responses, toUInt32(arrayCount(x -> x > 300, _prompt_periods_sec)) as long_pauses, toUInt32( length(extractAll(cs.content, '"status":"failed"')) + length(extractAll(cs.content, '"error"')) ) as error_count, if(_model_provider != '', _model_provider, 'unknown') as model_used, toUInt8(if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 1, 0)) as has_commit, toUInt8(0) as used_plan_mode, toUInt32(arraySum(_inference_gaps)) as inference_duration_sec, toUInt32(0) as human_duration_sec, CASE WHEN _duration_min <= 10 AND (_input_tokens + _output_tokens) < 500000 AND _output_tokens > 1000 THEN 'quick_win' WHEN _duration_min > 30 AND _output_tokens > 50000 AND cs.git_sha IS NOT NULL AND cs.git_sha != '' THEN 'deep_work' WHEN (_input_tokens + _output_tokens) > 1000000 AND (_output_tokens / nullif(_input_tokens, 0)) < 0.3 AND _duration_min > 20 THEN 'struggle' WHEN _duration_min < 3 AND _output_tokens < 500 THEN 'abandoned' ELSE 'standard' END as session_archetype, toUInt8(round( 50 + (if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 20, 0)) + (if((_output_tokens / nullif(_input_tokens, 0)) > 0.5, 15, 0)) - (if((_input_tokens + _output_tokens) > 1500000 AND (cs.git_sha IS NULL OR cs.git_sha = ''), 20, 0)) - (if(_duration_min < 2 AND _output_tokens < 200, 30, 0)) - (least(toUInt32( length(extractAll(cs.content, '"status":"failed"')) + length(extractAll(cs.content, '"error"')) ), 10) * 2) )) as success_score FROM rudel.codex_sessions AS cs QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1; + +-- operation: create_materialized_view key=materialized_view:rudel.session_analytics_mv risk=caution +CREATE MATERIALIZED VIEW IF NOT EXISTS rudel.session_analytics_mv TO rudel.session_analytics AS +WITH arrayFilter( x -> JSONExtractString(x, 'type') IN ('user', 'assistant'), splitByChar('\n', cs.content) ) AS _interaction_lines, arrayFilter(x -> JSONHas(x, 'timestamp'), _interaction_lines) AS _ts_lines, arrayMap( x -> parseDateTime64BestEffort(JSONExtractString(x, 'timestamp')), _ts_lines ) AS _timestamps, arrayMap( x -> JSONExtractString(x, 'type'), _ts_lines ) AS _msg_types, if(length(_timestamps) > 1, arrayMap(i -> dateDiff('second', _timestamps[i], _timestamps[i+1]), range(1, length(_timestamps))), [] ) AS _prompt_periods_sec, if(length(_timestamps) > 1, arrayMap(i -> if(_msg_types[i] = 'user' AND _msg_types[i+1] = 'assistant', dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _inference_gaps, if(length(_timestamps) > 1, arrayMap(i -> if(_msg_types[i] = 'assistant' AND _msg_types[i+1] = 'user', dateDiff('second', _timestamps[i], _timestamps[i+1]), 0), range(1, length(_timestamps))), [] ) AS _human_gaps, arrayFilter( x -> JSONExtractString(x, 'type') = 'assistant' AND JSONHas(x, 'message'), splitByChar('\n', cs.content) ) AS _assistant_lines, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'input_tokens')), _assistant_lines)) AS _input_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'output_tokens')), _assistant_lines)) AS _output_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'cache_read_input_tokens')), _assistant_lines)) AS _cache_read, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'message'), 'usage', 'cache_creation_input_tokens')), _assistant_lines)) AS _cache_creation, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '"name":"Skill"[^}]*"skill":"([^"]+)"'))) AS _skills, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '"name":"Task"[^}]*"subagent_type":"([^"]+)"'))) AS _subagent_types, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '/([^<]+)'))) AS _slash_commands, if(length(_timestamps) > 0, arrayMin(_timestamps), cs.session_date) AS _session_date, if(length(_timestamps) > 0, arrayMax(_timestamps), cs.last_interaction_date) AS _last_interaction_date, if(length(_timestamps) > 1, dateDiff('minute', arrayMin(_timestamps), arrayMax(_timestamps)), 0) AS _duration_min SELECT * EXCEPT (session_date, last_interaction_date), _session_date as session_date, _last_interaction_date as last_interaction_date, 'claude_code' as source, _input_tokens as input_tokens, _output_tokens as output_tokens, _cache_read as cache_read_input_tokens, _cache_creation as cache_creation_input_tokens, _input_tokens + _output_tokens as total_tokens, _skills as skills, _slash_commands as slash_commands, _subagent_types as subagent_types, toUInt32(length(_timestamps)) as total_interactions, toUInt32(_duration_min) as actual_duration_min, if(length(_prompt_periods_sec) > 0, round(arrayAvg(_prompt_periods_sec), 2), 0) as avg_period_sec, if( length(_prompt_periods_sec) > 0, toFloat64(arrayElement( arraySort(_prompt_periods_sec), toUInt64(ceil(length(_prompt_periods_sec) / 2)) )), 0 ) as median_period_sec, toUInt32(arrayCount(x -> x < 5, _prompt_periods_sec)) as quick_responses, toUInt32(arrayCount(x -> x >= 5 AND x <= 60, _prompt_periods_sec)) as normal_responses, toUInt32(arrayCount(x -> x > 300, _prompt_periods_sec)) as long_pauses, toUInt32( length(extractAll(cs.content, '"isApiErrorMessage":true')) + length(extractAll(cs.content, '"is_error":true')) ) as error_count, JSONExtractString( JSONExtractRaw( arrayElement( arrayFilter( x -> JSONExtractString(x, 'type') = 'assistant', splitByChar('\n', cs.content) ), -1 ), 'message' ), 'model' ) as model_used, toUInt8(if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 1, 0)) as has_commit, toUInt8(if(position(cs.content, '"name":"EnterPlanMode"') > 0, 1, 0)) as used_plan_mode, toUInt32(arraySum(_inference_gaps)) as inference_duration_sec, toUInt32(arraySum(_human_gaps)) as human_duration_sec, CASE WHEN _duration_min <= 10 AND (_input_tokens + _output_tokens) < 500000 AND _output_tokens > 1000 THEN 'quick_win' WHEN _duration_min > 30 AND _output_tokens > 50000 AND cs.git_sha IS NOT NULL AND cs.git_sha != '' THEN 'deep_work' WHEN (_input_tokens + _output_tokens) > 1000000 AND (_output_tokens / nullif(_input_tokens, 0)) < 0.3 AND _duration_min > 20 THEN 'struggle' WHEN length(_skills) >= 3 AND (cs.git_sha IS NULL OR cs.git_sha = '') AND (_input_tokens + _output_tokens) > 200000 THEN 'exploration' WHEN _duration_min < 3 AND _output_tokens < 500 THEN 'abandoned' ELSE 'standard' END as session_archetype, toUInt8(round( 50 + (if(cs.git_sha IS NOT NULL AND cs.git_sha != '', 20, 0)) + (if((_output_tokens / nullif(_input_tokens, 0)) > 0.5, 15, 0)) + (least(toUInt32(length(_skills)), 3) * 5) - (if((_input_tokens + _output_tokens) > 1500000 AND (cs.git_sha IS NULL OR cs.git_sha = ''), 20, 0)) - (if(_duration_min < 2 AND _output_tokens < 200, 30, 0)) - (least(toUInt32( length(extractAll(cs.content, '"isApiErrorMessage":true')) + length(extractAll(cs.content, '"is_error":true')) ), 10) * 2) )) as success_score FROM rudel.claude_sessions AS cs QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1; diff --git a/packages/ch-schema/src/__tests__/session-analytics-visibility.integration.ts b/packages/ch-schema/src/__tests__/session-analytics-visibility.integration.ts new file mode 100644 index 00000000..1cbac94d --- /dev/null +++ b/packages/ch-schema/src/__tests__/session-analytics-visibility.integration.ts @@ -0,0 +1,420 @@ +import { afterAll, describe, expect, test } from "bun:test"; +import { createClickHouseExecutor } from "@chkit/clickhouse"; +import { + ingestRudelClaudeSessions, + ingestRudelCodexSessions, +} from "../generated/chkit-ingest.js"; +import type { + RudelClaudeSessionsRow, + RudelCodexSessionsRow, +} from "../generated/chkit-types.js"; + +const baseExecutor = createClickHouseExecutor({ + url: process.env.CLICKHOUSE_URL || "http://localhost:8123", + username: + process.env.CLICKHOUSE_USERNAME || process.env.CLICKHOUSE_USER || "default", + password: process.env.CLICKHOUSE_PASSWORD || "", + database: "default", +}); + +// ClickHouse Cloud's insert() can silently drop rows. +// Use execute() with async_insert=0 so inserts are immediately queryable. +const executor: typeof baseExecutor = { + ...baseExecutor, + async insert(params) { + const rows = params.values + .map((row: Record) => JSON.stringify(row)) + .join("\n"); + const sql = `INSERT INTO ${params.table} SETTINGS async_insert=0 FORMAT JSONEachRow ${rows}`; + + for (let attempt = 0; attempt < 5; attempt++) { + try { + await baseExecutor.execute(sql); + return; + } catch (error) { + const isRaceCondition = + error instanceof Error && + error.message.includes("INSERT race condition"); + if (!isRaceCondition || attempt === 4) throw error; + await Bun.sleep(1_000 * 2 ** attempt); + } + } + }, +}; + +const insertedClaudeIds = new Set(); +const insertedCodexIds = new Set(); +const insertedAnalyticsIds = new Set(); + +function isoMinutesFromNow(offsetMinutes: number): string { + return new Date(Date.now() + offsetMinutes * 60_000).toISOString(); +} + +function clickhouseNow(): string { + return new Date().toISOString().replace("Z", ""); +} + +async function waitForQuery( + query: string, + predicate: (rows: T[]) => boolean, + timeoutMs = 30_000, + intervalMs = 2_000, +): Promise { + const deadline = Date.now() + timeoutMs; + + while (Date.now() < deadline) { + try { + const rows = await executor.query(query); + if (predicate(rows)) return rows; + } catch { + // Retry transient ClickHouse errors while cloud storage settles. + } + await Bun.sleep(intervalMs); + } + + return []; +} + +function quoteIds(values: Set): string { + return [...values].map((value) => `'${value}'`).join(", "); +} + +afterAll(() => { + if (insertedAnalyticsIds.size > 0) { + void executor + .execute( + `DELETE FROM rudel.session_analytics WHERE session_id IN (${quoteIds(insertedAnalyticsIds)})`, + ) + .catch(() => {}); + } + + if (insertedClaudeIds.size > 0) { + void executor + .execute( + `DELETE FROM rudel.claude_sessions WHERE session_id IN (${quoteIds(insertedClaudeIds)})`, + ) + .catch(() => {}); + } + + if (insertedCodexIds.size > 0) { + void executor + .execute( + `DELETE FROM rudel.codex_sessions WHERE session_id IN (${quoteIds(insertedCodexIds)})`, + ) + .catch(() => {}); + } +}); + +describe("session analytics visibility regression", () => { + test("keeps Claude sessions visible when user/assistant timestamps are missing", async () => { + const orgId = `org_visibility_claude_${Date.now()}`; + const missingId = `claude_missing_ts_${Date.now()}`; + const validId = `claude_valid_ts_${Date.now()}`; + const now = clickhouseNow(); + + const rows: RudelClaudeSessionsRow[] = [ + { + session_date: now, + last_interaction_date: now, + session_id: missingId, + organization_id: orgId, + project_path: "/tests/claude-missing-timestamps", + git_remote: "", + package_name: "", + package_type: "", + content: [ + JSON.stringify({ type: "user", message: { content: "hello" } }), + JSON.stringify({ + type: "assistant", + message: { + model: "claude-3-7-sonnet", + usage: { input_tokens: 10, output_tokens: 20 }, + }, + }), + ].join("\n"), + subagents: {}, + ingested_at: now, + user_id: "user_visibility", + git_branch: "main", + git_sha: null, + tag: "bug-repro", + }, + { + session_date: now, + last_interaction_date: now, + session_id: validId, + organization_id: orgId, + project_path: "/tests/claude-valid-timestamps", + git_remote: "", + package_name: "", + package_type: "", + content: [ + JSON.stringify({ + type: "user", + timestamp: isoMinutesFromNow(-5), + message: { content: "hello" }, + }), + JSON.stringify({ + type: "assistant", + timestamp: isoMinutesFromNow(-4), + message: { + model: "claude-3-7-sonnet", + usage: { input_tokens: 10, output_tokens: 20 }, + }, + }), + ].join("\n"), + subagents: {}, + ingested_at: now, + user_id: "user_visibility", + git_branch: "main", + git_sha: null, + tag: "bug-repro", + }, + ]; + + insertedClaudeIds.add(missingId); + insertedClaudeIds.add(validId); + insertedAnalyticsIds.add(missingId); + insertedAnalyticsIds.add(validId); + + await ingestRudelClaudeSessions(executor, rows); + + const analyticsRows = await waitForQuery<{ + session_id: string; + total_interactions: number; + actual_duration_min: number; + }>( + `SELECT session_id, total_interactions, actual_duration_min + FROM rudel.session_analytics FINAL + WHERE organization_id = '${orgId}' + ORDER BY session_id ASC`, + (result) => result.length === 2, + ); + + expect(analyticsRows.map((row) => row.session_id)).toContain(validId); + expect(analyticsRows.map((row) => row.session_id)).toContain(missingId); + expect( + analyticsRows.find((row) => row.session_id === missingId), + ).toMatchObject({ + session_id: missingId, + total_interactions: 0, + actual_duration_min: 0, + }); + + const baseRows = await waitForQuery<{ session_id: string }>( + `SELECT session_id + FROM rudel.claude_sessions FINAL + WHERE organization_id = '${orgId}' + ORDER BY session_id ASC`, + (result) => result.length === 2, + ); + + expect(baseRows.map((row) => row.session_id)).toEqual( + [missingId, validId].sort(), + ); + }, 60_000); + + test("keeps Claude sessions visible when transcripts have no interaction lines", async () => { + const orgId = `org_visibility_claude_no_interactions_${Date.now()}`; + const missingId = `claude_no_interactions_${Date.now()}`; + const validId = `claude_valid_control_${Date.now()}`; + const now = clickhouseNow(); + + const rows: RudelClaudeSessionsRow[] = [ + { + session_date: now, + last_interaction_date: now, + session_id: missingId, + organization_id: orgId, + project_path: "/tests/claude-no-interactions", + git_remote: "", + package_name: "", + package_type: "", + content: [ + JSON.stringify({ type: "summary", sessionId: missingId }), + JSON.stringify({ + toolUseResult: { agentId: "sub-agent-001", result: "done" }, + }), + ].join("\n"), + subagents: {}, + ingested_at: now, + user_id: "user_visibility", + git_branch: "main", + git_sha: null, + tag: "bug-repro", + }, + { + session_date: now, + last_interaction_date: now, + session_id: validId, + organization_id: orgId, + project_path: "/tests/claude-valid-control", + git_remote: "", + package_name: "", + package_type: "", + content: [ + JSON.stringify({ + type: "user", + timestamp: isoMinutesFromNow(-8), + message: { content: "hello" }, + }), + JSON.stringify({ + type: "assistant", + timestamp: isoMinutesFromNow(-7), + message: { + model: "claude-3-7-sonnet", + usage: { input_tokens: 20, output_tokens: 40 }, + }, + }), + ].join("\n"), + subagents: {}, + ingested_at: now, + user_id: "user_visibility", + git_branch: "main", + git_sha: null, + tag: "bug-repro", + }, + ]; + + insertedClaudeIds.add(missingId); + insertedClaudeIds.add(validId); + insertedAnalyticsIds.add(missingId); + insertedAnalyticsIds.add(validId); + + await ingestRudelClaudeSessions(executor, rows); + + const analyticsRows = await waitForQuery<{ + session_id: string; + total_interactions: number; + actual_duration_min: number; + }>( + `SELECT session_id, total_interactions, actual_duration_min + FROM rudel.session_analytics FINAL + WHERE organization_id = '${orgId}' + ORDER BY session_id ASC`, + (result) => result.length === 2, + ); + + expect(analyticsRows.map((row) => row.session_id)).toContain(validId); + expect(analyticsRows.map((row) => row.session_id)).toContain(missingId); + expect( + analyticsRows.find((row) => row.session_id === missingId), + ).toMatchObject({ + session_id: missingId, + total_interactions: 0, + actual_duration_min: 0, + }); + }, 60_000); + + test("keeps Codex sessions visible when timestamps are missing", async () => { + const orgId = `org_visibility_codex_${Date.now()}`; + const missingId = `codex_missing_ts_${Date.now()}`; + const validId = `codex_valid_ts_${Date.now()}`; + const now = clickhouseNow(); + + const rows: RudelCodexSessionsRow[] = [ + { + session_date: now, + last_interaction_date: now, + session_id: missingId, + organization_id: orgId, + project_path: "/tests/codex-missing-timestamps", + git_remote: "", + package_name: "", + package_type: "", + content: [ + JSON.stringify({ + type: "session_meta", + payload: { id: missingId, cwd: "/tests/codex-missing-timestamps" }, + }), + JSON.stringify({ type: "response_item", payload: { id: "resp-1" } }), + JSON.stringify({ type: "event_msg", payload: { text: "done" } }), + ].join("\n"), + ingested_at: now, + user_id: "user_visibility", + git_branch: "main", + git_sha: null, + tag: "bug-repro", + }, + { + session_date: now, + last_interaction_date: now, + session_id: validId, + organization_id: orgId, + project_path: "/tests/codex-valid-timestamps", + git_remote: "", + package_name: "", + package_type: "", + content: [ + JSON.stringify({ + type: "session_meta", + timestamp: isoMinutesFromNow(-12), + payload: { id: validId, cwd: "/tests/codex-valid-timestamps" }, + }), + JSON.stringify({ + type: "response_item", + timestamp: isoMinutesFromNow(-11), + payload: { id: "resp-1" }, + }), + JSON.stringify({ + type: "event_msg", + timestamp: isoMinutesFromNow(-10), + payload: { text: "done" }, + }), + JSON.stringify({ + type: "event", + name: "turn.completed", + timestamp: isoMinutesFromNow(-10), + payload: { usage: { input_tokens: 15, output_tokens: 25 } }, + }), + ].join("\n"), + ingested_at: now, + user_id: "user_visibility", + git_branch: "main", + git_sha: null, + tag: "bug-repro", + }, + ]; + + insertedCodexIds.add(missingId); + insertedCodexIds.add(validId); + insertedAnalyticsIds.add(missingId); + insertedAnalyticsIds.add(validId); + + await ingestRudelCodexSessions(executor, rows); + + const analyticsRows = await waitForQuery<{ + session_id: string; + total_interactions: number; + actual_duration_min: number; + }>( + `SELECT session_id, total_interactions, actual_duration_min + FROM rudel.session_analytics FINAL + WHERE organization_id = '${orgId}' + ORDER BY session_id ASC`, + (result) => result.length === 2, + ); + + expect(analyticsRows.map((row) => row.session_id)).toContain(validId); + expect(analyticsRows.map((row) => row.session_id)).toContain(missingId); + expect( + analyticsRows.find((row) => row.session_id === missingId), + ).toMatchObject({ + session_id: missingId, + total_interactions: 0, + actual_duration_min: 0, + }); + + const baseRows = await waitForQuery<{ session_id: string }>( + `SELECT session_id + FROM rudel.codex_sessions FINAL + WHERE organization_id = '${orgId}' + ORDER BY session_id ASC`, + (result) => result.length === 2, + ); + + expect(baseRows.map((row) => row.session_id)).toEqual( + [missingId, validId].sort(), + ); + }, 60_000); +}); diff --git a/packages/ch-schema/src/db/schema/codex-sessions.ts b/packages/ch-schema/src/db/schema/codex-sessions.ts index 0855dbcd..a353ea59 100644 --- a/packages/ch-schema/src/db/schema/codex-sessions.ts +++ b/packages/ch-schema/src/db/schema/codex-sessions.ts @@ -51,9 +51,9 @@ const codex_session_analytics_mv = materializedView({ arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'input_tokens')), _completion_lines)) AS _input_tokens, arraySum(arrayMap(x -> toUInt64OrZero(JSONExtractRaw(JSONExtractRaw(x, 'payload'), 'usage', 'output_tokens')), _completion_lines)) AS _output_tokens, - arrayMin(_timestamps) AS _session_date, - arrayMax(_timestamps) AS _last_interaction_date, - dateDiff('minute', _session_date, _last_interaction_date) AS _duration_min, + if(length(_timestamps) > 0, arrayMin(_timestamps), cs.session_date) AS _session_date, + if(length(_timestamps) > 0, arrayMax(_timestamps), cs.last_interaction_date) AS _last_interaction_date, + if(length(_timestamps) > 1, dateDiff('minute', arrayMin(_timestamps), arrayMax(_timestamps)), 0) AS _duration_min, JSONExtractString( JSONExtractRaw( @@ -134,7 +134,6 @@ const codex_session_analytics_mv = materializedView({ )) as success_score FROM rudel.codex_sessions AS cs - WHERE length(_timestamps) > 0 QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1`, }); diff --git a/packages/ch-schema/src/db/schema/session-analytics.ts b/packages/ch-schema/src/db/schema/session-analytics.ts index f646f725..f81c6a9f 100644 --- a/packages/ch-schema/src/db/schema/session-analytics.ts +++ b/packages/ch-schema/src/db/schema/session-analytics.ts @@ -163,9 +163,9 @@ const rudel_session_analytics_mv = materializedView({ arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '"name":"Task"[^}]*"subagent_type":"([^"]+)"'))) AS _subagent_types, arrayDistinct(arrayFilter(x -> x != '', extractAll(cs.content, '/([^<]+)'))) AS _slash_commands, - arrayMin(_timestamps) AS _session_date, - arrayMax(_timestamps) AS _last_interaction_date, - dateDiff('minute', _session_date, _last_interaction_date) AS _duration_min + if(length(_timestamps) > 0, arrayMin(_timestamps), cs.session_date) AS _session_date, + if(length(_timestamps) > 0, arrayMax(_timestamps), cs.last_interaction_date) AS _last_interaction_date, + if(length(_timestamps) > 1, dateDiff('minute', arrayMin(_timestamps), arrayMax(_timestamps)), 0) AS _duration_min SELECT * EXCEPT (session_date, last_interaction_date), @@ -251,7 +251,6 @@ const rudel_session_analytics_mv = materializedView({ )) as success_score FROM rudel.claude_sessions AS cs - WHERE length(_timestamps) > 0 QUALIFY ROW_NUMBER() OVER (PARTITION BY cs.session_id ORDER BY cs.ingested_at DESC) = 1`, });