diff --git a/.changeset/new-days-fry.md b/.changeset/new-days-fry.md new file mode 100644 index 00000000..3233130d --- /dev/null +++ b/.changeset/new-days-fry.md @@ -0,0 +1,8 @@ +--- +'@powersync/common': minor +'@powersync/web': minor +'@powersync/node': minor +'@powersync/react-native': minor +--- + +Added `getNextCrudTransactionBatch` method which allows batch uploads for multiple transactions diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index c78912c9..24c7a197 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -9,13 +9,14 @@ import { UpdateNotification, isBatchedUpdateNotification } from '../db/DBAdapter.js'; +import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js'; import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js'; import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js'; import { Schema } from '../db/schema/Schema.js'; import { BaseObserver } from '../utils/BaseObserver.js'; import { ControlledExecutor } from '../utils/ControlledExecutor.js'; -import { mutexRunExclusive } from '../utils/mutex.js'; import { throttleTrailing } from '../utils/async.js'; +import { mutexRunExclusive } from '../utils/mutex.js'; import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js'; import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js'; import { runOnSchemaChange } from './runOnSchemaChange.js'; @@ -32,7 +33,6 @@ import { type PowerSyncConnectionOptions, type RequiredAdditionalConnectionOptions } from './sync/stream/AbstractStreamingSyncImplementation.js'; -import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js'; export interface DisconnectAndClearOptions { /** When set to false, data in local-only tables is preserved. */ @@ -98,6 +98,10 @@ export interface WatchOnChangeHandler { onError?: (error: Error) => void; } +export interface CrudTransactionBatchOptions { + transactionLimit: number; +} + export interface PowerSyncDBListener extends StreamingSyncImplementationListener { initialized: () => void; schemaChanged: (schema: Schema) => void; @@ -122,6 +126,11 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = { disconnect: true }; +export const DEFAULT_CRUD_TRANSACTION_BATCH_LIMIT = 10; +export const DEFAULT_CRUD_TRANSACTION_BATCH_OPTIONS: CrudTransactionBatchOptions = { + transactionLimit: DEFAULT_CRUD_TRANSACTION_BATCH_LIMIT +}; + export const DEFAULT_WATCH_THROTTLE_MS = 30; export const DEFAULT_POWERSYNC_DB_OPTIONS = { @@ -580,6 +589,69 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { + const { transactionLimit } = options; + + // Transaction IDs are always incrementing + // We can fetch the first transaction id and use that to limit the query + // to the next batch of transactions + const first = await this.getOptional(` + SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`); + + if (!first) { + return null; + } + + const items = await this.getAll( + `SELECT + id, tx_id, data + FROM ${PSInternalTable.CRUD} + WHERE + tx_id < ? + ORDER BY + id ASC + `, + [(first.tx_id ?? 0) + transactionLimit] + ); + + if (items.length == 0) { + return null; + } + + // check if there are more items for haveMore + const nextItem = await this.getOptional( + ` + SELECT id FROM ${PSInternalTable.CRUD} WHERE id > ? LIMIT 1`, + [items[items.length - 1].id] + ); + + const crudEntries: CrudEntry[] = items.map((row) => CrudEntry.fromRow(row)) ?? []; + const last = crudEntries[items.length - 1]; + + return new CrudBatch(crudEntries, !!nextItem, async (writeCheckpoint?: string) => + this.handleCrudCheckpoint(last.clientId, writeCheckpoint) + ); + } + /** * Get the next recorded transaction to upload. * diff --git a/packages/web/tests/uploads.test.ts b/packages/web/tests/uploads.test.ts index 264b0767..bec9656e 100644 --- a/packages/web/tests/uploads.test.ts +++ b/packages/web/tests/uploads.test.ts @@ -142,5 +142,39 @@ function describeCrudUploadTests(getDatabaseOptions: () => WebPowerSyncDatabaseO expect(loggerSpy.mock.calls.find((logArgs) => logArgs[0].includes(PARTIAL_WARNING))).toBeUndefined; }); + + it('should returned batched transaction CRUD batches', async () => { + const logger = createLogger('crud-logger'); + + const options = getDatabaseOptions(); + + const { powersync } = await generateConnectedDatabase({ + powerSyncOptions: { + ...options, + logger + } + }); + + // Create some test transactions + for (let i = 0; i < 10; i++) { + await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', [`user-${i}`]); + } + + const firstBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 3 }); + expect(firstBatch?.crud.length).eq(3); + expect(firstBatch?.haveMore).true; + + // completing the first batch should clear all the relevant crud items from the queue + await firstBatch?.complete(); + + // This batch should contain all the remaining crud items + const secondBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 10 }); + expect(secondBatch?.crud.length).eq(7); + expect(secondBatch?.haveMore).false; + await secondBatch?.complete(); + + const thirdBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 10 }); + expect(thirdBatch).toBeUndefined; + }); }; }