+ Number of tables to copy in parallel during the initial sync.
+ Uses one replication slot per worker (N + 1 total when fully active).
+ >
+ }
+ >
+
+
+
@@ -81,7 +144,15 @@ export const AdvancedSettings = ({
}
layout="horizontal"
- description="Maximum age of cached data before BigQuery reads from base tables at query time. Lower values ensure fresher results but may increase query costs. Leave empty for no staleness limit."
+ description={
+ <>
+
+ Maximum age of cached data before BigQuery reads from base tables at query
+ time.
+
+ Lower values return fresher results, but may increase query costs.
+ >
+ }
>
@@ -90,7 +161,7 @@ export const AdvancedSettings = ({
type="number"
value={field.value ?? ''}
onChange={handleNumberChange(field)}
- placeholder="Default: none"
+ placeholder="Default: None (No staleness limit)"
/>
diff --git a/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/DestinationForm.schema.ts b/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/DestinationForm.schema.ts
index 97e09b56a953f..312d3798a5375 100644
--- a/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/DestinationForm.schema.ts
+++ b/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/DestinationForm.schema.ts
@@ -7,6 +7,12 @@ export const DestinationPanelFormSchema = z.object({
name: z.string().min(1, 'Name is required'),
publicationName: z.string().min(1, 'Publication is required'),
maxFillMs: z.number().min(1, 'Max Fill milliseconds should be greater than 0').int().optional(),
+ maxSize: z.number().min(1, 'Max batch size should be greater than 0').int().optional(),
+ maxTableSyncWorkers: z
+ .number()
+ .min(1, 'Max table sync workers should be greater than 0')
+ .int()
+ .optional(),
// BigQuery fields
projectId: z.string().optional(),
datasetId: z.string().optional(),
diff --git a/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/index.tsx b/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/index.tsx
index 9143453ec5ab4..25eb91dc02f9c 100644
--- a/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/index.tsx
+++ b/apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/index.tsx
@@ -173,7 +173,9 @@ export const DestinationForm = ({
// Common fields
name: destinationData?.name ?? '',
publicationName: pipelineData?.config.publication_name ?? '',
- maxFillMs: pipelineData?.config?.batch?.max_fill_ms ?? 10000, // Default: 10 seconds
+ maxFillMs: pipelineData?.config?.batch?.max_fill_ms ?? undefined,
+ maxSize: pipelineData?.config?.batch?.max_size ?? undefined,
+ maxTableSyncWorkers: pipelineData?.config?.max_table_sync_workers ?? undefined,
// BigQuery fields
projectId: isBigQueryConfig ? config.big_query.project_id : '',
datasetId: isBigQueryConfig ? config.big_query.dataset_id : '',
@@ -301,6 +303,8 @@ export const DestinationForm = ({
sourceId,
publicationName: data.publicationName,
maxFillMs: data.maxFillMs,
+ maxSize: data.maxSize,
+ maxTableSyncWorkers: data.maxTableSyncWorkers,
}),
])
@@ -398,9 +402,13 @@ export const DestinationForm = ({
destinationConfig = { iceberg: icebergConfig }
}
- const batchConfig: BatchConfig | undefined = !!data.maxFillMs
- ? { maxFillMs: data.maxFillMs }
- : undefined
+ const batchConfig: BatchConfig | undefined =
+ data.maxFillMs !== undefined || data.maxSize !== undefined
+ ? {
+ ...(data.maxFillMs !== undefined ? { maxFillMs: data.maxFillMs } : {}),
+ ...(data.maxSize !== undefined ? { maxSize: data.maxSize } : {}),
+ }
+ : undefined
const hasBatchFields = batchConfig !== undefined
if (!destinationConfig) throw new Error('Destination configuration is missing')
@@ -413,6 +421,7 @@ export const DestinationForm = ({
destinationConfig,
pipelineConfig: {
publicationName: data.publicationName,
+ maxTableSyncWorkers: data.maxTableSyncWorkers,
...(hasBatchFields ? { batch: batchConfig } : {}),
},
sourceId,
@@ -476,9 +485,13 @@ export const DestinationForm = ({
}
destinationConfig = { iceberg: icebergConfig }
}
- const batchConfig: BatchConfig | undefined = !!data.maxFillMs
- ? { maxFillMs: data.maxFillMs }
- : undefined
+ const batchConfig: BatchConfig | undefined =
+ data.maxFillMs !== undefined || data.maxSize !== undefined
+ ? {
+ ...(data.maxFillMs !== undefined ? { maxFillMs: data.maxFillMs } : {}),
+ ...(data.maxSize !== undefined ? { maxSize: data.maxSize } : {}),
+ }
+ : undefined
const hasBatchFields = batchConfig !== undefined
if (!destinationConfig) throw new Error('Destination configuration is missing')
@@ -490,6 +503,7 @@ export const DestinationForm = ({
sourceId,
pipelineConfig: {
publicationName: data.publicationName,
+ maxTableSyncWorkers: data.maxTableSyncWorkers,
...(hasBatchFields ? { batch: batchConfig } : {}),
},
})
diff --git a/apps/studio/data/replication/create-destination-pipeline-mutation.ts b/apps/studio/data/replication/create-destination-pipeline-mutation.ts
index 8141dac26e6d8..4d9006222847b 100644
--- a/apps/studio/data/replication/create-destination-pipeline-mutation.ts
+++ b/apps/studio/data/replication/create-destination-pipeline-mutation.ts
@@ -32,7 +32,8 @@ export type IcebergDestinationConfig = {
}
export type BatchConfig = {
- maxFillMs: number
+ maxFillMs?: number
+ maxSize?: number
}
export type CreateDestinationPipelineParams = {
@@ -43,6 +44,7 @@ export type CreateDestinationPipelineParams = {
pipelineConfig: {
publicationName: string
batch?: BatchConfig
+ maxTableSyncWorkers?: number
}
}
@@ -51,7 +53,7 @@ async function createDestinationPipeline(
projectRef,
destinationName: destinationName,
destinationConfig,
- pipelineConfig: { publicationName, batch },
+ pipelineConfig: { publicationName, batch, maxTableSyncWorkers },
sourceId,
}: CreateDestinationPipelineParams,
signal?: AbortSignal
@@ -108,7 +110,17 @@ async function createDestinationPipeline(
destination_config,
pipeline_config: {
publication_name: publicationName,
- ...(!!batch ? { batch: { max_fill_ms: batch.maxFillMs } } : {}),
+ ...(maxTableSyncWorkers !== undefined
+ ? { max_table_sync_workers: maxTableSyncWorkers }
+ : {}),
+ ...(batch
+ ? {
+ batch: {
+ ...(batch.maxFillMs !== undefined ? { max_fill_ms: batch.maxFillMs } : {}),
+ ...(batch.maxSize !== undefined ? { max_size: batch.maxSize } : {}),
+ },
+ }
+ : {}),
},
},
signal,
diff --git a/apps/studio/data/replication/update-destination-pipeline-mutation.ts b/apps/studio/data/replication/update-destination-pipeline-mutation.ts
index 12da1fcd1db21..8bdb50f0acc2c 100644
--- a/apps/studio/data/replication/update-destination-pipeline-mutation.ts
+++ b/apps/studio/data/replication/update-destination-pipeline-mutation.ts
@@ -17,6 +17,7 @@ export type UpdateDestinationPipelineParams = {
pipelineConfig: {
publicationName: string
batch?: BatchConfig
+ maxTableSyncWorkers?: number
}
}
@@ -27,7 +28,7 @@ async function updateDestinationPipeline(
projectRef,
destinationName: destinationName,
destinationConfig,
- pipelineConfig: { publicationName, batch },
+ pipelineConfig: { publicationName, batch, maxTableSyncWorkers },
sourceId,
}: UpdateDestinationPipelineParams,
signal?: AbortSignal
@@ -84,7 +85,17 @@ async function updateDestinationPipeline(
destination_name: destinationName,
pipeline_config: {
publication_name: publicationName,
- ...(!!batch ? { batch: { max_fill_ms: batch.maxFillMs } } : {}),
+ ...(maxTableSyncWorkers !== undefined
+ ? { max_table_sync_workers: maxTableSyncWorkers }
+ : {}),
+ ...(batch
+ ? {
+ batch: {
+ ...(batch.maxFillMs !== undefined ? { max_fill_ms: batch.maxFillMs } : {}),
+ ...(batch.maxSize !== undefined ? { max_size: batch.maxSize } : {}),
+ },
+ }
+ : {}),
},
},
signal,
diff --git a/apps/studio/data/replication/validate-pipeline-mutation.ts b/apps/studio/data/replication/validate-pipeline-mutation.ts
index 67d3be6acd295..9c85c9b2f1295 100644
--- a/apps/studio/data/replication/validate-pipeline-mutation.ts
+++ b/apps/studio/data/replication/validate-pipeline-mutation.ts
@@ -9,23 +9,38 @@ type ValidatePipelineParams = {
sourceId: number
publicationName: string
maxFillMs?: number
+ maxSize?: number
+ maxTableSyncWorkers?: number
}
type ValidatePipelineResponse = components['schemas']['ValidatePipelineResponse']
async function validatePipeline(
- { projectRef, sourceId, publicationName, maxFillMs }: ValidatePipelineParams,
+ {
+ projectRef,
+ sourceId,
+ publicationName,
+ maxFillMs,
+ maxSize,
+ maxTableSyncWorkers,
+ }: ValidatePipelineParams,
signal?: AbortSignal
): Promise {
if (!projectRef) throw new Error('projectRef is required')
if (!sourceId) throw new Error('sourceId is required')
+ const batchConfig =
+ maxFillMs !== undefined || maxSize !== undefined
+ ? { max_fill_ms: maxFillMs, max_size: maxSize }
+ : undefined
+
const { data, error } = await post('/platform/replication/{ref}/pipelines/validate', {
params: { path: { ref: projectRef } },
body: {
source_id: sourceId,
config: {
publication_name: publicationName,
- ...(maxFillMs !== undefined ? { batch: { max_fill_ms: maxFillMs } } : {}),
+ max_table_sync_workers: maxTableSyncWorkers,
+ batch: batchConfig,
},
},
signal,
diff --git a/packages/api-types/types/platform.d.ts b/packages/api-types/types/platform.d.ts
index 54861b411af6b..5831308e0896f 100644
--- a/packages/api-types/types/platform.d.ts
+++ b/packages/api-types/types/platform.d.ts
@@ -5383,11 +5383,13 @@ export interface components {
*/
max_fill_ms?: number
/**
- * @description Maximum batch size
+ * @description Maximum size of the batch
* @example 200
*/
max_size?: number
}
+ /** @description Maximum number of table sync workers */
+ max_table_sync_workers?: number
/**
* @description Publication name
* @example pub_orders
@@ -5411,11 +5413,13 @@ export interface components {
*/
max_fill_ms?: number
/**
- * @description Maximum batch size
+ * @description Maximum size of the batch
* @example 200
*/
max_size?: number
}
+ /** @description Maximum number of table sync workers */
+ max_table_sync_workers?: number
/**
* @description Publication name
* @example pub_orders
@@ -8600,11 +8604,13 @@ export interface components {
*/
max_fill_ms?: number
/**
- * @description Maximum batch size
+ * @description Maximum size of the batch
* @example 200
*/
max_size?: number
}
+ /** @description Maximum number of table sync workers */
+ max_table_sync_workers?: number
/**
* @description Publication name
* @example pub_orders
@@ -8660,11 +8666,13 @@ export interface components {
*/
max_fill_ms?: number
/**
- * @description Maximum batch size
+ * @description Maximum size of the batch
* @example 200
*/
max_size?: number
}
+ /** @description Maximum number of table sync workers */
+ max_table_sync_workers?: number
/**
* @description Publication name
* @example pub_orders
@@ -10257,11 +10265,13 @@ export interface components {
*/
max_fill_ms?: number
/**
- * @description Maximum batch size
+ * @description Maximum size of the batch
* @example 200
*/
max_size?: number
}
+ /** @description Maximum number of table sync workers */
+ max_table_sync_workers?: number
/**
* @description Publication name
* @example pub_orders
@@ -10285,11 +10295,13 @@ export interface components {
*/
max_fill_ms?: number
/**
- * @description Maximum batch size
+ * @description Maximum size of the batch
* @example 200
*/
max_size?: number
}
+ /** @description Maximum number of table sync workers */
+ max_table_sync_workers?: number
/**
* @description Publication name
* @example pub_orders
@@ -10581,12 +10593,12 @@ export interface components {
failure_type: 'critical' | 'warning'
/**
* @description Validation failure name
- * @example BigQuery Dataset Not Found
+ * @example Validation Failed
*/
name: string
/**
* @description Validation failure reason
- * @example Dataset 'my_dataset' does not exist in project 'my_project'
+ * @example The configuration is invalid
*/
reason: string
}[]
@@ -10602,12 +10614,12 @@ export interface components {
failure_type: 'critical' | 'warning'
/**
* @description Validation failure name
- * @example Publication Not Found
+ * @example Validation Failed
*/
name: string
/**
* @description Validation failure reason
- * @example Publication 'my_publication' does not exist in the source database
+ * @example The configuration is invalid
*/
reason: string
}[]
@@ -10694,11 +10706,13 @@ export interface components {
*/
max_fill_ms?: number
/**
- * @description Maximum batch size
+ * @description Maximum size of the batch
* @example 200
*/
max_size?: number
}
+ /** @description Maximum number of table sync workers */
+ max_table_sync_workers?: number
/**
* @description Publication name
* @example pub_orders