Skip to content

Commit 3fcb6cb

Browse files
add getNextCrudTransactionBatch
1 parent 9884e7b commit 3fcb6cb

File tree

3 files changed

+116
-2
lines changed

3 files changed

+116
-2
lines changed

.changeset/new-days-fry.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
'@powersync/common': minor
3+
'@powersync/web': minor
4+
'@powersync/node': minor
5+
'@powersync/react-native': minor
6+
---
7+
8+
Added `getNextCrudTransactionBatch` method which allows batch uploads for multiple transactions

packages/common/src/client/AbstractPowerSyncDatabase.ts

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ import {
99
UpdateNotification,
1010
isBatchedUpdateNotification
1111
} from '../db/DBAdapter.js';
12+
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
1213
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
1314
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
1415
import { Schema } from '../db/schema/Schema.js';
1516
import { BaseObserver } from '../utils/BaseObserver.js';
1617
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
17-
import { mutexRunExclusive } from '../utils/mutex.js';
1818
import { throttleTrailing } from '../utils/async.js';
19+
import { mutexRunExclusive } from '../utils/mutex.js';
1920
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
2021
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
2122
import { runOnSchemaChange } from './runOnSchemaChange.js';
@@ -32,7 +33,6 @@ import {
3233
type PowerSyncConnectionOptions,
3334
type RequiredAdditionalConnectionOptions
3435
} from './sync/stream/AbstractStreamingSyncImplementation.js';
35-
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
3636

3737
export interface DisconnectAndClearOptions {
3838
/** When set to false, data in local-only tables is preserved. */
@@ -98,6 +98,10 @@ export interface WatchOnChangeHandler {
9898
onError?: (error: Error) => void;
9999
}
100100

101+
export interface CrudTransactionBatchOptions {
102+
transactionLimit: number;
103+
}
104+
101105
export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
102106
initialized: () => void;
103107
schemaChanged: (schema: Schema) => void;
@@ -122,6 +126,11 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = {
122126
disconnect: true
123127
};
124128

129+
export const DEFAULT_CRUD_TRANSACTION_BATCH_LIMIT = 10;
130+
export const DEFAULT_CRUD_TRANSACTION_BATCH_OPTIONS: CrudTransactionBatchOptions = {
131+
transactionLimit: DEFAULT_CRUD_TRANSACTION_BATCH_LIMIT
132+
};
133+
125134
export const DEFAULT_WATCH_THROTTLE_MS = 30;
126135

127136
export const DEFAULT_POWERSYNC_DB_OPTIONS = {
@@ -580,6 +589,69 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
580589
);
581590
}
582591

592+
/**
593+
* Get a batch of CRUD data, grouped by transaction, to upload.
594+
*
595+
* Returns null if there is no data to upload.
596+
*
597+
* This allows for processing and completing multiple transactions at once.
598+
*
599+
* Use this from the {@link PowerSyncBackendConnector.uploadData} callback.
600+
*
601+
* Once the data have been successfully uploaded, call {@link CrudBatch.complete} before
602+
* requesting the next batch.
603+
*
604+
* Use {@link limit} to specify the maximum number of transactions to return in the batch.
605+
*
606+
* @param limit Maximum number of transactions to include in the batch
607+
* @returns A batch of CRUD operations to upload, or null if there are none
608+
*/
609+
async getNextCrudTransactionBatch(
610+
options: CrudTransactionBatchOptions = DEFAULT_CRUD_TRANSACTION_BATCH_OPTIONS
611+
): Promise<CrudBatch | null> {
612+
const { transactionLimit } = options;
613+
614+
// Transaction IDs are always incrementing
615+
// We can fetch the first transaction id and use that to limit the query
616+
// to the next batch of transactions
617+
const first = await this.getOptional<CrudEntryJSON>(`
618+
SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`);
619+
620+
if (!first) {
621+
return null;
622+
}
623+
624+
const items = await this.getAll<CrudEntryJSON>(
625+
`SELECT
626+
id, tx_id, data
627+
FROM ${PSInternalTable.CRUD}
628+
WHERE
629+
tx_id < ?
630+
ORDER BY
631+
id ASC
632+
`,
633+
[(first.tx_id ?? 0) + transactionLimit]
634+
);
635+
636+
if (items.length == 0) {
637+
return null;
638+
}
639+
640+
// check if there are more items for haveMore
641+
const nextItem = await this.getOptional<CrudEntryJSON>(
642+
`
643+
SELECT id FROM ${PSInternalTable.CRUD} WHERE id > ? LIMIT 1`,
644+
[items[items.length - 1].id]
645+
);
646+
647+
const crudEntries: CrudEntry[] = items.map((row) => CrudEntry.fromRow(row)) ?? [];
648+
const last = crudEntries[items.length - 1];
649+
650+
return new CrudBatch(crudEntries, !!nextItem, async (writeCheckpoint?: string) =>
651+
this.handleCrudCheckpoint(last.clientId, writeCheckpoint)
652+
);
653+
}
654+
583655
/**
584656
* Get the next recorded transaction to upload.
585657
*

packages/web/tests/uploads.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,39 @@ function describeCrudUploadTests(getDatabaseOptions: () => WebPowerSyncDatabaseO
142142

143143
expect(loggerSpy.mock.calls.find((logArgs) => logArgs[0].includes(PARTIAL_WARNING))).toBeUndefined;
144144
});
145+
146+
it('should returned batched transaction CRUD batches', async () => {
147+
const logger = createLogger('crud-logger');
148+
149+
const options = getDatabaseOptions();
150+
151+
const { powersync } = await generateConnectedDatabase({
152+
powerSyncOptions: {
153+
...options,
154+
logger
155+
}
156+
});
157+
158+
// Create some test transactions
159+
for (let i = 0; i < 10; i++) {
160+
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', [`user-${i}`]);
161+
}
162+
163+
const firstBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 3 });
164+
expect(firstBatch?.crud.length).eq(3);
165+
expect(firstBatch?.haveMore).true;
166+
167+
// completing the first batch should clear all the relevant crud items from the queue
168+
await firstBatch?.complete();
169+
170+
// This batch should contain all the remaining crud items
171+
const secondBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 10 });
172+
expect(secondBatch?.crud.length).eq(7);
173+
expect(secondBatch?.haveMore).false;
174+
await secondBatch?.complete();
175+
176+
const thirdBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 10 });
177+
expect(thirdBatch).toBeUndefined;
178+
});
145179
};
146180
}

0 commit comments

Comments
 (0)