Skip to content

Commit a4ead94

Browse files
authored
Postgres custom type edge cases (#428)
1 parent c2da2be commit a4ead94

File tree

5 files changed

+129
-83
lines changed

5 files changed

+129
-83
lines changed

.changeset/afraid-weeks-matter.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
---
4+
5+
Fix decoding arrays of enums, fix decoding `box[]` columns during initial replication.

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ import {
2020
} from '@powersync/service-core';
2121
import * as pgwire from '@powersync/service-jpgwire';
2222
import {
23+
applyRowContext,
2324
applyValueContext,
2425
CompatibilityContext,
2526
DatabaseInputRow,
2627
SqliteInputRow,
2728
SqliteInputValue,
2829
SqliteRow,
30+
SqliteValue,
2931
SqlSyncRules,
3032
TablePattern,
3133
ToastableSqliteRow,
32-
toSyncRulesRow
34+
toSyncRulesRow,
35+
toSyncRulesValue
3336
} from '@powersync/service-sync-rules';
3437

3538
import { ReplicationMetric } from '@powersync/service-types';
@@ -44,6 +47,7 @@ import {
4447
SimpleSnapshotQuery,
4548
SnapshotQuery
4649
} from './SnapshotQuery.js';
50+
import { PostgresTypeResolver } from '../types/resolver.js';
4751

4852
export interface WalStreamOptions {
4953
logger?: Logger;
@@ -462,11 +466,25 @@ WHERE oid = $1::regclass`,
462466
}
463467
}
464468

465-
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
466-
for (let row of results) {
467-
yield toSyncRulesRow(row);
468-
}
469+
static decodeRow(row: pgwire.PgRow, types: PostgresTypeResolver): SqliteInputRow {
470+
let result: SqliteInputRow = {};
471+
472+
row.raw.forEach((rawValue, i) => {
473+
const column = row.columns[i];
474+
let mappedValue: SqliteInputValue;
475+
476+
if (typeof rawValue == 'string') {
477+
mappedValue = toSyncRulesValue(types.registry.decodeDatabaseValue(rawValue, column.typeOid), false, true);
478+
} else {
479+
// Binary format, expose as-is.
480+
mappedValue = rawValue;
481+
}
482+
483+
result[column.name] = mappedValue;
484+
});
485+
return result;
469486
}
487+
470488
private async snapshotTableInTx(
471489
batch: storage.BucketStorageBatch,
472490
db: pgwire.PgConnection,
@@ -541,8 +559,6 @@ WHERE oid = $1::regclass`,
541559
}
542560
await q.initialize();
543561

544-
let columns: { i: number; name: string; typeOid: number }[] = [];
545-
let columnMap: Record<string, number> = {};
546562
let hasRemainingData = true;
547563
while (hasRemainingData) {
548564
// Fetch 10k at a time.
@@ -556,31 +572,16 @@ WHERE oid = $1::regclass`,
556572
// There are typically 100-200 rows per chunk.
557573
for await (let chunk of cursor) {
558574
if (chunk.tag == 'RowDescription') {
559-
// We get a RowDescription for each FETCH call, but they should
560-
// all be the same.
561-
let i = 0;
562-
columns = chunk.payload.map((c) => {
563-
return { i: i++, name: c.name, typeOid: c.typeOid };
564-
});
565-
for (let column of chunk.payload) {
566-
columnMap[column.name] = column.typeOid;
567-
}
568575
continue;
569576
}
570577

571-
const rows = chunk.rows.map((row) => {
572-
let q: DatabaseInputRow = {};
573-
for (let c of columns) {
574-
q[c.name] = pgwire.PgType.decode(row.raw[c.i], c.typeOid);
575-
}
576-
return q;
577-
});
578-
if (rows.length > 0) {
578+
if (chunk.rows.length > 0) {
579579
hasRemainingData = true;
580580
}
581581

582-
for (const inputRecord of WalStream.getQueryData(rows)) {
583-
const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord));
582+
for (const rawRow of chunk.rows) {
583+
const record = this.sync_rules.applyRowContext<never>(WalStream.decodeRow(rawRow, this.connections.types));
584+
584585
// This auto-flushes when the batch reaches its size limit
585586
await batch.save({
586587
tag: storage.SaveOperationTag.INSERT,
@@ -592,8 +593,8 @@ WHERE oid = $1::regclass`,
592593
});
593594
}
594595

595-
at += rows.length;
596-
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length);
596+
at += chunk.rows.length;
597+
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(chunk.rows.length);
597598

598599
this.touch();
599600
}

modules/module-postgres/src/types/registry.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,7 @@ export class CustomTypeRegistry {
256256
case 'unknown':
257257
return true;
258258
case 'array':
259-
return (
260-
type.separatorCharCode == pgwire.CHAR_CODE_COMMA &&
261-
this.isParsedWithoutCustomTypesSupport(this.lookupType(type.innerId))
262-
);
259+
return type.separatorCharCode == pgwire.CHAR_CODE_COMMA && pgwire.ARRAY_TO_ELEM_OID.has(type.innerId);
263260
default:
264261
return false;
265262
}

