Skip to content

Commit d130411

Browse files
committed
Merge remote-tracking branch 'origin/main' into granular-sync-rules
2 parents f8f432d + a4ead94 commit d130411

File tree

5 files changed

+129
-83
lines changed

5 files changed

+129
-83
lines changed

.changeset/afraid-weeks-matter.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
---
4+
5+
Fix decoding arrays of enums, fix decoding `box[]` columns during initial replication.

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@ import {
2020
} from '@powersync/service-core';
2121
import * as pgwire from '@powersync/service-jpgwire';
2222
import {
23+
applyRowContext,
2324
applyValueContext,
2425
CompatibilityContext,
2526
DatabaseInputRow,
2627
SqliteInputRow,
2728
SqliteInputValue,
2829
SqliteRow,
30+
SqliteValue,
2931
SqlSyncRules,
3032
HydratedSyncRules,
3133
TablePattern,
3234
ToastableSqliteRow,
33-
toSyncRulesRow
35+
toSyncRulesRow,
36+
toSyncRulesValue
3437
} from '@powersync/service-sync-rules';
3538

3639
import { ReplicationMetric } from '@powersync/service-types';
@@ -45,6 +48,7 @@ import {
4548
SimpleSnapshotQuery,
4649
SnapshotQuery
4750
} from './SnapshotQuery.js';
51+
import { PostgresTypeResolver } from '../types/resolver.js';
4852

4953
export interface WalStreamOptions {
5054
logger?: Logger;
@@ -463,11 +467,25 @@ WHERE oid = $1::regclass`,
463467
}
464468
}
465469

466-
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
467-
for (let row of results) {
468-
yield toSyncRulesRow(row);
469-
}
470+
static decodeRow(row: pgwire.PgRow, types: PostgresTypeResolver): SqliteInputRow {
471+
let result: SqliteInputRow = {};
472+
473+
row.raw.forEach((rawValue, i) => {
474+
const column = row.columns[i];
475+
let mappedValue: SqliteInputValue;
476+
477+
if (typeof rawValue == 'string') {
478+
mappedValue = toSyncRulesValue(types.registry.decodeDatabaseValue(rawValue, column.typeOid), false, true);
479+
} else {
480+
// Binary format, expose as-is.
481+
mappedValue = rawValue;
482+
}
483+
484+
result[column.name] = mappedValue;
485+
});
486+
return result;
470487
}
488+
471489
private async snapshotTableInTx(
472490
batch: storage.BucketStorageBatch,
473491
db: pgwire.PgConnection,
@@ -542,8 +560,6 @@ WHERE oid = $1::regclass`,
542560
}
543561
await q.initialize();
544562

545-
let columns: { i: number; name: string; typeOid: number }[] = [];
546-
let columnMap: Record<string, number> = {};
547563
let hasRemainingData = true;
548564
while (hasRemainingData) {
549565
// Fetch 10k at a time.
@@ -557,31 +573,16 @@ WHERE oid = $1::regclass`,
557573
// There are typically 100-200 rows per chunk.
558574
for await (let chunk of cursor) {
559575
if (chunk.tag == 'RowDescription') {
560-
// We get a RowDescription for each FETCH call, but they should
561-
// all be the same.
562-
let i = 0;
563-
columns = chunk.payload.map((c) => {
564-
return { i: i++, name: c.name, typeOid: c.typeOid };
565-
});
566-
for (let column of chunk.payload) {
567-
columnMap[column.name] = column.typeOid;
568-
}
569576
continue;
570577
}
571578

572-
const rows = chunk.rows.map((row) => {
573-
let q: DatabaseInputRow = {};
574-
for (let c of columns) {
575-
q[c.name] = pgwire.PgType.decode(row.raw[c.i], c.typeOid);
576-
}
577-
return q;
578-
});
579-
if (rows.length > 0) {
579+
if (chunk.rows.length > 0) {
580580
hasRemainingData = true;
581581
}
582582

583-
for (const inputRecord of WalStream.getQueryData(rows)) {
584-
const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord));
583+
for (const rawRow of chunk.rows) {
584+
const record = this.sync_rules.applyRowContext<never>(WalStream.decodeRow(rawRow, this.connections.types));
585+
585586
// This auto-flushes when the batch reaches its size limit
586587
await batch.save({
587588
tag: storage.SaveOperationTag.INSERT,
@@ -593,8 +594,8 @@ WHERE oid = $1::regclass`,
593594
});
594595
}
595596

596-
at += rows.length;
597-
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length);
597+
at += chunk.rows.length;
598+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(chunk.rows.length);
598599

599600
this.touch();
600601
}

