Skip to content

Commit 7f60a4b

Browse files
committed
Remove hydration for ParameterLookupSource.
1 parent 448e84c commit 7f60a4b

File tree

24 files changed

+407
-408
lines changed

24 files changed

+407
-408
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import {
1010
BroadcastIterable,
1111
CHECKPOINT_INVALIDATE_ALL,
1212
CheckpointChanges,
13-
CompactOptions,
1413
deserializeParameterLookup,
1514
GetCheckpointChangesOptions,
1615
InternalOpId,
@@ -25,10 +24,11 @@ import {
2524
WatchWriteCheckpointOptions
2625
} from '@powersync/service-core';
2726
import { JSONBig } from '@powersync/service-jsonbig';
28-
import { ParameterLookup, SqliteJsonRow, SqlSyncRules, HydratedSyncRules } from '@powersync/service-sync-rules';
27+
import { HydratedSyncRules, ScopedParameterLookup, SqliteJsonRow } from '@powersync/service-sync-rules';
2928
import * as bson from 'bson';
3029
import { LRUCache } from 'lru-cache';
3130
import * as timers from 'timers/promises';
31+
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
3232
import { MongoBucketStorage } from '../MongoBucketStorage.js';
3333
import { PowerSyncMongo } from './db.js';
3434
import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js';
@@ -37,7 +37,6 @@ import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js';
3737
import { MongoCompactor } from './MongoCompactor.js';
3838
import { MongoParameterCompactor } from './MongoParameterCompactor.js';
3939
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
40-
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
4140

4241
export interface MongoSyncBucketStorageOptions {
4342
checksumOptions?: MongoChecksumOptions;
@@ -293,7 +292,10 @@ export class MongoSyncBucketStorage
293292
return result!;
294293
}
295294

296-
async getParameterSets(checkpoint: MongoReplicationCheckpoint, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
295+
async getParameterSets(
296+
checkpoint: MongoReplicationCheckpoint,
297+
lookups: ScopedParameterLookup[]
298+
): Promise<SqliteJsonRow[]> {
297299
return this.db.client.withSession({ snapshot: true }, async (session) => {
298300
// Set the session's snapshot time to the checkpoint's snapshot time.
299301
// An alternative would be to create the session when the checkpoint is created, but managing
@@ -1025,7 +1027,7 @@ class MongoReplicationCheckpoint implements ReplicationCheckpoint {
10251027
public snapshotTime: mongo.Timestamp
10261028
) {}
10271029

1028-
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
1030+
async getParameterSets(lookups: ScopedParameterLookup[]): Promise<SqliteJsonRow[]> {
10291031
return this.storage.getParameterSets(this, lookups);
10301032
}
10311033
}
@@ -1034,7 +1036,7 @@ class EmptyReplicationCheckpoint implements ReplicationCheckpoint {
10341036
readonly checkpoint: InternalOpId = 0n;
10351037
readonly lsn: string | null = null;
10361038

1037-
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
1039+
async getParameterSets(lookups: ScopedParameterLookup[]): Promise<SqliteJsonRow[]> {
10381040
return [];
10391041
}
10401042
}

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
BucketChecksum,
55
CHECKPOINT_INVALIDATE_ALL,
66
CheckpointChanges,
7-
CompactOptions,
87
GetCheckpointChangesOptions,
98
InternalOpId,
109
internalToExternalOpId,
@@ -376,7 +375,7 @@ export class PostgresSyncRulesStorage
376375

377376
async getParameterSets(
378377
checkpoint: ReplicationCheckpoint,
379-
lookups: sync_rules.ParameterLookup[]
378+
lookups: sync_rules.ScopedParameterLookup[]
380379
): Promise<sync_rules.SqliteJsonRow[]> {
381380
const rows = await this.db.sql`
382381
SELECT DISTINCT
@@ -881,7 +880,7 @@ class PostgresReplicationCheckpoint implements storage.ReplicationCheckpoint {
881880
public readonly lsn: string | null
882881
) {}
883882

884-
getParameterSets(lookups: sync_rules.ParameterLookup[]): Promise<sync_rules.SqliteJsonRow[]> {
883+
getParameterSets(lookups: sync_rules.ScopedParameterLookup[]): Promise<sync_rules.SqliteJsonRow[]> {
885884
return this.storage.getParameterSets(this, lookups);
886885
}
887886
}

packages/service-core-tests/src/tests/register-data-storage-parameter-tests.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { storage } from '@powersync/service-core';
2-
import { ParameterLookup, RequestParameters } from '@powersync/service-sync-rules';
2+
import { RequestParameters, ScopedParameterLookup } from '@powersync/service-sync-rules';
33
import { expect, test } from 'vitest';
44
import * as test_utils from '../test-utils/test-utils-index.js';
55
import { TEST_TABLE } from './util.js';
@@ -60,7 +60,7 @@ bucket_definitions:
6060
});
6161

6262
const checkpoint = await bucketStorage.getCheckpoint();
63-
const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized(MYBUCKET_1, ['user1'])]);
63+
const parameters = await checkpoint.getParameterSets([ScopedParameterLookup.direct(MYBUCKET_1, ['user1'])]);
6464
expect(parameters).toEqual([
6565
{
6666
group_id: 'group1a'
@@ -108,15 +108,15 @@ bucket_definitions:
108108
});
109109
const checkpoint2 = await bucketStorage.getCheckpoint();
110110

111-
const parameters = await checkpoint2.getParameterSets([ParameterLookup.normalized(MYBUCKET_1, ['user1'])]);
111+
const parameters = await checkpoint2.getParameterSets([ScopedParameterLookup.direct(MYBUCKET_1, ['user1'])]);
112112
expect(parameters).toEqual([
113113
{
114114
group_id: 'group2'
115115
}
116116
]);
117117

118118
// Use the checkpoint to get older data if relevant
119-
const parameters2 = await checkpoint1.getParameterSets([ParameterLookup.normalized(MYBUCKET_1, ['user1'])]);
119+
const parameters2 = await checkpoint1.getParameterSets([ScopedParameterLookup.direct(MYBUCKET_1, ['user1'])]);
120120
expect(parameters2).toEqual([
121121
{
122122
group_id: 'group1'
@@ -185,8 +185,8 @@ bucket_definitions:
185185
// association of `list1`::`todo2`
186186
const checkpoint = await bucketStorage.getCheckpoint();
187187
const parameters = await checkpoint.getParameterSets([
188-
ParameterLookup.normalized(MYBUCKET_1, ['list1']),
189-
ParameterLookup.normalized(MYBUCKET_1, ['list2'])
188+
ScopedParameterLookup.direct(MYBUCKET_1, ['list1']),
189+
ScopedParameterLookup.direct(MYBUCKET_1, ['list2'])
190190
]);
191191

192192
expect(parameters.sort((a, b) => (a.todo_id as string).localeCompare(b.todo_id as string))).toEqual([
@@ -233,11 +233,15 @@ bucket_definitions:
233233

234234
const checkpoint = await bucketStorage.getCheckpoint();
235235

236-
const parameters1 = await checkpoint.getParameterSets([ParameterLookup.normalized(MYBUCKET_1, [314n, 314, 3.14])]);
236+
const parameters1 = await checkpoint.getParameterSets([
237+
ScopedParameterLookup.direct(MYBUCKET_1, [314n, 314, 3.14])
238+
]);
237239
expect(parameters1).toEqual([TEST_PARAMS]);
238-
const parameters2 = await checkpoint.getParameterSets([ParameterLookup.normalized(MYBUCKET_1, [314, 314n, 3.14])]);
240+
const parameters2 = await checkpoint.getParameterSets([
241+
ScopedParameterLookup.direct(MYBUCKET_1, [314, 314n, 3.14])
242+
]);
239243
expect(parameters2).toEqual([TEST_PARAMS]);
240-
const parameters3 = await checkpoint.getParameterSets([ParameterLookup.normalized(MYBUCKET_1, [314n, 314, 3])]);
244+
const parameters3 = await checkpoint.getParameterSets([ScopedParameterLookup.direct(MYBUCKET_1, [314n, 314, 3])]);
241245
expect(parameters3).toEqual([]);
242246
});
243247

@@ -291,7 +295,7 @@ bucket_definitions:
291295
const checkpoint = await bucketStorage.getCheckpoint();
292296

293297
const parameters1 = await checkpoint.getParameterSets([
294-
ParameterLookup.normalized(MYBUCKET_1, [1152921504606846976n])
298+
ScopedParameterLookup.direct(MYBUCKET_1, [1152921504606846976n])
295299
]);
296300
expect(parameters1).toEqual([TEST_PARAMS]);
297301
});
@@ -332,7 +336,7 @@ bucket_definitions:
332336
const querier = sync_rules.getBucketParameterQuerier(test_utils.querierOptions(parameters)).querier;
333337

334338
const lookups = querier.parameterQueryLookups;
335-
expect(lookups).toEqual([ParameterLookup.normalized({ lookupName: 'by_workspace', queryId: '1' }, ['u1'])]);
339+
expect(lookups).toEqual([ScopedParameterLookup.direct({ lookupName: 'by_workspace', queryId: '1' }, ['u1'])]);
336340

337341
const parameter_sets = await checkpoint.getParameterSets(lookups);
338342
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]);
@@ -405,7 +409,7 @@ bucket_definitions:
405409
const querier = sync_rules.getBucketParameterQuerier(test_utils.querierOptions(parameters)).querier;
406410

407411
const lookups = querier.parameterQueryLookups;
408-
expect(lookups).toEqual([ParameterLookup.normalized({ lookupName: 'by_public_workspace', queryId: '1' }, [])]);
412+
expect(lookups).toEqual([ScopedParameterLookup.direct({ lookupName: 'by_public_workspace', queryId: '1' }, [])]);
409413

410414
const parameter_sets = await checkpoint.getParameterSets(lookups);
411415
parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
@@ -507,8 +511,8 @@ bucket_definitions:
507511

508512
const lookups = querier.parameterQueryLookups;
509513
expect(lookups).toEqual([
510-
ParameterLookup.normalized({ lookupName: 'by_workspace', queryId: '1' }, []),
511-
ParameterLookup.normalized({ lookupName: 'by_workspace', queryId: '2' }, ['u1'])
514+
ScopedParameterLookup.direct({ lookupName: 'by_workspace', queryId: '1' }, []),
515+
ScopedParameterLookup.direct({ lookupName: 'by_workspace', queryId: '2' }, ['u1'])
512516
]);
513517

514518
const parameter_sets = await checkpoint.getParameterSets(lookups);
@@ -558,7 +562,7 @@ bucket_definitions:
558562

559563
const checkpoint = await bucketStorage.getCheckpoint();
560564

561-
const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized(MYBUCKET_1, ['user1'])]);
565+
const parameters = await checkpoint.getParameterSets([ScopedParameterLookup.direct(MYBUCKET_1, ['user1'])]);
562566
expect(parameters).toEqual([]);
563567
});
564568

packages/service-core-tests/src/tests/register-parameter-compacting-tests.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { storage } from '@powersync/service-core';
2-
import { ParameterLookup } from '@powersync/service-sync-rules';
2+
import { ScopedParameterLookup } from '@powersync/service-sync-rules';
33
import { expect, test } from 'vitest';
44
import * as test_utils from '../test-utils/test-utils-index.js';
55

@@ -40,7 +40,7 @@ bucket_definitions:
4040
await batch.commit('1/1');
4141
});
4242

43-
const lookup = ParameterLookup.normalized({ lookupName: 'test', queryId: '1' }, ['t1']);
43+
const lookup = ScopedParameterLookup.direct({ lookupName: 'test', queryId: '1' }, ['t1']);
4444

4545
const checkpoint1 = await bucketStorage.getCheckpoint();
4646
const parameters1 = await checkpoint1.getParameterSets([lookup]);
@@ -151,7 +151,7 @@ bucket_definitions:
151151
await batch.commit('3/1');
152152
});
153153

154-
const lookup = ParameterLookup.normalized({ lookupName: 'test', queryId: '1' }, ['u1']);
154+
const lookup = ScopedParameterLookup.direct({ lookupName: 'test', queryId: '1' }, ['u1']);
155155

156156
const checkpoint1 = await bucketStorage.getCheckpoint();
157157
const parameters1 = await checkpoint1.getParameterSets([lookup]);

packages/service-core-tests/tsconfig.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
},
3030
{
3131
"path": "../../libs/lib-services"
32+
},
33+
{
34+
"path": "../sync-rules"
3235
}
3336
]
3437
}

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Logger, ObserverClient } from '@powersync/lib-services-framework';
2-
import { ParameterLookup, SqliteJsonRow, HydratedSyncRules } from '@powersync/service-sync-rules';
2+
import { HydratedSyncRules, ScopedParameterLookup, SqliteJsonRow } from '@powersync/service-sync-rules';
33
import * as util from '../util/util-index.js';
44
import { BucketStorageBatch, FlushedResult, SaveUpdate } from './BucketStorageBatch.js';
55
import { BucketStorageFactory } from './BucketStorageFactory.js';
@@ -284,7 +284,7 @@ export interface ReplicationCheckpoint {
284284
*
285285
* This gets parameter sets specific to this checkpoint.
286286
*/
287-
getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]>;
287+
getParameterSets(lookups: ScopedParameterLookup[]): Promise<SqliteJsonRow[]>;
288288
}
289289

290290
export interface WatchWriteCheckpointOptions {

packages/service-core/src/storage/bson.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as bson from 'bson';
22

3-
import { ParameterLookup, SqliteJsonValue } from '@powersync/service-sync-rules';
3+
import { ScopedParameterLookup, SqliteJsonValue } from '@powersync/service-sync-rules';
44
import { ReplicaId } from './BucketStorageBatch.js';
55

66
type NodeBuffer = Buffer<ArrayBuffer>;
@@ -27,11 +27,11 @@ export const BSON_DESERIALIZE_DATA_OPTIONS: bson.DeserializeOptions = {
2727
* Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers.
2828
* @param lookup
2929
*/
30-
export const serializeLookupBuffer = (lookup: ParameterLookup): NodeBuffer => {
30+
export const serializeLookupBuffer = (lookup: ScopedParameterLookup): NodeBuffer => {
3131
return bson.serialize({ l: lookup.values }) as NodeBuffer;
3232
};
3333

34-
export const serializeLookup = (lookup: ParameterLookup) => {
34+
export const serializeLookup = (lookup: ScopedParameterLookup) => {
3535
return new bson.Binary(serializeLookupBuffer(lookup));
3636
};
3737

packages/service-core/test/src/sync/BucketChecksumState.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
WatchFilterEvent
1313
} from '@/index.js';
1414
import { JSONBig } from '@powersync/service-jsonbig';
15-
import { ParameterLookup, RequestJwtPayload, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules';
15+
import { RequestJwtPayload, ScopedParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules';
1616
import { versionedHydrationState } from '@powersync/service-sync-rules/src/HydrationState.js';
1717
import { beforeEach, describe, expect, test } from 'vitest';
1818

@@ -505,7 +505,7 @@ bucket_definitions:
505505

506506
const line = (await state.buildNextCheckpointLine({
507507
base: storage.makeCheckpoint(1n, (lookups) => {
508-
expect(lookups).toEqual([ParameterLookup.normalized({ lookupName: 'by_project', queryId: '1' }, ['u1'])]);
508+
expect(lookups).toEqual([ScopedParameterLookup.direct({ lookupName: 'by_project', queryId: '1' }, ['u1'])]);
509509
return [{ id: 1 }, { id: 2 }];
510510
}),
511511
writeCheckpoint: null,
@@ -566,7 +566,7 @@ bucket_definitions:
566566
// Now we get a new line
567567
const line2 = (await state.buildNextCheckpointLine({
568568
base: storage.makeCheckpoint(2n, (lookups) => {
569-
expect(lookups).toEqual([ParameterLookup.normalized({ lookupName: 'by_project', queryId: '1' }, ['u1'])]);
569+
expect(lookups).toEqual([ScopedParameterLookup.direct({ lookupName: 'by_project', queryId: '1' }, ['u1'])]);
570570
return [{ id: 1 }, { id: 2 }, { id: 3 }];
571571
}),
572572
writeCheckpoint: null,
@@ -854,12 +854,12 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage {
854854

855855
makeCheckpoint(
856856
opId: InternalOpId,
857-
parameters?: (lookups: ParameterLookup[]) => SqliteJsonRow[]
857+
parameters?: (lookups: ScopedParameterLookup[]) => SqliteJsonRow[]
858858
): ReplicationCheckpoint {
859859
return {
860860
checkpoint: opId,
861861
lsn: String(opId),
862-
getParameterSets: async (lookups: ParameterLookup[]) => {
862+
getParameterSets: async (lookups: ScopedParameterLookup[]) => {
863863
if (parameters == null) {
864864
throw new Error(`getParametersSets not defined for checkpoint ${opId}`);
865865
}

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import { TablePattern } from './TablePattern.js';
88
import {
99
QueryParameters,
1010
QuerySchema,
11-
SourceEvaluatedRow,
12-
SourceEvaluationResult,
11+
UnscopedEvaluatedRow,
12+
UnscopedEvaluationResult,
1313
SourceSchema,
1414
SourceSchemaTable,
1515
SqliteJsonRow,
@@ -170,7 +170,7 @@ export class BaseSqlDataQuery {
170170
}
171171
}
172172

173-
evaluateRowWithOptions(options: EvaluateRowOptions): SourceEvaluationResult[] {
173+
evaluateRowWithOptions(options: EvaluateRowOptions): UnscopedEvaluationResult[] {
174174
try {
175175
const { table, row, serializedBucketParameters } = options;
176176

@@ -201,7 +201,7 @@ export class BaseSqlDataQuery {
201201
table: outputTable,
202202
id: id,
203203
data
204-
} satisfies SourceEvaluatedRow;
204+
} satisfies UnscopedEvaluatedRow;
205205
});
206206
} catch (e) {
207207
return [{ error: e.message ?? `Evaluating data query failed` }];

packages/sync-rules/src/BucketParameterQuerier.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export interface BucketParameterQuerier {
2626
*/
2727
readonly hasDynamicBuckets: boolean;
2828

29-
readonly parameterQueryLookups: ParameterLookup[];
29+
readonly parameterQueryLookups: ScopedParameterLookup[];
3030

3131
/**
3232
* These buckets depend on parameter storage, and needs to be retrieved dynamically for each checkpoint.
@@ -59,7 +59,7 @@ export interface PendingQueriers {
5959
}
6060

6161
export interface ParameterLookupSource {
62-
getParameterSets: (lookups: ParameterLookup[]) => Promise<SqliteJsonRow[]>;
62+
getParameterSets: (lookups: ScopedParameterLookup[]) => Promise<SqliteJsonRow[]>;
6363
}
6464

6565
export interface QueryBucketDescriptorOptions extends ParameterLookupSource {
@@ -89,12 +89,19 @@ export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[])
8989
*
9090
* Other query types are not supported yet.
9191
*/
92-
export class ParameterLookup {
92+
export class ScopedParameterLookup {
9393
// bucket definition name, parameter query index, ...lookup values
9494
readonly values: SqliteJsonValue[];
9595

96-
static normalized(scope: ParameterLookupScope, values: SqliteJsonValue[]): ParameterLookup {
97-
return new ParameterLookup([scope.lookupName, scope.queryId, ...values.map(normalizeParameterValue)]);
96+
static normalized(scope: ParameterLookupScope, lookup: UnscopedParameterLookup): ScopedParameterLookup {
97+
return new ScopedParameterLookup([scope.lookupName, scope.queryId, ...lookup.lookupValues]);
98+
}
99+
100+
/**
101+
* Primarily for test fixtures.
102+
*/
103+
static direct(scope: ParameterLookupScope, values: SqliteJsonValue[]): ScopedParameterLookup {
104+
return new ScopedParameterLookup([scope.lookupName, scope.queryId, ...values.map(normalizeParameterValue)]);
98105
}
99106

100107
/**
@@ -105,3 +112,15 @@ export class ParameterLookup {
105112
this.values = values;
106113
}
107114
}
115+
116+
export class UnscopedParameterLookup {
117+
readonly lookupValues: SqliteJsonValue[];
118+
119+
static normalized(values: SqliteJsonValue[]): UnscopedParameterLookup {
120+
return new UnscopedParameterLookup(values.map(normalizeParameterValue));
121+
}
122+
123+
constructor(lookupValues: SqliteJsonValue[]) {
124+
this.lookupValues = lookupValues;
125+
}
126+
}

0 commit comments

Comments
 (0)