Skip to content

Commit 3dde911

Browse files
authored
Merge pull request #82 from powersync-ja/serde-oplog-parsing
Use serde to parse data lines and checkpoints
2 parents 08f59fa + 202ebba commit 3dde911

File tree

11 files changed

+442
-310
lines changed

11 files changed

+442
-310
lines changed

crates/core/src/checkpoint.rs

Lines changed: 17 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@ use alloc::string::String;
55
use alloc::vec::Vec;
66
use core::ffi::c_int;
77

8-
use serde::{Deserialize, Serialize};
8+
use serde::Serialize;
99
use serde_json as json;
1010
use sqlite::ResultCode;
1111
use sqlite_nostd as sqlite;
1212
use sqlite_nostd::{Connection, Context, Value};
1313

1414
use crate::create_sqlite_text_fn;
1515
use crate::error::SQLiteError;
16-
use crate::sync_types::Checkpoint;
16+
use crate::sync::checkpoint::{validate_checkpoint, OwnedBucketChecksum};
17+
use crate::sync::line::Checkpoint;
1718

18-
#[derive(Serialize, Deserialize)]
19+
#[derive(Serialize)]
1920
struct CheckpointResult {
2021
valid: bool,
2122
failed_buckets: Vec<String>,
@@ -26,53 +27,23 @@ fn powersync_validate_checkpoint_impl(
2627
args: &[*mut sqlite::value],
2728
) -> Result<String, SQLiteError> {
2829
let data = args[0].text();
29-
30-
let _checkpoint: Checkpoint = serde_json::from_str(data)?;
31-
30+
let checkpoint: Checkpoint = serde_json::from_str(data)?;
3231
let db = ctx.db_handle();
33-
34-
// language=SQLite
35-
let statement = db.prepare_v2(
36-
"WITH
37-
bucket_list(bucket, checksum) AS (
38-
SELECT
39-
json_extract(json_each.value, '$.bucket') as bucket,
40-
json_extract(json_each.value, '$.checksum') as checksum
41-
FROM json_each(json_extract(?1, '$.buckets'))
42-
)
43-
SELECT
44-
bucket_list.bucket as bucket,
45-
IFNULL(buckets.add_checksum, 0) as add_checksum,
46-
IFNULL(buckets.op_checksum, 0) as oplog_checksum,
47-
bucket_list.checksum as expected_checksum
48-
FROM bucket_list
49-
LEFT OUTER JOIN ps_buckets AS buckets ON
50-
buckets.name = bucket_list.bucket
51-
GROUP BY bucket_list.bucket",
52-
)?;
53-
54-
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
55-
56-
let mut failures: Vec<String> = alloc::vec![];
57-
58-
while statement.step()? == ResultCode::ROW {
59-
let name = statement.column_text(0)?;
60-
// checksums with column_int are wrapped to i32 by SQLite
61-
let add_checksum = statement.column_int(1);
62-
let oplog_checksum = statement.column_int(2);
63-
let expected_checksum = statement.column_int(3);
64-
65-
// wrapping add is like +, but safely overflows
66-
let checksum = oplog_checksum.wrapping_add(add_checksum);
67-
68-
if checksum != expected_checksum {
69-
failures.push(String::from(name));
70-
}
32+
let buckets: Vec<OwnedBucketChecksum> = checkpoint
33+
.buckets
34+
.iter()
35+
.map(OwnedBucketChecksum::from)
36+
.collect();
37+
38+
let failures = validate_checkpoint(buckets.iter(), None, db)?;
39+
let mut failed_buckets = Vec::<String>::with_capacity(failures.len());
40+
for failure in failures {
41+
failed_buckets.push(failure.bucket_name);
7142
}
7243

7344
let result = CheckpointResult {
74-
valid: failures.is_empty(),
75-
failed_buckets: failures,
45+
valid: failed_buckets.is_empty(),
46+
failed_buckets: failed_buckets,
7647
};
7748

7849
Ok(json::to_string(&result)?)

crates/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ mod operations_vtab;
2727
mod schema;
2828
mod sync;
2929
mod sync_local;
30-
mod sync_types;
3130
mod util;
3231
mod uuid;
3332
mod version;

crates/core/src/operations.rs

Lines changed: 11 additions & 238 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
use alloc::format;
22
use alloc::string::String;
3+
use alloc::vec::Vec;
34
use num_traits::Zero;
5+
use serde::Deserialize;
46

57
use crate::error::{PSResult, SQLiteError};
8+
use crate::sync::line::DataLine;
9+
use crate::sync::operations::insert_bucket_operations;
610
use crate::sync::Checksum;
711
use sqlite_nostd as sqlite;
812
use sqlite_nostd::{Connection, ResultCode};
@@ -11,246 +15,15 @@ use crate::ext::SafeManagedStmt;
1115

1216
// Run inside a transaction
1317
pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
14-
// language=SQLite
15-
let statement = db.prepare_v2(
16-
"\
17-
SELECT
18-
json_extract(e.value, '$.bucket') as bucket,
19-
json_extract(e.value, '$.data') as data,
20-
json_extract(e.value, '$.has_more') as has_more,
21-
json_extract(e.value, '$.after') as after,
22-
json_extract(e.value, '$.next_after') as next_after
23-
FROM json_each(json_extract(?1, '$.buckets')) e",
24-
)?;
25-
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
26-
27-
while statement.step()? == ResultCode::ROW {
28-
let bucket = statement.column_text(0)?;
29-
let data = statement.column_text(1)?;
30-
// let _has_more = statement.column_int(2)? != 0;
31-
// let _after = statement.column_text(3)?;
32-
// let _next_after = statement.column_text(4)?;
33-
34-
insert_bucket_operations(db, bucket, data)?;
35-
}
36-
37-
Ok(())
38-
}
39-
40-
pub fn insert_bucket_operations(
41-
db: *mut sqlite::sqlite3,
42-
bucket: &str,
43-
data: &str,
44-
) -> Result<(), SQLiteError> {
45-
// Statement to insert new operations (only for PUT and REMOVE).
46-
// language=SQLite
47-
let iterate_statement = db.prepare_v2(
48-
"\
49-
SELECT
50-
json_extract(e.value, '$.op_id') as op_id,
51-
json_extract(e.value, '$.op') as op,
52-
json_extract(e.value, '$.object_type') as object_type,
53-
json_extract(e.value, '$.object_id') as object_id,
54-
json_extract(e.value, '$.checksum') as checksum,
55-
json_extract(e.value, '$.data') as data,
56-
json_extract(e.value, '$.subkey') as subkey
57-
FROM json_each(?) e",
58-
)?;
59-
iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
60-
61-
// We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
62-
// We can consider splitting this into separate SELECT and INSERT statements.
63-
// language=SQLite
64-
let bucket_statement = db.prepare_v2(
65-
"INSERT INTO ps_buckets(name)
66-
VALUES(?)
67-
ON CONFLICT DO UPDATE
68-
SET last_applied_op = last_applied_op
69-
RETURNING id, last_applied_op",
70-
)?;
71-
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
72-
bucket_statement.step()?;
73-
74-
let bucket_id = bucket_statement.column_int64(0);
75-
76-
// This is an optimization for initial sync - we can avoid persisting individual REMOVE
77-
// operations when last_applied_op = 0.
78-
// We do still need to do the "supersede_statement" step for this case, since a REMOVE
79-
// operation can supersede another PUT operation we're syncing at the same time.
80-
let mut is_empty = bucket_statement.column_int64(1) == 0;
81-
82-
// Statement to supersede (replace) operations with the same key.
83-
// language=SQLite
84-
let supersede_statement = db.prepare_v2(
85-
"\
86-
DELETE FROM ps_oplog
87-
WHERE unlikely(ps_oplog.bucket = ?1)
88-
AND ps_oplog.key = ?2
89-
RETURNING op_id, hash",
90-
)?;
91-
supersede_statement.bind_int64(1, bucket_id)?;
92-
93-
// language=SQLite
94-
let insert_statement = db.prepare_v2("\
95-
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
96-
insert_statement.bind_int64(1, bucket_id)?;
97-
98-
let updated_row_statement = db.prepare_v2(
99-
"\
100-
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
101-
)?;
102-
103-
bucket_statement.reset()?;
104-
105-
let mut last_op: Option<i64> = None;
106-
let mut add_checksum = Checksum::zero();
107-
let mut op_checksum = Checksum::zero();
108-
let mut added_ops: i32 = 0;
109-
110-
while iterate_statement.step()? == ResultCode::ROW {
111-
let op_id = iterate_statement.column_int64(0);
112-
let op = iterate_statement.column_text(1)?;
113-
let object_type = iterate_statement.column_text(2);
114-
let object_id = iterate_statement.column_text(3);
115-
let checksum = Checksum::from_i32(iterate_statement.column_int(4));
116-
let op_data = iterate_statement.column_text(5);
117-
118-
last_op = Some(op_id);
119-
added_ops += 1;
120-
121-
if op == "PUT" || op == "REMOVE" {
122-
let key: String;
123-
if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) {
124-
let subkey = iterate_statement.column_text(6).unwrap_or("null");
125-
key = format!("{}/{}/{}", &object_type, &object_id, subkey);
126-
} else {
127-
key = String::from("");
128-
}
129-
130-
supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?;
131-
132-
let mut superseded = false;
133-
134-
while supersede_statement.step()? == ResultCode::ROW {
135-
// Superseded (deleted) a previous operation, add the checksum
136-
let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1));
137-
add_checksum += supersede_checksum;
138-
op_checksum -= supersede_checksum;
139-
140-
// Superseded an operation, only skip if the bucket was empty
141-
// Previously this checked "superseded_op <= last_applied_op".
142-
// However, that would not account for a case where a previous
143-
// PUT operation superseded the original PUT operation in this
144-
// same batch, in which case superseded_op is not accurate for this.
145-
if !is_empty {
146-
superseded = true;
147-
}
148-
}
149-
supersede_statement.reset()?;
150-
151-
if op == "REMOVE" {
152-
let should_skip_remove = !superseded;
153-
154-
add_checksum += checksum;
155-
156-
if !should_skip_remove {
157-
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
158-
updated_row_statement.bind_text(
159-
1,
160-
object_type,
161-
sqlite::Destructor::STATIC,
162-
)?;
163-
updated_row_statement.bind_text(
164-
2,
165-
object_id,
166-
sqlite::Destructor::STATIC,
167-
)?;
168-
updated_row_statement.exec()?;
169-
}
170-
}
171-
172-
continue;
173-
}
174-
175-
insert_statement.bind_int64(2, op_id)?;
176-
if key != "" {
177-
insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?;
178-
} else {
179-
insert_statement.bind_null(3)?;
180-
}
181-
182-
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
183-
insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?;
184-
insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?;
185-
} else {
186-
insert_statement.bind_null(4)?;
187-
insert_statement.bind_null(5)?;
188-
}
189-
if let Ok(data) = op_data {
190-
insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?;
191-
} else {
192-
insert_statement.bind_null(6)?;
193-
}
194-
195-
insert_statement.bind_int(7, checksum.bitcast_i32())?;
196-
insert_statement.exec()?;
197-
198-
op_checksum += checksum;
199-
} else if op == "MOVE" {
200-
add_checksum += checksum;
201-
} else if op == "CLEAR" {
202-
// Any remaining PUT operations should get an implicit REMOVE
203-
// language=SQLite
204-
let clear_statement1 = db
205-
.prepare_v2(
206-
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
207-
SELECT row_type, row_id
208-
FROM ps_oplog
209-
WHERE bucket = ?1",
210-
)
211-
.into_db_result(db)?;
212-
clear_statement1.bind_int64(1, bucket_id)?;
213-
clear_statement1.exec()?;
214-
215-
let clear_statement2 = db
216-
.prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1")
217-
.into_db_result(db)?;
218-
clear_statement2.bind_int64(1, bucket_id)?;
219-
clear_statement2.exec()?;
220-
221-
// And we need to re-apply all of those.
222-
// We also replace the checksum with the checksum of the CLEAR op.
223-
// language=SQLite
224-
let clear_statement2 = db.prepare_v2(
225-
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2",
226-
)?;
227-
clear_statement2.bind_int64(2, bucket_id)?;
228-
clear_statement2.bind_int(1, checksum.bitcast_i32())?;
229-
clear_statement2.exec()?;
230-
231-
add_checksum = Checksum::zero();
232-
is_empty = true;
233-
op_checksum = Checksum::zero();
234-
}
18+
#[derive(Deserialize)]
19+
struct BucketBatch<'a> {
20+
#[serde(borrow)]
21+
buckets: Vec<DataLine<'a>>,
23522
}
23623

