Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/cli/src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async function waitForParts(
database: string,
table: string,
expectedPartitions: number,
timeoutMs = 15_000,
timeoutMs = 60_000,
): Promise<void> {
const start = Date.now()
while (Date.now() - start < timeoutMs) {
Expand Down
17 changes: 17 additions & 0 deletions packages/plugin-backfill/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ export default defineConfig({

See the [chkit documentation](https://chkit.obsessiondb.com).

## SDK Internals

The package root is limited to the plugin registration API.

Chunk-planning and async execution internals are exposed from the SDK subpath:

```ts
import {
analyzeAndChunk,
buildWhereClauseFromChunk,
decodeChunkPlanFromPersistence,
encodeChunkPlanForPersistence,
executeBackfill,
generateIdempotencyToken,
} from '@chkit/plugin-backfill/sdk'
```

## License

[MIT](../../LICENSE)
5 changes: 5 additions & 0 deletions packages/plugin-backfill/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
"source": "./src/index.ts",
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"./sdk": {
"source": "./src/sdk.ts",
"types": "./dist/sdk.d.ts",
"default": "./dist/sdk.js"
}
},
"files": [
Expand Down
4 changes: 2 additions & 2 deletions packages/plugin-backfill/src/async-backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ export interface BackfillOptions {
/** Plan ID used as a namespace in deterministic query IDs */
planId: string
/** The chunks to process (from buildChunks) */
chunks: Array<{ id: string; from: string; to: string; [key: string]: unknown }>
chunks: Array<{ id: string; from?: string; to?: string; [key: string]: unknown }>
/** Build the SQL for a given chunk. Called once per chunk at submit time. */
buildQuery: (chunk: { id: string; from: string; to: string }) => string
buildQuery: (chunk: { id: string; from?: string; to?: string }) => string
/** Max concurrent queries running on the server. Default: 3 */
concurrency?: number
/** Polling interval in ms. Default: 5000 */
Expand Down
130 changes: 8 additions & 122 deletions packages/plugin-backfill/src/chunking/analyze.ts
Original file line number Diff line number Diff line change
@@ -1,129 +1,15 @@
import { hashId, randomPlanId } from '../state.js'
import { generateChunkPlan } from './planner.js'
import type { ChunkPlan, GenerateChunkPlanInput } from './types.js'

import { buildChunkBoundaries } from './build.js'
import { introspectTable, querySortKeyRanges } from './introspect.js'
import type { ChunkBoundary, PartitionInfo, PlannedChunk, SortKeyInfo } from './types.js'

export interface AnalyzeAndChunkInput {
database: string
table: string
from?: string
to?: string
maxChunkBytes: number
requireIdempotencyToken: boolean
query: <T>(sql: string) => Promise<T[]>
}

export interface AnalyzeAndChunkResult {
planId: string
partitions: PartitionInfo[]
sortKey?: SortKeyInfo
chunks: PlannedChunk[]
}
export type AnalyzeAndChunkInput = GenerateChunkPlanInput
export type AnalyzeAndChunkResult = ChunkPlan
export type AnalyzeTableInput = GenerateChunkPlanInput
export type AnalyzeTableResult = ChunkPlan

export async function analyzeAndChunk(input: AnalyzeAndChunkInput): Promise<AnalyzeAndChunkResult> {
const { partitions, sortKey, boundaries } = await analyzeTable({
database: input.database,
table: input.table,
from: input.from,
to: input.to,
maxChunkBytes: input.maxChunkBytes,
query: input.query,
})

const planId = randomPlanId()

const chunks = buildPlannedChunks({
planId,
partitions,
boundaries,
requireIdempotencyToken: input.requireIdempotencyToken,
})

return { planId, partitions, sortKey, chunks }
}

export interface AnalyzeTableInput {
database: string
table: string
from?: string
to?: string
maxChunkBytes: number
query: <T>(sql: string) => Promise<T[]>
}

export interface AnalyzeTableResult {
partitions: PartitionInfo[]
sortKey?: SortKeyInfo
boundaries: ChunkBoundary[]
return generateChunkPlan(input)
}

export async function analyzeTable(input: AnalyzeTableInput): Promise<AnalyzeTableResult> {
const { partitions, sortKey } = await introspectTable({
database: input.database,
table: input.table,
from: input.from,
to: input.to,
query: input.query,
})

const oversizedPartitionIds = partitions
.filter(p => p.bytesOnDisk > input.maxChunkBytes)
.map(p => p.partitionId)

let sortKeyRanges: Map<string, { min: string; max: string }> | undefined
if (sortKey && oversizedPartitionIds.length > 0) {
sortKeyRanges = await querySortKeyRanges({
database: input.database,
table: input.table,
sortKeyColumn: sortKey.column,
partitionIds: oversizedPartitionIds,
query: input.query,
})
}

const boundaries = buildChunkBoundaries({
partitions,
maxChunkBytes: input.maxChunkBytes,
sortKey,
sortKeyRanges,
})

return { partitions, sortKey, boundaries }
}

export function buildPlannedChunks(input: {
planId: string
partitions: PartitionInfo[]
boundaries: ChunkBoundary[]
requireIdempotencyToken: boolean
}): PlannedChunk[] {
const chunks: PlannedChunk[] = []
const partitionIndex = new Map<string, number>()

for (const boundary of input.boundaries) {
const idx = partitionIndex.get(boundary.partitionId) ?? 0
partitionIndex.set(boundary.partitionId, idx + 1)

const idSeed = `${input.planId}:${boundary.partitionId}:${idx}`
const chunkId = hashId(`chunk:${idSeed}`).slice(0, 16)
const token = input.requireIdempotencyToken ? hashId(`token:${idSeed}`) : ''

const partition = input.partitions.find(p => p.partitionId === boundary.partitionId)
const from = boundary.sortKeyFrom ?? partition?.minTime ?? ''
const to = boundary.sortKeyTo ?? partition?.maxTime ?? ''

chunks.push({
id: chunkId,
partitionId: boundary.partitionId,
sortKeyFrom: boundary.sortKeyFrom,
sortKeyTo: boundary.sortKeyTo,
estimatedBytes: boundary.estimatedBytes,
idempotencyToken: token,
from,
to,
})
}

return chunks
return analyzeAndChunk(input)
}
109 changes: 109 additions & 0 deletions packages/plugin-backfill/src/chunking/boundary-codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import type {
Chunk,
ChunkPlan,
ChunkRange,
FocusedValue,
SortKey,
} from './types.js'

export function encodeBoundary(
value: string | undefined,
sortKey: SortKey | undefined,
): string | undefined {
if (value === undefined || sortKey === undefined) return value
if (sortKey.boundaryEncoding === 'hex-latin1') {
return Buffer.from(value, 'latin1').toString('hex')
}
return value
}

export function decodeBoundary(
value: string | undefined,
sortKey: SortKey | undefined,
): string | undefined {
if (value === undefined || sortKey === undefined) return value
if (sortKey.boundaryEncoding === 'hex-latin1') {
return Buffer.from(value, 'hex').toString('latin1')
}
return value
}

export function encodeRangesForPlan(
ranges: ChunkRange[],
sortKeys: SortKey[],
): ChunkRange[] {
return ranges.map((range) => ({
dimensionIndex: range.dimensionIndex,
from: encodeBoundary(range.from, sortKeys[range.dimensionIndex]),
to: encodeBoundary(range.to, sortKeys[range.dimensionIndex]),
}))
}

export function decodeRangesFromPlan(
ranges: ChunkRange[],
sortKeys: SortKey[],
): ChunkRange[] {
return ranges.map((range) => ({
dimensionIndex: range.dimensionIndex,
from: decodeBoundary(range.from, sortKeys[range.dimensionIndex]),
to: decodeBoundary(range.to, sortKeys[range.dimensionIndex]),
}))
}

function encodeFocusedValue(
focusedValue: FocusedValue | undefined,
sortKeys: SortKey[],
): FocusedValue | undefined {
if (!focusedValue) return undefined
return {
dimensionIndex: focusedValue.dimensionIndex,
value: encodeBoundary(focusedValue.value, sortKeys[focusedValue.dimensionIndex]) ?? focusedValue.value,
}
}

function decodeFocusedValue(
focusedValue: FocusedValue | undefined,
sortKeys: SortKey[],
): FocusedValue | undefined {
if (!focusedValue) return undefined
return {
dimensionIndex: focusedValue.dimensionIndex,
value: decodeBoundary(focusedValue.value, sortKeys[focusedValue.dimensionIndex]) ?? focusedValue.value,
}
}

export function encodeChunkForPlan(chunk: Chunk, sortKeys: SortKey[]): Chunk {
return {
...chunk,
ranges: encodeRangesForPlan(chunk.ranges, sortKeys),
analysis: {
...chunk.analysis,
focusedValue: encodeFocusedValue(chunk.analysis.focusedValue, sortKeys),
},
}
}

export function decodeChunkFromPlan(chunk: Chunk, sortKeys: SortKey[]): Chunk {
return {
...chunk,
ranges: decodeRangesFromPlan(chunk.ranges, sortKeys),
analysis: {
...chunk.analysis,
focusedValue: decodeFocusedValue(chunk.analysis.focusedValue, sortKeys),
},
}
}

export function encodeChunkPlanForPersistence(plan: ChunkPlan): ChunkPlan {
return {
...plan,
chunks: plan.chunks.map((chunk) => encodeChunkForPlan(chunk, plan.table.sortKeys)),
}
}

export function decodeChunkPlanFromPersistence(plan: ChunkPlan): ChunkPlan {
return {
...plan,
chunks: plan.chunks.map((chunk) => decodeChunkFromPlan(chunk, plan.table.sortKeys)),
}
}
Loading
Loading