From 809618a754b53764b4433d1bfff878810bf73f28 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Tue, 13 May 2025 13:46:11 +0200 Subject: [PATCH 1/7] Expanded credentials API getCredentials, prefetchCredentials, fetchCredentials, invalidateCredentials. --- .../src/client/sync/stream/AbstractRemote.ts | 56 ++++++++++++++++--- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 79907443..a40e4568 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -130,18 +130,60 @@ export abstract class AbstractRemote { : fetchImplementation; } + /** + * Get credentials currently cached, or fetch new credentials if none are + * available. + * + * These credentials may have expired already. + */ async getCredentials(): Promise { - const { expiresAt } = this.credentials ?? {}; - if (expiresAt && expiresAt > new Date(new Date().valueOf() + REFRESH_CREDENTIALS_SAFETY_PERIOD_MS)) { - return this.credentials!; + if (this.credentials) { + return this.credentials; } - this.credentials = await this.connector.fetchCredentials(); - if (this.credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) { + + return this.prefetchCredentials(); + } + + /** + * Fetch a new set of credentials and cache it. + * + * Until this call succeeds, `getCredentials` will still return the + * old credentials. + * + * This may be called before the current credentials have expired. + */ + async prefetchCredentials() { + this.credentials = await this.fetchCredentials(); + + return this.credentials; + } + + /** + * Get credentials for PowerSync. + * + * This should always fetch a fresh set of credentials - don't use cached + * values. + * + * + */ + async fetchCredentials() { + const credentials = await this.connector.fetchCredentials(); + if (credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) { throw new Error( - `A trailing forward slash "/" was found in the fetchCredentials endpoint: "${this.credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.` + `A trailing forward slash "/" was found in the fetchCredentials endpoint: "${credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.` ); } - return this.credentials; + + return credentials; + } + + /*** + * Immediately invalidate credentials. + * + * This may be called when the current credentials have expired. + */ + invalidateCredentials() { + this.credentials = null; } getUserAgent() { From 801306a46bb74a9bc22676e57cd6d5d702e3682e Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 14 May 2025 10:44:06 +0200 Subject: [PATCH 2/7] Invalidate credentials on 401. --- .../src/client/sync/stream/AbstractRemote.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index a40e4568..9e020016 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -163,8 +163,6 @@ export abstract class AbstractRemote { * * This should always fetch a fresh set of credentials - don't use cached * values. - * - * */ async fetchCredentials() { const credentials = await this.connector.fetchCredentials(); @@ -223,6 +221,10 @@ export abstract class AbstractRemote { body: JSON.stringify(data) }); + if (res.status === 401) { + this.invalidateCredentials(); + } + if (!res.ok) { throw new Error(`Received ${res.status} - ${res.statusText} when posting to ${path}: ${await res.text()}}`); } @@ -240,6 +242,10 @@ export abstract class AbstractRemote { } }); + if (res.status === 401) { + this.invalidateCredentials(); + } + if (!res.ok) { throw new Error(`Received ${res.status} - ${res.statusText} when getting from ${path}: ${await res.text()}}`); } @@ -266,6 +272,10 @@ export abstract class AbstractRemote { throw ex; }); + if (res.status === 401) { + this.invalidateCredentials(); + } + if (!res.ok) { const text = await res.text(); this.logger.error(`Could not POST streaming to ${path} - ${res.status} - ${res.statusText}: ${text}`); From 211a37cfc3f68a1135a8f40ca82026877febc6cf Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Wed, 14 May 2025 16:16:18 +0200 Subject: [PATCH 3/7] WIP handling credential errors kicked off from websockets. --- .../src/client/sync/stream/AbstractRemote.ts | 17 +++++++++++++++-- .../AbstractStreamingSyncImplementation.ts | 9 +++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 9e020016..01ee5788 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -312,10 +312,19 @@ export abstract class AbstractRemote { // automatically as a header. const userAgent = this.getUserAgent(); + let socketCreationError: Error | undefined; + const connector = new RSocketConnector({ transport: new WebsocketClientTransport({ url: this.options.socketUrlTransformer(request.url), - wsCreator: (url) => this.createSocket(url) + wsCreator: (url) => { + const s = this.createSocket(url); + s.addEventListener('error', (e: Event) => { + socketCreationError = new Error('Failed to create connection to websocket: ', (e.target as any).url ?? ''); + this.logger.warn('Socket error', e); + }); + return s; + } }), setup: { keepAlive: KEEP_ALIVE_MS, @@ -342,7 +351,7 @@ export abstract class AbstractRemote { * On React native the connection exception can be `undefined` this causes issues * with detecting the exception inside async-mutex */ - throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex)}`); + throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex ?? socketCreationError)}`); } const stream = new DataStream({ @@ -387,6 +396,10 @@ export abstract class AbstractRemote { syncQueueRequestSize, // The initial N amount { onError: (e) => { + if (e.message.includes('PSYNC_S2101')) { + this.logger.error('PSYNC_S2101 - 401 Unauthorized'); + this.invalidateCredentials(); + } // Don't log closed as an error if (e.message !== 'Closed. ') { this.logger.error(e); diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index a6179a03..5ac50b02 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -683,12 +683,13 @@ The next upload iteration will be delayed.`); if (remaining_seconds == 0) { // Connection would be closed automatically right after this this.logger.debug('Token expiring; reconnect'); - /** - * For a rare case where the backend connector does not update the token - * (uses the same one), this should have some delay. - */ + this.options.remote.invalidateCredentials(); + await this.delayRetry(); return; + } else if (remaining_seconds < 30) { + // Pre-emptively refresh the token + await this.options.remote.prefetchCredentials(); } this.triggerCrudUpload(); } else { From 88ee962d5cd6adacb32a3c5849373d53a9a41846 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Fri, 16 May 2025 10:16:40 +0200 Subject: [PATCH 4/7] Calling invalidate credentials over comlink, so that worker informs tab's remote connector about invalidation. --- packages/common/src/client/sync/stream/AbstractRemote.ts | 5 +++-- .../sync/stream/AbstractStreamingSyncImplementation.ts | 3 --- .../src/db/sync/SharedWebStreamingSyncImplementation.ts | 4 ++++ .../src/worker/sync/AbstractSharedSyncClientProvider.ts | 1 + packages/web/src/worker/sync/SharedSyncImplementation.ts | 9 +++++++++ 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 01ee5788..54e427a3 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -15,6 +15,7 @@ export type BSONImplementation = typeof BSON; export type RemoteConnector = { fetchCredentials: () => Promise; + invalidateCredentials?: () => void; }; const POWERSYNC_TRAILING_SLASH_MATCH = /\/+$/; @@ -182,6 +183,7 @@ export abstract class AbstractRemote { */ invalidateCredentials() { this.credentials = null; + this.connector.invalidateCredentials?.(); } getUserAgent() { @@ -396,8 +398,7 @@ export abstract class AbstractRemote { syncQueueRequestSize, // The initial N amount { onError: (e) => { - if (e.message.includes('PSYNC_S2101')) { - this.logger.error('PSYNC_S2101 - 401 Unauthorized'); + if (e.message.includes('Authorization failed') || e.message.includes('401')) { this.invalidateCredentials(); } // Don't log closed as an error diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 5ac50b02..6d6cdc5d 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -687,9 +687,6 @@ The next upload iteration will be delayed.`); await this.delayRetry(); return; - } else if (remaining_seconds < 30) { - // Pre-emptively refresh the token - await this.options.remote.prefetchCredentials(); } this.triggerCrudUpload(); } else { diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index dadb1276..3dad3192 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -31,6 +31,10 @@ class SharedSyncClientProvider extends AbstractSharedSyncClientProvider { return Comlink.transfer(port, [port]); } + invalidateCredentials() { + this.options.remote.invalidateCredentials(); + } + async fetchCredentials(): Promise { const credentials = await this.options.remote.getCredentials(); if (credentials == null) { diff --git a/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts b/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts index 2713f13f..c33c5ffc 100644 --- a/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts +++ b/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts @@ -5,6 +5,7 @@ import type { PowerSyncCredentials, SyncStatusOptions } from '@powersync/common' */ export abstract class AbstractSharedSyncClientProvider { abstract fetchCredentials(): Promise; + abstract invalidateCredentials(): void; abstract uploadCrud(): Promise; abstract statusChanged(status: SyncStatusOptions): void; abstract getDBWorkerPort(): Promise; diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index af2d96d7..60e2e1c0 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -308,6 +308,15 @@ export class SharedSyncImplementation adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger), remote: new WebRemote( { + invalidateCredentials: async () => { + const lastPort = this.ports[this.ports.length - 1]; + try { + this.logger.log('calling the last port client provider to invalidate credentials'); + lastPort.clientProvider.invalidateCredentials(); + } catch (ex) { + this.logger.error('error invalidating credentials', ex); + } + }, fetchCredentials: async () => { const lastPort = this.ports[this.ports.length - 1]; return new Promise(async (resolve, reject) => { From 0068fc53f51209064807e8cef7d35fdc6e24c897 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Fri, 16 May 2025 14:21:48 +0200 Subject: [PATCH 5/7] Changeset entries. --- .changeset/cool-yaks-allow.md | 5 +++++ .changeset/shiny-rules-invent.md | 5 +++++ .../sync/stream/AbstractStreamingSyncImplementation.ts | 4 ++++ 3 files changed, 14 insertions(+) create mode 100644 .changeset/cool-yaks-allow.md create mode 100644 .changeset/shiny-rules-invent.md diff --git a/.changeset/cool-yaks-allow.md b/.changeset/cool-yaks-allow.md new file mode 100644 index 00000000..9fd8db55 --- /dev/null +++ b/.changeset/cool-yaks-allow.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': minor +--- + +To support the upstream credentials management changes from `@powersync/common`, the sync worker now communicates credentials invalidation to tabs. diff --git a/.changeset/shiny-rules-invent.md b/.changeset/shiny-rules-invent.md new file mode 100644 index 00000000..dc732b43 --- /dev/null +++ b/.changeset/shiny-rules-invent.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': minor +--- + +Improved credentials management and error handling. Credentials are invalidated when they expire or become invalid. The frequency of credential fetching has been reduced as a result of this work. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 6d6cdc5d..b14bab37 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -685,6 +685,10 @@ The next upload iteration will be delayed.`); this.logger.debug('Token expiring; reconnect'); this.options.remote.invalidateCredentials(); + /** + * For a rare case where the backend connector does not update the token + * (uses the same one), this should have some delay. + */ await this.delayRetry(); return; } From ed480cd6a5850254240ef36b949d0b64b9bf99c0 Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Fri, 16 May 2025 14:30:31 +0200 Subject: [PATCH 6/7] Update shiny-rules-invent.md --- .changeset/shiny-rules-invent.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/shiny-rules-invent.md b/.changeset/shiny-rules-invent.md index dc732b43..cb454ed7 100644 --- a/.changeset/shiny-rules-invent.md +++ b/.changeset/shiny-rules-invent.md @@ -2,4 +2,4 @@ '@powersync/common': minor --- -Improved credentials management and error handling. Credentials are invalidated when they expire or become invalid. The frequency of credential fetching has been reduced as a result of this work. +Improved credentials management and error handling. Credentials are invalidated when they expire or become invalid based on responses from the PowerSync service. The frequency of credential fetching has been reduced as a result of this work. From 196c9ea0e07853fac9333d7f649a184170ff1f4f Mon Sep 17 00:00:00 2001 From: Christiaan Landman Date: Fri, 16 May 2025 14:48:12 +0200 Subject: [PATCH 7/7] Handling `0 < X < 30` expiration time check. --- packages/common/src/client/sync/stream/AbstractRemote.ts | 2 -- .../sync/stream/AbstractStreamingSyncImplementation.ts | 5 +++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 54e427a3..a55e1214 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -21,8 +21,6 @@ export type RemoteConnector = { const POWERSYNC_TRAILING_SLASH_MATCH = /\/+$/; const POWERSYNC_JS_VERSION = PACKAGE.version; -// Refresh at least 30 sec before it expires -const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000; const SYNC_QUEUE_REQUEST_LOW_WATER = 5; // Keep alive message is sent every period diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index b14bab37..46f04bd7 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -691,6 +691,11 @@ The next upload iteration will be delayed.`); */ await this.delayRetry(); return; + } else if (remaining_seconds < 30) { + this.logger.debug('Token will expire soon; reconnect'); + // Pre-emptively refresh the token + this.options.remote.invalidateCredentials(); + return; } this.triggerCrudUpload(); } else {