Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .changeset/some-wombats-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
"@tanstack/powersync-db-collection": patch
---

Added support for tracking collection operation metadata in PowerSync CrudEntry operations.

```typescript
// Schema config
const APP_SCHEMA = new Schema({
documents: new Table(
{
name: column.text,
author: column.text,
created_at: column.text,
},
{
// Metadata tracking must be enabled on the PowerSync table
trackMetadata: true,
}
),
})

// ... Other config

// Collection operations which specify metadata
await collection.insert(
{
id,
name: `document`,
author: `Foo`,
},
// The string version of this will be present in PowerSync `CrudEntry`s during uploads
{
metadata: {
extraInfo: "Info",
},
}
)
```
103 changes: 103 additions & 0 deletions docs/collections/powersync-collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,109 @@ task.due_date.getTime() // OK - TypeScript knows this is a Date

Updates to the collection are applied optimistically to the local state first, then synchronized with PowerSync and the backend. If an error occurs during sync, the changes are automatically rolled back.

### Metadata Tracking

Metadata tracking allows attaching custom metadata to collection operations (insert, update, delete). This metadata is persisted alongside the operation and available in PowerSync `CrudEntry` records during upload processing. This is useful for passing additional context about mutations to the backend, such as audit information, operation sources, or custom processing hints.

#### Enabling Metadata Tracking

Metadata tracking must be enabled on the PowerSync table:

```typescript
const APP_SCHEMA = new Schema({
documents: new Table(
{
name: column.text,
author: column.text,
},
{
// Enable metadata tracking on this table
trackMetadata: true,
}
),
})
```

#### Using Metadata in Operations

Once enabled, metadata can be passed to any collection operation:

```typescript
const documents = createCollection(
powerSyncCollectionOptions({
database: db,
table: APP_SCHEMA.props.documents,
})
)

// Insert with metadata
await documents.insert(
{
id: crypto.randomUUID(),
name: "Report Q4",
author: "Jane Smith",
},
{
metadata: {
source: "web-app",
userId: "user-123",
timestamp: Date.now(),
},
}
).isPersisted.promise

// Update with metadata
await documents.update(
docId,
{ metadata: { reason: "typo-fix", editor: "user-456" } },
(doc) => {
doc.name = "Report Q4 (Updated)"
}
).isPersisted.promise

// Delete with metadata
await documents.delete(docId, {
metadata: { deletedBy: "user-789", reason: "duplicate" },
}).isPersisted.promise
```

#### Accessing Metadata During Upload

The metadata is available in PowerSync `CrudEntry` records when processing uploads in the connector:

```typescript
import { CrudEntry } from "@powersync/web"

class Connector implements PowerSyncBackendConnector {
// ...

async uploadData(database: AbstractPowerSyncDatabase) {
const batch = await database.getCrudBatch()
if (!batch) return

for (const entry of batch.crud) {
console.log("Operation:", entry.op) // PUT, PATCH, DELETE
console.log("Table:", entry.table)
console.log("Data:", entry.opData)
console.log("Metadata:", entry.metadata) // Custom metadata (stringified)

// Parse metadata if needed
if (entry.metadata) {
const meta = JSON.parse(entry.metadata)
console.log("Source:", meta.source)
console.log("User ID:", meta.userId)
}

// Process the operation with the backend...
}

await batch.complete()
}
}
```

**Note**: If metadata is provided to an operation but the table doesn't have `trackMetadata: true`, a warning will be logged and the metadata will be ignored.

## Configuration Options

The `powerSyncCollectionOptions` function accepts the following options:
Expand Down
106 changes: 85 additions & 21 deletions packages/powersync-db-collection/src/PowerSyncTransactor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { sanitizeSQL } from "@powersync/common"
import DebugModule from "debug"
import { asPowerSyncRecord, mapOperationToPowerSync } from "./helpers"
import { PendingOperationStore } from "./PendingOperationStore"
import { asPowerSyncRecord, mapOperationToPowerSync } from "./helpers"
import type { AbstractPowerSyncDatabase, LockContext } from "@powersync/common"
import type { PendingMutation, Transaction } from "@tanstack/db"
import type { EnhancedPowerSyncCollectionConfig } from "./definitions"
import type { PendingOperation } from "./PendingOperationStore"
import type {
EnhancedPowerSyncCollectionConfig,
PowerSyncCollectionMeta,
} from "./definitions"

const debug = DebugModule.debug(`ts/db:powersync`)

