Skip to content

WIP: Add sync implementation from core extension #599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions demos/example-node/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { once } from 'node:events';
import repl_factory from 'node:repl';

import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
import {
createBaseLogger,
createLogger,
PowerSyncDatabase,
SyncClientImplementation,
SyncStreamConnectionMethod
} from '@powersync/node';
import { exit } from 'node:process';
import { AppSchema, DemoConnector } from './powersync.js';

Expand All @@ -26,7 +32,16 @@ const main = async () => {
});
console.log(await db.get('SELECT powersync_rs_version();'));

await db.connect(new DemoConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
db.registerListener({
statusChanged(status) {
console.log('status changed', status);
}
});

await db.connect(new DemoConnector(), {
connectionMethod: SyncStreamConnectionMethod.HTTP,
clientImplementation: SyncClientImplementation.RUST
});
await db.waitForFirstSync();
console.log('First sync complete!');

Expand Down
1 change: 0 additions & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
"async-mutex": "^0.4.0",
"bson": "^6.6.0",
"buffer": "^6.0.3",
"can-ndjson-stream": "^1.0.2",
"cross-fetch": "^4.0.0",
"event-iterator": "^2.0.0",
"rollup": "4.14.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,9 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
* Get an unique client id.
*/
getClientId(): Promise<string>;

/**
* Invokes the `powersync_control` function for the sync client.
*/
control(op: string, payload: string | ArrayBuffer | null): Promise<string>;
}
7 changes: 7 additions & 0 deletions packages/common/src/client/sync/bucket/SqliteBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,13 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
async setTargetCheckpoint(checkpoint: Checkpoint) {
// No-op for now
}

async control(op: string, payload: string | ArrayBuffer | null): Promise<string> {
return await this.writeTransaction(async (tx) => {
const [[raw]] = await tx.executeRaw('SELECT powersync_control(?, ?)', [op, payload]);
return raw;
});
}
}

