Skip to content

Some minor drive-by fixes and cleanups for the electric collection. #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 23 additions & 38 deletions packages/db-collections/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,8 @@ function isUpToDateMessage<T extends Row<unknown>>(
// Check if a message contains txids in its headers
function hasTxids<T extends Row<unknown>>(
message: Message<T>
): message is Message<T> & { headers: { txids?: Array<number> } } {
return (
`headers` in message &&
`txids` in message.headers &&
Array.isArray(message.headers.txids)
)
): message is Message<T> & { headers: { txids?: Array<string> } } {
return `txids` in message.headers && Array.isArray(message.headers.txids)
}

/**
Expand Down Expand Up @@ -149,7 +145,7 @@ export function electricCollectionOptions<
TSchema extends StandardSchemaV1 = never,
TFallback extends Row<unknown> = Row<unknown>,
>(config: ElectricCollectionConfig<TExplicit, TSchema, TFallback>) {
const seenTxids = new Store<Set<string>>(new Set([`${Math.random()}`]))
const seenTxids = new Store<Set<string>>(new Set([]))
const sync = createElectricSync<ResolveType<TExplicit, TSchema, TFallback>>(
config.shapeOptions,
{
Expand All @@ -165,7 +161,7 @@ export function electricCollectionOptions<
*/
const awaitTxId: AwaitTxIdFn = async (
txId: string,
timeout = 30000
timeout: number = 30000
): Promise<boolean> => {
if (typeof txId !== `string`) {
throw new TypeError(
Expand Down Expand Up @@ -246,18 +242,14 @@ export function electricCollectionOptions<
ResolveType<TExplicit, TSchema, TFallback>
>
) => {
// Runtime check (that doesn't follow type)
// eslint-disable-next-line
const handlerResult = (await config.onDelete!(params)) ?? {}
const txid = (handlerResult as { txid?: string }).txid

if (!txid) {
const handlerResult = await config.onDelete!(params)
if (!handlerResult.txid) {
throw new Error(
`Electric collection onDelete handler must return a txid`
)
}

await awaitTxId(txid)
await awaitTxId(handlerResult.txid)
return handlerResult
}
: undefined
Expand Down Expand Up @@ -333,43 +325,37 @@ function createElectricSync<T extends Row<unknown>>(
signal: abortController.signal,
})
let transactionStarted = false
let newTxids = new Set<string>()
const newTxids = new Set<string>()

unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
let hasUpToDate = false

for (const message of messages) {
// Check for txids in the message and add them to our store
if (hasTxids(message) && message.headers.txids) {
message.headers.txids.forEach((txid) => newTxids.add(String(txid)))
if (hasTxids(message)) {
message.headers.txids?.forEach((txid) => newTxids.add(txid))
}

// Check if the message contains schema information
if (isChangeMessage(message) && message.headers.schema) {
// Store the schema for future use if it's a valid string
if (typeof message.headers.schema === `string`) {
const schema: string = message.headers.schema
if (isChangeMessage(message)) {
// Check if the message contains schema information
const schema = message.headers.schema
if (schema && typeof schema === `string`) {
// Store the schema for future use if it's a valid string
relationSchema.setState(() => schema)
}
}

if (isChangeMessage(message)) {
if (!transactionStarted) {
begin()
transactionStarted = true
}

const value = message.value as unknown as T

// Include the primary key and relation info in the metadata
const enhancedMetadata = {
...message.headers,
}

write({
type: message.headers.operation,
value,
metadata: enhancedMetadata,
value: message.value,
// Include the primary key and relation info in the metadata
metadata: {
...message.headers,
},
})
} else if (isUpToDateMessage(message)) {
hasUpToDate = true
Expand All @@ -390,10 +376,9 @@ function createElectricSync<T extends Row<unknown>>(

// Always commit txids when we receive up-to-date, regardless of transaction state
seenTxids.setState((currentTxids) => {
const clonedSeen = new Set(currentTxids)
newTxids.forEach((txid) => clonedSeen.add(String(txid)))

newTxids = new Set()
const clonedSeen = new Set<string>(currentTxids)
newTxids.forEach((txid) => clonedSeen.add(txid))
newTxids.clear()
return clonedSeen
})
}
Expand Down
6 changes: 3 additions & 3 deletions packages/db-collections/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ describe(`Electric Integration`, () => {
value: value.value as Row,
headers: {
operation: `insert`,
txids: [Number(txid)], // Convert to number as the API expects numbers but our code converts to strings
txids: [txid],
},
})
}
Expand Down Expand Up @@ -662,7 +662,7 @@ describe(`Electric Integration`, () => {
value: { id: 1, name: `Test` },
headers: {
operation: `insert`,
txids: [100, 200],
txids: [`100`, `200`],
},
},
{
Expand Down Expand Up @@ -959,7 +959,7 @@ describe(`Electric Integration`, () => {
{
headers: {
control: `up-to-date`,
txids: [300, 400],
txids: [`300`, `400`],
},
},
])
Expand Down