Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/smart-chunking-rewrite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@chkit/plugin-backfill": patch
"@chkit/clickhouse": patch
"chkit": patch
---

Rewrite backfill chunk planning with multi-strategy smart chunking. The planner now introspects partition layout, sort key distribution, and row estimates to produce better-sized chunks using strategies like equal-width splitting, quantile ranges, temporal bucketing, string prefix splitting, and group-by-key splitting. Adds a dedicated `sdk` entry point for programmatic access to chunking internals.
22 changes: 13 additions & 9 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/cli/src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async function waitForParts(
database: string,
table: string,
expectedPartitions: number,
timeoutMs = 15_000,
timeoutMs = 60_000,
): Promise<void> {
const start = Date.now()
while (Date.now() - start < timeoutMs) {
Expand Down
1 change: 1 addition & 0 deletions packages/clickhouse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"dependencies": {
"@chkit/core": "workspace:*",
"@clickhouse/client": "^1.11.0",
"@logtape/logtape": "^2.0.5",
"p-retry": "^7.1.1"
}
}
71 changes: 62 additions & 9 deletions packages/clickhouse/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { createClient } from '@clickhouse/client'
import { createClient, type ClickHouseSettings } from '@clickhouse/client'
import {
normalizeSQLFragment,
type ChxConfig,
type ColumnDefinition,
type ProjectionDefinition,
type SkipIndexDefinition,
} from '@chkit/core'
import { getLogger } from '@logtape/logtape'
import {
parseEngineFromCreateTableQuery,
parseOrderByFromCreateTableQuery,
Expand All @@ -28,9 +29,11 @@ export interface QueryStatus {
error?: string
}

export type { ClickHouseSettings }

export interface ClickHouseExecutor {
command(sql: string): Promise<void>
query<T>(sql: string): Promise<T[]>
query<T>(sql: string, settings?: ClickHouseSettings): Promise<T[]>
insert<T extends Record<string, unknown>>(params: { table: string; values: T[] }): Promise<void>
listSchemaObjects(): Promise<SchemaObjectRef[]>
listTableDetails(databases: string[]): Promise<IntrospectedTable[]>
Expand Down Expand Up @@ -249,7 +252,54 @@ export {
waitForTableAbsent,
} from './ddl-propagation.js'

function parseSummaryFromHeaders(headers: Record<string, string | string[] | undefined>): {
read_rows: string
read_bytes: string
written_rows: string
written_bytes: string
result_rows: string
result_bytes: string
elapsed_ns: string
} | undefined {
const raw = headers['x-clickhouse-summary']
if (!raw || typeof raw !== 'string') return undefined
try {
return JSON.parse(raw)
} catch {
return undefined
}
}

function logProfiling(
logger: ReturnType<typeof getLogger>,
query: string,
queryId: string,
summary?: {
read_rows: string
read_bytes: string
written_rows: string
written_bytes: string
result_rows?: string
result_bytes?: string
elapsed_ns: string
},
): void {
logger.trace('Query completed: {query}', {
query,
queryId,
readRows: Number(summary?.read_rows ?? 0),
readBytes: Number(summary?.read_bytes ?? 0),
writtenRows: Number(summary?.written_rows ?? 0),
writtenBytes: Number(summary?.written_bytes ?? 0),
elapsedMs: Number(summary?.elapsed_ns ?? 0) / 1_000_000,
resultRows: Number(summary?.result_rows ?? 0),
resultBytes: Number(summary?.result_bytes ?? 0),
})
}

export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhouse']>): ClickHouseExecutor {
const profiler = getLogger(['chkit', 'profiling'])

const client = createClient({
url: config.url,
username: config.username,
Expand All @@ -259,6 +309,7 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
clickhouse_settings: {
wait_end_of_query: 1,
async_insert: 0,
send_progress_in_http_headers: 1,
},
})

Expand All @@ -275,11 +326,10 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
return {
async command(sql: string): Promise<void> {
try {
await client.command({ query: sql, http_headers: { 'X-DDL': '1' } })
const result = await client.command({ query: sql, http_headers: { 'X-DDL': '1' } })
logProfiling(profiler, sql, result.query_id, result.summary)
} catch (error) {
if (isUnknownDatabaseError(error)) {
// The configured database doesn't exist yet. Retry without the
// session database so that CREATE DATABASE can succeed.
const fallback = createClient({
url: config.url,
username: config.username,
Expand All @@ -296,21 +346,24 @@ export function createClickHouseExecutor(config: NonNullable<ChxConfig['clickhou
wrapConnectionError(error, config.url)
}
},
async query<T>(sql: string): Promise<T[]> {
async query<T>(sql: string, settings?: ClickHouseSettings): Promise<T[]> {
try {
const result = await client.query({ query: sql, format: 'JSONEachRow', http_headers: { 'X-DDL': '1' } })
return result.json<T>()
const result = await client.query({ query: sql, format: 'JSONEachRow', http_headers: { 'X-DDL': '1' }, ...(settings ? { clickhouse_settings: settings } : {}) })
const rows = await result.json<T>()
logProfiling(profiler, sql, result.query_id, parseSummaryFromHeaders(result.response_headers))
return rows
} catch (error) {
wrapConnectionError(error, config.url)
}
},
async insert<T extends Record<string, unknown>>(params: { table: string; values: T[] }): Promise<void> {
try {
await client.insert({
const result = await client.insert({
table: params.table,
values: params.values,
format: 'JSONEachRow',
})
logProfiling(profiler, `INSERT INTO ${params.table}`, result.query_id, result.summary)
} catch (error) {
wrapConnectionError(error, config.url)
}
Expand Down
17 changes: 17 additions & 0 deletions packages/plugin-backfill/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ export default defineConfig({

See the [chkit documentation](https://chkit.obsessiondb.com).

## SDK Internals

The package root is limited to the plugin registration API.

Chunk-planning and async execution internals are exposed from the SDK subpath:

```ts
import {
analyzeAndChunk,
buildWhereClauseFromChunk,
decodeChunkPlanFromPersistence,
encodeChunkPlanForPersistence,
executeBackfill,
generateIdempotencyToken,
} from '@chkit/plugin-backfill/sdk'
```

## License

[MIT](../../LICENSE)
7 changes: 7 additions & 0 deletions packages/plugin-backfill/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
"source": "./src/index.ts",
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"./sdk": {
"source": "./src/sdk.ts",
"types": "./dist/sdk.d.ts",
"default": "./dist/sdk.js"
}
},
"files": [
Expand All @@ -38,6 +43,8 @@
"typecheck": "tsc -p tsconfig.json --noEmit",
"lint": "biome lint src",
"test": "bun test src",
"test:env": "doppler run --project chkit --config ci -- bun test src",
"seed:env": "doppler run --project chkit --config ci -- bun run src/chunking/e2e/seed-datasets.script.ts",
"clean": "rm -rf dist"
},
"dependencies": {
Expand Down
4 changes: 2 additions & 2 deletions packages/plugin-backfill/src/async-backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ export interface BackfillOptions {
/** Plan ID used as a namespace in deterministic query IDs */
planId: string
/** The chunks to process (from buildChunks) */
chunks: Array<{ id: string; from: string; to: string; [key: string]: unknown }>
chunks: Array<{ id: string; from?: string; to?: string; [key: string]: unknown }>
/** Build the SQL for a given chunk. Called once per chunk at submit time. */
buildQuery: (chunk: { id: string; from: string; to: string }) => string
buildQuery: (chunk: { id: string; from?: string; to?: string }) => string
/** Max concurrent queries running on the server. Default: 3 */
concurrency?: number
/** Polling interval in ms. Default: 5000 */
Expand Down
Loading
Loading