Skip to content

Commit e578245

Browse files
authored
Granular sync rule parsing (#432)
* Initial split of data vs parameter sources. * Split out sync rule definitions from compiled ones. * Update APIs to use hydrated sync rules. * Refactor parameter queries. * Fix tests. * Remove redundant functions. * Split out parameter lookup sources from parameter querier sources. * Bring back BucketSource. * Cleanup. * Cleanup imports. * Split out SyncStream into its components. * Support multiple data sources. * Use separate sources per sync stream variant. * Move debugRepresentation() back to BucketSource. * Split out dataSource on SqlBucketDescriptor. * Move SubqueryParameterLookupSource deeper to be specific to variants. * Use HydrationState for bucket names. * Remove some unused descriptorName references. * Refactor parameter lookups to make the names configurable. * More minor refactoring. * Some comments. * Import cleanup. * Remove BucketIdTransformer. * Add a stream test with custom hydrationState. * Add changeset. * Remove need for hydration on BucketDataSource. * Rename defaultBucketPrefix -> uniqueName. * Remove hydration for ParameterLookupSource. * Rename BucketParameterLookupSource -> ParameterIndexLookupCreator. * Merge BucketParameterQuerierSourceDefinition into HydratedBucketSource. * Remove HydrationState generics. We can re-add it if we actually need it later. * Add some hydration tests. * Add "end-to-end" sync rules test.
1 parent 267fef2 commit e578245

File tree

59 files changed

+2764
-1490
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2764
-1490
lines changed

.changeset/fresh-geckos-develop.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-module-postgres': minor
6+
'@powersync/service-module-mongodb': minor
7+
'@powersync/service-core': minor
8+
'@powersync/service-module-mssql': minor
9+
'@powersync/service-module-mysql': minor
10+
'@powersync/service-sync-rules': minor
11+
---
12+
13+
[Internal] Refactor sync rule representation to split out the parsed definitions from the hydrated state.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
2-
import { SqlEventDescriptor, SqliteRow, SqliteValue, SqlSyncRules } from '@powersync/service-sync-rules';
2+
import { SqlEventDescriptor, SqliteRow, SqliteValue, HydratedSyncRules } from '@powersync/service-sync-rules';
33
import * as bson from 'bson';
44

55
import {
66
BaseObserver,
77
container,
8+
logger as defaultLogger,
89
ErrorCode,
910
errors,
1011
Logger,
11-
logger as defaultLogger,
1212
ReplicationAssertionError,
1313
ServiceError
1414
} from '@powersync/lib-services-framework';
@@ -22,13 +22,13 @@ import {
2222
utils
2323
} from '@powersync/service-core';
2424
import * as timers from 'node:timers/promises';
25+
import { idPrefixFilter } from '../../utils/util.js';
2526
import { PowerSyncMongo } from './db.js';
2627
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
2728
import { MongoIdSequence } from './MongoIdSequence.js';
2829
import { batchCreateCustomWriteCheckpoints } from './MongoWriteCheckpointAPI.js';
2930
import { cacheKey, OperationBatch, RecordOperation } from './OperationBatch.js';
3031
import { PersistedBatch } from './PersistedBatch.js';
31-
import { idPrefixFilter } from '../../utils/util.js';
3232

3333
/**
3434
* 15MB
@@ -44,7 +44,7 @@ const replicationMutex = new utils.Mutex();
4444

4545
export interface MongoBucketBatchOptions {
4646
db: PowerSyncMongo;
47-
syncRules: SqlSyncRules;
47+
syncRules: HydratedSyncRules;
4848
groupId: number;
4949
slotName: string;
5050
lastCheckpointLsn: string | null;
@@ -71,7 +71,7 @@ export class MongoBucketBatch
7171
private readonly client: mongo.MongoClient;
7272
public readonly db: PowerSyncMongo;
7373
public readonly session: mongo.ClientSession;
74-
private readonly sync_rules: SqlSyncRules;
74+
private readonly sync_rules: HydratedSyncRules;
7575

7676
private readonly group_id: number;
7777

@@ -474,8 +474,7 @@ export class MongoBucketBatch
474474
if (sourceTable.syncData) {
475475
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
476476
record: after,
477-
sourceTable,
478-
bucketIdTransformer: SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`)
477+
sourceTable
479478
});
480479

481480
for (let error of syncErrors) {

modules/module-mongodb-storage/src/storage/implementation/MongoPersistedSyncRules.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import { SqlSyncRules } from '@powersync/service-sync-rules';
1+
import { SqlSyncRules, HydratedSyncRules } from '@powersync/service-sync-rules';
22

33
import { storage } from '@powersync/service-core';
4+
import { versionedHydrationState } from '@powersync/service-sync-rules/src/HydrationState.js';
45

56
export class MongoPersistedSyncRules implements storage.PersistedSyncRules {
67
public readonly slot_name: string;
@@ -13,4 +14,8 @@ export class MongoPersistedSyncRules implements storage.PersistedSyncRules {
1314
) {
1415
this.slot_name = slot_name ?? `powersync_${id}`;
1516
}
17+
18+
hydratedSyncRules(): HydratedSyncRules {
19+
return this.sync_rules.hydrate({ hydrationState: versionedHydrationState(this.id) });
20+
}
1621
}

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import {
1010
BroadcastIterable,
1111
CHECKPOINT_INVALIDATE_ALL,
1212
CheckpointChanges,
13-
CompactOptions,
1413
deserializeParameterLookup,
1514
GetCheckpointChangesOptions,
1615
InternalOpId,
@@ -25,10 +24,11 @@ import {
2524
WatchWriteCheckpointOptions
2625
} from '@powersync/service-core';
2726
import { JSONBig } from '@powersync/service-jsonbig';
28-
import { ParameterLookup, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules';
27+
import { HydratedSyncRules, ScopedParameterLookup, SqliteJsonRow } from '@powersync/service-sync-rules';
2928
import * as bson from 'bson';
3029
import { LRUCache } from 'lru-cache';
3130
import * as timers from 'timers/promises';
31+
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
3232
import { MongoBucketStorage } from '../MongoBucketStorage.js';
3333
import { PowerSyncMongo } from './db.js';
3434
import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js';
@@ -37,7 +37,6 @@ import { MongoChecksumOptions, MongoChecksums } from './MongoChecksums.js';
3737
import { MongoCompactor } from './MongoCompactor.js';
3838
import { MongoParameterCompactor } from './MongoParameterCompactor.js';
3939
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
40-
import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from '../../utils/util.js';
4140

4241
export interface MongoSyncBucketStorageOptions {
4342
checksumOptions?: MongoChecksumOptions;
@@ -61,7 +60,7 @@ export class MongoSyncBucketStorage
6160
private readonly db: PowerSyncMongo;
6261
readonly checksums: MongoChecksums;
6362

64-
private parsedSyncRulesCache: { parsed: SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
63+
private parsedSyncRulesCache: { parsed: HydratedSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
6564
private writeCheckpointAPI: MongoWriteCheckpointAPI;
6665

6766
constructor(
@@ -101,14 +100,14 @@ export class MongoSyncBucketStorage
101100
});
102101
}
103102

104-
getParsedSyncRules(options: storage.ParseSyncRulesOptions): SqlSyncRules {
103+
getParsedSyncRules(options: storage.ParseSyncRulesOptions): HydratedSyncRules {
105104
const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {};
106105
/**
107106
* Check if the cached sync rules, if present, had the same options.
108107
* Parse sync rules if the options are different or if there is no cached value.
109108
*/
110109
if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) {
111-
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options };
110+
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).hydratedSyncRules(), options };
112111
}
113112

