Skip to content

Commit 23ec406

Browse files
authored
Fix and test "has_more" for data batches (#255)
* Fix and test "has_more" for data batches. * Add changeset. * More naming and comment improvements. * More refactoring.
1 parent 64e51d1 commit 23ec406

File tree

14 files changed

+326
-104
lines changed

14 files changed

+326
-104
lines changed

.changeset/pretty-eagles-fail.md

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@powersync/service-module-postgres-storage': patch
3+
'@powersync/service-module-mongodb-storage': patch
4+
'@powersync/service-core-tests': patch
5+
'@powersync/service-core': patch
6+
'@powersync/service-image': patch
7+
---
8+
9+
Fix has_more and other data batch metadata

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

+41-28
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ export class MongoSyncBucketStorage
288288
checkpoint: utils.InternalOpId,
289289
dataBuckets: Map<string, InternalOpId>,
290290
options?: storage.BucketDataBatchOptions
291-
): AsyncIterable<storage.SyncBucketDataBatch> {
291+
): AsyncIterable<storage.SyncBucketDataChunk> {
292292
if (dataBuckets.size == 0) {
293293
return;
294294
}
@@ -315,8 +315,13 @@ export class MongoSyncBucketStorage
315315
});
316316
}
317317

318-
const limit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
319-
const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
318+
// Internal naming:
319+
// We do a query for one "batch", which may consist of multiple "chunks".
320+
// Each chunk is limited to single bucket, and is limited in length and size.
321+
// There are also overall batch length and size limits.
322+
323+
const batchLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
324+
const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
320325