modules/module-postgres/src/types/registry.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,7 @@ export class CustomTypeRegistry {
256256
case 'unknown':
257257
return true;
258258
case 'array':
259-
return (
260-
type.separatorCharCode == pgwire.CHAR_CODE_COMMA &&
261-
this.isParsedWithoutCustomTypesSupport(this.lookupType(type.innerId))
262-
);
259+
return type.separatorCharCode == pgwire.CHAR_CODE_COMMA && pgwire.ARRAY_TO_ELEM_OID.has(type.innerId);
263260
default:
264261
return false;
265262
}

modules/module-postgres/src/types/resolver.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ WHERE a.attnum > 0
153153
AND cn.nspname not in ('information_schema', 'pg_catalog', 'pg_toast')
154154
`;
155155

156-
const query = await this.pool.query({ statement: sql });
156+
const query = await this.pool.query(sql);
157157
let ids: number[] = [];
158158
for (const row of pgwire.pgwireRows(query)) {
159159
ids.push(Number(row.type_oid));
@@ -186,11 +186,6 @@ WHERE a.attnum > 0
186186
return toSyncRulesRow(record);
187187
}
188188

189-
constructRowRecord(columnMap: Record<string, number>, tupleRaw: Record<string, any>): SqliteInputRow {
190-
const record = this.decodeTupleForTable(columnMap, tupleRaw);
191-
return toSyncRulesRow(record);
192-
}
193-
194189
/**
195190
* We need a high level of control over how values are decoded, to make sure there is no loss
196191
* of precision in the process.

modules/module-postgres/test/src/pg_test.test.ts

Lines changed: 93 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -327,10 +327,7 @@ VALUES(10, ARRAY['null']::TEXT[]);
327327

328328
await insert(db);
329329

330-
const transformed = [
331-
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`)))
332-
];
333-
330+
const transformed = await queryAll(db, `SELECT * FROM test_data ORDER BY id`);
334331
checkResults(transformed);
335332
} finally {
336333
await db.end();
@@ -346,17 +343,11 @@ VALUES(10, ARRAY['null']::TEXT[]);
346343

347344
await insert(db);
348345

349-
const transformed = [
350-
...WalStream.getQueryData(
351-
pgwire.pgwireRows(
352-
await db.query({
353-
statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`,
354-
params: [{ type: 'bool', value: true }]
355-
})
356-
)
357-
)
358-
];
359-
346+
const raw = await db.query({
347+
statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`,
348+
params: [{ type: 'bool', value: true }]
349+
});
350+
const transformed = await interpretResults(db, raw);
360351
checkResults(transformed);
361352
} finally {
362353
await db.end();
@@ -370,9 +361,9 @@ VALUES(10, ARRAY['null']::TEXT[]);
370361

371362
await insertArrays(db);
372363

373-
const transformed = [
374-
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`)))
375-
].map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY));
364+
const transformed = (await queryAll(db, `SELECT * FROM test_data_arrays ORDER BY id`)).map((e) =>
365+
applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)
366+
);
376367

377368
checkResultArrays(transformed);
378369
} finally {
@@ -465,19 +456,15 @@ VALUES(10, ARRAY['null']::TEXT[]);
465456
});
466457

467458
test('date formats', async () => {
468-
const db = await connectPgWire();
459+
const db = await connectPgPool();
469460
try {
470461
await setupTable(db);
471462

472463
await db.query(`
473464
INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12', '2023-03-06 15:47:12.4', '2023-03-06 15:47+02');
474465
`);
475466

476-
const [row] = [
477-
...WalStream.getQueryData(
478-
pgwire.pgwireRows(await db.query(`SELECT time, timestamp, timestamptz FROM test_data`))
479-
)
480-
];
467+
const [row] = await queryAll(db, `SELECT time, timestamp, timestamptz FROM test_data`);
481468

482469
const oldFormat = applyRowContext(row, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
483470
expect(oldFormat).toMatchObject({
@@ -515,17 +502,18 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
515502
try {
516503
await clearTestDb(db);
517504
await db.query(`CREATE DOMAIN rating_value AS FLOAT CHECK (VALUE BETWEEN 0 AND 5);`);
518-
await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT);`);
519-
await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`);
520505
await db.query(`CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')`);
506+
await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT, mood mood);`);
507+
await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`);
521508

522509
await db.query(`CREATE TABLE test_custom(
523510
id serial primary key,
524511
rating rating_value,
525512
composite composite,
526513
nested_composite nested_composite,
527514
boxes box[],
528-
mood mood
515+
mood mood,
516+
moods mood[]
529517
);`);
530518

531519
const slotName = 'test_slot';
@@ -542,13 +530,14 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
542530

543531
await db.query(`
544532
INSERT INTO test_custom
545-
(rating, composite, nested_composite, boxes, mood)
533+
(rating, composite, nested_composite, boxes, mood, moods)
546534
VALUES (
547535
1,
548-
(ARRAY[2,3], 'bar'),
549-
(TRUE, (ARRAY[2,3], 'bar')),
536+
(ARRAY[2,3], 'bar', 'sad'::mood),
537+
(TRUE, (ARRAY[2,3], 'bar', 'sad'::mood)),
550538
ARRAY[box(point '(1,2)', point '(3,4)'), box(point '(5, 6)', point '(7,8)')],
551-
'happy'
539+
'happy',
540+
ARRAY['sad'::mood, 'happy'::mood]
552541
);
553542
`);
554543

@@ -562,27 +551,53 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
562551
});
563552