114113
return this.parsedSyncRulesCache!.parsed;
@@ -170,7 +169,7 @@ export class MongoSyncBucketStorage
170169
await using batch = new MongoBucketBatch({
171170
logger: options.logger,
172171
db: this.db,
173-
syncRules: this.sync_rules.parsed(options).sync_rules,
172+
syncRules: this.sync_rules.parsed(options).hydratedSyncRules(),
174173
groupId: this.group_id,
175174
slotName: this.slot_name,
176175
lastCheckpointLsn: checkpoint_lsn,
@@ -293,7 +292,10 @@ export class MongoSyncBucketStorage
293292
return result!;
294293
}
295294

296-
async getParameterSets(checkpoint: MongoReplicationCheckpoint, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
295+
async getParameterSets(
296+
checkpoint: MongoReplicationCheckpoint,
297+
lookups: ScopedParameterLookup[]
298+
): Promise<SqliteJsonRow[]> {
297299
return this.db.client.withSession({ snapshot: true }, async (session) => {
298300
// Set the session's snapshot time to the checkpoint's snapshot time.
299301
// An alternative would be to create the session when the checkpoint is created, but managing
@@ -1025,7 +1027,7 @@ class MongoReplicationCheckpoint implements ReplicationCheckpoint {
10251027
public snapshotTime: mongo.Timestamp
10261028
) {}
10271029

1028-
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
1030+
async getParameterSets(lookups: ScopedParameterLookup[]): Promise<SqliteJsonRow[]> {
10291031
return this.storage.getParameterSets(this, lookups);
10301032
}
10311033
}
@@ -1034,7 +1036,7 @@ class EmptyReplicationCheckpoint implements ReplicationCheckpoint {
10341036
readonly checkpoint: InternalOpId = 0n;
10351037
readonly lsn: string | null = null;
10361038

1037-
async getParameterSets(lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
1039+
async getParameterSets(lookups: ScopedParameterLookup[]): Promise<SqliteJsonRow[]> {
10381040
return [];
10391041
}
10401042
}

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,20 @@ import {
1010
ServiceError
1111
} from '@powersync/lib-services-framework';
1212
import {
13-
InternalOpId,
1413
MetricsEngine,
1514
RelationCache,
1615
SaveOperationTag,
1716
SourceEntityDescriptor,
1817
SourceTable,
1918
storage
2019
} from '@powersync/service-core';
21-
import { DatabaseInputRow, SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
20+
import {
21+
DatabaseInputRow,
22+
SqliteInputRow,
23+
SqliteRow,
24+
HydratedSyncRules,
25+
TablePattern
26+
} from '@powersync/service-sync-rules';
2227
import { ReplicationMetric } from '@powersync/service-types';
2328
import { MongoLSN } from '../common/MongoLSN.js';
2429
import { PostImagesOption } from '../types/types.js';
@@ -75,7 +80,7 @@ export class ChangeStreamInvalidatedError extends DatabaseConnectionError {
7580
}
7681

7782
export class ChangeStream {
78-
sync_rules: SqlSyncRules;
83+
sync_rules: HydratedSyncRules;
7984
group_id: number;
8085

8186
connection_id = 1;

modules/module-mssql/src/replication/CDCStream.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@ import {
1010
} from '@powersync/lib-services-framework';
1111
import { getUuidReplicaIdentityBson, MetricsEngine, SourceEntityDescriptor, storage } from '@powersync/service-core';
1212

13-
import { SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
13+
import {
14+
SqliteInputRow,
15+
SqliteRow,
16+
SqlSyncRules,
17+
HydratedSyncRules,
18+
TablePattern
19+
} from '@powersync/service-sync-rules';
1420

1521
import { ReplicationMetric } from '@powersync/service-types';
1622
import { BatchedSnapshotQuery, MSSQLSnapshotQuery, SimpleSnapshotQuery } from './MSSQLSnapshotQuery.js';
@@ -82,7 +88,7 @@ export class CDCDataExpiredError extends DatabaseConnectionError {
8288
}
8389

8490
export class CDCStream {
85-
private readonly syncRules: SqlSyncRules;
91+
private readonly syncRules: HydratedSyncRules;
8692
private readonly storage: storage.SyncRulesBucketStorage;
8793
private readonly connections: MSSQLConnectionManager;
8894
private readonly abortSignal: AbortSignal;

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ function createTableId(schema: string, tableName: string): string {
6161
}
6262

6363
export class BinLogStream {
64-
private readonly syncRules: sync_rules.SqlSyncRules;
64+
private readonly syncRules: sync_rules.HydratedSyncRules;
6565
private readonly groupId: number;
6666

6767
private readonly storage: storage.SyncRulesBucketStorage;

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
BucketChecksum,
55
CHECKPOINT_INVALIDATE_ALL,
66
CheckpointChanges,
7-
CompactOptions,
87
GetCheckpointChangesOptions,
98
InternalOpId,
109
internalToExternalOpId,
@@ -60,7 +59,9 @@ export class PostgresSyncRulesStorage
6059
protected writeCheckpointAPI: PostgresWriteCheckpointAPI;
6160

6261
// TODO we might be able to share this in an abstract class
63-
private parsedSyncRulesCache: { parsed: sync_rules.SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined;
62+
private parsedSyncRulesCache:
63+
| { parsed: sync_rules.HydratedSyncRules; options: storage.ParseSyncRulesOptions }
64+
| undefined;
6465
private _checksumCache: storage.ChecksumCache | undefined;
6566

6667
constructor(protected options: PostgresSyncRulesStorageOptions) {
@@ -96,14 +97,14 @@ export class PostgresSyncRulesStorage
9697
}
9798

9899
// TODO we might be able to share this in an abstract class
99-
getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.SqlSyncRules {
100+
getParsedSyncRules(options: storage.ParseSyncRulesOptions): sync_rules.HydratedSyncRules {
100101
const { parsed, options: cachedOptions } = this.parsedSyncRulesCache ?? {};
101102
/**
102103
* Check if the cached sync rules, if present, had the same options.
103104
* Parse sync rules if the options are different or if there is no cached value.
104105
*/
105106
if (!parsed || options.defaultSchema != cachedOptions?.defaultSchema) {
106-
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).sync_rules, options };
107+
this.parsedSyncRulesCache = { parsed: this.sync_rules.parsed(options).hydratedSyncRules(), options };
107108
}
108109

109110
return this.parsedSyncRulesCache!.parsed;
@@ -349,7 +350,7 @@ export class PostgresSyncRulesStorage
349350
const batch = new PostgresBucketBatch({
350351
logger: options.logger ?? framework.logger,
351352
db: this.db,
352-
sync_rules: this.sync_rules.parsed(options).sync_rules,
353+
sync_rules: this.sync_rules.parsed(options).hydratedSyncRules(),
353354
group_id: this.group_id,
354355
slot_name: this.slot_name,
355356
last_checkpoint_lsn: checkpoint_lsn,
@@ -374,7 +375,7 @@ export class PostgresSyncRulesStorage
374375

375376
async getParameterSets(
376377
checkpoint: ReplicationCheckpoint,
377-
lookups: sync_rules.ParameterLookup[]
378+
lookups: sync_rules.ScopedParameterLookup[]
378379
): Promise<sync_rules.SqliteJsonRow[]> {
379380
const rows = await this.db.sql`
380381
SELECT DISTINCT
@@ -879,7 +880,7 @@ class PostgresReplicationCheckpoint implements storage.ReplicationCheckpoint {
879880
public readonly lsn: string | null
880881
) {}
881882

882-
getParameterSets(lookups: sync_rules.ParameterLookup[]): Promise<sync_rules.SqliteJsonRow[]> {
883+
getParameterSets(lookups: sync_rules.ScopedParameterLookup[]): Promise<sync_rules.SqliteJsonRow[]> {
883884
return this.storage.getParameterSets(this, lookups);
884885
}
885886
}

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { PostgresPersistedBatch } from './PostgresPersistedBatch.js';
2424
export interface PostgresBucketBatchOptions {
2525
logger: Logger;
2626
db: lib_postgres.DatabaseClient;
27-
sync_rules: sync_rules.SqlSyncRules;
27+
sync_rules: sync_rules.HydratedSyncRules;
2828
group_id: number;
2929
slot_name: string;
3030
last_checkpoint_lsn: string | null;
@@ -72,7 +72,7 @@ export class PostgresBucketBatch
7272
protected persisted_op: InternalOpId | null;
7373

7474
protected write_checkpoint_batch: storage.CustomWriteCheckpointOptions[];
75-
protected readonly sync_rules: sync_rules.SqlSyncRules;
75+
protected readonly sync_rules: sync_rules.HydratedSyncRules;
7676
protected batch: OperationBatch | null;
7777
private lastWaitingLogThrottled = 0;
7878
private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined;
@@ -840,8 +840,7 @@ export class PostgresBucketBatch
840840
if (sourceTable.syncData) {
841841
const { results: evaluated, errors: syncErrors } = this.sync_rules.evaluateRowWithErrors({
842842
record: after,
843-
sourceTable,
844-
bucketIdTransformer: sync_rules.SqlSyncRules.versionedBucketIdTransformer(`${this.group_id}`)
843+
sourceTable
845844
});
846845

847846
for (const error of syncErrors) {

modules/module-postgres-storage/src/storage/sync-rules/PostgresPersistedSyncRulesContent.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { storage } from '@powersync/service-core';
44
import { SqlSyncRules } from '@powersync/service-sync-rules';
55

66
import { models } from '../../types/types.js';
7+
import { versionedHydrationState } from '@powersync/service-sync-rules/src/HydrationState.js';
78

89
export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncRulesContent {
910
public readonly slot_name: string;
@@ -35,7 +36,12 @@ export class PostgresPersistedSyncRulesContent implements storage.PersistedSyncR
3536
return {
3637
id: this.id,
3738
slot_name: this.slot_name,
38-
sync_rules: SqlSyncRules.fromYaml(this.sync_rules_content, options)
39+
sync_rules: SqlSyncRules.fromYaml(this.sync_rules_content, options),
40+
hydratedSyncRules() {
41+
return this.sync_rules.hydrate({
42+
hydrationState: versionedHydrationState(this.id)
43+
});
44+
}
3945
};
4046
}
4147

0 commit comments

Comments
 (0)