modules/module-postgres/src/types/resolver.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ WHERE a.attnum > 0
153153
AND cn.nspname not in ('information_schema', 'pg_catalog', 'pg_toast')
154154
`;
155155

156-
const query = await this.pool.query({ statement: sql });
156+
const query = await this.pool.query(sql);
157157
let ids: number[] = [];
158158
for (const row of pgwire.pgwireRows(query)) {
159159
ids.push(Number(row.type_oid));
@@ -186,11 +186,6 @@ WHERE a.attnum > 0
186186
return toSyncRulesRow(record);
187187
}
188188

189-
constructRowRecord(columnMap: Record<string, number>, tupleRaw: Record<string, any>): SqliteInputRow {
190-
const record = this.decodeTupleForTable(columnMap, tupleRaw);
191-
return toSyncRulesRow(record);
192-
}
193-
194189
/**
195190
* We need a high level of control over how values are decoded, to make sure there is no loss
196191
* of precision in the process.

modules/module-postgres/test/src/pg_test.test.ts

Lines changed: 93 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -327,10 +327,7 @@ VALUES(10, ARRAY['null']::TEXT[]);
327327

328328
await insert(db);
329329

330-
const transformed = [
331-
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`)))
332-
];
333-
330+
const transformed = await queryAll(db, `SELECT * FROM test_data ORDER BY id`);
334331
checkResults(transformed);
335332
} finally {
336333
await db.end();
@@ -346,17 +343,11 @@ VALUES(10, ARRAY['null']::TEXT[]);
346343

347344
await insert(db);
348345

349-
const transformed = [
350-
...WalStream.getQueryData(
351-
pgwire.pgwireRows(
352-
await db.query({
353-
statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`,
354-
params: [{ type: 'bool', value: true }]
355-
})
356-
)
357-
)
358-
];
359-
346+
const raw = await db.query({
347+
statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`,
348+
params: [{ type: 'bool', value: true }]
349+
});
350+
const transformed = await interpretResults(db, raw);
360351
checkResults(transformed);
361352
} finally {
362353
await db.end();
@@ -370,9 +361,9 @@ VALUES(10, ARRAY['null']::TEXT[]);
370361

371362
await insertArrays(db);
372363

373-
const transformed = [
374-
...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`)))
375-
].map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY));
364+
const transformed = (await queryAll(db, `SELECT * FROM test_data_arrays ORDER BY id`)).map((e) =>
365+
applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)
366+
);
376367

377368
checkResultArrays(transformed);
378369
} finally {
@@ -465,19 +456,15 @@ VALUES(10, ARRAY['null']::TEXT[]);
465456
});
466457

467458
test('date formats', async () => {
468-
const db = await connectPgWire();
459+
const db = await connectPgPool();
469460
try {
470461
await setupTable(db);
471462

472463
await db.query(`
473464
INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12', '2023-03-06 15:47:12.4', '2023-03-06 15:47+02');
474465
`);
475466

476-
const [row] = [
477-
...WalStream.getQueryData(
478-
pgwire.pgwireRows(await db.query(`SELECT time, timestamp, timestamptz FROM test_data`))
479-
)
480-
];
467+
const [row] = await queryAll(db, `SELECT time, timestamp, timestamptz FROM test_data`);
481468

482469
const oldFormat = applyRowContext(row, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
483470
expect(oldFormat).toMatchObject({
@@ -515,17 +502,18 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
515502
try {
516503
await clearTestDb(db);
517504
await db.query(`CREATE DOMAIN rating_value AS FLOAT CHECK (VALUE BETWEEN 0 AND 5);`);
518-
await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT);`);
519-
await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`);
520505
await db.query(`CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')`);
506+
await db.query(`CREATE TYPE composite AS (foo rating_value[], bar TEXT, mood mood);`);
507+
await db.query(`CREATE TYPE nested_composite AS (a BOOLEAN, b composite);`);
521508

522509
await db.query(`CREATE TABLE test_custom(
523510
id serial primary key,
524511
rating rating_value,
525512
composite composite,
526513
nested_composite nested_composite,
527514
boxes box[],
528-
mood mood
515+
mood mood,
516+
moods mood[]
529517
);`);
530518

531519
const slotName = 'test_slot';
@@ -542,13 +530,14 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
542530

543531
await db.query(`
544532
INSERT INTO test_custom
545-
(rating, composite, nested_composite, boxes, mood)
533+
(rating, composite, nested_composite, boxes, mood, moods)
546534
VALUES (
547535
1,
548-
(ARRAY[2,3], 'bar'),
549-
(TRUE, (ARRAY[2,3], 'bar')),
536+
(ARRAY[2,3], 'bar', 'sad'::mood),
537+
(TRUE, (ARRAY[2,3], 'bar', 'sad'::mood)),
550538
ARRAY[box(point '(1,2)', point '(3,4)'), box(point '(5, 6)', point '(7,8)')],
551-
'happy'
539+
'happy',
540+
ARRAY['sad'::mood, 'happy'::mood]
552541
);
553542
`);
554543

@@ -562,27 +551,53 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
562551
});
563552

564553
const [transformed] = await getReplicationTx(db, replicationStream);
554+
const [queried] = await queryAll(db, `SELECT * FROM test_custom`);
565555
await pg.end();
566556

567-
const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
568-
expect(oldFormat).toMatchObject({
557+
const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
558+
expect(oldFormatStreamed).toMatchObject({
569559
rating: '1',
570-
composite: '("{2,3}",bar)',
571-
nested_composite: '(t,"(""{2,3}"",bar)")',
560+
composite: '("{2,3}",bar,sad)',
561+
nested_composite: '(t,"(""{2,3}"",bar,sad)")',
562+
boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]',
563+
mood: 'happy',
564+
moods: '{sad,happy}'
565+
});
566+
567+
const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
568+
expect(oldFormatQueried).toMatchObject({
569+
rating: 1,
570+
composite: '("{2,3}",bar,sad)',
571+
nested_composite: '(t,"(""{2,3}"",bar,sad)")',
572572
boxes: '["(3","4)","(1","2);(7","8)","(5","6)"]',
573-
mood: 'happy'
573+
mood: 'happy',
574+
moods: '{sad,happy}'
574575
});
575576

576-
const newFormat = applyRowContext(
577+
const newFormatStreamed = applyRowContext(
577578
transformed,
578579
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
579580
);
580-
expect(newFormat).toMatchObject({
581+
expect(newFormatStreamed).toMatchObject({
581582
rating: 1,
582-
composite: '{"foo":[2.0,3.0],"bar":"bar"}',
583-
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar"}}',
583+
composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}',
584+
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}',
584585
boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']),
585-
mood: 'happy'
586+
mood: 'happy',
587+
moods: '["sad","happy"]'
588+
});
589+
590+
const newFormatQueried = applyRowContext(
591+
queried,
592+
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
593+
);
594+
expect(newFormatQueried).toMatchObject({
595+
rating: 1,
596+
composite: '{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}',
597+
nested_composite: '{"a":1,"b":{"foo":[2.0,3.0],"bar":"bar","mood":"sad"}}',
598+
boxes: JSON.stringify(['(3,4),(1,2)', '(7,8),(5,6)']),
599+
mood: 'happy',
600+
moods: '["sad","happy"]'
586601
});
587602
} finally {
588603
await db.end();
@@ -635,18 +650,36 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12'
635650
});
636651

637652
const [transformed] = await getReplicationTx(db, replicationStream);
653+
const [queried] = await queryAll(db, `SELECT ranges FROM test_custom`);
638654
await pg.end();
639655

640-
const oldFormat = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
641-
expect(oldFormat).toMatchObject({
656+
const oldFormatStreamed = applyRowContext(transformed, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
657+
expect(oldFormatStreamed).toMatchObject({
658+
ranges: '{"{[2,4),[6,8)}"}'
659+
});
660+
const oldFormatQueried = applyRowContext(queried, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
661+
expect(oldFormatQueried).toMatchObject({
642662
ranges: '{"{[2,4),[6,8)}"}'
643663
});
644664

645-
const newFormat = applyRowContext(
665+
const newFormatStreamed = applyRowContext(
646666
transformed,
647667
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
648668
);
649-
expect(newFormat).toMatchObject({
669+
expect(newFormatStreamed).toMatchObject({
670+
ranges: JSON.stringify([
671+
[
672+
{ lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 },
673+
{ lower: 6, upper: 8, lower_exclusive: 0, upper_exclusive: 1 }
674+
]
675+
])
676+
});
677+
678+
const newFormatQueried = applyRowContext(
679+
queried,
680+
new CompatibilityContext({ edition: CompatibilityEdition.SYNC_STREAMS })
681+
);
682+
expect(newFormatQueried).toMatchObject({
650683
ranges: JSON.stringify([
651684
[
652685
{ lower: 2, upper: 4, lower_exclusive: 0, upper_exclusive: 1 },
@@ -679,3 +712,18 @@ async function getReplicationTx(db: pgwire.PgClient, replicationStream: pgwire.R
679712
}
680713
return transformed;
681714
}
715+
716+
/**
717+
* Simulates what WalStream does for initial snapshots.
718+
*/
719+
async function queryAll(db: pgwire.PgClient, sql: string) {
720+
const raw = await db.query(sql);
721+
return await interpretResults(db, raw);
722+
}
723+
724+
async function interpretResults(db: pgwire.PgClient, results: pgwire.PgResult) {
725+
const typeCache = new PostgresTypeResolver(db);
726+
await typeCache.fetchTypesForSchema();
727+
728+
return results.rows.map((row) => WalStream.decodeRow(row, typeCache));
729+
}

0 commit comments

Comments
 (0)