Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ export const AdvancedSettings = ({
name="maxFillMs"
render={({ field }) => (
<FormItemLayout
label="Batch wait time"
layout="horizontal"
description="How long to wait for more changes before sending. Shorter times mean more real-time updates but higher overhead."
label="Batch wait time"
description={
<>
<p>Time to wait for additional changes before sending.</p>
<p>Shorter times imply faster updates, but higher overhead.</p>
</>
}
>
<FormControl_Shadcn_>
<PrePostTab postTab="milliseconds">
Expand All @@ -59,7 +64,65 @@ export const AdvancedSettings = ({
type="number"
value={field.value ?? ''}
onChange={handleNumberChange(field)}
placeholder="Default: 10000 (10 seconds)"
placeholder="Default: 10000"
/>
</PrePostTab>
</FormControl_Shadcn_>
</FormItemLayout>
)}
/>

<FormField_Shadcn_
control={form.control}
name="maxSize"
render={({ field }) => (
<FormItemLayout
label="Batch size"
layout="horizontal"
description={
<>
<p>Number of rows to send in a batch.</p>
<p>Larger batches use more memory, with the risk of running out of memory.</p>
</>
}
>
<FormControl_Shadcn_>
<PrePostTab postTab="rows">
<Input_Shadcn_
{...field}
type="number"
value={field.value ?? ''}
onChange={handleNumberChange(field)}
placeholder="Default: 100000"
/>
</PrePostTab>
</FormControl_Shadcn_>
</FormItemLayout>
)}
/>

<FormField_Shadcn_
control={form.control}
name="maxTableSyncWorkers"
render={({ field }) => (
<FormItemLayout
label="Table sync workers"
layout="horizontal"
description={
<>
<p>Number of tables to copy in parallel during the initial sync.</p>
<p>Uses one replication slot per worker (N + 1 total when fully active).</p>
</>
}
>
<FormControl_Shadcn_>
<PrePostTab postTab="workers">
<Input_Shadcn_
{...field}
type="number"
value={field.value ?? ''}
onChange={handleNumberChange(field)}
placeholder="Default: 4"
/>
</PrePostTab>
</FormControl_Shadcn_>
Expand All @@ -81,7 +144,15 @@ export const AdvancedSettings = ({
</div>
}
layout="horizontal"
description="Maximum age of cached data before BigQuery reads from base tables at query time. Lower values ensure fresher results but may increase query costs. Leave empty for no staleness limit."
description={
<>
<p>
Maximum age of cached data before BigQuery reads from base tables at query
time.
</p>
<p>Lower values return fresher results, but may increase query costs.</p>
</>
}
>
<FormControl_Shadcn_>
<PrePostTab postTab="minutes">
Expand All @@ -90,7 +161,7 @@ export const AdvancedSettings = ({
type="number"
value={field.value ?? ''}
onChange={handleNumberChange(field)}
placeholder="Default: none"
placeholder="Default: None (No staleness limit)"
/>
</PrePostTab>
</FormControl_Shadcn_>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ export const DestinationPanelFormSchema = z.object({
name: z.string().min(1, 'Name is required'),
publicationName: z.string().min(1, 'Publication is required'),
maxFillMs: z.number().min(1, 'Max Fill milliseconds should be greater than 0').int().optional(),
maxSize: z.number().min(1, 'Max batch size should be greater than 0').int().optional(),
maxTableSyncWorkers: z
.number()
.min(1, 'Max table sync workers should be greater than 0')
.int()
.optional(),
// BigQuery fields
projectId: z.string().optional(),
datasetId: z.string().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ export const DestinationForm = ({
// Common fields
name: destinationData?.name ?? '',
publicationName: pipelineData?.config.publication_name ?? '',
maxFillMs: pipelineData?.config?.batch?.max_fill_ms ?? 10000, // Default: 10 seconds
maxFillMs: pipelineData?.config?.batch?.max_fill_ms ?? undefined,
maxSize: pipelineData?.config?.batch?.max_size ?? undefined,
maxTableSyncWorkers: pipelineData?.config?.max_table_sync_workers ?? undefined,
// BigQuery fields
projectId: isBigQueryConfig ? config.big_query.project_id : '',
datasetId: isBigQueryConfig ? config.big_query.dataset_id : '',
Expand Down Expand Up @@ -301,6 +303,8 @@ export const DestinationForm = ({
sourceId,
publicationName: data.publicationName,
maxFillMs: data.maxFillMs,
maxSize: data.maxSize,
maxTableSyncWorkers: data.maxTableSyncWorkers,
}),
])

Expand Down Expand Up @@ -398,9 +402,13 @@ export const DestinationForm = ({
destinationConfig = { iceberg: icebergConfig }
}

const batchConfig: BatchConfig | undefined = !!data.maxFillMs
? { maxFillMs: data.maxFillMs }
: undefined
const batchConfig: BatchConfig | undefined =
data.maxFillMs !== undefined || data.maxSize !== undefined
? {
...(data.maxFillMs !== undefined ? { maxFillMs: data.maxFillMs } : {}),
...(data.maxSize !== undefined ? { maxSize: data.maxSize } : {}),
}
: undefined
const hasBatchFields = batchConfig !== undefined

if (!destinationConfig) throw new Error('Destination configuration is missing')
Expand All @@ -413,6 +421,7 @@ export const DestinationForm = ({
destinationConfig,
pipelineConfig: {
publicationName: data.publicationName,
maxTableSyncWorkers: data.maxTableSyncWorkers,
...(hasBatchFields ? { batch: batchConfig } : {}),
},
sourceId,
Expand Down Expand Up @@ -476,9 +485,13 @@ export const DestinationForm = ({
}
destinationConfig = { iceberg: icebergConfig }
}
const batchConfig: BatchConfig | undefined = !!data.maxFillMs
? { maxFillMs: data.maxFillMs }
: undefined
const batchConfig: BatchConfig | undefined =
data.maxFillMs !== undefined || data.maxSize !== undefined
? {
...(data.maxFillMs !== undefined ? { maxFillMs: data.maxFillMs } : {}),
...(data.maxSize !== undefined ? { maxSize: data.maxSize } : {}),
}
: undefined
const hasBatchFields = batchConfig !== undefined

if (!destinationConfig) throw new Error('Destination configuration is missing')
Expand All @@ -490,6 +503,7 @@ export const DestinationForm = ({
sourceId,
pipelineConfig: {
publicationName: data.publicationName,
maxTableSyncWorkers: data.maxTableSyncWorkers,
...(hasBatchFields ? { batch: batchConfig } : {}),
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ export type IcebergDestinationConfig = {
}

export type BatchConfig = {
maxFillMs: number
maxFillMs?: number
maxSize?: number
}

export type CreateDestinationPipelineParams = {
Expand All @@ -43,6 +44,7 @@ export type CreateDestinationPipelineParams = {
pipelineConfig: {
publicationName: string
batch?: BatchConfig
maxTableSyncWorkers?: number
}
}

Expand All @@ -51,7 +53,7 @@ async function createDestinationPipeline(
projectRef,
destinationName: destinationName,
destinationConfig,
pipelineConfig: { publicationName, batch },
pipelineConfig: { publicationName, batch, maxTableSyncWorkers },
sourceId,
}: CreateDestinationPipelineParams,
signal?: AbortSignal
Expand Down Expand Up @@ -108,7 +110,17 @@ async function createDestinationPipeline(
destination_config,
pipeline_config: {
publication_name: publicationName,
...(!!batch ? { batch: { max_fill_ms: batch.maxFillMs } } : {}),
...(maxTableSyncWorkers !== undefined
? { max_table_sync_workers: maxTableSyncWorkers }
: {}),
...(batch
? {
batch: {
...(batch.maxFillMs !== undefined ? { max_fill_ms: batch.maxFillMs } : {}),
...(batch.maxSize !== undefined ? { max_size: batch.maxSize } : {}),
},
}
: {}),
},
},
signal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type UpdateDestinationPipelineParams = {
pipelineConfig: {
publicationName: string
batch?: BatchConfig
maxTableSyncWorkers?: number
}
}

Expand All @@ -27,7 +28,7 @@ async function updateDestinationPipeline(
projectRef,
destinationName: destinationName,
destinationConfig,
pipelineConfig: { publicationName, batch },
pipelineConfig: { publicationName, batch, maxTableSyncWorkers },
sourceId,
}: UpdateDestinationPipelineParams,
signal?: AbortSignal
Expand Down Expand Up @@ -84,7 +85,17 @@ async function updateDestinationPipeline(
destination_name: destinationName,
pipeline_config: {
publication_name: publicationName,
...(!!batch ? { batch: { max_fill_ms: batch.maxFillMs } } : {}),
...(maxTableSyncWorkers !== undefined
? { max_table_sync_workers: maxTableSyncWorkers }
: {}),
...(batch
? {
batch: {
...(batch.maxFillMs !== undefined ? { max_fill_ms: batch.maxFillMs } : {}),
...(batch.maxSize !== undefined ? { max_size: batch.maxSize } : {}),
},
}
: {}),
},
},
signal,
Expand Down
19 changes: 17 additions & 2 deletions apps/studio/data/replication/validate-pipeline-mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,38 @@ type ValidatePipelineParams = {
sourceId: number
publicationName: string
maxFillMs?: number
maxSize?: number
maxTableSyncWorkers?: number
}
type ValidatePipelineResponse = components['schemas']['ValidatePipelineResponse']

async function validatePipeline(
{ projectRef, sourceId, publicationName, maxFillMs }: ValidatePipelineParams,
{
projectRef,
sourceId,
publicationName,
maxFillMs,
maxSize,
maxTableSyncWorkers,
}: ValidatePipelineParams,
signal?: AbortSignal
): Promise<ValidatePipelineResponse> {
if (!projectRef) throw new Error('projectRef is required')
if (!sourceId) throw new Error('sourceId is required')

const batchConfig =
maxFillMs !== undefined || maxSize !== undefined
? { max_fill_ms: maxFillMs, max_size: maxSize }
: undefined

const { data, error } = await post('/platform/replication/{ref}/pipelines/validate', {
params: { path: { ref: projectRef } },
body: {
source_id: sourceId,
config: {
publication_name: publicationName,
...(maxFillMs !== undefined ? { batch: { max_fill_ms: maxFillMs } } : {}),
max_table_sync_workers: maxTableSyncWorkers,
batch: batchConfig,
},
},
signal,
Expand Down
Loading
Loading