237-
if let Some(last_op) = &last_op {
238-
// language=SQLite
239-
let statement = db.prepare_v2(
240-
"UPDATE ps_buckets
241-
SET last_op = ?2,
242-
add_checksum = (add_checksum + ?3) & 0xffffffff,
243-
op_checksum = (op_checksum + ?4) & 0xffffffff,
244-
count_since_last = count_since_last + ?5
245-
WHERE id = ?1",
246-
)?;
247-
statement.bind_int64(1, bucket_id)?;
248-
statement.bind_int64(2, *last_op)?;
249-
statement.bind_int(3, add_checksum.bitcast_i32())?;
250-
statement.bind_int(4, op_checksum.bitcast_i32())?;
251-
statement.bind_int(5, added_ops)?;
252-
253-
statement.exec()?;
24+
let batch: BucketBatch = serde_json::from_str(data)?;
25+
for line in &batch.buckets {
26+
insert_bucket_operations(db, line)?;
25427
}
25528

25629
Ok(())

crates/core/src/sync/bucket_priority.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use sqlite_nostd::ResultCode;
44
use crate::error::SQLiteError;
55

66
#[repr(transparent)]
7-
#[derive(Clone, Copy, PartialEq, Eq)]
7+
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
88
pub struct BucketPriority {
99
pub number: i32,
1010
}
@@ -14,6 +14,8 @@ impl BucketPriority {
1414
self == BucketPriority::HIGHEST
1515
}
1616

17+
/// The priority to use when the sync service doesn't attach priorities in checkpoints.
18+
pub const FALLBACK: BucketPriority = BucketPriority { number: 3 };
1719
pub const HIGHEST: BucketPriority = BucketPriority { number: 0 };
1820

1921
/// A low priority used to represent fully-completed sync operations across all priorities.

0 commit comments

Comments
 (0)