diff --git a/crates/core/src/checkpoint.rs b/crates/core/src/checkpoint.rs index 1e6527d..50130ec 100644 --- a/crates/core/src/checkpoint.rs +++ b/crates/core/src/checkpoint.rs @@ -5,7 +5,7 @@ use alloc::string::String; use alloc::vec::Vec; use core::ffi::c_int; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use serde_json as json; use sqlite::ResultCode; use sqlite_nostd as sqlite; @@ -13,9 +13,10 @@ use sqlite_nostd::{Connection, Context, Value}; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; -use crate::sync_types::Checkpoint; +use crate::sync::checkpoint::{validate_checkpoint, OwnedBucketChecksum}; +use crate::sync::line::Checkpoint; -#[derive(Serialize, Deserialize)] +#[derive(Serialize)] struct CheckpointResult { valid: bool, failed_buckets: Vec, @@ -26,53 +27,23 @@ fn powersync_validate_checkpoint_impl( args: &[*mut sqlite::value], ) -> Result { let data = args[0].text(); - - let _checkpoint: Checkpoint = serde_json::from_str(data)?; - + let checkpoint: Checkpoint = serde_json::from_str(data)?; let db = ctx.db_handle(); - - // language=SQLite - let statement = db.prepare_v2( - "WITH -bucket_list(bucket, checksum) AS ( - SELECT - json_extract(json_each.value, '$.bucket') as bucket, - json_extract(json_each.value, '$.checksum') as checksum - FROM json_each(json_extract(?1, '$.buckets')) -) -SELECT - bucket_list.bucket as bucket, - IFNULL(buckets.add_checksum, 0) as add_checksum, - IFNULL(buckets.op_checksum, 0) as oplog_checksum, - bucket_list.checksum as expected_checksum -FROM bucket_list - LEFT OUTER JOIN ps_buckets AS buckets ON - buckets.name = bucket_list.bucket -GROUP BY bucket_list.bucket", - )?; - - statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - let mut failures: Vec = alloc::vec![]; - - while statement.step()? == ResultCode::ROW { - let name = statement.column_text(0)?; - // checksums with column_int are wrapped to i32 by SQLite - let add_checksum = statement.column_int(1); - let oplog_checksum = statement.column_int(2); - let expected_checksum = statement.column_int(3); - - // wrapping add is like +, but safely overflows - let checksum = oplog_checksum.wrapping_add(add_checksum); - - if checksum != expected_checksum { - failures.push(String::from(name)); - } + let buckets: Vec = checkpoint + .buckets + .iter() + .map(OwnedBucketChecksum::from) + .collect(); + + let failures = validate_checkpoint(buckets.iter(), None, db)?; + let mut failed_buckets = Vec::::with_capacity(failures.len()); + for failure in failures { + failed_buckets.push(failure.bucket_name); } let result = CheckpointResult { - valid: failures.is_empty(), - failed_buckets: failures, + valid: failed_buckets.is_empty(), + failed_buckets: failed_buckets, }; Ok(json::to_string(&result)?) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 995e5f9..77a43d9 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -27,7 +27,6 @@ mod operations_vtab; mod schema; mod sync; mod sync_local; -mod sync_types; mod util; mod uuid; mod version; diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index e4c471c..74e7cd3 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,8 +1,12 @@ use alloc::format; use alloc::string::String; +use alloc::vec::Vec; use num_traits::Zero; +use serde::Deserialize; use crate::error::{PSResult, SQLiteError}; +use crate::sync::line::DataLine; +use crate::sync::operations::insert_bucket_operations; use crate::sync::Checksum; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; @@ -11,246 +15,15 @@ use crate::ext::SafeManagedStmt; // Run inside a transaction pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> { - // language=SQLite - let statement = db.prepare_v2( - "\ -SELECT - json_extract(e.value, '$.bucket') as bucket, - json_extract(e.value, '$.data') as data, - json_extract(e.value, '$.has_more') as has_more, - json_extract(e.value, '$.after') as after, - json_extract(e.value, '$.next_after') as next_after -FROM json_each(json_extract(?1, '$.buckets')) e", - )?; - statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - while statement.step()? == ResultCode::ROW { - let bucket = statement.column_text(0)?; - let data = statement.column_text(1)?; - // let _has_more = statement.column_int(2)? != 0; - // let _after = statement.column_text(3)?; - // let _next_after = statement.column_text(4)?; - - insert_bucket_operations(db, bucket, data)?; - } - - Ok(()) -} - -pub fn insert_bucket_operations( - db: *mut sqlite::sqlite3, - bucket: &str, - data: &str, -) -> Result<(), SQLiteError> { - // Statement to insert new operations (only for PUT and REMOVE). - // language=SQLite - let iterate_statement = db.prepare_v2( - "\ -SELECT - json_extract(e.value, '$.op_id') as op_id, - json_extract(e.value, '$.op') as op, - json_extract(e.value, '$.object_type') as object_type, - json_extract(e.value, '$.object_id') as object_id, - json_extract(e.value, '$.checksum') as checksum, - json_extract(e.value, '$.data') as data, - json_extract(e.value, '$.subkey') as subkey -FROM json_each(?) e", - )?; - iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?; - - // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. - // We can consider splitting this into separate SELECT and INSERT statements. - // language=SQLite - let bucket_statement = db.prepare_v2( - "INSERT INTO ps_buckets(name) - VALUES(?) - ON CONFLICT DO UPDATE - SET last_applied_op = last_applied_op - RETURNING id, last_applied_op", - )?; - bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; - bucket_statement.step()?; - - let bucket_id = bucket_statement.column_int64(0); - - // This is an optimization for initial sync - we can avoid persisting individual REMOVE - // operations when last_applied_op = 0. - // We do still need to do the "supersede_statement" step for this case, since a REMOVE - // operation can supersede another PUT operation we're syncing at the same time. - let mut is_empty = bucket_statement.column_int64(1) == 0; - - // Statement to supersede (replace) operations with the same key. - // language=SQLite - let supersede_statement = db.prepare_v2( - "\ -DELETE FROM ps_oplog - WHERE unlikely(ps_oplog.bucket = ?1) - AND ps_oplog.key = ?2 -RETURNING op_id, hash", - )?; - supersede_statement.bind_int64(1, bucket_id)?; - - // language=SQLite - let insert_statement = db.prepare_v2("\ -INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; - insert_statement.bind_int64(1, bucket_id)?; - - let updated_row_statement = db.prepare_v2( - "\ -INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", - )?; - - bucket_statement.reset()?; - - let mut last_op: Option = None; - let mut add_checksum = Checksum::zero(); - let mut op_checksum = Checksum::zero(); - let mut added_ops: i32 = 0; - - while iterate_statement.step()? == ResultCode::ROW { - let op_id = iterate_statement.column_int64(0); - let op = iterate_statement.column_text(1)?; - let object_type = iterate_statement.column_text(2); - let object_id = iterate_statement.column_text(3); - let checksum = Checksum::from_i32(iterate_statement.column_int(4)); - let op_data = iterate_statement.column_text(5); - - last_op = Some(op_id); - added_ops += 1; - - if op == "PUT" || op == "REMOVE" { - let key: String; - if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) { - let subkey = iterate_statement.column_text(6).unwrap_or("null"); - key = format!("{}/{}/{}", &object_type, &object_id, subkey); - } else { - key = String::from(""); - } - - supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; - - let mut superseded = false; - - while supersede_statement.step()? == ResultCode::ROW { - // Superseded (deleted) a previous operation, add the checksum - let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1)); - add_checksum += supersede_checksum; - op_checksum -= supersede_checksum; - - // Superseded an operation, only skip if the bucket was empty - // Previously this checked "superseded_op <= last_applied_op". - // However, that would not account for a case where a previous - // PUT operation superseded the original PUT operation in this - // same batch, in which case superseded_op is not accurate for this. - if !is_empty { - superseded = true; - } - } - supersede_statement.reset()?; - - if op == "REMOVE" { - let should_skip_remove = !superseded; - - add_checksum += checksum; - - if !should_skip_remove { - if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { - updated_row_statement.bind_text( - 1, - object_type, - sqlite::Destructor::STATIC, - )?; - updated_row_statement.bind_text( - 2, - object_id, - sqlite::Destructor::STATIC, - )?; - updated_row_statement.exec()?; - } - } - - continue; - } - - insert_statement.bind_int64(2, op_id)?; - if key != "" { - insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?; - } else { - insert_statement.bind_null(3)?; - } - - if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) { - insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?; - } else { - insert_statement.bind_null(4)?; - insert_statement.bind_null(5)?; - } - if let Ok(data) = op_data { - insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?; - } else { - insert_statement.bind_null(6)?; - } - - insert_statement.bind_int(7, checksum.bitcast_i32())?; - insert_statement.exec()?; - - op_checksum += checksum; - } else if op == "MOVE" { - add_checksum += checksum; - } else if op == "CLEAR" { - // Any remaining PUT operations should get an implicit REMOVE - // language=SQLite - let clear_statement1 = db - .prepare_v2( - "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) -SELECT row_type, row_id -FROM ps_oplog -WHERE bucket = ?1", - ) - .into_db_result(db)?; - clear_statement1.bind_int64(1, bucket_id)?; - clear_statement1.exec()?; - - let clear_statement2 = db - .prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1") - .into_db_result(db)?; - clear_statement2.bind_int64(1, bucket_id)?; - clear_statement2.exec()?; - - // And we need to re-apply all of those. - // We also replace the checksum with the checksum of the CLEAR op. - // language=SQLite - let clear_statement2 = db.prepare_v2( - "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2", - )?; - clear_statement2.bind_int64(2, bucket_id)?; - clear_statement2.bind_int(1, checksum.bitcast_i32())?; - clear_statement2.exec()?; - - add_checksum = Checksum::zero(); - is_empty = true; - op_checksum = Checksum::zero(); - } + #[derive(Deserialize)] + struct BucketBatch<'a> { + #[serde(borrow)] + buckets: Vec>, } - if let Some(last_op) = &last_op { - // language=SQLite - let statement = db.prepare_v2( - "UPDATE ps_buckets - SET last_op = ?2, - add_checksum = (add_checksum + ?3) & 0xffffffff, - op_checksum = (op_checksum + ?4) & 0xffffffff, - count_since_last = count_since_last + ?5 - WHERE id = ?1", - )?; - statement.bind_int64(1, bucket_id)?; - statement.bind_int64(2, *last_op)?; - statement.bind_int(3, add_checksum.bitcast_i32())?; - statement.bind_int(4, op_checksum.bitcast_i32())?; - statement.bind_int(5, added_ops)?; - - statement.exec()?; + let batch: BucketBatch = serde_json::from_str(data)?; + for line in &batch.buckets { + insert_bucket_operations(db, line)?; } Ok(()) diff --git a/crates/core/src/sync/bucket_priority.rs b/crates/core/src/sync/bucket_priority.rs index 454f1fe..bd685f7 100644 --- a/crates/core/src/sync/bucket_priority.rs +++ b/crates/core/src/sync/bucket_priority.rs @@ -4,7 +4,7 @@ use sqlite_nostd::ResultCode; use crate::error::SQLiteError; #[repr(transparent)] -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct BucketPriority { pub number: i32, } @@ -14,6 +14,8 @@ impl BucketPriority { self == BucketPriority::HIGHEST } + /// The priority to use when the sync service doesn't attach priorities in checkpoints. + pub const FALLBACK: BucketPriority = BucketPriority { number: 3 }; pub const HIGHEST: BucketPriority = BucketPriority { number: 0 }; /// A low priority used to represent fully-completed sync operations across all priorities. diff --git a/crates/core/src/sync/checkpoint.rs b/crates/core/src/sync/checkpoint.rs new file mode 100644 index 0000000..57c9b7c --- /dev/null +++ b/crates/core/src/sync/checkpoint.rs @@ -0,0 +1,91 @@ +use alloc::{string::String, vec::Vec}; +use num_traits::Zero; + +use crate::{ + error::SQLiteError, + sync::{line::BucketChecksum, BucketPriority, Checksum}, +}; +use sqlite_nostd::{self as sqlite, Connection, ResultCode}; + +/// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from +/// a sync line. +#[derive(Debug, Clone)] +pub struct OwnedBucketChecksum { + pub bucket: String, + pub checksum: Checksum, + pub priority: BucketPriority, + pub count: Option, +} + +impl OwnedBucketChecksum { + pub fn is_in_priority(&self, prio: Option) -> bool { + match prio { + None => true, + Some(prio) => self.priority >= prio, + } + } +} + +impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum { + fn from(value: &'_ BucketChecksum<'_>) -> Self { + Self { + bucket: value.bucket.clone().into_owned(), + checksum: value.checksum, + priority: value.priority.unwrap_or(BucketPriority::FALLBACK), + count: value.count, + } + } +} + +pub struct ChecksumMismatch { + pub bucket_name: String, + pub expected_checksum: Checksum, + pub actual_op_checksum: Checksum, + pub actual_add_checksum: Checksum, +} + +pub fn validate_checkpoint<'a>( + buckets: impl Iterator, + priority: Option, + db: *mut sqlite::sqlite3, +) -> Result, SQLiteError> { + // language=SQLite + let statement = db.prepare_v2( + " +SELECT + ps_buckets.add_checksum as add_checksum, + ps_buckets.op_checksum as oplog_checksum +FROM ps_buckets WHERE name = ?;", + )?; + + let mut failures: Vec = Vec::new(); + for bucket in buckets { + if bucket.is_in_priority(priority) { + statement.bind_text(1, &bucket.bucket, sqlite_nostd::Destructor::STATIC)?; + + let (add_checksum, oplog_checksum) = match statement.step()? { + ResultCode::ROW => { + let add_checksum = Checksum::from_i32(statement.column_int(0)); + let oplog_checksum = Checksum::from_i32(statement.column_int(1)); + (add_checksum, oplog_checksum) + } + _ => (Checksum::zero(), Checksum::zero()), + }; + + let actual = add_checksum + oplog_checksum; + + if actual != bucket.checksum { + failures.push(ChecksumMismatch { + bucket_name: bucket.bucket.clone(), + expected_checksum: bucket.checksum, + actual_add_checksum: add_checksum, + actual_op_checksum: oplog_checksum, + }); + } + + statement.reset()?; + } + } + + Ok(failures) +} diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs new file mode 100644 index 0000000..771b6b9 --- /dev/null +++ b/crates/core/src/sync/line.rs @@ -0,0 +1,97 @@ +use alloc::borrow::Cow; +use alloc::vec::Vec; +use serde::Deserialize; + +use super::BucketPriority; +use super::Checksum; + +use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; + +/// While we would like to always borrow strings for efficiency, that's not consistently possible. +/// With the JSON decoder, borrowing from input data is only possible when the string contains no +/// escape sequences (otherwise, the string is not a direct view of input data and we need an +/// internal copy). +type SyncLineStr<'a> = Cow<'a, str>; + +#[derive(Deserialize, Debug)] +pub struct Checkpoint<'a> { + #[serde(deserialize_with = "deserialize_string_to_i64")] + pub last_op_id: i64, + #[serde(default)] + #[serde(deserialize_with = "deserialize_optional_string_to_i64")] + pub write_checkpoint: Option, + #[serde(borrow)] + pub buckets: Vec>, +} + +#[derive(Deserialize, Debug)] +pub struct BucketChecksum<'a> { + #[serde(borrow)] + pub bucket: SyncLineStr<'a>, + pub checksum: Checksum, + #[serde(default)] + pub priority: Option, + #[serde(default)] + pub count: Option, + // #[serde(default)] + // #[serde(deserialize_with = "deserialize_optional_string_to_i64")] + // pub last_op_id: Option, +} + +#[derive(Deserialize, Debug)] +pub struct DataLine<'a> { + #[serde(borrow)] + pub bucket: SyncLineStr<'a>, + pub data: Vec>, + // #[serde(default)] + // pub has_more: bool, + // #[serde(default, borrow)] + // pub after: Option>, + // #[serde(default, borrow)] + // pub next_after: Option>, +} + +#[derive(Deserialize, Debug)] +pub struct OplogEntry<'a> { + pub checksum: Checksum, + #[serde(deserialize_with = "deserialize_string_to_i64")] + pub op_id: i64, + pub op: OpType, + #[serde(default, borrow)] + pub object_id: Option>, + #[serde(default, borrow)] + pub object_type: Option>, + #[serde(default, borrow)] + pub subkey: Option>, + #[serde(default, borrow)] + pub data: Option>, +} + +#[derive(Debug)] +pub enum OplogData<'a> { + /// A string encoding a well-formed JSON object representing values of the row. + Json { data: Cow<'a, str> }, + // BsonDocument { data: Cow<'a, [u8]> }, +} + +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum OpType { + CLEAR, + MOVE, + PUT, + REMOVE, +} + +impl<'a, 'de: 'a> Deserialize<'de> for OplogData<'a> { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + // For now, we will always get oplog data as a string. In the future, there may be the + // option of the sync service sending BSON-encoded data lines too, but that's not relevant + // for now. + return Ok(OplogData::Json { + data: Deserialize::deserialize(deserializer)?, + }); + } +} diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index b2fbd19..afe3393 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -1,5 +1,8 @@ mod bucket_priority; +pub mod checkpoint; mod checksum; +pub mod line; +pub mod operations; pub use bucket_priority::BucketPriority; pub use checksum::Checksum; diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs new file mode 100644 index 0000000..7e7499a --- /dev/null +++ b/crates/core/src/sync/operations.rs @@ -0,0 +1,218 @@ +use alloc::format; +use alloc::string::String; +use num_traits::Zero; +use sqlite_nostd::Connection; +use sqlite_nostd::{self as sqlite, ResultCode}; + +use crate::{ + error::{PSResult, SQLiteError}, + ext::SafeManagedStmt, +}; + +use super::line::OplogData; +use super::line::{DataLine, OpType}; +use super::Checksum; + +pub fn insert_bucket_operations( + db: *mut sqlite::sqlite3, + data: &DataLine, +) -> Result<(), SQLiteError> { + // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. + // We can consider splitting this into separate SELECT and INSERT statements. + // language=SQLite + let bucket_statement = db.prepare_v2( + "INSERT INTO ps_buckets(name) + VALUES(?) + ON CONFLICT DO UPDATE + SET last_applied_op = last_applied_op + RETURNING id, last_applied_op", + )?; + bucket_statement.bind_text(1, &data.bucket, sqlite::Destructor::STATIC)?; + bucket_statement.step()?; + + let bucket_id = bucket_statement.column_int64(0); + + // This is an optimization for initial sync - we can avoid persisting individual REMOVE + // operations when last_applied_op = 0. + // We do still need to do the "supersede_statement" step for this case, since a REMOVE + // operation can supersede another PUT operation we're syncing at the same time. + let mut is_empty = bucket_statement.column_int64(1) == 0; + + // Statement to supersede (replace) operations with the same key. + // language=SQLite + let supersede_statement = db.prepare_v2( + "\ +DELETE FROM ps_oplog + WHERE unlikely(ps_oplog.bucket = ?1) + AND ps_oplog.key = ?2 +RETURNING op_id, hash", + )?; + supersede_statement.bind_int64(1, bucket_id)?; + + // language=SQLite + let insert_statement = db.prepare_v2("\ +INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?; + insert_statement.bind_int64(1, bucket_id)?; + + let updated_row_statement = db.prepare_v2( + "\ +INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", + )?; + + bucket_statement.reset()?; + + let mut last_op: Option = None; + let mut add_checksum = Checksum::zero(); + let mut op_checksum = Checksum::zero(); + let mut added_ops: i32 = 0; + + for line in &data.data { + let op_id = line.op_id; + let op = line.op; + let object_type = line.object_type.as_ref(); + let object_id = line.object_id.as_ref(); + let checksum = line.checksum; + let op_data = line.data.as_ref(); + + last_op = Some(op_id); + added_ops += 1; + + if op == OpType::PUT || op == OpType::REMOVE { + let key: String; + if let (Some(object_type), Some(object_id)) = (object_type, object_id) { + let subkey = line.subkey.as_ref().map(|i| &**i).unwrap_or("null"); + key = format!("{}/{}/{}", &object_type, &object_id, subkey); + } else { + key = String::from(""); + } + + supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?; + + let mut superseded = false; + + while supersede_statement.step()? == ResultCode::ROW { + // Superseded (deleted) a previous operation, add the checksum + let supersede_checksum = Checksum::from_i32(supersede_statement.column_int(1)); + add_checksum += supersede_checksum; + op_checksum -= supersede_checksum; + + // Superseded an operation, only skip if the bucket was empty + // Previously this checked "superseded_op <= last_applied_op". + // However, that would not account for a case where a previous + // PUT operation superseded the original PUT operation in this + // same batch, in which case superseded_op is not accurate for this. + if !is_empty { + superseded = true; + } + } + supersede_statement.reset()?; + + if op == OpType::REMOVE { + let should_skip_remove = !superseded; + + add_checksum += checksum; + + if !should_skip_remove { + if let (Some(object_type), Some(object_id)) = (object_type, object_id) { + updated_row_statement.bind_text( + 1, + object_type, + sqlite::Destructor::STATIC, + )?; + updated_row_statement.bind_text( + 2, + object_id, + sqlite::Destructor::STATIC, + )?; + updated_row_statement.exec()?; + } + } + + continue; + } + + insert_statement.bind_int64(2, op_id)?; + if key != "" { + insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?; + } else { + insert_statement.bind_null(3)?; + } + + if let (Some(object_type), Some(object_id)) = (object_type, object_id) { + insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?; + } else { + insert_statement.bind_null(4)?; + insert_statement.bind_null(5)?; + } + if let Some(data) = op_data { + let OplogData::Json { data } = data; + + insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?; + } else { + insert_statement.bind_null(6)?; + } + + insert_statement.bind_int(7, checksum.bitcast_i32())?; + insert_statement.exec()?; + + op_checksum += checksum; + } else if op == OpType::MOVE { + add_checksum += checksum; + } else if op == OpType::CLEAR { + // Any remaining PUT operations should get an implicit REMOVE + // language=SQLite + let clear_statement1 = db + .prepare_v2( + "INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) +SELECT row_type, row_id +FROM ps_oplog +WHERE bucket = ?1", + ) + .into_db_result(db)?; + clear_statement1.bind_int64(1, bucket_id)?; + clear_statement1.exec()?; + + let clear_statement2 = db + .prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1") + .into_db_result(db)?; + clear_statement2.bind_int64(1, bucket_id)?; + clear_statement2.exec()?; + + // And we need to re-apply all of those. + // We also replace the checksum with the checksum of the CLEAR op. + // language=SQLite + let clear_statement2 = db.prepare_v2( + "UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2", + )?; + clear_statement2.bind_int64(2, bucket_id)?; + clear_statement2.bind_int(1, checksum.bitcast_i32())?; + clear_statement2.exec()?; + + add_checksum = Checksum::zero(); + is_empty = true; + op_checksum = Checksum::zero(); + } + } + + if let Some(last_op) = &last_op { + // language=SQLite + let statement = db.prepare_v2( + "UPDATE ps_buckets + SET last_op = ?2, + add_checksum = (add_checksum + ?3) & 0xffffffff, + op_checksum = (op_checksum + ?4) & 0xffffffff, + count_since_last = count_since_last + ?5 + WHERE id = ?1", + )?; + statement.bind_int64(1, bucket_id)?; + statement.bind_int64(2, *last_op)?; + statement.bind_int(3, add_checksum.bitcast_i32())?; + statement.bind_int(4, op_checksum.bitcast_i32())?; + statement.bind_int(5, added_ops)?; + + statement.exec()?; + } + + Ok(()) +} diff --git a/crates/core/src/sync_types.rs b/crates/core/src/sync_types.rs deleted file mode 100644 index 060dd25..0000000 --- a/crates/core/src/sync_types.rs +++ /dev/null @@ -1,22 +0,0 @@ -use alloc::string::String; -use alloc::vec::Vec; -use serde::{Deserialize, Serialize}; - -use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; - -#[derive(Serialize, Deserialize, Debug)] -pub struct Checkpoint { - #[serde(deserialize_with = "deserialize_string_to_i64")] - pub last_op_id: i64, - #[serde(default)] - #[serde(deserialize_with = "deserialize_optional_string_to_i64")] - pub write_checkpoint: Option, - pub buckets: Vec, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct BucketChecksum { - pub bucket: String, - pub checksum: i32, - pub priority: Option, -} diff --git a/dart/test/js_key_encoding_test.dart b/dart/test/js_key_encoding_test.dart index 22d3d7e..dd86e06 100644 --- a/dart/test/js_key_encoding_test.dart +++ b/dart/test/js_key_encoding_test.dart @@ -45,7 +45,7 @@ void main() { 'object_id': '1', 'subkey': json.encode('subkey'), 'checksum': 0, - 'data': {'col': 'a'}, + 'data': json.encode({'col': 'a'}), } ], } diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 84bb955..8eb832e 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -52,7 +52,7 @@ void main() { 'object_type': 'items', 'object_id': rowId, 'checksum': 0, - 'data': data, + 'data': json.encode(data), } ], }