Expand Down Expand Up @@ -160,6 +163,13 @@ export class PowerSyncTransactor {
async (tableName, mutation, serializeValue) => {
const values = serializeValue(mutation.modified)
const keys = Object.keys(values).map((key) => sanitizeSQL`${key}`)
const queryParameters = Object.values(values)

const metadataValue = this.processMutationMetadata(mutation)
if (metadataValue != null) {
keys.push(`_metadata`)
queryParameters.push(metadataValue)
}

await context.execute(
`
Expand All @@ -168,7 +178,7 @@ export class PowerSyncTransactor {
VALUES
(${keys.map((_) => `?`).join(`, `)})
`,
Object.values(values)
queryParameters
)
}
)
Expand All @@ -188,14 +198,21 @@ export class PowerSyncTransactor {
async (tableName, mutation, serializeValue) => {
const values = serializeValue(mutation.modified)
const keys = Object.keys(values).map((key) => sanitizeSQL`${key}`)
const queryParameters = Object.values(values)

const metadataValue = this.processMutationMetadata(mutation)
if (metadataValue != null) {
keys.push(`_metadata`)
queryParameters.push(metadataValue)
}

await context.execute(
`
UPDATE ${tableName}
SET ${keys.map((key) => `${key} = ?`).join(`, `)}
WHERE id = ?
`,
[...Object.values(values), asPowerSyncRecord(mutation.modified).id]
[...queryParameters, asPowerSyncRecord(mutation.modified).id]
)
}
)
Expand All @@ -213,12 +230,26 @@ export class PowerSyncTransactor {
context,
waitForCompletion,
async (tableName, mutation) => {
await context.execute(
`
DELETE FROM ${tableName} WHERE id = ?
`,
[asPowerSyncRecord(mutation.original).id]
)
const metadataValue = this.processMutationMetadata(mutation)
if (metadataValue != null) {
/**
* Delete operations with metadata require a different approach to handle metadata.
* This will delete the record.
*/
await context.execute(
`
UPDATE ${tableName} SET _deleted = TRUE, _metadata = ? WHERE id = ?
`,
[metadataValue, asPowerSyncRecord(mutation.original).id]
)
} else {
await context.execute(
`
DELETE FROM ${tableName} WHERE id = ?
`,
[asPowerSyncRecord(mutation.original).id]
)
}
}
)
}
Expand All @@ -239,17 +270,8 @@ export class PowerSyncTransactor {
serializeValue: (value: any) => Record<string, unknown>
) => Promise<void>
): Promise<PendingOperation | null> {
if (
typeof (mutation.collection.config as any).utils?.getMeta != `function`
) {
throw new Error(`Could not get tableName from mutation's collection config.
The provided mutation might not have originated from PowerSync.`)
}

const { tableName, trackedTableName, serializeValue } = (
mutation.collection
.config as unknown as EnhancedPowerSyncCollectionConfig<any>
).utils.getMeta()
const { tableName, trackedTableName, serializeValue } =
this.getMutationCollectionMeta(mutation)

await handler(sanitizeSQL`${tableName}`, mutation, serializeValue)

Expand All @@ -268,4 +290,46 @@ export class PowerSyncTransactor {
timestamp: diffOperation.timestamp,
}
}

protected getMutationCollectionMeta(
mutation: PendingMutation<any>
): PowerSyncCollectionMeta<any> {
if (
typeof (mutation.collection.config as any).utils?.getMeta != `function`
) {
throw new Error(`Collection is not a PowerSync collection.`)
}
return (
mutation.collection
.config as unknown as EnhancedPowerSyncCollectionConfig<any>
).utils.getMeta()
}

/**
* Processes collection mutation metadata for persistence to the database.
* We only support storing string metadata.
* @returns null if no metadata should be stored.
*/
protected processMutationMetadata(
mutation: PendingMutation<any>
): string | null {
const { metadataIsTracked } = this.getMutationCollectionMeta(mutation)
if (!metadataIsTracked) {
// If it's not supported, we don't store metadata.
if (typeof mutation.metadata != `undefined`) {
// Log a warning if metadata is provided but not tracked.
this.database.logger.warn(
`Metadata provided for collection ${mutation.collection.id} but the PowerSync table does not track metadata. The PowerSync table should be configured with trackMetadata: true.`,
mutation.metadata
)
}
return null
} else if (typeof mutation.metadata == `undefined`) {
return null
} else if (typeof mutation.metadata == `string`) {
return mutation.metadata
} else {
return JSON.stringify(mutation.metadata)
}
}
}
5 changes: 5 additions & 0 deletions packages/powersync-db-collection/src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ export type PowerSyncCollectionMeta<TTable extends Table = Table> = {
* Serializes a collection value to the SQLite type
*/
serializeValue: (value: any) => ExtractedTable<TTable>

/**
* Whether the PowerSync table tracks metadata.
*/
metadataIsTracked: boolean
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/powersync-db-collection/src/powersync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ export function powerSyncCollectionOptions<
// The collection output type
type OutputType = InferPowerSyncOutputType<TTable, TSchema>

const { viewName } = table
const { viewName, trackMetadata: metadataIsTracked } = table

/**
* Deserializes data from the incoming sync stream
Expand Down Expand Up @@ -459,6 +459,7 @@ export function powerSyncCollectionOptions<
getMeta: () => ({
tableName: viewName,
trackedTableName,
metadataIsTracked,
serializeValue: (value) =>
serializeForSQLite(
value,
Expand Down
Loading
Loading