function hasMatchingPriority(priority: number, bucket: BucketChecksum) {
Expand Down
217 changes: 112 additions & 105 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { BSON } from 'bson';
import { Buffer } from 'buffer';
import ndjsonStream from 'can-ndjson-stream';
import { type fetch } from 'cross-fetch';
import Logger, { ILogger } from 'js-logger';
import { RSocket, RSocketConnector, Requestable } from 'rsocket-core';
Expand All @@ -22,7 +21,6 @@ const POWERSYNC_JS_VERSION = PACKAGE.version;

// Refresh at least 30 sec before it expires
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;

// Keep alive message is sent every period
const KEEP_ALIVE_MS = 20_000;
Expand Down Expand Up @@ -144,6 +142,10 @@ export abstract class AbstractRemote {
return this.credentials;
}

invalidateCredentials() {
this.credentials = null;
}

getUserAgent() {
return `powersync-js/${POWERSYNC_JS_VERSION}`;
}
Expand Down Expand Up @@ -205,36 +207,6 @@ export abstract class AbstractRemote {
return res.json();
}

async postStreaming(
path: string,
data: any,
headers: Record<string, string> = {},
signal?: AbortSignal
): Promise<any> {
const request = await this.buildRequest(path);

const res = await this.fetch(request.url, {
method: 'POST',
headers: { ...headers, ...request.headers },
body: JSON.stringify(data),
signal,
cache: 'no-store'
}).catch((ex) => {
this.logger.error(`Caught ex when POST streaming to ${path}`, ex);
throw ex;
});

if (!res.ok) {
const text = await res.text();
this.logger.error(`Could not POST streaming to ${path} - ${res.status} - ${res.statusText}: ${text}`);
const error: any = new Error(`HTTP ${res.statusText}: ${text}`);
error.status = res.status;
throw error;
}

return res;
}

/**
* Provides a BSON implementation. The import nature of this varies depending on the platform
*/
Expand All @@ -245,16 +217,45 @@ export abstract class AbstractRemote {
}

/**
* Connects to the sync/stream websocket endpoint
* Connects to the sync/stream websocket endpoint and delivers sync lines by decoding the BSON events
* sent by the server.
*/
async socketStream(options: SocketSyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const bson = await this.getBSON();
return await this.socketStreamInternal(options, bson);
}

/**
* Connects to the sync/stream websocket endpoint without decoding BSON in JavaScript.
*/
async socketStreamRaw(options: SocketSyncStreamOptions): Promise<DataStream<Buffer<ArrayBuffer>>> {
return this.socketStreamInternal(options);
}

/**
* Returns a data stream of sync line data.
*
* @param bson A BSON encoder and decoder. When set, the data stream will emit parsed instances of
* {@link StreamingSyncLine}. Otherwise, unparsed buffers will be emitted instead.
*/
private async socketStreamInternal(options: SocketSyncStreamOptions, bson?: typeof BSON): Promise<DataStream> {
const { path, fetchStrategy = FetchStrategy.Buffered } = options;
const mimeType = bson == null ? 'application/json' : 'application/bson';

function toBuffer(js: any): Buffer {
let contents: any;
if (bson != null) {
contents = bson.serialize(js);
} else {
contents = JSON.stringify(js);
}

return Buffer.from(contents);
}

const syncQueueRequestSize = fetchStrategy == FetchStrategy.Buffered ? 10 : 1;
const request = await this.buildRequest(path);

const bson = await this.getBSON();

// Add the user agent in the setup payload - we can't set custom
// headers with websockets on web. The browser userAgent is however added
// automatically as a header.
Expand All @@ -268,16 +269,14 @@ export abstract class AbstractRemote {
setup: {
keepAlive: KEEP_ALIVE_MS,
lifetime: KEEP_ALIVE_LIFETIME_MS,
dataMimeType: 'application/bson',
metadataMimeType: 'application/bson',
dataMimeType: mimeType,
metadataMimeType: mimeType,
payload: {
data: null,
metadata: Buffer.from(
bson.serialize({
token: request.headers.Authorization,
user_agent: userAgent
})
)
metadata: toBuffer({
token: request.headers.Authorization,
user_agent: userAgent
})
}
}
});
Expand All @@ -296,7 +295,7 @@ export abstract class AbstractRemote {
const stream = new DataStream({
logger: this.logger,
pressure: {
lowWaterMark: SYNC_QUEUE_REQUEST_LOW_WATER
lowWaterMark: Math.max(1, Math.round(syncQueueRequestSize * 0.7))
}
});

Expand Down Expand Up @@ -325,12 +324,10 @@ export abstract class AbstractRemote {

const res = rsocket.requestStream(
{
data: Buffer.from(bson.serialize(options.data)),
metadata: Buffer.from(
bson.serialize({
path
})
)
data: toBuffer(options.data),
metadata: toBuffer({
path
})
},
syncQueueRequestSize, // The initial N amount
{
Expand Down Expand Up @@ -360,8 +357,7 @@ export abstract class AbstractRemote {
return;
}

const deserializedData = bson.deserialize(data);
stream.enqueueData(deserializedData);
stream.enqueueData(bson != null ? bson.deserialize(data) : data);
},
onComplete: () => {
stream.close();
Expand Down Expand Up @@ -404,6 +400,40 @@ export abstract class AbstractRemote {
* Connects to the sync/stream http endpoint
*/
async postStream(options: SyncStreamOptions): Promise<DataStream<StreamingSyncLine>> {
const jsonS = await this.postStreamRaw(options);

const stream = new DataStream({
logger: this.logger
});

const r = jsonS.getReader();

const l = stream.registerListener({
lowWater: async () => {
try {
const { done, value } = await r.read();
// Exit if we're done
if (done) {
stream.close();
l?.();
return;
}
stream.enqueueData(JSON.parse(value));
} catch (ex) {
stream.close();
throw ex;
}
},
closed: () => {
r.cancel();
l?.();
}
});

return stream;
}

async postStreamRaw(options: SyncStreamOptions): Promise<ReadableStream<string>> {
const { data, path, headers, abortSignal } = options;

const request = await this.buildRequest(path);
Expand Down Expand Up @@ -458,11 +488,8 @@ export abstract class AbstractRemote {
throw error;
}

/**
* The can-ndjson-stream does not handle aborted streams well.
* This will intercept the readable stream and close the stream if
* aborted.
*/
// Create a new stream splitting the response at line endings while also handling cancellations
// by closing the reader.
const reader = res.body.getReader();
// This will close the network request and read stream
const closeReader = async () => {
Expand All @@ -478,63 +505,43 @@ export abstract class AbstractRemote {
closeReader();
});

const outputStream = new ReadableStream({
start: (controller) => {
const processStream = async () => {
while (!abortSignal?.aborted) {
try {
const { done, value } = await reader.read();
// When no more data needs to be consumed, close the stream
if (done) {
break;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
} catch (ex) {
this.logger.error('Caught exception when reading sync stream', ex);
break;
const decoder = new TextDecoder();
let buffer = '';

const outputStream = new ReadableStream<string>({
pull: async (controller) => {
let didCompleteLine = false;

while (!didCompleteLine) {
const { done, value } = await reader.read();
if (done) {
const remaining = buffer.trim();
if (remaining.length != 0) {
controller.enqueue(remaining);
}
}
if (!abortSignal?.aborted) {
// Close the downstream readable stream

controller.close();
await closeReader();
return;
}
controller.close();
};
processStream();
}
});

const jsonS = ndjsonStream(outputStream);

const stream = new DataStream({
logger: this.logger
});

const r = jsonS.getReader();
const data = decoder.decode(value, { stream: true });
buffer += data;

const l = stream.registerListener({
lowWater: async () => {
try {
const { done, value } = await r.read();
// Exit if we're done
if (done) {
stream.close();
l?.();
return;
const lines = buffer.split('\n');
for (var i = 0; i < lines.length - 1; i++) {
var l = lines[i].trim();
if (l.length > 0) {
controller.enqueue(l);
didCompleteLine = true;
}
}
stream.enqueueData(value);
} catch (ex) {
stream.close();
throw ex;

buffer = lines[lines.length - 1];
}
},
closed: () => {
closeReader();
l?.();
}
});

return stream;
return outputStream;
}
}
Loading
Loading