Skip to content

feat: Fetch credentials on token expiry/401 #604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions .changeset/cool-yaks-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/web': minor
---

To support the upstream credentials management changes from `@powersync/common`, the sync worker now communicates credentials invalidation to tabs.
5 changes: 5 additions & 0 deletions .changeset/shiny-rules-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': minor
---

Improved credentials management and error handling. Credentials are invalidated when they expire or become invalid based on responses from the PowerSync service. The frequency of credential fetching has been reduced as a result of this work.
86 changes: 75 additions & 11 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ export type BSONImplementation = typeof BSON;

export type RemoteConnector = {
fetchCredentials: () => Promise<PowerSyncCredentials | null>;
invalidateCredentials?: () => void;
};

const POWERSYNC_TRAILING_SLASH_MATCH = /\/+$/;
const POWERSYNC_JS_VERSION = PACKAGE.version;

// Refresh at least 30 sec before it expires
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;

// Keep alive message is sent every period
Expand Down Expand Up @@ -130,18 +129,59 @@ export abstract class AbstractRemote {
: fetchImplementation;
}

/**
* Get credentials currently cached, or fetch new credentials if none are
* available.
*
* These credentials may have expired already.
*/
async getCredentials(): Promise<PowerSyncCredentials | null> {
const { expiresAt } = this.credentials ?? {};
if (expiresAt && expiresAt > new Date(new Date().valueOf() + REFRESH_CREDENTIALS_SAFETY_PERIOD_MS)) {
return this.credentials!;
if (this.credentials) {
return this.credentials;
}
this.credentials = await this.connector.fetchCredentials();
if (this.credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) {

return this.prefetchCredentials();
}

/**
* Fetch a new set of credentials and cache it.
*
* Until this call succeeds, `getCredentials` will still return the
* old credentials.
*
* This may be called before the current credentials have expired.
*/
async prefetchCredentials() {
this.credentials = await this.fetchCredentials();

return this.credentials;
}

/**
* Get credentials for PowerSync.
*
* This should always fetch a fresh set of credentials - don't use cached
* values.
*/
async fetchCredentials() {
const credentials = await this.connector.fetchCredentials();
if (credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) {
throw new Error(
`A trailing forward slash "/" was found in the fetchCredentials endpoint: "${this.credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.`
`A trailing forward slash "/" was found in the fetchCredentials endpoint: "${credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.`
);
}
return this.credentials;

return credentials;
}

/***
* Immediately invalidate credentials.
*
* This may be called when the current credentials have expired.
*/
invalidateCredentials() {
this.credentials = null;
this.connector.invalidateCredentials?.();
}

getUserAgent() {
Expand Down Expand Up @@ -181,6 +221,10 @@ export abstract class AbstractRemote {
body: JSON.stringify(data)
});

if (res.status === 401) {
this.invalidateCredentials();
}

if (!res.ok) {
throw new Error(`Received ${res.status} - ${res.statusText} when posting to ${path}: ${await res.text()}}`);
}
Expand All @@ -198,6 +242,10 @@ export abstract class AbstractRemote {
}
});

if (res.status === 401) {
this.invalidateCredentials();
}

if (!res.ok) {
throw new Error(`Received ${res.status} - ${res.statusText} when getting from ${path}: ${await res.text()}}`);
}
Expand All @@ -224,6 +272,10 @@ export abstract class AbstractRemote {
throw ex;
});

if (res.status === 401) {
this.invalidateCredentials();
}

if (!res.ok) {
const text = await res.text();
this.logger.error(`Could not POST streaming to ${path} - ${res.status} - ${res.statusText}: ${text}`);
Expand Down Expand Up @@ -260,10 +312,19 @@ export abstract class AbstractRemote {
// automatically as a header.
const userAgent = this.getUserAgent();

let socketCreationError: Error | undefined;

const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url: this.options.socketUrlTransformer(request.url),
wsCreator: (url) => this.createSocket(url)
wsCreator: (url) => {
const s = this.createSocket(url);
s.addEventListener('error', (e: Event) => {
socketCreationError = new Error('Failed to create connection to websocket: ', (e.target as any).url ?? '');
this.logger.warn('Socket error', e);
});
return s;
}
}),
setup: {
keepAlive: KEEP_ALIVE_MS,
Expand All @@ -290,7 +351,7 @@ export abstract class AbstractRemote {
* On React native the connection exception can be `undefined` this causes issues
* with detecting the exception inside async-mutex
*/
throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex)}`);
throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex ?? socketCreationError)}`);
}

const stream = new DataStream({
Expand Down Expand Up @@ -335,6 +396,9 @@ export abstract class AbstractRemote {
syncQueueRequestSize, // The initial N amount
{
onError: (e) => {
if (e.message.includes('Authorization failed') || e.message.includes('401')) {
this.invalidateCredentials();
}
// Don't log closed as an error
if (e.message !== 'Closed. ') {
this.logger.error(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,12 +683,19 @@ The next upload iteration will be delayed.`);
if (remaining_seconds == 0) {
// Connection would be closed automatically right after this
this.logger.debug('Token expiring; reconnect');
this.options.remote.invalidateCredentials();

/**
* For a rare case where the backend connector does not update the token
* (uses the same one), this should have some delay.
*/
await this.delayRetry();
return;
} else if (remaining_seconds < 30) {
this.logger.debug('Token will expire soon; reconnect');
// Pre-emptively refresh the token
this.options.remote.invalidateCredentials();
return;
}
this.triggerCrudUpload();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class SharedSyncClientProvider extends AbstractSharedSyncClientProvider {
return Comlink.transfer(port, [port]);
}

invalidateCredentials() {
this.options.remote.invalidateCredentials();
}

async fetchCredentials(): Promise<PowerSyncCredentials | null> {
const credentials = await this.options.remote.getCredentials();
if (credentials == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { PowerSyncCredentials, SyncStatusOptions } from '@powersync/common'
*/
export abstract class AbstractSharedSyncClientProvider {
abstract fetchCredentials(): Promise<PowerSyncCredentials | null>;
abstract invalidateCredentials(): void;
abstract uploadCrud(): Promise<void>;
abstract statusChanged(status: SyncStatusOptions): void;
abstract getDBWorkerPort(): Promise<MessagePort>;
Expand Down
9 changes: 9 additions & 0 deletions packages/web/src/worker/sync/SharedSyncImplementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,15 @@ export class SharedSyncImplementation
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
remote: new WebRemote(
{
invalidateCredentials: async () => {
const lastPort = this.ports[this.ports.length - 1];
try {
this.logger.log('calling the last port client provider to invalidate credentials');
lastPort.clientProvider.invalidateCredentials();
} catch (ex) {
this.logger.error('error invalidating credentials', ex);
}
},
fetchCredentials: async () => {
const lastPort = this.ports[this.ports.length - 1];
return new Promise(async (resolve, reject) => {
Expand Down