564553
const [transformed] = await getReplicationTx(db, replicationStream);
554+
const [queried] = await queryAll(db, `SELECT * FROM test_custom`);
565555
await pg.end();
566556

567-
const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
568-
expect(oldFormat).toMatchObject({
557+
const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
558+
expect(oldFormatStreamed).toMatchObject({
569559
rating: '1',
570-
composite: '("{2,3}",bar)',
571-
nested_composite: '(t,"(""{2,3}"",bar)")',
560+
composite: '("{2,3}",bar,sad)',
561+
nested_composite: '(t,"(""{2,3}"",bar,sad)")',
562+
boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]',
563+
mood: 'happy',
564+
moods: '{sad,happy}'
565+
});
566+
567+
const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
568+
expect(oldFormatQueried).toMatchObject({
569+
rating: 1,
570+
composite: '("{2,3}",bar,sad)',
571+
nested_composite: '(t,"(""{2,3}"",bar,sad)")',
572572
boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]',
573-
mood: 'happy'
573+
mood: 'happy',
574+
moods: '{sad,happy}'
574575
});
575576

576-
const newFormat = applyRowContext(
577+
const newFormatStreamed = applyRowContext(
577578
transformed,
578579
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
579580
);
580-
expect(newFormat).toMatchObject({
581+
expect(newFormatStreamed).toMatchObject({
581582
rating: 1,
582-
composite: '{"foo":[2.0,3.0],"bar":"bar"}',
583-
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar"}}',
583+
composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}',
584+
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}',
584585
boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']),
585-
mood: 'happy'
586+
mood: 'happy',
587+
moods: '["sad","happy"]'
588+
});
589+
590+
const newFormatQueried = applyRowContext(
591+
queried,
592+
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
593+
);
594+
expect(newFormatQueried).toMatchObject({
595+
rating: 1,
596+
composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}',
597+
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}',
598+
boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']),
599+
mood: 'happy',
600+
moods: '["sad","happy"]'
586601
});
587602
} finally {
588603
await db.end();
@@ -635,18 +650,36 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
635650
});
636651

637652
const [transformed] = await getReplicationTx(db, replicationStream);
653+
const [queried] = await queryAll(db, `SELECT ranges FROM test_custom`);
638654
await pg.end();
639655

640-
const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
641-
expect(oldFormat).toMatchObject({
656+
const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
657+
expect(oldFormatStreamed).toMatchObject({
658+
ranges: '{"{[2,4),[6,8)}"}'
659+
});
660+
const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
661+
expect(oldFormatQueried).toMatchObject({
642662
ranges: '{"{[2,4),[6,8)}"}'
643663
});
644664

645-
const newFormat = applyRowContext(
665+
const newFormatStreamed = applyRowContext(
646666
transformed,
647667
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
648668
);
649-
expect(newFormat).toMatchObject({
669+
expect(newFormatStreamed).toMatchObject({
670+
ranges: JSON.stringify([
671+
[
672+
{ lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 },
673+
{ lower: 6, upper: 8, lower_exclusive: 0, upper_exclusive: 1 }
674+
]
675+
])
676+
});
677+
678+
const newFormatQueried = applyRowContext(
679+
queried,
680+
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
681+
);
682+
expect(newFormatQueried).toMatchObject({
650683
ranges: JSON.stringify([
651684
[
652685
{ lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 },
@@ -679,3 +712,18 @@ async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.R
679712
}
680713
return transformed;
681714
}
715+
716+
/**
717+
* Simulates what WalStream does for initial snapshots.
718+
*/
719+
async function queryAll(db: pgwire.PgClient, sql: string) {
720+
const raw = await db.query(sql);
721+
return await interpretResults(db, raw);
722+
}
723+
724+
async function interpretResults(db: pgwire.PgClient, results: pgwire.PgResult) {
725+
const typeCache = new PostgresTypeResolver(db);
726+
await typeCache.fetchTypesForSchema();
727+
728+
return results.rows.map((row) => WalStream.decodeRow(row, typeCache));
729+
}

0 commit comments

Comments
 (0)