diff --git a/.changeset/cool-yaks-allow.md b/.changeset/cool-yaks-allow.md new file mode 100644 index 00000000..9fd8db55 --- /dev/null +++ b/.changeset/cool-yaks-allow.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': minor +--- + +To support the upstream credentials management changes from `@powersync/common`, the sync worker now communicates credentials invalidation to tabs. diff --git a/.changeset/shiny-rules-invent.md b/.changeset/shiny-rules-invent.md new file mode 100644 index 00000000..cb454ed7 --- /dev/null +++ b/.changeset/shiny-rules-invent.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': minor +--- + +Improved credentials management and error handling. Credentials are invalidated when they expire or become invalid based on responses from the PowerSync service. The frequency of credential fetching has been reduced as a result of this work. diff --git a/packages/common/src/client/sync/stream/AbstractRemote.ts b/packages/common/src/client/sync/stream/AbstractRemote.ts index 79907443..a55e1214 100644 --- a/packages/common/src/client/sync/stream/AbstractRemote.ts +++ b/packages/common/src/client/sync/stream/AbstractRemote.ts @@ -15,13 +15,12 @@ export type BSONImplementation = typeof BSON; export type RemoteConnector = { fetchCredentials: () => Promise; + invalidateCredentials?: () => void; }; const POWERSYNC_TRAILING_SLASH_MATCH = /\/+$/; const POWERSYNC_JS_VERSION = PACKAGE.version; -// Refresh at least 30 sec before it expires -const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000; const SYNC_QUEUE_REQUEST_LOW_WATER = 5; // Keep alive message is sent every period @@ -130,18 +129,59 @@ export abstract class AbstractRemote { : fetchImplementation; } + /** + * Get credentials currently cached, or fetch new credentials if none are + * available. + * + * These credentials may have expired already. + */ async getCredentials(): Promise { - const { expiresAt } = this.credentials ?? {}; - if (expiresAt && expiresAt > new Date(new Date().valueOf() + REFRESH_CREDENTIALS_SAFETY_PERIOD_MS)) { - return this.credentials!; + if (this.credentials) { + return this.credentials; } - this.credentials = await this.connector.fetchCredentials(); - if (this.credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) { + + return this.prefetchCredentials(); + } + + /** + * Fetch a new set of credentials and cache it. + * + * Until this call succeeds, `getCredentials` will still return the + * old credentials. + * + * This may be called before the current credentials have expired. + */ + async prefetchCredentials() { + this.credentials = await this.fetchCredentials(); + + return this.credentials; + } + + /** + * Get credentials for PowerSync. + * + * This should always fetch a fresh set of credentials - don't use cached + * values. + */ + async fetchCredentials() { + const credentials = await this.connector.fetchCredentials(); + if (credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) { throw new Error( - `A trailing forward slash "/" was found in the fetchCredentials endpoint: "${this.credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.` + `A trailing forward slash "/" was found in the fetchCredentials endpoint: "${credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.` ); } - return this.credentials; + + return credentials; + } + + /*** + * Immediately invalidate credentials. + * + * This may be called when the current credentials have expired. + */ + invalidateCredentials() { + this.credentials = null; + this.connector.invalidateCredentials?.(); } getUserAgent() { @@ -181,6 +221,10 @@ export abstract class AbstractRemote { body: JSON.stringify(data) }); + if (res.status === 401) { + this.invalidateCredentials(); + } + if (!res.ok) { throw new Error(`Received ${res.status} - ${res.statusText} when posting to ${path}: ${await res.text()}}`); } @@ -198,6 +242,10 @@ export abstract class AbstractRemote { } }); + if (res.status === 401) { + this.invalidateCredentials(); + } + if (!res.ok) { throw new Error(`Received ${res.status} - ${res.statusText} when getting from ${path}: ${await res.text()}}`); } @@ -224,6 +272,10 @@ export abstract class AbstractRemote { throw ex; }); + if (res.status === 401) { + this.invalidateCredentials(); + } + if (!res.ok) { const text = await res.text(); this.logger.error(`Could not POST streaming to ${path} - ${res.status} - ${res.statusText}: ${text}`); @@ -260,10 +312,19 @@ export abstract class AbstractRemote { // automatically as a header. const userAgent = this.getUserAgent(); + let socketCreationError: Error | undefined; + const connector = new RSocketConnector({ transport: new WebsocketClientTransport({ url: this.options.socketUrlTransformer(request.url), - wsCreator: (url) => this.createSocket(url) + wsCreator: (url) => { + const s = this.createSocket(url); + s.addEventListener('error', (e: Event) => { + socketCreationError = new Error('Failed to create connection to websocket: ', (e.target as any).url ?? ''); + this.logger.warn('Socket error', e); + }); + return s; + } }), setup: { keepAlive: KEEP_ALIVE_MS, @@ -290,7 +351,7 @@ export abstract class AbstractRemote { * On React native the connection exception can be `undefined` this causes issues * with detecting the exception inside async-mutex */ - throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex)}`); + throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex ?? socketCreationError)}`); } const stream = new DataStream({ @@ -335,6 +396,9 @@ export abstract class AbstractRemote { syncQueueRequestSize, // The initial N amount { onError: (e) => { + if (e.message.includes('Authorization failed') || e.message.includes('401')) { + this.invalidateCredentials(); + } // Don't log closed as an error if (e.message !== 'Closed. ') { this.logger.error(e); diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index a6179a03..46f04bd7 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -683,12 +683,19 @@ The next upload iteration will be delayed.`); if (remaining_seconds == 0) { // Connection would be closed automatically right after this this.logger.debug('Token expiring; reconnect'); + this.options.remote.invalidateCredentials(); + /** * For a rare case where the backend connector does not update the token * (uses the same one), this should have some delay. */ await this.delayRetry(); return; + } else if (remaining_seconds < 30) { + this.logger.debug('Token will expire soon; reconnect'); + // Pre-emptively refresh the token + this.options.remote.invalidateCredentials(); + return; } this.triggerCrudUpload(); } else { diff --git a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts index dadb1276..3dad3192 100644 --- a/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts +++ b/packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts @@ -31,6 +31,10 @@ class SharedSyncClientProvider extends AbstractSharedSyncClientProvider { return Comlink.transfer(port, [port]); } + invalidateCredentials() { + this.options.remote.invalidateCredentials(); + } + async fetchCredentials(): Promise { const credentials = await this.options.remote.getCredentials(); if (credentials == null) { diff --git a/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts b/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts index 2713f13f..c33c5ffc 100644 --- a/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts +++ b/packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts @@ -5,6 +5,7 @@ import type { PowerSyncCredentials, SyncStatusOptions } from '@powersync/common' */ export abstract class AbstractSharedSyncClientProvider { abstract fetchCredentials(): Promise; + abstract invalidateCredentials(): void; abstract uploadCrud(): Promise; abstract statusChanged(status: SyncStatusOptions): void; abstract getDBWorkerPort(): Promise; diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index af2d96d7..60e2e1c0 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -308,6 +308,15 @@ export class SharedSyncImplementation adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger), remote: new WebRemote( { + invalidateCredentials: async () => { + const lastPort = this.ports[this.ports.length - 1]; + try { + this.logger.log('calling the last port client provider to invalidate credentials'); + lastPort.clientProvider.invalidateCredentials(); + } catch (ex) { + this.logger.error('error invalidating credentials', ex); + } + }, fetchCredentials: async () => { const lastPort = this.ports[this.ports.length - 1]; return new Promise(async (resolve, reject) => {