321326
const cursor = this.db.bucket_data.find(
322327
{
@@ -325,10 +330,10 @@ export class MongoSyncBucketStorage
325330
{
326331
session: undefined,
327332
sort: { _id: 1 },
328-
limit: limit,
333+
limit: batchLimit,
329334
// Increase batch size above the default 101, so that we can fill an entire batch in
330335
// one go.
331-
batchSize: limit,
336+
batchSize: batchLimit,
332337
// Raw mode is returns an array of Buffer instead of parsed documents.
333338
// We use it so that:
334339
// 1. We can calculate the document size accurately without serializing again.
@@ -343,33 +348,38 @@ export class MongoSyncBucketStorage
343348
// to the lower of the batch count and size limits.
344349
// This is similar to using `singleBatch: true` in the find options, but allows
345350
// detecting "hasMore".
346-
let { data, hasMore } = await readSingleBatch(cursor);
347-
if (data.length == limit) {
351+
let { data, hasMore: batchHasMore } = await readSingleBatch(cursor);
352+
if (data.length == batchLimit) {
348353
// Limit reached - could have more data, despite the cursor being drained.
349-
hasMore = true;
354+
batchHasMore = true;
350355
}
351356

352-
let batchSize = 0;
353-
let currentBatch: utils.SyncBucketData | null = null;
357+
let chunkSizeBytes = 0;
358+
let currentChunk: utils.SyncBucketData | null = null;
354359
let targetOp: InternalOpId | null = null;
355360

356361
// Ordered by _id, meaning buckets are grouped together
357362
for (let rawData of data) {
358363
const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocument;
359364
const bucket = row._id.b;
360365

361-
if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) {
366+
if (currentChunk == null || currentChunk.bucket != bucket || chunkSizeBytes >= chunkSizeLimitBytes) {
367+
// We need to start a new chunk
362368
let start: ProtocolOpId | undefined = undefined;
363-
if (currentBatch != null) {
364-
if (currentBatch.bucket == bucket) {
365-
currentBatch.has_more = true;
369+
if (currentChunk != null) {
370+
// There is an existing chunk we need to yield
371+
if (currentChunk.bucket == bucket) {
372+
// Current and new chunk have the same bucket, so need has_more on the current one.
373+
// If currentChunk.bucket != bucket, then we reached the end of the previous bucket,
374+
// and has_more = false in that case.
375+
currentChunk.has_more = true;
376+
start = currentChunk.next_after;
366377
}
367378

368-
const yieldBatch = currentBatch;
369-
start = currentBatch.after;
370-
currentBatch = null;
371-
batchSize = 0;
372-
yield { batch: yieldBatch, targetOp: targetOp };
379+
const yieldChunk = currentChunk;
380+
currentChunk = null;
381+
chunkSizeBytes = 0;
382+
yield { chunkData: yieldChunk, targetOp: targetOp };
373383
targetOp = null;
374384
}
375385

@@ -380,10 +390,10 @@ export class MongoSyncBucketStorage
380390
}
381391
start = internalToExternalOpId(startOpId);
382392
}
383-
currentBatch = {
393+
currentChunk = {
384394
bucket,
385395
after: start,
386-
has_more: hasMore,
396+
has_more: false,
387397
data: [],
388398
next_after: start
389399
};
@@ -399,16 +409,19 @@ export class MongoSyncBucketStorage
399409
}
400410
}
401411

402-
currentBatch.data.push(entry);
403-
currentBatch.next_after = entry.op_id;
412+
currentChunk.data.push(entry);
413+
currentChunk.next_after = entry.op_id;
404414

405-
batchSize += rawData.byteLength;
415+
chunkSizeBytes += rawData.byteLength;
406416
}
407417

408-
if (currentBatch != null) {
409-
const yieldBatch = currentBatch;
410-
currentBatch = null;
411-
yield { batch: yieldBatch, targetOp: targetOp };
418+
if (currentChunk != null) {
419+
const yieldChunk = currentChunk;
420+
currentChunk = null;
421+
// This is the final chunk in the batch.
422+
// There may be more data if and only if the batch we retrieved isn't complete.
423+
yieldChunk.has_more = batchHasMore;
424+
yield { chunkData: yieldChunk, targetOp: targetOp };
412425
targetOp = null;
413426
}
414427
}

modules/module-mongodb-storage/test/src/storage_sync.test.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,11 @@ describe('sync - mongodb', () => {
8787
});
8888

8989
const batch2 = await test_utils.fromAsync(
90-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options)
90+
bucketStorage.getBucketDataBatch(
91+
checkpoint,
92+
new Map([['global[]', BigInt(batch1[0].chunkData.next_after)]]),
93+
options
94+
)
9195
);
9296
expect(test_utils.getBatchData(batch2)).toEqual([
9397
{ op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 }
@@ -99,7 +103,11 @@ describe('sync - mongodb', () => {
99103
});
100104

101105
const batch3 = await test_utils.fromAsync(
102-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options)
106+
bucketStorage.getBucketDataBatch(
107+
checkpoint,
108+
new Map([['global[]', BigInt(batch2[0].chunkData.next_after)]]),
109+
options
110+
)
103111
);
104112
expect(test_utils.getBatchData(batch3)).toEqual([
105113
{ op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 }

modules/module-mongodb/test/src/change_stream_utils.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ export class ChangeStreamTestContext {
138138
chunkLimitBytes: options?.chunkLimitBytes
139139
});
140140
const batches = await test_utils.fromAsync(batch);
141-
return batches[0]?.batch.data ?? [];
141+
return batches[0]?.chunkData.data ?? [];
142142
}
143143

144144
async getChecksums(buckets: string[], options?: { timeout?: number }) {

modules/module-mysql/test/src/BinlogStreamUtils.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ export class BinlogStreamTestContext {
157157
const map = new Map<string, InternalOpId>([[bucket, start]]);
158158
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
159159
const batches = await test_utils.fromAsync(batch);
160-
return batches[0]?.batch.data ?? [];
160+
return batches[0]?.chunkData.data ?? [];
161161
}
162162
}
163163

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

+47-34
Original file line numberDiff line numberDiff line change
@@ -387,24 +387,31 @@ export class PostgresSyncRulesStorage
387387
checkpoint: InternalOpId,
388388
dataBuckets: Map<string, InternalOpId>,
389389
options?: storage.BucketDataBatchOptions
390-
): AsyncIterable<storage.SyncBucketDataBatch> {
390+
): AsyncIterable<storage.SyncBucketDataChunk> {
391391
if (dataBuckets.size == 0) {
392392
return;
393393
}
394394

395+
// Internal naming:
396+
// We do a query for one "batch", which may be returend in multiple "chunks".
397+
// Each chunk is limited to single bucket, and is limited in length and size.
398+
// There are also overall batch length and size limits.
399+
// Each batch query batch are streamed in separate sets of rows, which may or may
400+
// not match up with chunks.
401+
395402
const end = checkpoint ?? BIGINT_MAX;
396403
const filters = Array.from(dataBuckets.entries()).map(([name, start]) => ({
397404
bucket_name: name,
398405
start: start
399406
}));
400407

401-
const rowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
402-
const sizeLimit = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
408+
const batchRowLimit = options?.limit ?? storage.DEFAULT_DOCUMENT_BATCH_LIMIT;
409+
const chunkSizeLimitBytes = options?.chunkLimitBytes ?? storage.DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES;
403410

404-
let batchSize = 0;
405-
let currentBatch: utils.SyncBucketData | null = null;
411+
let chunkSizeBytes = 0;
412+
let currentChunk: utils.SyncBucketData | null = null;
406413
let targetOp: InternalOpId | null = null;
407-
let rowCount = 0;
414+
let batchRowCount = 0;
408415

409416
/**
410417
* It is possible to perform this query with JSONB join. e.g.
@@ -458,7 +465,7 @@ export class PostgresSyncRulesStorage
458465
params: [
459466
{ type: 'int4', value: this.group_id },
460467
{ type: 'int8', value: end },
461-
{ type: 'int4', value: rowLimit + 1 },
468+
{ type: 'int4', value: batchRowLimit },
462469
...filters.flatMap((f) => [
463470
{ type: 'varchar' as const, value: f.bucket_name },
464471
{ type: 'int8' as const, value: f.start } satisfies StatementParam
@@ -469,28 +476,27 @@ export class PostgresSyncRulesStorage
469476

470477
for (const row of decodedRows) {
471478
const { bucket_name } = row;
472-
const rowSize = row.data ? row.data.length : 0;
473-
474-
if (
475-
currentBatch == null ||
476-
currentBatch.bucket != bucket_name ||
477-
batchSize >= sizeLimit ||
478-
(currentBatch?.data.length && batchSize + rowSize > sizeLimit) ||
479-
currentBatch.data.length >= rowLimit
480-
) {
479+
const rowSizeBytes = row.data ? row.data.length : 0;
480+
481+
const sizeExceeded =
482+
chunkSizeBytes >= chunkSizeLimitBytes ||
483+
(currentChunk?.data.length && chunkSizeBytes + rowSizeBytes > chunkSizeLimitBytes) ||
484+
(currentChunk?.data.length ?? 0) >= batchRowLimit;
485+
486+
if (currentChunk == null || currentChunk.bucket != bucket_name || sizeExceeded) {
481487
let start: string | undefined = undefined;
482-
if (currentBatch != null) {
483-
if (currentBatch.bucket == bucket_name) {
484-
currentBatch.has_more = true;
488+
if (currentChunk != null) {
489+
if (currentChunk.bucket == bucket_name) {
490+
currentChunk.has_more = true;
491+
start = currentChunk.next_after;
485492
}
486493

487-
const yieldBatch = currentBatch;
488-
start = currentBatch.after;
489-
currentBatch = null;
490-
batchSize = 0;
491-
yield { batch: yieldBatch, targetOp: targetOp };
494+
const yieldChunk = currentChunk;
495+
currentChunk = null;
496+
chunkSizeBytes = 0;
497+
yield { chunkData: yieldChunk, targetOp: targetOp };
492498
targetOp = null;
493-
if (rowCount >= rowLimit) {
499+
if (batchRowCount >= batchRowLimit) {
494500
// We've yielded all the requested rows
495501
break;
496502
}
@@ -503,11 +509,13 @@ export class PostgresSyncRulesStorage
503509
}
504510
start = internalToExternalOpId(startOpId);
505511
}
506-
currentBatch = {
512+
currentChunk = {
507513
bucket: bucket_name,
508514
after: start,
515+
// this is updated when we yield the batch
509516
has_more: false,
510517
data: [],
518+
// this is updated incrementally
511519
next_after: start
512520
};
513521
targetOp = null;
@@ -527,20 +535,25 @@ export class PostgresSyncRulesStorage
527535
}
528536
}
529537

530-
currentBatch.data.push(entry);
531-
currentBatch.next_after = entry.op_id;
538+
currentChunk.data.push(entry);
539+
currentChunk.next_after = entry.op_id;
532540

533-
batchSize += rowSize;
541+
chunkSizeBytes += rowSizeBytes;
534542

535543
// Manually track the total rows yielded
536-
rowCount++;
544+
batchRowCount++;
537545
}
538546
}
539547

540-
if (currentBatch != null) {
541-
const yieldBatch = currentBatch;
542-
currentBatch = null;
543-
yield { batch: yieldBatch, targetOp: targetOp };
548+
if (currentChunk != null) {
549+
const yieldChunk = currentChunk;
550+
currentChunk = null;
551+
// This is the final chunk in the batch.
552+
// There may be more data if and only if the batch we retrieved isn't complete.
553+
// If batchRowCount == batchRowLimit, we don't actually know whether there is more data,
554+
// but it is safe to return true in that case.
555+
yieldChunk.has_more = batchRowCount >= batchRowLimit;
556+
yield { chunkData: yieldChunk, targetOp: targetOp };
544557
targetOp = null;
545558
}
546559
}

modules/module-postgres-storage/test/src/storage.test.ts

+15-3
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ describe('Postgres Sync Bucket Storage', () => {
9393
});
9494

9595
const batch2 = await test_utils.fromAsync(
96-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options)
96+
bucketStorage.getBucketDataBatch(
97+
checkpoint,
98+
new Map([['global[]', BigInt(batch1[0].chunkData.next_after)]]),
99+
options
100+
)
97101
);
98102
expect(test_utils.getBatchData(batch2)).toEqual([
99103
{ op_id: '2', op: 'PUT', object_id: 'large1', checksum: 1178768505 }
@@ -105,7 +109,11 @@ describe('Postgres Sync Bucket Storage', () => {
105109
});
106110

107111
const batch3 = await test_utils.fromAsync(
108-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options)
112+
bucketStorage.getBucketDataBatch(
113+
checkpoint,
114+
new Map([['global[]', BigInt(batch2[0].chunkData.next_after)]]),
115+
options
116+
)
109117
);
110118
expect(test_utils.getBatchData(batch3)).toEqual([
111119
{ op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 }
@@ -117,7 +125,11 @@ describe('Postgres Sync Bucket Storage', () => {
117125
});
118126

119127
const batch4 = await test_utils.fromAsync(
120-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch3[0].batch.next_after)]]), options)
128+
bucketStorage.getBucketDataBatch(
129+
checkpoint,
130+
new Map([['global[]', BigInt(batch3[0].chunkData.next_after)]]),
131+
options
132+
)
121133
);
122134
expect(test_utils.getBatchData(batch4)).toEqual([
123135
{ op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 }

modules/module-postgres/test/src/wal_stream_utils.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,11 @@ export class WalStreamTestContext implements AsyncDisposable {
161161
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
162162

163163
const batches = await test_utils.fromAsync(batch);
164-
data = data.concat(batches[0]?.batch.data ?? []);
165-
if (batches.length == 0 || !batches[0]!.batch.has_more) {
164+
data = data.concat(batches[0]?.chunkData.data ?? []);
165+
if (batches.length == 0 || !batches[0]!.chunkData.has_more) {
166166
break;
167167
}
168-
map.set(bucket, BigInt(batches[0]!.batch.next_after));
168+
map.set(bucket, BigInt(batches[0]!.chunkData.next_after));
169169
}
170170
return data;
171171
}
@@ -182,6 +182,6 @@ export class WalStreamTestContext implements AsyncDisposable {
182182
const map = new Map<string, InternalOpId>([[bucket, start]]);
183183
const batch = this.storage!.getBucketDataBatch(checkpoint, map);
184184
const batches = await test_utils.fromAsync(batch);
185-
return batches[0]?.batch.data ?? [];
185+
return batches[0]?.chunkData.data ?? [];
186186
}
187187
}

0 commit comments

Comments
 (0)