Skip to content

[Feature] getNextCrudTransactionBatch #593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/new-days-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/common': minor
'@powersync/web': minor
'@powersync/node': minor
'@powersync/react-native': minor
---

Added `getNextCrudTransactionBatch` method which allows batch uploads for multiple transactions
76 changes: 74 additions & 2 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import {
UpdateNotification,
isBatchedUpdateNotification
} from '../db/DBAdapter.js';
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';
import { SyncPriorityStatus, SyncStatus } from '../db/crud/SyncStatus.js';
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js';
import { Schema } from '../db/schema/Schema.js';
import { BaseObserver } from '../utils/BaseObserver.js';
import { ControlledExecutor } from '../utils/ControlledExecutor.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { throttleTrailing } from '../utils/async.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { SQLOpenFactory, SQLOpenOptions, isDBAdapter, isSQLOpenFactory, isSQLOpenOptions } from './SQLOpenFactory.js';
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js';
import { runOnSchemaChange } from './runOnSchemaChange.js';
Expand All @@ -32,7 +33,6 @@ import {
type PowerSyncConnectionOptions,
type RequiredAdditionalConnectionOptions
} from './sync/stream/AbstractStreamingSyncImplementation.js';
import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down Expand Up @@ -98,6 +98,10 @@ export interface WatchOnChangeHandler {
onError?: (error: Error) => void;
}

export interface CrudTransactionBatchOptions {
transactionLimit: number;
}

export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
initialized: () => void;
schemaChanged: (schema: Schema) => void;
Expand All @@ -122,6 +126,11 @@ export const DEFAULT_POWERSYNC_CLOSE_OPTIONS: PowerSyncCloseOptions = {
disconnect: true
};

export const DEFAULT_CRUD_TRANSACTION_BATCH_LIMIT = 10;
export const DEFAULT_CRUD_TRANSACTION_BATCH_OPTIONS: CrudTransactionBatchOptions = {
transactionLimit: DEFAULT_CRUD_TRANSACTION_BATCH_LIMIT
};

export const DEFAULT_WATCH_THROTTLE_MS = 30;

export const DEFAULT_POWERSYNC_DB_OPTIONS = {
Expand Down Expand Up @@ -580,6 +589,69 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
);
}

/**
* Get a batch of CRUD data, grouped by transaction, to upload.
*
* Returns null if there is no data to upload.
*
* This allows for processing and completing multiple transactions at once.
*
* Use this from the {@link PowerSyncBackendConnector.uploadData} callback.
*
* Once the data have been successfully uploaded, call {@link CrudBatch.complete} before
* requesting the next batch.
*
* Use {@link limit} to specify the maximum number of transactions to return in the batch.
*
* @param limit Maximum number of transactions to include in the batch
* @returns A batch of CRUD operations to upload, or null if there are none
*/
async getNextCrudTransactionBatch(
options: CrudTransactionBatchOptions = DEFAULT_CRUD_TRANSACTION_BATCH_OPTIONS
): Promise<CrudBatch | null> {
const { transactionLimit } = options;

// Transaction IDs are always incrementing
// We can fetch the first transaction id and use that to limit the query
// to the next batch of transactions
const first = await this.getOptional<CrudEntryJSON>(`
SELECT id, tx_id, data FROM ${PSInternalTable.CRUD} ORDER BY id ASC LIMIT 1`);

if (!first) {
return null;
}

const items = await this.getAll<CrudEntryJSON>(
`SELECT
id, tx_id, data
FROM ${PSInternalTable.CRUD}
WHERE
tx_id < ?
ORDER BY
id ASC
`,
[(first.tx_id ?? 0) + transactionLimit]
);

if (items.length == 0) {
return null;
}

// check if there are more items for haveMore
const nextItem = await this.getOptional<CrudEntryJSON>(
`
SELECT id FROM ${PSInternalTable.CRUD} WHERE id > ? LIMIT 1`,
[items[items.length - 1].id]
);

const crudEntries: CrudEntry[] = items.map((row) => CrudEntry.fromRow(row)) ?? [];
const last = crudEntries[items.length - 1];

return new CrudBatch(crudEntries, !!nextItem, async (writeCheckpoint?: string) =>
this.handleCrudCheckpoint(last.clientId, writeCheckpoint)
);
}

/**
* Get the next recorded transaction to upload.
*
Expand Down
34 changes: 34 additions & 0 deletions packages/web/tests/uploads.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,39 @@ function describeCrudUploadTests(getDatabaseOptions: () => WebPowerSyncDatabaseO

expect(loggerSpy.mock.calls.find((logArgs) => logArgs[0].includes(PARTIAL_WARNING))).toBeUndefined;
});

it('should returned batched transaction CRUD batches', async () => {
const logger = createLogger('crud-logger');

const options = getDatabaseOptions();

const { powersync } = await generateConnectedDatabase({
powerSyncOptions: {
...options,
logger
}
});

// Create some test transactions
for (let i = 0; i < 10; i++) {
await powersync.execute('INSERT into users (id, name) VALUES (uuid(), ?)', [`user-${i}`]);
}

const firstBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 3 });
expect(firstBatch?.crud.length).eq(3);
expect(firstBatch?.haveMore).true;

// completing the first batch should clear all the relevant crud items from the queue
await firstBatch?.complete();

// This batch should contain all the remaining crud items
const secondBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 10 });
expect(secondBatch?.crud.length).eq(7);
expect(secondBatch?.haveMore).false;
await secondBatch?.complete();

const thirdBatch = await powersync.getNextCrudTransactionBatch({ transactionLimit: 10 });
expect(thirdBatch).toBeUndefined;
});
};
}