diff --git a/Cargo.lock b/Cargo.lock index 6d69fcb..fdacb13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,7 +34,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.100", "which", @@ -113,6 +113,22 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "getrandom" version = "0.2.10" @@ -235,17 +251,26 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + [[package]] name = "powersync_core" version = "0.3.14" dependencies = [ "bytes", "const_format", + "futures-lite", "num-derive 0.3.3", "num-traits", + "rustc-hash 2.1.1", "serde", "serde_json", "sqlite_nostd", + "streaming-iterator", "uuid", ] @@ -329,6 +354,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "ryu" version = "1.0.15" @@ -403,6 +434,12 @@ dependencies = [ "sqlite3_capi", ] +[[package]] +name = "streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2231b7c3057d5e4ad0156fb3dc807d900806020c5ffa3ee6ff2c8c76fb8520" + [[package]] name = "syn" version = "1.0.109" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 419e484..3839d97 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -18,8 +18,11 @@ bytes = { version = "1.4", default-features = false } num-traits = { version = "0.2.15", default-features = false } num-derive = "0.3" serde_json = { version = "1.0", default-features = false, features = ["alloc"] } -serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] } +serde = { version = "1.0", default-features = false, features = ["alloc", "derive", "rc"] } const_format = "0.2.34" +futures-lite = { version = "2.6.0", default-features = false, features = ["alloc"] } +rustc-hash = { version = "2.1", default-features = false } +streaming-iterator = { version = "0.1.9", default-features = false, features = ["alloc"] } [dependencies.uuid] version = "1.4.1" diff --git a/crates/core/src/bson/de.rs b/crates/core/src/bson/de.rs new file mode 100644 index 0000000..d6d3e01 --- /dev/null +++ b/crates/core/src/bson/de.rs @@ -0,0 +1,322 @@ +use core::assert_matches::debug_assert_matches; + +use serde::{ + de::{ + self, DeserializeSeed, EnumAccess, IntoDeserializer, MapAccess, SeqAccess, VariantAccess, + Visitor, + }, + forward_to_deserialize_any, +}; + +use super::{ + error::ErrorKind, + parser::{ElementType, Parser}, + BsonError, +}; + +pub struct Deserializer<'de> { + parser: Parser<'de>, + position: DeserializerPosition, +} + +#[derive(Clone, Debug)] +enum DeserializerPosition { + /// The deserializer is outside of the initial document header. + OutsideOfDocument, + /// The deserializer expects the beginning of a key-value pair, or the end of the current + /// document. + BeforeTypeOrAtEndOfDocument, + /// The deserializer has read past the type of a key-value pair, but did not scan the name yet. + BeforeName { pending_type: ElementType }, + /// Read type and name of a key-value pair, position is before the value now. + BeforeValue { pending_type: ElementType }, +} + +impl<'de> Deserializer<'de> { + /// When used as a name hint to [de::Deserialize.deserialize_enum], the BSON deserializer will + /// report documents a byte array view instead of parsing them. + /// + /// This is used as an internal optimization when we want to keep a reference to a BSON sub- + /// document without actually inspecting the structure of that document. + pub const SPECIAL_CASE_EMBEDDED_DOCUMENT: &'static str = "\0SpecialCaseEmbedDoc"; + + fn outside_of_document(parser: Parser<'de>) -> Self { + Self { + parser, + position: DeserializerPosition::OutsideOfDocument, + } + } + + pub fn from_bytes(bytes: &'de [u8]) -> Self { + let parser = Parser::new(bytes); + Self::outside_of_document(parser) + } + + fn prepare_to_read(&mut self, allow_key: bool) -> Result, BsonError> { + match self.position.clone() { + DeserializerPosition::OutsideOfDocument => { + // The next value we're reading is a document + self.position = DeserializerPosition::BeforeValue { + pending_type: ElementType::Document, + }; + Ok(KeyOrValue::PendingValue(ElementType::Document)) + } + DeserializerPosition::BeforeValue { pending_type } => { + Ok(KeyOrValue::PendingValue(pending_type)) + } + DeserializerPosition::BeforeTypeOrAtEndOfDocument { .. } => { + Err(self.parser.error(ErrorKind::InvalidStateExpectedType)) + } + DeserializerPosition::BeforeName { pending_type } => { + if !allow_key { + return Err(self.parser.error(ErrorKind::InvalidStateExpectedName)); + } + + self.position = DeserializerPosition::BeforeValue { + pending_type: pending_type, + }; + Ok(KeyOrValue::Key(self.parser.read_cstr()?)) + } + } + } + + fn prepare_to_read_value(&mut self) -> Result { + let result = self.prepare_to_read(false)?; + match result { + KeyOrValue::Key(_) => unreachable!(), + KeyOrValue::PendingValue(element_type) => Ok(element_type), + } + } + + fn object_reader(&mut self) -> Result, BsonError> { + let parser = self.parser.document_scope()?; + let deserializer = Deserializer { + parser, + position: DeserializerPosition::BeforeTypeOrAtEndOfDocument, + }; + Ok(deserializer) + } + + fn advance_to_next_name(&mut self) -> Result, BsonError> { + if self.parser.end_document()? { + return Ok(None); + } + + self.position = DeserializerPosition::BeforeName { + pending_type: self.parser.read_element_type()?, + }; + Ok(Some(())) + } +} + +impl<'de, 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> { + type Error = BsonError; + + fn is_human_readable(&self) -> bool { + false + } + + fn deserialize_any(self, visitor: V) -> Result + where + V: Visitor<'de>, + { + let element_type = match self.prepare_to_read(true)? { + KeyOrValue::Key(name) => return visitor.visit_borrowed_str(name), + KeyOrValue::PendingValue(element_type) => element_type, + }; + + match element_type { + ElementType::Double => visitor.visit_f64(self.parser.read_double()?), + ElementType::String => visitor.visit_borrowed_str(self.parser.read_string()?), + ElementType::Document => { + let mut object = self.object_reader()?; + visitor.visit_map(&mut object) + } + ElementType::Array => { + let mut object = self.object_reader()?; + visitor.visit_seq(&mut object) + } + ElementType::Binary => { + let (_, bytes) = self.parser.read_binary()?; + visitor.visit_borrowed_bytes(bytes) + } + ElementType::ObjectId => visitor.visit_borrowed_bytes(self.parser.read_object_id()?), + ElementType::Boolean => visitor.visit_bool(self.parser.read_bool()?), + ElementType::DatetimeUtc | ElementType::Timestamp => { + visitor.visit_u64(self.parser.read_uint64()?) + } + ElementType::Null | ElementType::Undefined => visitor.visit_unit(), + ElementType::Int32 => visitor.visit_i32(self.parser.read_int32()?), + ElementType::Int64 => visitor.visit_i64(self.parser.read_int64()?), + } + } + + fn deserialize_enum( + self, + name: &'static str, + _variants: &'static [&'static str], + visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + let kind = self.prepare_to_read_value()?; + + // With this special name, the visitor indicates that it doesn't actually want to read an + // enum, it wants to read values regularly. Except that a document appearing at this + // position should not be parsed, it should be forwarded as an embedded byte array. + if name == Deserializer::SPECIAL_CASE_EMBEDDED_DOCUMENT { + return if matches!(kind, ElementType::Document) { + let object = self.parser.skip_document()?; + visitor.visit_borrowed_bytes(object) + } else { + self.deserialize_any(visitor) + }; + } + + match kind { + ElementType::String => { + visitor.visit_enum(self.parser.read_string()?.into_deserializer()) + } + ElementType::Document => { + let mut object = self.object_reader()?; + visitor.visit_enum(&mut object) + } + _ => Err(self.parser.error(ErrorKind::ExpectedEnum { actual: kind })), + } + } + + fn deserialize_option(self, visitor: V) -> Result + where + V: Visitor<'de>, + { + let kind = self.prepare_to_read_value()?; + match kind { + ElementType::Null => visitor.visit_none(), + _ => visitor.visit_some(self), + } + } + + fn deserialize_newtype_struct( + self, + _name: &'static str, + visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + self.prepare_to_read_value()?; + visitor.visit_newtype_struct(self) + } + + forward_to_deserialize_any! { + bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string + bytes byte_buf unit unit_struct seq tuple + tuple_struct map struct ignored_any identifier + } +} + +impl<'de> MapAccess<'de> for Deserializer<'de> { + type Error = BsonError; + + fn next_key_seed(&mut self, seed: K) -> Result, Self::Error> + where + K: DeserializeSeed<'de>, + { + if let None = self.advance_to_next_name()? { + return Ok(None); + } + Ok(Some(seed.deserialize(self)?)) + } + + fn next_value_seed(&mut self, seed: V) -> Result + where + V: DeserializeSeed<'de>, + { + seed.deserialize(self) + } +} + +impl<'de> SeqAccess<'de> for Deserializer<'de> { + type Error = BsonError; + + fn next_element_seed(&mut self, seed: T) -> Result, Self::Error> + where + T: DeserializeSeed<'de>, + { + // Array elements are encoded as an object like `{"0": value, "1": another}` + if let None = self.advance_to_next_name()? { + return Ok(None); + } + + // Skip name + debug_assert_matches!(self.position, DeserializerPosition::BeforeName { .. }); + self.prepare_to_read(true)?; + + // And deserialize value! + Ok(Some(seed.deserialize(self)?)) + } +} + +impl<'a, 'de> EnumAccess<'de> for &'a mut Deserializer<'de> { + type Error = BsonError; + type Variant = Self; + + fn variant_seed(self, seed: V) -> Result<(V::Value, Self::Variant), Self::Error> + where + V: DeserializeSeed<'de>, + { + if let None = self.advance_to_next_name()? { + return Err(self + .parser + .error(ErrorKind::UnexpectedEndOfDocumentForEnumVariant)); + } + + let value = seed.deserialize(&mut *self)?; + Ok((value, self)) + } +} + +impl<'a, 'de> VariantAccess<'de> for &'a mut Deserializer<'de> { + type Error = BsonError; + + fn unit_variant(self) -> Result<(), Self::Error> { + // Unit variants are encoded as simple string values, which are handled directly in + // Deserializer::deserialize_enum. + Err(self.parser.error(ErrorKind::ExpectedString)) + } + + fn newtype_variant_seed(self, seed: T) -> Result + where + T: DeserializeSeed<'de>, + { + // Newtype variants are represented as `{ NAME: VALUE }`, so we just have to deserialize the + // value here. + seed.deserialize(self) + } + + fn tuple_variant(self, _len: usize, visitor: V) -> Result + where + V: Visitor<'de>, + { + // Tuple variants are represented as `{ NAME: VALUES[] }`, so we deserialize the array here. + de::Deserializer::deserialize_seq(self, visitor) + } + + fn struct_variant( + self, + _fields: &'static [&'static str], + visitor: V, + ) -> Result + where + V: Visitor<'de>, + { + // Struct variants are represented as `{ NAME: { ... } }`, so we deserialize the struct. + de::Deserializer::deserialize_map(self, visitor) + } +} + +enum KeyOrValue<'de> { + Key(&'de str), + PendingValue(ElementType), +} diff --git a/crates/core/src/bson/error.rs b/crates/core/src/bson/error.rs new file mode 100644 index 0000000..dafeff3 --- /dev/null +++ b/crates/core/src/bson/error.rs @@ -0,0 +1,98 @@ +use core::{fmt::Display, str::Utf8Error}; + +use alloc::{ + boxed::Box, + string::{String, ToString}, +}; +use serde::de::{self, StdError}; + +use super::parser::ElementType; + +#[derive(Debug)] +pub struct BsonError { + /// Using a [Box] here keeps the size of this type as small, which makes results of this error + /// type smaller (at the cost of making errors more expensive to report, but that's fine because + /// we expect them to be rare). + err: Box, +} + +#[derive(Debug)] +struct BsonErrorImpl { + offset: Option, + kind: ErrorKind, +} + +#[derive(Debug)] +pub enum ErrorKind { + Custom(String), + UnknownElementType(i8), + UnterminatedCString, + InvalidCString(Utf8Error), + UnexpectedEoF, + InvalidEndOfDocument, + InvalidSize, + InvalidStateExpectedType, + InvalidStateExpectedName, + InvalidStateExpectedValue, + ExpectedEnum { actual: ElementType }, + ExpectedString, + UnexpectedEndOfDocumentForEnumVariant, +} + +impl BsonError { + pub fn new(offset: Option, kind: ErrorKind) -> Self { + Self { + err: Box::new(BsonErrorImpl { offset, kind }), + } + } +} + +impl Display for BsonError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + self.err.fmt(f) + } +} + +impl Display for BsonErrorImpl { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + if let Some(offset) = self.offset { + write!(f, "bson error, at {offset}: {}", self.kind) + } else { + write!(f, "bson error at unknown offset: {}", self.kind) + } + } +} + +impl Display for ErrorKind { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + ErrorKind::Custom(msg) => write!(f, "custom {msg}"), + ErrorKind::UnknownElementType(code) => write!(f, "unknown element code: {code}"), + ErrorKind::UnterminatedCString => write!(f, "unterminated cstring"), + ErrorKind::InvalidCString(e) => write!(f, "cstring with non-utf8 content: {e}"), + ErrorKind::UnexpectedEoF => write!(f, "unexpected end of file"), + ErrorKind::InvalidEndOfDocument => write!(f, "unexpected end of document"), + ErrorKind::InvalidSize => write!(f, "invalid document size"), + ErrorKind::InvalidStateExpectedType => write!(f, "internal state error, expected type"), + ErrorKind::InvalidStateExpectedName => write!(f, "internal state error, expected name"), + ErrorKind::InvalidStateExpectedValue => { + write!(f, "internal state error, expected value") + } + ErrorKind::ExpectedEnum { actual } => write!(f, "expected enum, got {}", *actual as u8), + ErrorKind::ExpectedString => write!(f, "expected a string value"), + ErrorKind::UnexpectedEndOfDocumentForEnumVariant => { + write!(f, "unexpected end of document for enum variant") + } + } + } +} + +impl de::Error for BsonError { + fn custom(msg: T) -> Self + where + T: Display, + { + BsonError::new(None, ErrorKind::Custom(msg.to_string())) + } +} +impl StdError for BsonError {} diff --git a/crates/core/src/bson/mod.rs b/crates/core/src/bson/mod.rs new file mode 100644 index 0000000..6cfaa3c --- /dev/null +++ b/crates/core/src/bson/mod.rs @@ -0,0 +1,57 @@ +pub use de::Deserializer; +pub use error::BsonError; +use serde::Deserialize; + +mod de; +mod error; +mod parser; + +/// Deserializes BSON [bytes] into a structure [T]. +pub fn from_bytes<'de, T: Deserialize<'de>>(bytes: &'de [u8]) -> Result { + let mut deserializer = Deserializer::from_bytes(bytes); + + T::deserialize(&mut deserializer) +} + +#[cfg(test)] +mod test { + use core::assert_matches::assert_matches; + + use crate::sync::line::{SyncLine, TokenExpiresIn}; + + use super::*; + + #[test] + fn test_hello_world() { + // {"hello": "world"} + let bson = b"\x16\x00\x00\x00\x02hello\x00\x06\x00\x00\x00world\x00\x00"; + + #[derive(Deserialize)] + struct Expected<'a> { + hello: &'a str, + } + + let expected: Expected = from_bytes(bson.as_slice()).expect("should deserialize"); + assert_eq!(expected.hello, "world"); + } + + #[test] + fn test_checkpoint_line() { + let bson = b"\x85\x00\x00\x00\x03checkpoint\x00t\x00\x00\x00\x02last_op_id\x00\x02\x00\x00\x001\x00\x0awrite_checkpoint\x00\x04buckets\x00B\x00\x00\x00\x030\x00:\x00\x00\x00\x02bucket\x00\x02\x00\x00\x00a\x00\x10checksum\x00\x00\x00\x00\x00\x10priority\x00\x03\x00\x00\x00\x10count\x00\x01\x00\x00\x00\x00\x00\x00\x00"; + + let expected: SyncLine = from_bytes(bson.as_slice()).expect("should deserialize"); + let SyncLine::Checkpoint(checkpoint) = expected else { + panic!("Expected to deserialize as checkpoint line") + }; + + assert_eq!(checkpoint.buckets.len(), 1); + } + + #[test] + fn test_newtype_tuple() { + let bson = b"\x1b\x00\x00\x00\x10token_expires_in\x00<\x00\x00\x00\x00"; + + let expected: SyncLine = from_bytes(bson.as_slice()).expect("should deserialize"); + assert_matches!(expected, SyncLine::KeepAlive(TokenExpiresIn(60))); + } +} diff --git a/crates/core/src/bson/parser.rs b/crates/core/src/bson/parser.rs new file mode 100644 index 0000000..1287ade --- /dev/null +++ b/crates/core/src/bson/parser.rs @@ -0,0 +1,224 @@ +use core::ffi::CStr; + +use super::{error::ErrorKind, BsonError}; +use num_traits::{FromBytes, Num}; + +pub struct Parser<'de> { + offset: usize, + remaining_input: &'de [u8], +} + +impl<'de> Parser<'de> { + pub fn new(source: &'de [u8]) -> Self { + Self { + offset: 0, + remaining_input: source, + } + } + + #[cold] + pub fn error(&self, kind: ErrorKind) -> BsonError { + BsonError::new(Some(self.offset), kind) + } + + /// Advances the position of the parser, panicking on bound errors. + fn advance(&mut self, by: usize) { + self.offset = self.offset.strict_add(by); + self.remaining_input = &self.remaining_input[by..]; + } + + /// Reads a sized buffer from the parser and advances the input accordingly. + /// + /// This returns an error if not enough bytes are left in the input. + fn advance_checked(&mut self, size: usize) -> Result<&'de [u8], BsonError> { + let (taken, rest) = self + .remaining_input + .split_at_checked(size) + .ok_or_else(|| self.error(ErrorKind::UnexpectedEoF))?; + + self.offset += size; + self.remaining_input = rest; + Ok(taken) + } + + fn advance_byte(&mut self) -> Result { + let value = *self + .remaining_input + .split_off_first() + .ok_or_else(|| self.error(ErrorKind::UnexpectedEoF))?; + + Ok(value) + } + + fn advance_bytes(&mut self) -> Result<&'de [u8; N], BsonError> { + let bytes = self.advance_checked(N)?; + Ok(bytes.try_into().expect("should have correct length")) + } + + pub fn read_cstr(&mut self) -> Result<&'de str, BsonError> { + let raw = CStr::from_bytes_until_nul(self.remaining_input) + .map_err(|_| self.error(ErrorKind::UnterminatedCString))?; + let str = raw + .to_str() + .map_err(|e| self.error(ErrorKind::InvalidCString(e)))?; + + self.advance(str.len() + 1); + Ok(str) + } + + fn read_number>( + &mut self, + ) -> Result { + let bytes = self.advance_bytes::()?; + Ok(T::from_le_bytes(&bytes)) + } + + pub fn read_int32(&mut self) -> Result { + self.read_number() + } + + fn read_length(&mut self) -> Result { + let raw = self.read_int32()?; + u32::try_from(raw) + .and_then(usize::try_from) + .map_err(|_| self.error(ErrorKind::InvalidSize)) + } + + pub fn read_int64(&mut self) -> Result { + self.read_number() + } + + pub fn read_uint64(&mut self) -> Result { + self.read_number() + } + + pub fn read_double(&mut self) -> Result { + self.read_number() + } + + pub fn read_bool(&mut self) -> Result { + let byte = self.advance_byte()?; + Ok(byte != 0) + } + + pub fn read_object_id(&mut self) -> Result<&'de [u8], BsonError> { + self.advance_checked(12) + } + + /// Reads a BSON string, `string ::= int32 (byte*) unsigned_byte(0)` + pub fn read_string(&mut self) -> Result<&'de str, BsonError> { + let length_including_null = self.read_length()?; + let bytes = self.advance_checked(length_including_null)?; + + str::from_utf8(&bytes[..length_including_null - 1]) + .map_err(|e| self.error(ErrorKind::InvalidCString(e))) + } + + pub fn read_binary(&mut self) -> Result<(BinarySubtype, &'de [u8]), BsonError> { + let length = self.read_length()?; + let subtype = self.advance_byte()?; + let binary = self.advance_checked(length)?; + + Ok((BinarySubtype(subtype), binary)) + } + + pub fn read_element_type(&mut self) -> Result { + let raw_type = self.advance_byte()? as i8; + Ok(match raw_type { + 1 => ElementType::Double, + 2 => ElementType::String, + 3 => ElementType::Document, + 4 => ElementType::Array, + 5 => ElementType::Binary, + 6 => ElementType::Undefined, + 7 => ElementType::ObjectId, + 8 => ElementType::Boolean, + 9 => ElementType::DatetimeUtc, + 10 => ElementType::Null, + 16 => ElementType::Int32, + 17 => ElementType::Timestamp, + 18 => ElementType::Int64, + _ => return Err(self.error(ErrorKind::UnknownElementType(raw_type))), + }) + } + + fn subreader(&mut self, len: usize) -> Result, BsonError> { + let current_offset = self.offset; + let for_sub_reader = self.advance_checked(len)?; + Ok(Parser { + offset: current_offset, + remaining_input: for_sub_reader, + }) + } + + /// Reads a document header and skips over the contents of the document. + /// + /// Returns a new [Parser] that can only read contents of the document. + pub fn document_scope(&mut self) -> Result, BsonError> { + let total_size = self.read_length()?; + if total_size < 5 { + return Err(self.error(ErrorKind::InvalidSize))?; + } + + self.subreader(total_size - 4) + } + + /// Skips over a document at the current offset, returning the bytes making up the document. + pub fn skip_document(&mut self) -> Result<&'de [u8], BsonError> { + let Some(peek_size) = self.remaining_input.get(0..4) else { + return Err(self.error(ErrorKind::UnexpectedEoF)); + }; + + let parsed_size = u32::try_from(i32::from_le_bytes( + peek_size.try_into().expect("should have correct length"), + )) + .and_then(usize::try_from) + .map_err(|_| self.error(ErrorKind::InvalidSize))?; + + if parsed_size < 5 || parsed_size >= self.remaining_input.len() { + return Err(self.error(ErrorKind::InvalidSize))?; + } + + Ok(self.subreader(parsed_size)?.remaining()) + } + + /// If only a single byte is left in the current scope, validate that it is a zero byte. + /// + /// Otherwise returns false as we haven't reached the end of a document. + pub fn end_document(&mut self) -> Result { + Ok(if self.remaining_input.len() == 1 { + let trailing_zero = self.advance_byte()?; + if trailing_zero != 0 { + return Err(self.error(ErrorKind::InvalidEndOfDocument)); + } + + true + } else { + false + }) + } + + pub fn remaining(&self) -> &'de [u8] { + self.remaining_input + } +} + +#[repr(transparent)] +pub struct BinarySubtype(pub u8); + +#[derive(Clone, Copy, Debug)] +pub enum ElementType { + Double = 1, + String = 2, + Document = 3, + Array = 4, + Binary = 5, + Undefined = 6, + ObjectId = 7, + Boolean = 8, + DatetimeUtc = 9, + Null = 10, + Int32 = 16, + Timestamp = 17, + Int64 = 18, +} diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index e73fdd8..a2cf8dd 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -5,6 +5,8 @@ use alloc::{ use core::error::Error; use sqlite_nostd::{context, sqlite3, Connection, Context, ResultCode}; +use crate::bson::BsonError; + #[derive(Debug)] pub struct SQLiteError(pub ResultCode, pub Option); @@ -72,3 +74,9 @@ impl From for SQLiteError { SQLiteError(ResultCode::INTERNAL, Some(format!("{}", value))) } } + +impl From for SQLiteError { + fn from(value: BsonError) -> Self { + SQLiteError(ResultCode::ERROR, Some(value.to_string())) + } +} diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index c8d9848..3b84791 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -19,17 +19,21 @@ fn powersync_client_id_impl( ) -> Result { let db = ctx.db_handle(); + client_id(db) +} + +pub fn client_id(db: *mut sqlite::sqlite3) -> Result { // language=SQLite let statement = db.prepare_v2("select value from ps_kv where key = 'client_id'")?; if statement.step()? == ResultCode::ROW { let client_id = statement.column_text(0)?; - return Ok(client_id.to_string()); + Ok(client_id.to_string()) } else { - return Err(SQLiteError( + Err(SQLiteError( ResultCode::ABORT, Some(format!("No client_id found in ps_kv")), - )); + )) } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index cd50ea3..76edd45 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -3,6 +3,7 @@ #![allow(internal_features)] #![feature(core_intrinsics)] #![feature(assert_matches)] +#![feature(strict_overflow_ops)] extern crate alloc; @@ -11,6 +12,7 @@ use core::ffi::{c_char, c_int}; use sqlite::ResultCode; use sqlite_nostd as sqlite; +mod bson; mod checkpoint; mod crud_vtab; mod diff; @@ -60,6 +62,7 @@ fn init_extension(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { crate::view_admin::register(db)?; crate::checkpoint::register(db)?; crate::kv::register(db)?; + sync::register(db)?; crate::schema::register(db)?; crate::operations_vtab::register(db)?; diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 74e7cd3..0c031b9 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,13 +1,9 @@ -use alloc::format; -use alloc::string::String; -use alloc::vec::Vec; -use num_traits::Zero; -use serde::Deserialize; - -use crate::error::{PSResult, SQLiteError}; +use crate::error::SQLiteError; use crate::sync::line::DataLine; use crate::sync::operations::insert_bucket_operations; -use crate::sync::Checksum; +use crate::sync::storage_adapter::StorageAdapter; +use alloc::vec::Vec; +use serde::Deserialize; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, ResultCode}; @@ -22,8 +18,10 @@ pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLi } let batch: BucketBatch = serde_json::from_str(data)?; + let adapter = StorageAdapter::new(db)?; + for line in &batch.buckets { - insert_bucket_operations(db, line)?; + insert_bucket_operations(&adapter, &line)?; } Ok(()) diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index 76a8c4a..96fb732 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -5,7 +5,7 @@ use alloc::vec::Vec; use serde::Deserialize; use sqlite::ResultCode; use sqlite_nostd as sqlite; -pub use table_info::{Column, DiffIncludeOld, Table, TableInfoFlags}; +pub use table_info::{DiffIncludeOld, Table, TableInfoFlags}; #[derive(Deserialize)] pub struct Schema { diff --git a/crates/core/src/sync/bucket_priority.rs b/crates/core/src/sync/bucket_priority.rs index bd685f7..a69f2a6 100644 --- a/crates/core/src/sync/bucket_priority.rs +++ b/crates/core/src/sync/bucket_priority.rs @@ -1,10 +1,10 @@ -use serde::{de::Visitor, Deserialize}; +use serde::{de::Visitor, Deserialize, Serialize}; use sqlite_nostd::ResultCode; use crate::error::SQLiteError; #[repr(transparent)] -#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct BucketPriority { pub number: i32, } @@ -45,7 +45,13 @@ impl Into for BucketPriority { impl PartialOrd for BucketPriority { fn partial_cmp(&self, other: &BucketPriority) -> Option { - Some(self.number.partial_cmp(&other.number)?.reverse()) + Some(self.cmp(other)) + } +} + +impl Ord for BucketPriority { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + self.number.cmp(&other.number).reverse() } } @@ -89,3 +95,12 @@ impl<'de> Deserialize<'de> for BucketPriority { deserializer.deserialize_i32(PriorityVisitor) } } + +impl Serialize for BucketPriority { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_i32(self.number) + } +} diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs new file mode 100644 index 0000000..beecf81 --- /dev/null +++ b/crates/core/src/sync/interface.rs @@ -0,0 +1,210 @@ +use core::cell::RefCell; +use core::ffi::{c_int, c_void}; + +use alloc::borrow::Cow; +use alloc::boxed::Box; +use alloc::rc::Rc; +use alloc::string::ToString; +use alloc::{string::String, vec::Vec}; +use serde::Serialize; +use sqlite::{ResultCode, Value}; +use sqlite_nostd::{self as sqlite, ColumnType}; +use sqlite_nostd::{Connection, Context}; + +use crate::error::SQLiteError; + +use super::streaming_sync::SyncClient; +use super::sync_status::DownloadSyncStatus; + +/// A request sent from a client SDK to the [SyncClient] with a `powersync_control` invocation. +pub enum SyncControlRequest<'a> { + /// The client requests to start a sync iteration. + /// + /// Earlier iterations are implicitly dropped when receiving this request. + StartSyncStream { + /// Bucket parameters to include in the request when opening a sync stream. + parameters: Option>, + }, + /// The client requests to stop the current sync iteration. + StopSyncStream, + /// The client is forwading a sync event to the core extension. + SyncEvent(SyncEvent<'a>), +} + +pub enum SyncEvent<'a> { + /// A synthetic event forwarded to the [SyncClient] after being started. + Initialize, + /// An event requesting the sync client to shut down. + TearDown, + /// Notifies the sync client that a token has been refreshed. + /// + /// In response, we'll stop the current iteration to begin another one with the new token. + DidRefreshToken, + /// Notifies the sync client that the current CRUD upload (for which the client SDK is + /// responsible) has finished. + /// + /// If pending CRUD entries have previously prevented a sync from completing, this even can be + /// used to try again. + UploadFinished, + /// Forward a text line (JSON) received from the sync service. + TextLine { data: &'a str }, + /// Forward a binary line (BSON) received from the sync service. + BinaryLine { data: &'a [u8] }, +} + +/// An instruction sent by the core extension to the SDK. +#[derive(Serialize)] +pub enum Instruction { + LogLine { + severity: LogSeverity, + line: Cow<'static, str>, + }, + /// Update the download status for the ongoing sync iteration. + UpdateSyncStatus { + status: Rc>, + }, + /// Connect to the sync service using the [StreamingSyncRequest] created by the core extension, + /// and then forward received lines via [SyncEvent::TextLine] and [SyncEvent::BinaryLine]. + EstablishSyncStream { request: StreamingSyncRequest }, + FetchCredentials { + /// Whether the credentials currently used have expired. + /// + /// If false, this is a pre-fetch. + did_expire: bool, + }, + // These are defined like this because deserializers in Kotlin can't support either an + // object or a literal value + /// Close the websocket / HTTP stream to the sync service. + CloseSyncStream {}, + /// Flush the file-system if it's non-durable (only applicable to the Dart SDK). + FlushFileSystem {}, + /// Notify that a sync has been completed, prompting client SDKs to clear earlier errors. + DidCompleteSync {}, +} + +#[derive(Serialize)] +pub enum LogSeverity { + DEBUG, + INFO, + WARNING, +} + +#[derive(Serialize)] +pub struct StreamingSyncRequest { + pub buckets: Vec, + pub include_checksum: bool, + pub raw_data: bool, + pub binary_data: bool, + pub client_id: String, + pub parameters: Option>, +} + +#[derive(Serialize)] +pub struct BucketRequest { + pub name: String, + pub after: String, +} + +/// Wrapper around a [SyncClient]. +/// +/// We allocate one instance of this per database (in [register]) - the [SyncClient] has an initial +/// empty state that doesn't consume any resources. +struct SqlController { + client: SyncClient, +} + +pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { + extern "C" fn control( + ctx: *mut sqlite::context, + argc: c_int, + argv: *mut *mut sqlite::value, + ) -> () { + let result = (|| -> Result<(), SQLiteError> { + let controller = unsafe { ctx.user_data().cast::().as_mut() } + .ok_or_else(|| SQLiteError::from(ResultCode::INTERNAL))?; + + let args = sqlite::args!(argc, argv); + let [op, payload] = args else { + return Err(ResultCode::MISUSE.into()); + }; + + if op.value_type() != ColumnType::Text { + return Err(SQLiteError( + ResultCode::MISUSE, + Some("First argument must be a string".to_string()), + )); + } + + let op = op.text(); + let event = match op { + "start" => SyncControlRequest::StartSyncStream { + parameters: if payload.value_type() == ColumnType::Text { + Some(serde_json::from_str(payload.text())?) + } else { + None + }, + }, + "stop" => SyncControlRequest::StopSyncStream, + "line_text" => SyncControlRequest::SyncEvent(SyncEvent::TextLine { + data: if payload.value_type() == ColumnType::Text { + payload.text() + } else { + return Err(SQLiteError( + ResultCode::MISUSE, + Some("Second argument must be a string".to_string()), + )); + }, + }), + "line_binary" => SyncControlRequest::SyncEvent(SyncEvent::BinaryLine { + data: if payload.value_type() == ColumnType::Blob { + payload.blob() + } else { + return Err(SQLiteError( + ResultCode::MISUSE, + Some("Second argument must be a byte array".to_string()), + )); + }, + }), + "refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken), + "completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished), + _ => { + return Err(SQLiteError( + ResultCode::MISUSE, + Some("Unknown operation".to_string()), + )) + } + }; + + let instructions = controller.client.push_event(event)?; + let formatted = serde_json::to_string(&instructions)?; + ctx.result_text_transient(&formatted); + + Ok(()) + })(); + + if let Err(e) = result { + e.apply_to_ctx("powersync_control", ctx); + } + } + + unsafe extern "C" fn destroy(ptr: *mut c_void) { + drop(Box::from_raw(ptr.cast::())); + } + + let controller = Box::new(SqlController { + client: SyncClient::new(db), + }); + + db.create_function_v2( + "powersync_control", + 2, + sqlite::UTF8 | sqlite::DIRECTONLY, + Some(Box::into_raw(controller).cast()), + Some(control), + None, + None, + Some(destroy), + )?; + + Ok(()) +} diff --git a/crates/core/src/sync/line.rs b/crates/core/src/sync/line.rs index 771b6b9..ab8c199 100644 --- a/crates/core/src/sync/line.rs +++ b/crates/core/src/sync/line.rs @@ -2,17 +2,37 @@ use alloc::borrow::Cow; use alloc::vec::Vec; use serde::Deserialize; -use super::BucketPriority; -use super::Checksum; - use crate::util::{deserialize_optional_string_to_i64, deserialize_string_to_i64}; +use super::bucket_priority::BucketPriority; +use super::Checksum; + /// While we would like to always borrow strings for efficiency, that's not consistently possible. /// With the JSON decoder, borrowing from input data is only possible when the string contains no /// escape sequences (otherwise, the string is not a direct view of input data and we need an /// internal copy). type SyncLineStr<'a> = Cow<'a, str>; +#[derive(Deserialize, Debug)] + +pub enum SyncLine<'a> { + #[serde(rename = "checkpoint", borrow)] + Checkpoint(Checkpoint<'a>), + #[serde(rename = "checkpoint_diff", borrow)] + CheckpointDiff(CheckpointDiff<'a>), + + #[serde(rename = "checkpoint_complete")] + CheckpointComplete(CheckpointComplete), + #[serde(rename = "partial_checkpoint_complete")] + CheckpointPartiallyComplete(CheckpointPartiallyComplete), + + #[serde(rename = "data", borrow)] + Data(DataLine<'a>), + + #[serde(rename = "token_expires_in")] + KeepAlive(TokenExpiresIn), +} + #[derive(Deserialize, Debug)] pub struct Checkpoint<'a> { #[serde(deserialize_with = "deserialize_string_to_i64")] @@ -24,6 +44,32 @@ pub struct Checkpoint<'a> { pub buckets: Vec>, } +#[derive(Deserialize, Debug)] +pub struct CheckpointDiff<'a> { + #[serde(deserialize_with = "deserialize_string_to_i64")] + pub last_op_id: i64, + #[serde(borrow)] + pub updated_buckets: Vec>, + #[serde(borrow)] + pub removed_buckets: Vec>, + #[serde(default)] + #[serde(deserialize_with = "deserialize_optional_string_to_i64")] + pub write_checkpoint: Option, +} + +#[derive(Deserialize, Debug)] +pub struct CheckpointComplete { + // #[serde(deserialize_with = "deserialize_string_to_i64")] + // pub last_op_id: i64, +} + +#[derive(Deserialize, Debug)] +pub struct CheckpointPartiallyComplete { + // #[serde(deserialize_with = "deserialize_string_to_i64")] + // pub last_op_id: i64, + pub priority: BucketPriority, +} + #[derive(Deserialize, Debug)] pub struct BucketChecksum<'a> { #[serde(borrow)] @@ -82,6 +128,20 @@ pub enum OpType { REMOVE, } +#[repr(transparent)] +#[derive(Deserialize, Debug, Clone, Copy)] +pub struct TokenExpiresIn(pub i32); + +impl TokenExpiresIn { + pub fn is_expired(self) -> bool { + self.0 <= 0 + } + + pub fn should_prefetch(self) -> bool { + !self.is_expired() && self.0 <= 30 + } +} + impl<'a, 'de: 'a> Deserialize<'de> for OplogData<'a> { fn deserialize(deserializer: D) -> Result where @@ -95,3 +155,154 @@ impl<'a, 'de: 'a> Deserialize<'de> for OplogData<'a> { }); } } + +#[cfg(test)] +mod tests { + use core::assert_matches::assert_matches; + + use super::*; + + fn deserialize(source: &str) -> SyncLine { + serde_json::from_str(source).expect("Should have deserialized") + } + + #[test] + fn parse_token_expires_in() { + assert_matches!( + deserialize(r#"{"token_expires_in": 123}"#), + SyncLine::KeepAlive(TokenExpiresIn(123)) + ); + } + + #[test] + fn parse_checkpoint() { + assert_matches!( + deserialize(r#"{"checkpoint": {"last_op_id": "10", "buckets": []}}"#), + SyncLine::Checkpoint(Checkpoint { + last_op_id: 10, + write_checkpoint: None, + buckets: _, + }) + ); + + let SyncLine::Checkpoint(checkpoint) = deserialize( + r#"{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "checksum": 10}]}}"#, + ) else { + panic!("Expected checkpoint"); + }; + + assert_eq!(checkpoint.buckets.len(), 1); + let bucket = &checkpoint.buckets[0]; + assert_eq!(bucket.bucket, "a"); + assert_eq!(bucket.checksum, 10u32.into()); + assert_eq!(bucket.priority, None); + + let SyncLine::Checkpoint(checkpoint) = deserialize( + r#"{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "priority": 1, "checksum": 10}]}}"#, + ) else { + panic!("Expected checkpoint"); + }; + + assert_eq!(checkpoint.buckets.len(), 1); + let bucket = &checkpoint.buckets[0]; + assert_eq!(bucket.bucket, "a"); + assert_eq!(bucket.checksum, 10u32.into()); + assert_eq!(bucket.priority, Some(BucketPriority { number: 1 })); + + assert_matches!( + deserialize( + r#"{"checkpoint":{"write_checkpoint":null,"last_op_id":"1","buckets":[{"bucket":"a","checksum":0,"priority":3,"count":1}]}}"# + ), + SyncLine::Checkpoint(Checkpoint { + last_op_id: 1, + write_checkpoint: None, + buckets: _, + }) + ); + } + + #[test] + fn parse_checkpoint_diff() { + let SyncLine::CheckpointDiff(diff) = deserialize( + r#"{"checkpoint_diff": {"last_op_id": "10", "buckets": [], "updated_buckets": [], "removed_buckets": [], "write_checkpoint": null}}"#, + ) else { + panic!("Expected checkpoint diff") + }; + + assert_eq!(diff.updated_buckets.len(), 0); + assert_eq!(diff.removed_buckets.len(), 0); + } + + #[test] + fn parse_checkpoint_diff_escape() { + let SyncLine::CheckpointDiff(diff) = deserialize( + r#"{"checkpoint_diff": {"last_op_id": "10", "buckets": [], "updated_buckets": [], "removed_buckets": ["foo\""], "write_checkpoint": null}}"#, + ) else { + panic!("Expected checkpoint diff") + }; + + assert_eq!(diff.removed_buckets[0], "foo\""); + } + + #[test] + fn parse_checkpoint_diff_no_write_checkpoint() { + let SyncLine::CheckpointDiff(_diff) = deserialize( + r#"{"checkpoint_diff":{"last_op_id":"12","updated_buckets":[{"bucket":"a","count":12,"checksum":0,"priority":3}],"removed_buckets":[]}}"#, + ) else { + panic!("Expected checkpoint diff") + }; + } + + #[test] + fn parse_checkpoint_complete() { + assert_matches!( + deserialize(r#"{"checkpoint_complete": {"last_op_id": "10"}}"#), + SyncLine::CheckpointComplete(CheckpointComplete { + // last_op_id: 10 + }) + ); + } + + #[test] + fn parse_checkpoint_partially_complete() { + assert_matches!( + deserialize(r#"{"partial_checkpoint_complete": {"last_op_id": "10", "priority": 1}}"#), + SyncLine::CheckpointPartiallyComplete(CheckpointPartiallyComplete { + //last_op_id: 10, + priority: BucketPriority { number: 1 } + }) + ); + } + + #[test] + fn parse_data() { + let SyncLine::Data(data) = deserialize( + r#"{"data": { + "bucket": "bkt", + "data": [{"checksum":10,"op_id":"1","object_id":"test","object_type":"users","op":"PUT","subkey":null,"data":"{\"name\":\"user 0\",\"email\":\"0@example.org\"}"}], + "after": null, + "next_after": null} + }"#, + ) else { + panic!("Expected data line") + }; + + assert_eq!(data.bucket, "bkt"); + + assert_eq!(data.data.len(), 1); + let entry = &data.data[0]; + assert_eq!(entry.checksum, 10u32.into()); + assert_matches!( + &data.data[0], + OplogEntry { + checksum: _, + op_id: 1, + object_id: Some(_), + object_type: Some(_), + op: OpType::PUT, + subkey: None, + data: _, + } + ); + } +} diff --git a/crates/core/src/sync/mod.rs b/crates/core/src/sync/mod.rs index afe3393..fb4f02c 100644 --- a/crates/core/src/sync/mod.rs +++ b/crates/core/src/sync/mod.rs @@ -1,8 +1,18 @@ +use sqlite_nostd::{self as sqlite, ResultCode}; + mod bucket_priority; pub mod checkpoint; mod checksum; +mod interface; pub mod line; pub mod operations; +pub mod storage_adapter; +mod streaming_sync; +mod sync_status; pub use bucket_priority::BucketPriority; pub use checksum::Checksum; + +pub fn register(db: *mut sqlite::sqlite3) -> Result<(), ResultCode> { + interface::register(db) +} diff --git a/crates/core/src/sync/operations.rs b/crates/core/src/sync/operations.rs index 7e7499a..29c23f3 100644 --- a/crates/core/src/sync/operations.rs +++ b/crates/core/src/sync/operations.rs @@ -10,33 +10,27 @@ use crate::{ }; use super::line::OplogData; -use super::line::{DataLine, OpType}; use super::Checksum; +use super::{ + line::{DataLine, OpType}, + storage_adapter::{BucketInfo, StorageAdapter}, +}; pub fn insert_bucket_operations( - db: *mut sqlite::sqlite3, + adapter: &StorageAdapter, data: &DataLine, ) -> Result<(), SQLiteError> { - // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. - // We can consider splitting this into separate SELECT and INSERT statements. - // language=SQLite - let bucket_statement = db.prepare_v2( - "INSERT INTO ps_buckets(name) - VALUES(?) - ON CONFLICT DO UPDATE - SET last_applied_op = last_applied_op - RETURNING id, last_applied_op", - )?; - bucket_statement.bind_text(1, &data.bucket, sqlite::Destructor::STATIC)?; - bucket_statement.step()?; - - let bucket_id = bucket_statement.column_int64(0); + let db = adapter.db; + let BucketInfo { + id: bucket_id, + last_applied_op, + } = adapter.lookup_bucket(&*data.bucket)?; // This is an optimization for initial sync - we can avoid persisting individual REMOVE // operations when last_applied_op = 0. // We do still need to do the "supersede_statement" step for this case, since a REMOVE // operation can supersede another PUT operation we're syncing at the same time. - let mut is_empty = bucket_statement.column_int64(1) == 0; + let mut is_empty = last_applied_op == 0; // Statement to supersede (replace) operations with the same key. // language=SQLite @@ -59,8 +53,6 @@ INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (? INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)", )?; - bucket_statement.reset()?; - let mut last_op: Option = None; let mut add_checksum = Checksum::zero(); let mut op_checksum = Checksum::zero(); diff --git a/crates/core/src/sync/storage_adapter.rs b/crates/core/src/sync/storage_adapter.rs new file mode 100644 index 0000000..ed71b79 --- /dev/null +++ b/crates/core/src/sync/storage_adapter.rs @@ -0,0 +1,314 @@ +use core::{assert_matches::debug_assert_matches, fmt::Display}; + +use alloc::{string::ToString, vec::Vec}; +use serde::Serialize; +use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode}; +use streaming_iterator::StreamingIterator; + +use crate::{ + error::SQLiteError, + ext::SafeManagedStmt, + operations::delete_bucket, + sync::checkpoint::{validate_checkpoint, ChecksumMismatch}, + sync_local::{PartialSyncOperation, SyncOperation}, +}; + +use super::{ + bucket_priority::BucketPriority, interface::BucketRequest, streaming_sync::OwnedCheckpoint, + sync_status::Timestamp, +}; + +/// An adapter for storing sync state. +/// +/// This is used to encapsulate some SQL queries used for the sync implementation, making the code +/// in `streaming_sync.rs` easier to read. It also allows caching some prepared statements that are +/// used frequently as an optimization, but we're not taking advantage of that yet. +pub struct StorageAdapter { + pub db: *mut sqlite::sqlite3, + progress_stmt: ManagedStmt, + time_stmt: ManagedStmt, +} + +impl StorageAdapter { + pub fn new(db: *mut sqlite::sqlite3) -> Result { + // language=SQLite + let progress = + db.prepare_v2("SELECT name, count_at_last, count_since_last FROM ps_buckets")?; + + // language=SQLite + let time = db.prepare_v2("SELECT unixepoch()")?; + + Ok(Self { + db, + progress_stmt: progress, + time_stmt: time, + }) + } + + pub fn collect_bucket_requests(&self) -> Result, SQLiteError> { + // language=SQLite + let statement = self.db.prepare_v2( + "SELECT name, last_op FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'", + )?; + + let mut requests = Vec::::new(); + + while statement.step()? == ResultCode::ROW { + let bucket_name = statement.column_text(0)?.to_string(); + let last_op = statement.column_int64(1); + + requests.push(BucketRequest { + name: bucket_name.clone(), + after: last_op.to_string(), + }); + } + + Ok(requests) + } + + pub fn delete_buckets<'a>( + &self, + buckets: impl IntoIterator, + ) -> Result<(), SQLiteError> { + for bucket in buckets { + // TODO: This is a neat opportunity to create the statements here and cache them + delete_bucket(self.db, bucket)?; + } + + Ok(()) + } + + pub fn local_progress( + &self, + ) -> Result< + impl StreamingIterator>, + ResultCode, + > { + self.progress_stmt.reset()?; + + fn step(stmt: &ManagedStmt) -> Result, ResultCode> { + if stmt.step()? == ResultCode::ROW { + let bucket = stmt.column_text(0)?; + let count_at_last = stmt.column_int64(1); + let count_since_last = stmt.column_int64(2); + + return Ok(Some(PersistedBucketProgress { + bucket, + count_at_last, + count_since_last, + })); + } + + Ok(None) + } + + Ok(streaming_iterator::from_fn(|| { + match step(&self.progress_stmt) { + Err(e) => Some(Err(e)), + Ok(Some(other)) => Some(Ok(other)), + Ok(None) => None, + } + })) + } + + pub fn reset_progress(&self) -> Result<(), ResultCode> { + self.db + .exec_safe("UPDATE ps_buckets SET count_since_last = 0, count_at_last = 0;")?; + Ok(()) + } + + pub fn lookup_bucket(&self, bucket: &str) -> Result { + // We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows. + // We can consider splitting this into separate SELECT and INSERT statements. + // language=SQLite + let bucket_statement = self.db.prepare_v2( + "INSERT INTO ps_buckets(name) + VALUES(?) + ON CONFLICT DO UPDATE + SET last_applied_op = last_applied_op + RETURNING id, last_applied_op", + )?; + bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?; + let res = bucket_statement.step()?; + debug_assert_matches!(res, ResultCode::ROW); + + let bucket_id = bucket_statement.column_int64(0); + let last_applied_op = bucket_statement.column_int64(1); + + return Ok(BucketInfo { + id: bucket_id, + last_applied_op, + }); + } + + pub fn sync_local( + &self, + checkpoint: &OwnedCheckpoint, + priority: Option, + ) -> Result { + let mismatched_checksums = + validate_checkpoint(checkpoint.buckets.values(), priority, self.db)?; + + if !mismatched_checksums.is_empty() { + self.delete_buckets(mismatched_checksums.iter().map(|i| i.bucket_name.as_str()))?; + + return Ok(SyncLocalResult::ChecksumFailure(CheckpointResult { + failed_buckets: mismatched_checksums, + })); + } + + let update_bucket = self + .db + .prepare_v2("UPDATE ps_buckets SET last_op = ? WHERE name = ?")?; + + for bucket in checkpoint.buckets.values() { + if bucket.is_in_priority(priority) { + update_bucket.bind_int64(1, checkpoint.last_op_id)?; + update_bucket.bind_text(2, &bucket.bucket, sqlite::Destructor::STATIC)?; + update_bucket.exec()?; + } + } + + if let (None, Some(write_checkpoint)) = (&priority, &checkpoint.write_checkpoint) { + update_bucket.bind_int64(1, *write_checkpoint)?; + update_bucket.bind_text(2, "$local", sqlite::Destructor::STATIC)?; + update_bucket.exec()?; + } + + #[derive(Serialize)] + struct PartialArgs<'a> { + priority: BucketPriority, + buckets: Vec<&'a str>, + } + + let sync_result = match priority { + None => SyncOperation::new(self.db, None).apply(), + Some(priority) => { + let args = PartialArgs { + priority, + buckets: checkpoint + .buckets + .values() + .filter_map(|item| { + if item.is_in_priority(Some(priority)) { + Some(item.bucket.as_str()) + } else { + None + } + }) + .collect(), + }; + + // TODO: Avoid this serialization, it's currently used to bind JSON SQL parameters. + let serialized_args = serde_json::to_string(&args)?; + SyncOperation::new( + self.db, + Some(PartialSyncOperation { + priority, + args: &serialized_args, + }), + ) + .apply() + } + }?; + + if sync_result == 1 { + if priority.is_none() { + // Reset progress counters. We only do this for a complete sync, as we want a + // download progress to always cover a complete checkpoint instead of resetting for + // partial completions. + let update = self.db.prepare_v2( + "UPDATE ps_buckets SET count_since_last = 0, count_at_last = ? WHERE name = ?", + )?; + + for bucket in checkpoint.buckets.values() { + if let Some(count) = bucket.count { + update.bind_int64(1, count)?; + update.bind_text(2, bucket.bucket.as_str(), sqlite::Destructor::STATIC)?; + + update.exec()?; + update.reset()?; + } + } + } + + Ok(SyncLocalResult::ChangesApplied) + } else { + Ok(SyncLocalResult::PendingLocalChanges) + } + } + + pub fn now(&self) -> Result { + self.time_stmt.reset()?; + self.time_stmt.step()?; + + Ok(Timestamp(self.time_stmt.column_int64(0))) + } +} + +pub struct BucketInfo { + pub id: i64, + pub last_applied_op: i64, +} + +pub struct CheckpointResult { + failed_buckets: Vec, +} + +impl CheckpointResult { + pub fn is_valid(&self) -> bool { + self.failed_buckets.is_empty() + } +} + +impl Display for CheckpointResult { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + if self.is_valid() { + write!(f, "Valid checkpoint result") + } else { + write!(f, "Checksums didn't match, failed for: ")?; + for (i, item) in self.failed_buckets.iter().enumerate() { + if i != 0 { + write!(f, ", ")?; + } + + item.fmt(f)?; + } + + Ok(()) + } + } +} + +impl Display for ChecksumMismatch { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let actual = self.actual_add_checksum + self.actual_op_checksum; + write!( + f, + "{} (expected {}, got {} = {} (op) + {} (add))", + self.bucket_name, + self.expected_checksum, + actual, + self.actual_op_checksum, + self.actual_add_checksum + ) + } +} + +pub enum SyncLocalResult { + /// Changes could not be applied due to a checksum mismatch. + ChecksumFailure(CheckpointResult), + /// Changes could not be applied because they would break consistency - we need to wait for + /// pending local CRUD data to be uploaded and acknowledged in a write checkpoint. + PendingLocalChanges, + /// The checkpoint has been applied and changes have been published. + ChangesApplied, +} + +/// Information about the amount of operations a bucket had at the last checkpoint and how many +/// operations have been inserted in the meantime. +pub struct PersistedBucketProgress<'a> { + pub bucket: &'a str, + pub count_at_last: i64, + pub count_since_last: i64, +} diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs new file mode 100644 index 0000000..e8b1624 --- /dev/null +++ b/crates/core/src/sync/streaming_sync.rs @@ -0,0 +1,562 @@ +use core::{ + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll, Waker}, +}; + +use alloc::{ + boxed::Box, + collections::{btree_map::BTreeMap, btree_set::BTreeSet}, + format, + string::{String, ToString}, + vec::Vec, +}; +use futures_lite::FutureExt; + +use crate::{bson, error::SQLiteError, kv::client_id, sync::checkpoint::OwnedBucketChecksum}; +use sqlite_nostd::{self as sqlite, ResultCode}; + +use super::{ + interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent}, + line::{Checkpoint, CheckpointDiff, SyncLine}, + operations::insert_bucket_operations, + storage_adapter::{StorageAdapter, SyncLocalResult}, + sync_status::{SyncDownloadProgress, SyncProgressFromCheckpoint, SyncStatusContainer}, +}; + +/// The sync client implementation, responsible for parsing lines received by the sync service and +/// persisting them to the database. +/// +/// The client consumes no resources and prepares no statements until a sync iteration is +/// initialized. +pub struct SyncClient { + db: *mut sqlite::sqlite3, + /// The current [ClientState] (essentially an optional [StreamingSyncIteration]). + state: ClientState, +} + +impl SyncClient { + pub fn new(db: *mut sqlite::sqlite3) -> Self { + Self { + db, + state: ClientState::Idle, + } + } + + pub fn push_event<'a>( + &mut self, + event: SyncControlRequest<'a>, + ) -> Result, SQLiteError> { + match event { + SyncControlRequest::StartSyncStream { parameters } => { + self.state.tear_down()?; + + let mut handle = SyncIterationHandle::new(self.db, parameters)?; + let instructions = handle.initialize()?; + self.state = ClientState::IterationActive(handle); + + Ok(instructions) + } + SyncControlRequest::SyncEvent(sync_event) => { + let mut active = ActiveEvent::new(sync_event); + + let ClientState::IterationActive(handle) = &mut self.state else { + return Err(SQLiteError( + ResultCode::MISUSE, + Some("No iteration is active".to_string()), + )); + }; + + match handle.run(&mut active) { + Err(e) => { + self.state = ClientState::Idle; + return Err(e); + } + Ok(done) => { + if done { + self.state = ClientState::Idle; + } + } + }; + + Ok(active.instructions) + } + SyncControlRequest::StopSyncStream => self.state.tear_down(), + } + } +} + +enum ClientState { + /// No sync iteration is currently active. + Idle, + /// A sync iteration has begun on the database. + IterationActive(SyncIterationHandle), +} + +impl ClientState { + fn tear_down(&mut self) -> Result, SQLiteError> { + let mut event = ActiveEvent::new(SyncEvent::TearDown); + + if let ClientState::IterationActive(old) = self { + old.run(&mut event)?; + }; + + *self = ClientState::Idle; + Ok(event.instructions) + } +} + +/// A handle that allows progressing a [StreamingSyncIteration]. +/// +/// The sync itertion itself is implemented as an `async` function, as this allows us to treat it +/// as a coroutine that preserves internal state between multiple `powersync_control` invocations. +/// At each invocation, the future is polled once (and gets access to context that allows it to +/// render [Instruction]s to return from the function). +struct SyncIterationHandle { + future: Pin>>>, +} + +impl SyncIterationHandle { + /// Creates a new sync iteration in a pending state by preparing statements for + /// [StorageAdapter] and setting up the initial downloading state for [StorageAdapter] . + fn new( + db: *mut sqlite::sqlite3, + parameters: Option>, + ) -> Result { + let runner = StreamingSyncIteration { + db, + parameters, + adapter: StorageAdapter::new(db)?, + status: SyncStatusContainer::new(), + }; + let future = runner.run().boxed_local(); + + Ok(Self { future }) + } + + /// Forwards a [SyncEvent::Initialize] to the current sync iteration, returning the initial + /// instructions generated. + fn initialize(&mut self) -> Result, SQLiteError> { + let mut event = ActiveEvent::new(SyncEvent::Initialize); + let result = self.run(&mut event)?; + assert!(!result, "Stream client aborted initialization"); + + Ok(event.instructions) + } + + fn run(&mut self, active: &mut ActiveEvent) -> Result { + // Using a noop waker because the only event thing StreamingSyncIteration::run polls on is + // the next incoming sync event. + let waker = unsafe { + Waker::new( + active as *const ActiveEvent as *const (), + Waker::noop().vtable(), + ) + }; + let mut context = Context::from_waker(&waker); + + Ok( + if let Poll::Ready(result) = self.future.poll(&mut context) { + result?; + + active.instructions.push(Instruction::CloseSyncStream {}); + true + } else { + false + }, + ) + } +} + +/// A [SyncEvent] currently being handled by a [StreamingSyncIteration]. +struct ActiveEvent<'a> { + handled: bool, + /// The event to handle + event: SyncEvent<'a>, + /// Instructions to forward to the client when the `powersync_control` invocation completes. + instructions: Vec, +} + +impl<'a> ActiveEvent<'a> { + pub fn new(event: SyncEvent<'a>) -> Self { + Self { + handled: false, + event, + instructions: Vec::new(), + } + } +} + +struct StreamingSyncIteration { + db: *mut sqlite::sqlite3, + adapter: StorageAdapter, + parameters: Option>, + status: SyncStatusContainer, +} + +impl StreamingSyncIteration { + fn receive_event<'a>() -> impl Future> { + struct Wait<'a> { + a: PhantomData<&'a StreamingSyncIteration>, + } + + impl<'a> Future for Wait<'a> { + type Output = &'a mut ActiveEvent<'a>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let context = cx.waker().data().cast_mut() as *mut ActiveEvent; + let context = unsafe { &mut *context }; + + if context.handled { + Poll::Pending + } else { + context.handled = true; + Poll::Ready(context) + } + } + } + + Wait { a: PhantomData } + } + + async fn run(mut self) -> Result<(), SQLiteError> { + let mut target = SyncTarget::BeforeCheckpoint(self.prepare_request().await?); + + // A checkpoint that has been fully received and validated, but couldn't be applied due to + // pending local data. We will retry applying this checkpoint when the client SDK informs us + // that it has finished uploading changes. + let mut validated_but_not_applied = None::; + + loop { + let event = Self::receive_event().await; + + let line: SyncLine = match event.event { + SyncEvent::Initialize { .. } => { + panic!("Initialize should only be emited once") + } + SyncEvent::TearDown => { + self.status + .update(|s| s.disconnect(), &mut event.instructions); + break; + } + SyncEvent::TextLine { data } => serde_json::from_str(data)?, + SyncEvent::BinaryLine { data } => bson::from_bytes(data)?, + SyncEvent::UploadFinished => { + if let Some(checkpoint) = validated_but_not_applied.take() { + let result = self.adapter.sync_local(&checkpoint, None)?; + + match result { + SyncLocalResult::ChangesApplied => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::DEBUG, + line: "Applied pending checkpoint after completed upload" + .into(), + }); + + self.handle_checkpoint_applied(event)?; + } + _ => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::WARNING, + line: "Could not apply pending checkpoint even after completed upload" + .into(), + }); + } + } + } + + continue; + } + SyncEvent::DidRefreshToken => { + // Break so that the client SDK starts another iteration. + break; + } + }; + + self.status.update_only(|s| s.mark_connected()); + + match line { + SyncLine::Checkpoint(checkpoint) => { + validated_but_not_applied = None; + let to_delete = target.track_checkpoint(&checkpoint); + + self.adapter + .delete_buckets(to_delete.iter().map(|b| b.as_str()))?; + let progress = self.load_progress(target.target_checkpoint().unwrap())?; + self.status.update( + |s| s.start_tracking_checkpoint(progress), + &mut event.instructions, + ); + } + SyncLine::CheckpointDiff(diff) => { + let Some(target) = target.target_checkpoint_mut() else { + return Err(SQLiteError( + ResultCode::ABORT, + Some( + "Received checkpoint_diff without previous checkpoint".to_string(), + ), + )); + }; + + target.apply_diff(&diff); + validated_but_not_applied = None; + self.adapter + .delete_buckets(diff.removed_buckets.iter().map(|i| &**i))?; + + let progress = self.load_progress(target)?; + self.status.update( + |s| s.start_tracking_checkpoint(progress), + &mut event.instructions, + ); + } + SyncLine::CheckpointComplete(_) => { + let Some(target) = target.target_checkpoint_mut() else { + return Err(SQLiteError( + ResultCode::ABORT, + Some( + "Received checkpoint complete without previous checkpoint" + .to_string(), + ), + )); + }; + let result = self.adapter.sync_local(target, None)?; + + match result { + SyncLocalResult::ChecksumFailure(checkpoint_result) => { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + // await new Promise((resolve) => setTimeout(resolve, 50)); + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::WARNING, + line: format!("Could not apply checkpoint, {checkpoint_result}") + .into(), + }); + break; + } + SyncLocalResult::PendingLocalChanges => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::INFO, + line: "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint.".into(), + }); + + validated_but_not_applied = Some(target.clone()); + } + SyncLocalResult::ChangesApplied => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::DEBUG, + line: "Validated and applied checkpoint".into(), + }); + event.instructions.push(Instruction::FlushFileSystem {}); + self.handle_checkpoint_applied(event)?; + } + } + } + SyncLine::CheckpointPartiallyComplete(complete) => { + let priority = complete.priority; + let Some(target) = target.target_checkpoint_mut() else { + return Err(SQLiteError( + ResultCode::ABORT, + Some( + "Received checkpoint complete without previous checkpoint" + .to_string(), + ), + )); + }; + let result = self.adapter.sync_local(target, Some(priority))?; + + match result { + SyncLocalResult::ChecksumFailure(checkpoint_result) => { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + // await new Promise((resolve) => setTimeout(resolve, 50)); + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::WARNING, + line: format!( + "Could not apply partial checkpoint, {checkpoint_result}" + ) + .into(), + }); + break; + } + SyncLocalResult::PendingLocalChanges => { + // If we have pending uploads, we can't complete new checkpoints outside + // of priority 0. We'll resolve this for a complete checkpoint later. + } + SyncLocalResult::ChangesApplied => { + let now = self.adapter.now()?; + event.instructions.push(Instruction::FlushFileSystem {}); + self.status.update( + |status| { + status.partial_checkpoint_complete(priority, now); + }, + &mut event.instructions, + ); + } + } + } + SyncLine::Data(data_line) => { + self.status + .update(|s| s.track_line(&data_line), &mut event.instructions); + insert_bucket_operations(&self.adapter, &data_line)?; + } + SyncLine::KeepAlive(token) => { + if token.is_expired() { + // Token expired already - stop the connection immediately. + event + .instructions + .push(Instruction::FetchCredentials { did_expire: true }); + break; + } else if token.should_prefetch() { + event + .instructions + .push(Instruction::FetchCredentials { did_expire: false }); + } + } + } + + self.status.emit_changes(&mut event.instructions); + } + + Ok(()) + } + + fn load_progress( + &self, + checkpoint: &OwnedCheckpoint, + ) -> Result { + let local_progress = self.adapter.local_progress()?; + let SyncProgressFromCheckpoint { + progress, + needs_counter_reset, + } = SyncDownloadProgress::for_checkpoint(checkpoint, local_progress)?; + + if needs_counter_reset { + self.adapter.reset_progress()?; + } + + Ok(progress) + } + + /// Prepares a sync iteration by handling the initial [SyncEvent::Initialize]. + /// + /// This prepares a [StreamingSyncRequest] by fetching local sync state and the requested bucket + /// parameters. + async fn prepare_request(&mut self) -> Result, SQLiteError> { + let event = Self::receive_event().await; + let SyncEvent::Initialize = event.event else { + return Err(SQLiteError::from(ResultCode::MISUSE)); + }; + + self.status + .update(|s| s.start_connecting(), &mut event.instructions); + + let requests = self.adapter.collect_bucket_requests()?; + let local_bucket_names: Vec = requests.iter().map(|s| s.name.clone()).collect(); + let request = StreamingSyncRequest { + buckets: requests, + include_checksum: true, + raw_data: true, + binary_data: true, + client_id: client_id(self.db)?, + parameters: self.parameters.take(), + }; + + event + .instructions + .push(Instruction::EstablishSyncStream { request }); + Ok(local_bucket_names) + } + + fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent) -> Result<(), ResultCode> { + event.instructions.push(Instruction::DidCompleteSync {}); + + let now = self.adapter.now()?; + self.status.update( + |status| status.applied_checkpoint(now), + &mut event.instructions, + ); + + Ok(()) + } +} + +#[derive(Debug)] +enum SyncTarget { + /// We've received a checkpoint line towards the given checkpoint. The tracked checkpoint is + /// updated for subsequent checkpoint or checkpoint_diff lines. + Tracking(OwnedCheckpoint), + /// We have not received a checkpoint message yet. We still keep a list of local buckets around + /// so that we know which ones to delete depending on the first checkpoint message. + BeforeCheckpoint(Vec), +} + +impl SyncTarget { + fn target_checkpoint(&self) -> Option<&OwnedCheckpoint> { + match self { + Self::Tracking(cp) => Some(cp), + _ => None, + } + } + + fn target_checkpoint_mut(&mut self) -> Option<&mut OwnedCheckpoint> { + match self { + Self::Tracking(cp) => Some(cp), + _ => None, + } + } + + /// Starts tracking the received `Checkpoint`. + /// + /// This updates the internal state and returns a set of buckets to delete because they've been + /// tracked locally but not in the new checkpoint. + fn track_checkpoint<'a>(&mut self, checkpoint: &Checkpoint<'a>) -> BTreeSet { + let mut to_delete: BTreeSet = match &self { + SyncTarget::Tracking(checkpoint) => checkpoint.buckets.keys().cloned().collect(), + SyncTarget::BeforeCheckpoint(buckets) => buckets.iter().cloned().collect(), + }; + + let mut buckets = BTreeMap::::new(); + for bucket in &checkpoint.buckets { + buckets.insert(bucket.bucket.to_string(), OwnedBucketChecksum::from(bucket)); + to_delete.remove(&*bucket.bucket); + } + + *self = SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets)); + to_delete + } +} + +#[derive(Debug, Clone)] +pub struct OwnedCheckpoint { + pub last_op_id: i64, + pub write_checkpoint: Option, + pub buckets: BTreeMap, +} + +impl OwnedCheckpoint { + fn from_checkpoint<'a>( + checkpoint: &Checkpoint<'a>, + buckets: BTreeMap, + ) -> Self { + Self { + last_op_id: checkpoint.last_op_id, + write_checkpoint: checkpoint.write_checkpoint, + buckets: buckets, + } + } + + fn apply_diff<'a>(&mut self, diff: &CheckpointDiff<'a>) { + for removed in &diff.removed_buckets { + self.buckets.remove(&**removed); + } + + for updated in &diff.updated_buckets { + let owned = OwnedBucketChecksum::from(updated); + self.buckets.insert(owned.bucket.clone(), owned); + } + + self.last_op_id = diff.last_op_id; + self.write_checkpoint = diff.write_checkpoint; + } +} diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs new file mode 100644 index 0000000..e6744fa --- /dev/null +++ b/crates/core/src/sync/sync_status.rs @@ -0,0 +1,246 @@ +use alloc::{collections::btree_map::BTreeMap, rc::Rc, string::String, vec::Vec}; +use core::{cell::RefCell, hash::BuildHasher}; +use rustc_hash::FxBuildHasher; +use serde::Serialize; +use sqlite_nostd::ResultCode; +use streaming_iterator::StreamingIterator; + +use super::{ + bucket_priority::BucketPriority, interface::Instruction, line::DataLine, + storage_adapter::PersistedBucketProgress, streaming_sync::OwnedCheckpoint, +}; + +/// Information about a progressing download. +#[derive(Serialize, Hash)] +pub struct DownloadSyncStatus { + /// Whether the socket to the sync service is currently open and connected. + /// + /// This starts being true once we receive the first line, and is set to false as the iteration + /// ends. + pub connected: bool, + /// Whether we've requested the client SDK to connect to the socket while not receiving sync + /// lines yet. + pub connecting: bool, + /// Provides stats over which bucket priorities have already been synced (or when they've last + /// been changed). + /// + /// Always sorted by descending [BucketPriority] in [SyncPriorityStatus] (or, in other words, + /// increasing priority numbers). + pub priority_status: Vec, + /// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been + /// received), information about how far the download has progressed. + pub downloading: Option, +} + +impl DownloadSyncStatus { + fn debug_assert_priority_status_is_sorted(&self) { + debug_assert!(self + .priority_status + .is_sorted_by(|a, b| a.priority >= b.priority)) + } + + pub fn disconnect(&mut self) { + self.connected = false; + self.connecting = false; + self.downloading = None; + } + + pub fn start_connecting(&mut self) { + self.connected = false; + self.downloading = None; + self.connecting = true; + } + + pub fn mark_connected(&mut self) { + self.connecting = false; + self.connected = true; + } + + /// Transitions state after receiving a checkpoint line. + /// + /// This sets the [downloading] state to include [progress]. + pub fn start_tracking_checkpoint<'a>(&mut self, progress: SyncDownloadProgress) { + self.mark_connected(); + + self.downloading = Some(progress); + } + + /// Increments [SyncDownloadProgress] progress for the given [DataLine]. + pub fn track_line(&mut self, line: &DataLine) { + if let Some(ref mut downloading) = self.downloading { + downloading.increment_download_count(line); + } + } + + pub fn partial_checkpoint_complete(&mut self, priority: BucketPriority, now: Timestamp) { + self.debug_assert_priority_status_is_sorted(); + // We can delete entries with a higher priority because this partial sync includes them. + self.priority_status.retain(|i| i.priority < priority); + self.priority_status.insert( + 0, + SyncPriorityStatus { + priority: priority, + last_synced_at: Some(now), + has_synced: Some(true), + }, + ); + self.debug_assert_priority_status_is_sorted(); + } + + pub fn applied_checkpoint(&mut self, now: Timestamp) { + self.downloading = None; + self.priority_status.clear(); + + self.priority_status.push(SyncPriorityStatus { + priority: BucketPriority::SENTINEL, + last_synced_at: Some(now), + has_synced: Some(true), + }); + } +} + +impl Default for DownloadSyncStatus { + fn default() -> Self { + Self { + connected: false, + connecting: false, + downloading: None, + priority_status: Vec::new(), + } + } +} + +pub struct SyncStatusContainer { + status: Rc>, + last_published_hash: u64, +} + +impl SyncStatusContainer { + pub fn new() -> Self { + Self { + status: Rc::new(RefCell::new(Default::default())), + last_published_hash: 0, + } + } + + /// Invokes a function to update the sync status, then emits an [Instruction::UpdateSyncStatus] + /// if the function did indeed change the status. + pub fn update ()>( + &mut self, + apply: F, + instructions: &mut Vec, + ) { + self.update_only(apply); + self.emit_changes(instructions); + } + + /// Invokes a function to update the sync status without emitting a status event. + pub fn update_only ()>(&self, apply: F) { + let mut status = self.status.borrow_mut(); + apply(&mut *status); + } + + /// If the status has been changed since the last time an [Instruction::UpdateSyncStatus] event + /// was emitted, emit such an event now. + pub fn emit_changes(&mut self, instructions: &mut Vec) { + let status = self.status.borrow(); + let hash = FxBuildHasher.hash_one(&*status); + if hash != self.last_published_hash { + self.last_published_hash = hash; + instructions.push(Instruction::UpdateSyncStatus { + status: self.status.clone(), + }); + } + } +} + +#[repr(transparent)] +#[derive(Serialize, Hash, Clone, Copy)] +pub struct Timestamp(pub i64); + +#[derive(Serialize, Hash)] +pub struct SyncPriorityStatus { + priority: BucketPriority, + last_synced_at: Option, + has_synced: Option, +} + +/// Per-bucket download progress information. +#[derive(Serialize, Hash)] +pub struct BucketProgress { + pub priority: BucketPriority, + pub at_last: i64, + pub since_last: i64, + pub target_count: i64, +} + +#[derive(Serialize, Hash)] +pub struct SyncDownloadProgress { + buckets: BTreeMap, +} + +pub struct SyncProgressFromCheckpoint { + pub progress: SyncDownloadProgress, + pub needs_counter_reset: bool, +} + +impl SyncDownloadProgress { + pub fn for_checkpoint<'a>( + checkpoint: &OwnedCheckpoint, + mut local_progress: impl StreamingIterator< + Item = Result, ResultCode>, + >, + ) -> Result { + let mut buckets = BTreeMap::::new(); + let mut needs_reset = false; + for bucket in checkpoint.buckets.values() { + buckets.insert( + bucket.bucket.clone(), + BucketProgress { + priority: bucket.priority, + target_count: bucket.count.unwrap_or(0), + // Will be filled out later by iterating local_progress + at_last: 0, + since_last: 0, + }, + ); + } + + while let Some(row) = local_progress.next() { + let row = match row { + Ok(row) => row, + Err(e) => return Err(*e), + }; + + let Some(progress) = buckets.get_mut(row.bucket) else { + continue; + }; + + progress.at_last = row.count_at_last; + progress.since_last = row.count_since_last; + + if progress.target_count < row.count_at_last + row.count_since_last { + needs_reset = true; + // Either due to a defrag / sync rule deploy or a compactioon operation, the size + // of the bucket shrank so much that the local ops exceed the ops in the updated + // bucket. We can't possibly report progress in this case (it would overshoot 100%). + for (_, progress) in &mut buckets { + progress.at_last = 0; + progress.since_last = 0; + } + break; + } + } + + Ok(SyncProgressFromCheckpoint { + progress: Self { buckets }, + needs_counter_reset: needs_reset, + }) + } + + pub fn increment_download_count(&mut self, line: &DataLine) { + if let Some(info) = self.buckets.get_mut(&*line.bucket) { + info.since_last += line.data.len() as i64 + } + } +} diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 225796b..f884e88 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -13,30 +13,29 @@ use crate::ext::SafeManagedStmt; use crate::util::{internal_table_name, quote_internal_name}; pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { - let mut operation = SyncOperation::new(db, data)?; + let mut operation = SyncOperation::from_args(db, data)?; operation.apply() } -struct PartialSyncOperation<'a> { +pub struct PartialSyncOperation<'a> { /// The lowest priority part of the partial sync operation. - priority: BucketPriority, + pub priority: BucketPriority, /// The JSON-encoded arguments passed by the client SDK. This includes the priority and a list /// of bucket names in that (and higher) priorities. - args: &'a str, + pub args: &'a str, } -struct SyncOperation<'a> { +pub struct SyncOperation<'a> { db: *mut sqlite::sqlite3, data_tables: BTreeSet, partial: Option>, } impl<'a> SyncOperation<'a> { - fn new(db: *mut sqlite::sqlite3, data: &'a V) -> Result { - return Ok(Self { - db: db, - data_tables: BTreeSet::new(), - partial: match data.value_type() { + fn from_args(db: *mut sqlite::sqlite3, data: &'a V) -> Result { + Ok(Self::new( + db, + match data.value_type() { ColumnType::Text => { let text = data.text(); if text.len() > 0 { @@ -58,7 +57,15 @@ impl<'a> SyncOperation<'a> { } _ => None, }, - }); + )) + } + + pub fn new(db: *mut sqlite::sqlite3, partial: Option>) -> Self { + Self { + db, + data_tables: BTreeSet::new(), + partial, + } } fn can_apply_sync_changes(&self) -> Result { @@ -96,7 +103,7 @@ impl<'a> SyncOperation<'a> { Ok(true) } - fn apply(&mut self) -> Result { + pub fn apply(&mut self) -> Result { if !self.can_apply_sync_changes()? { return Ok(0); } diff --git a/dart/benchmark/apply_lines.dart b/dart/benchmark/apply_lines.dart new file mode 100644 index 0000000..ee0654b --- /dev/null +++ b/dart/benchmark/apply_lines.dart @@ -0,0 +1,48 @@ +import 'dart:io'; +import 'dart:typed_data'; + +import '../test/utils/native_test_utils.dart'; + +/// Usage: dart run benchmark/apply_lines.dart path/to/lines.bin +/// +/// This creates a new in-memory database and applies concatenated BSON sync +/// lines from a file. +void main(List args) { + if (args.length != 1) { + throw 'Usage: dart run benchmark/apply_lines.dart path/to/lines.bin'; + } + + final [path] = args; + final file = File(path).openSync(); + final db = openTestDatabase(); + + db + ..execute('select powersync_init()') + ..execute('select powersync_control(?, null)', ['start']); + + final stopwatch = Stopwatch()..start(); + + final lengthBuffer = Uint8List(4); + while (file.positionSync() < file.lengthSync()) { + // BSON document: + final bytesRead = file.readIntoSync(lengthBuffer); + if (bytesRead != 4) { + throw 'short read, expected length'; + } + final length = lengthBuffer.buffer.asByteData().getInt32(0, Endian.little); + file.setPositionSync(file.positionSync() - 4); + + final syncLineBson = file.readSync(length); + if (syncLineBson.length != length) { + throw 'short read for bson document'; + } + + db + ..execute('BEGIN') + ..execute('SELECT powersync_control(?, ?)', ['line_binary', syncLineBson]) + ..execute('COMMIT;'); + } + + stopwatch.stop(); + print('Applying $path took ${stopwatch.elapsed}'); +} diff --git a/dart/pubspec.lock b/dart/pubspec.lock index 2731635..5000e7e 100644 --- a/dart/pubspec.lock +++ b/dart/pubspec.lock @@ -41,6 +41,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.1.2" + bson: + dependency: "direct main" + description: + name: bson + sha256: "9b761248a3494fea594aecf5d6f369b5f04d7b082aa2b8c06579ade77f1a7e47" + url: "https://pub.dev" + source: hosted + version: "5.0.6" cli_config: dependency: transitive description: @@ -66,7 +74,7 @@ packages: source: hosted version: "1.19.1" convert: - dependency: transitive + dependency: "direct dev" description: name: convert sha256: b30acd5944035672bc15c6b7a8b47d773e41e2f17de064350988c5d02adb1c68 @@ -89,6 +97,14 @@ packages: url: "https://pub.dev" source: hosted version: "3.0.6" + decimal: + dependency: transitive + description: + name: decimal + sha256: "28239b8b929c1bd8618702e6dbc96e2618cf99770bbe9cb040d6cf56a11e4ec3" + url: "https://pub.dev" + source: hosted + version: "3.2.1" fake_async: dependency: "direct dev" description: @@ -113,6 +129,14 @@ packages: url: "https://pub.dev" source: hosted version: "7.0.1" + fixnum: + dependency: transitive + description: + name: fixnum + sha256: b6dc7065e46c974bc7c5f143080a6764ec7a4be6da1285ececdc37be96de53be + url: "https://pub.dev" + source: hosted + version: "1.1.1" frontend_server_client: dependency: transitive description: @@ -145,6 +169,14 @@ packages: url: "https://pub.dev" source: hosted version: "4.1.2" + intl: + dependency: transitive + description: + name: intl + sha256: "3df61194eb431efc39c4ceba583b95633a403f46c9fd341e550ce0bfa50e9aa5" + url: "https://pub.dev" + source: hosted + version: "0.20.2" io: dependency: transitive description: @@ -178,7 +210,7 @@ packages: source: hosted version: "0.12.17" meta: - dependency: transitive + dependency: "direct dev" description: name: meta sha256: e3641ec5d63ebf0d9b41bd43201a66e3fc79a65db5f61fc181f04cd27aab950c @@ -209,6 +241,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.2.0" + packages_extensions: + dependency: transitive + description: + name: packages_extensions + sha256: "1fb328695a9828c80d275ce1650a2bb5947690070de082dfa1dfac7429378daf" + url: "https://pub.dev" + source: hosted + version: "0.1.1" path: dependency: transitive description: @@ -225,6 +265,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.5.1" + power_extensions: + dependency: transitive + description: + name: power_extensions + sha256: ad0e8b2420090d996fe8b7fd32cdf02b9b924b6d4fc0fb0b559ff6aa5e24d5b0 + url: "https://pub.dev" + source: hosted + version: "0.2.3" pub_semver: dependency: transitive description: @@ -233,6 +281,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.2.0" + rational: + dependency: transitive + description: + name: rational + sha256: cb808fb6f1a839e6fc5f7d8cb3b0a10e1db48b3be102de73938c627f0b636336 + url: "https://pub.dev" + source: hosted + version: "2.2.3" shelf: dependency: transitive description: @@ -289,6 +345,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.10.1" + sprintf: + dependency: transitive + description: + name: sprintf + sha256: "1fc9ffe69d4df602376b52949af107d8f5703b77cda567c4d7d86a0693120f23" + url: "https://pub.dev" + source: hosted + version: "7.0.0" sqlite3: dependency: "direct main" description: @@ -369,6 +433,14 @@ packages: url: "https://pub.dev" source: hosted version: "1.4.0" + uuid: + dependency: transitive + description: + name: uuid + sha256: a5be9ef6618a7ac1e964353ef476418026db906c4facdedaa299b7a2e71690ff + url: "https://pub.dev" + source: hosted + version: "4.5.1" vm_service: dependency: transitive description: diff --git a/dart/pubspec.yaml b/dart/pubspec.yaml index 836e9ad..40b4648 100644 --- a/dart/pubspec.yaml +++ b/dart/pubspec.yaml @@ -6,8 +6,12 @@ environment: sdk: ^3.4.0 dependencies: sqlite3: ^2.7.6 + bson: ^5.0.5 + dev_dependencies: test: ^1.25.0 file: ^7.0.1 sqlite3_test: ^0.1.1 - fake_async: ^1.3.3 \ No newline at end of file + fake_async: ^1.3.3 + convert: ^3.1.2 + meta: ^1.16.0 diff --git a/dart/test/goldens/simple_iteration.json b/dart/test/goldens/simple_iteration.json new file mode 100644 index 0000000..ad3f358 --- /dev/null +++ b/dart/test/goldens/simple_iteration.json @@ -0,0 +1,168 @@ +[ + { + "operation": "start", + "data": null, + "output": [ + { + "UpdateSyncStatus": { + "status": { + "connected": false, + "connecting": true, + "priority_status": [], + "downloading": null + } + } + }, + { + "EstablishSyncStream": { + "request": { + "buckets": [], + "include_checksum": true, + "raw_data": true, + "binary_data": true, + "client_id": "test-test-test-test", + "parameters": null + } + } + } + ] + }, + { + "operation": "line_text", + "data": { + "checkpoint": { + "last_op_id": "1", + "write_checkpoint": null, + "buckets": [ + { + "bucket": "a", + "checksum": 0, + "priority": 3, + "count": 1 + } + ] + } + }, + "output": [ + { + "UpdateSyncStatus": { + "status": { + "connected": true, + "connecting": false, + "priority_status": [], + "downloading": { + "buckets": { + "a": { + "priority": 3, + "at_last": 0, + "since_last": 0, + "target_count": 1 + } + } + } + } + } + } + ] + }, + { + "operation": "line_text", + "data": { + "token_expires_in": 60 + }, + "output": [] + }, + { + "operation": "line_text", + "data": { + "data": { + "bucket": "a", + "has_more": false, + "after": null, + "next_after": null, + "data": [ + { + "op_id": "1", + "op": "PUT", + "object_type": "items", + "object_id": "1", + "checksum": 0, + "data": "{\"col\":\"hi\"}" + } + ] + } + }, + "output": [ + { + "UpdateSyncStatus": { + "status": { + "connected": true, + "connecting": false, + "priority_status": [], + "downloading": { + "buckets": { + "a": { + "priority": 3, + "at_last": 0, + "since_last": 1, + "target_count": 1 + } + } + } + } + } + } + ] + }, + { + "operation": "line_text", + "data": { + "checkpoint_complete": { + "last_op_id": "1" + } + }, + "output": [ + { + "LogLine": { + "severity": "DEBUG", + "line": "Validated and applied checkpoint" + } + }, + { + "FlushFileSystem": {} + }, + { + "DidCompleteSync": {} + }, + { + "UpdateSyncStatus": { + "status": { + "connected": true, + "connecting": false, + "priority_status": [ + { + "priority": 2147483647, + "last_synced_at": 1740823200, + "has_synced": true + } + ], + "downloading": null + } + } + } + ] + }, + { + "operation": "line_text", + "data": { + "token_expires_in": 10 + }, + "output": [ + { + "FetchCredentials": { + "did_expire": false + } + } + ] + } +] \ No newline at end of file diff --git a/dart/test/goldens/starting_stream.json b/dart/test/goldens/starting_stream.json new file mode 100644 index 0000000..110eada --- /dev/null +++ b/dart/test/goldens/starting_stream.json @@ -0,0 +1,30 @@ +[ + { + "operation": "start", + "data": null, + "output": [ + { + "UpdateSyncStatus": { + "status": { + "connected": false, + "connecting": true, + "priority_status": [], + "downloading": null + } + } + }, + { + "EstablishSyncStream": { + "request": { + "buckets": [], + "include_checksum": true, + "raw_data": true, + "binary_data": true, + "client_id": "test-test-test-test", + "parameters": null + } + } + } + ] + } +] \ No newline at end of file diff --git a/dart/test/legacy_sync_test.dart b/dart/test/legacy_sync_test.dart new file mode 100644 index 0000000..e67f7bb --- /dev/null +++ b/dart/test/legacy_sync_test.dart @@ -0,0 +1,341 @@ +import 'dart:convert'; + +import 'package:fake_async/fake_async.dart'; +import 'package:file/local.dart'; +import 'package:sqlite3/common.dart'; +import 'package:sqlite3/sqlite3.dart'; +import 'package:sqlite3_test/sqlite3_test.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; + +/// Tests that the older sync interfaces requiring clients to decode and handle +/// sync lines still work. +void main() { + final vfs = TestSqliteFileSystem( + fs: const LocalFileSystem(), name: 'legacy-sync-test'); + + setUpAll(() { + loadExtension(); + sqlite3.registerVirtualFileSystem(vfs, makeDefault: false); + }); + tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs)); + + group('sync tests', () { + late CommonDatabase db; + + setUp(() async { + db = openTestDatabase(vfs: vfs) + ..select('select powersync_init();') + ..select('select powersync_replace_schema(?)', [json.encode(_schema)]); + }); + + tearDown(() { + db.dispose(); + }); + + void pushSyncData( + String bucket, + String opId, + String rowId, + Object op, + Object? data, { + Object? descriptions = _bucketDescriptions, + }) { + final encoded = json.encode({ + 'buckets': [ + { + 'bucket': bucket, + 'data': [ + { + 'op_id': opId, + 'op': op, + 'object_type': 'items', + 'object_id': rowId, + 'checksum': 0, + 'data': json.encode(data), + } + ], + } + ], + if (descriptions != null) 'descriptions': descriptions, + }); + + db.execute('insert into powersync_operations (op, data) VALUES (?, ?);', + ['save', encoded]); + } + + bool pushCheckpointComplete( + String lastOpId, String? writeCheckpoint, List checksums, + {int? priority}) { + final [row] = db.select('select powersync_validate_checkpoint(?) as r;', [ + json.encode({ + 'last_op_id': lastOpId, + 'write_checkpoint': writeCheckpoint, + 'buckets': [ + for (final cs in checksums.cast>()) + if (priority == null || cs['priority'] <= priority) cs + ], + 'priority': priority, + }) + ]); + + final decoded = json.decode(row['r']); + if (decoded['valid'] != true) { + fail(row['r']); + } + + db.execute( + 'UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))', + [ + lastOpId, + json.encode(checksums.map((e) => (e as Map)['bucket']).toList()) + ], + ); + + db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [ + 'sync_local', + priority != null + ? jsonEncode({ + 'priority': priority, + 'buckets': [ + for (final cs in checksums.cast>()) + if (cs['priority'] <= priority) cs['bucket'] + ], + }) + : null, + ]); + return db.lastInsertRowId == 1; + } + + ResultSet fetchRows() { + return db.select('select * from items'); + } + + test('does not publish until reaching checkpoint', () { + expect(fetchRows(), isEmpty); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect(fetchRows(), isEmpty); + + expect( + pushCheckpointComplete( + '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), + isTrue); + expect(fetchRows(), [ + {'id': 'row-0', 'col': 'hi'} + ]); + }); + + test('does not publish with pending local data', () { + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); + expect(fetchRows(), isNotEmpty); + + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect( + pushCheckpointComplete( + '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), + isFalse); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'} + ]); + }); + + test('publishes with local data for prio=0 buckets', () { + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); + expect(fetchRows(), isNotEmpty); + + pushSyncData('prio0', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect( + pushCheckpointComplete( + '1', + null, + [_bucketChecksum('prio0', 0, checksum: 0)], + priority: 0, + ), + isTrue, + ); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'}, + {'id': 'row-0', 'col': 'hi'}, + ]); + }); + + test('can publish partial checkpoints under different priorities', () { + for (var i = 0; i < 4; i++) { + pushSyncData('prio$i', '1', 'row-$i', 'PUT', {'col': '$i'}); + } + expect(fetchRows(), isEmpty); + + // Simulate a partial checkpoint complete for each of the buckets. + for (var i = 0; i < 4; i++) { + expect( + pushCheckpointComplete( + '1', + null, + [ + for (var j = 0; j <= 4; j++) + _bucketChecksum( + 'prio$j', + j, + // Give buckets outside of the current priority a wrong + // checksum. They should not be validated yet. + checksum: j <= i ? 0 : 1234, + ), + ], + priority: i, + ), + isTrue, + ); + + expect(fetchRows(), [ + for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'}, + ]); + + expect(db.select('select 1 from ps_sync_state where priority = ?', [i]), + isNotEmpty); + // A sync at this priority includes all higher priorities too, so they + // should be cleared. + expect(db.select('select 1 from ps_sync_state where priority < ?', [i]), + isEmpty); + } + }); + + test('can sync multiple times', () { + fakeAsync((controller) { + for (var i = 0; i < 10; i++) { + for (var prio in const [1, 2, 3, null]) { + pushCheckpointComplete('1', null, [], priority: prio); + + // Make sure there's only a single row in last_synced_at + expect( + db.select( + "SELECT datetime(last_synced_at, 'localtime') AS last_synced_at FROM ps_sync_state WHERE priority = ?", + [prio ?? 2147483647]), + [ + {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} + ], + ); + + if (prio == null) { + expect( + db.select( + "SELECT datetime(powersync_last_synced_at(), 'localtime') AS last_synced_at"), + [ + {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} + ], + ); + } + } + + controller.elapse(const Duration(hours: 1)); + } + }, initialTime: DateTime(2025, 3, 1, 10)); + }); + + test('clearing database clears sync status', () { + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + + expect( + pushCheckpointComplete( + '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), + isTrue); + expect(db.select('SELECT powersync_last_synced_at() AS r').single, + {'r': isNotNull}); + expect(db.select('SELECT priority FROM ps_sync_state').single, + {'priority': 2147483647}); + + db.execute('SELECT powersync_clear(0)'); + expect(db.select('SELECT powersync_last_synced_at() AS r').single, + {'r': isNull}); + expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0)); + }); + + test('tracks download progress', () { + const bucket = 'bkt'; + void expectProgress(int atLast, int sinceLast) { + final [row] = db.select( + 'SELECT count_at_last, count_since_last FROM ps_buckets WHERE name = ?', + [bucket], + ); + final [actualAtLast, actualSinceLast] = row.values; + + expect(actualAtLast, atLast, reason: 'count_at_last mismatch'); + expect(actualSinceLast, sinceLast, reason: 'count_since_last mismatch'); + } + + pushSyncData(bucket, '1', 'row-0', 'PUT', {'col': 'hi'}); + expectProgress(0, 1); + + pushSyncData(bucket, '2', 'row-1', 'PUT', {'col': 'hi'}); + expectProgress(0, 2); + + expect( + pushCheckpointComplete( + '2', + null, + [_bucketChecksum(bucket, 1, checksum: 0)], + priority: 1, + ), + isTrue, + ); + + // Running partial or complete checkpoints should not reset stats, client + // SDKs are responsible for that. + expectProgress(0, 2); + expect(db.select('SELECT * FROM items'), isNotEmpty); + + expect( + pushCheckpointComplete( + '2', + null, + [_bucketChecksum(bucket, 1, checksum: 0)], + ), + isTrue, + ); + expectProgress(0, 2); + + db.execute(''' +UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name + WHERE ?1->name IS NOT NULL +''', [ + json.encode({bucket: 2}), + ]); + expectProgress(2, 0); + + // Run another iteration of this + pushSyncData(bucket, '3', 'row-3', 'PUT', {'col': 'hi'}); + expectProgress(2, 1); + db.execute(''' +UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name + WHERE ?1->name IS NOT NULL +''', [ + json.encode({bucket: 3}), + ]); + expectProgress(3, 0); + }); + }); +} + +Object? _bucketChecksum(String bucket, int prio, {int checksum = 0}) { + return {'bucket': bucket, 'priority': prio, 'checksum': checksum}; +} + +const _schema = { + 'tables': [ + { + 'name': 'items', + 'columns': [ + {'name': 'col', 'type': 'text'} + ], + } + ] +}; + +const _bucketDescriptions = { + 'prio0': {'priority': 0}, + 'prio1': {'priority': 1}, + 'prio2': {'priority': 2}, + 'prio3': {'priority': 3}, +}; diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 8eb832e..2b84a83 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -1,18 +1,30 @@ import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; +import 'package:bson/bson.dart'; import 'package:fake_async/fake_async.dart'; import 'package:file/local.dart'; +import 'package:meta/meta.dart'; import 'package:sqlite3/common.dart'; import 'package:sqlite3/sqlite3.dart'; import 'package:sqlite3_test/sqlite3_test.dart'; import 'package:test/test.dart'; +import 'package:path/path.dart'; import 'utils/native_test_utils.dart'; +@isTest +void syncTest(String description, void Function(FakeAsync controller) body) { + return test(description, () { + // Give each test the same starting time to make goldens easier to compare. + fakeAsync(body, initialTime: DateTime.utc(2025, 3, 1, 10)); + }); +} + void main() { - // Needs an unique name per test file to avoid concurrency issues final vfs = - TestSqliteFileSystem(fs: const LocalFileSystem(), name: 'sync-test-vfs'); + TestSqliteFileSystem(fs: const LocalFileSystem(), name: 'vfs-sync-test'); setUpAll(() { loadExtension(); @@ -20,307 +32,647 @@ void main() { }); tearDownAll(() => sqlite3.unregisterVirtualFileSystem(vfs)); - group('sync tests', () { - late CommonDatabase db; + group('text lines', () { + _syncTests(vfs: vfs, isBson: false); + }); - setUp(() async { - db = openTestDatabase(vfs: vfs) - ..select('select powersync_init();') - ..select('select powersync_replace_schema(?)', [json.encode(_schema)]); - }); + group('bson lines', () { + _syncTests(vfs: vfs, isBson: true); + }); +} - tearDown(() { - db.dispose(); - }); +void _syncTests({ + required VirtualFileSystem vfs, + required bool isBson, +}) { + late CommonDatabase db; + late SyncLinesGoldenTest matcher; + + List invokeControlRaw(String operation, Object? data) { + final [row] = + db.select('SELECT powersync_control(?, ?)', [operation, data]); + return jsonDecode(row.columnAt(0)); + } + + List invokeControl(String operation, Object? data) { + if (matcher.enabled) { + // Trace through golden matcher + return matcher.invoke(operation, data); + } else { + final [row] = + db.select('SELECT powersync_control(?, ?)', [operation, data]); + return jsonDecode(row.columnAt(0)); + } + } - void pushSyncData( - String bucket, - String opId, - String rowId, - Object op, - Object? data, { - Object? descriptions = _bucketDescriptions, - }) { - final encoded = json.encode({ - 'buckets': [ + setUp(() async { + db = openTestDatabase(vfs: vfs) + ..select('select powersync_init();') + ..select('select powersync_replace_schema(?)', [json.encode(_schema)]) + ..execute('update ps_kv set value = ?2 where key = ?1', + ['client_id', 'test-test-test-test']); + + matcher = SyncLinesGoldenTest(isBson, invokeControlRaw); + }); + + tearDown(() { + matcher.finish(); + db.dispose(); + }); + + List syncLine(Object? line) { + if (isBson) { + final serialized = BsonCodec.serialize(line).byteList; + // print(serialized.asRustByteString); + return invokeControl('line_binary', serialized); + } else { + return invokeControl('line_text', jsonEncode(line)); + } + } + + List pushSyncData( + String bucket, String opId, String rowId, Object op, Object? data, + {int checksum = 0}) { + return syncLine({ + 'data': { + 'bucket': bucket, + 'has_more': false, + 'after': null, + 'next_after': null, + 'data': [ { - 'bucket': bucket, - 'data': [ - { - 'op_id': opId, - 'op': op, - 'object_type': 'items', - 'object_id': rowId, - 'checksum': 0, - 'data': json.encode(data), - } - ], + 'op_id': opId, + 'op': op, + 'object_type': 'items', + 'object_id': rowId, + 'checksum': checksum, + 'data': json.encode(data), } ], - if (descriptions != null) 'descriptions': descriptions, - }); + }, + }); + } + + List pushCheckpoint( + {int lastOpId = 1, List buckets = const []}) { + return syncLine({ + 'checkpoint': { + 'last_op_id': '$lastOpId', + 'write_checkpoint': null, + 'buckets': buckets, + }, + }); + } - db.execute('insert into powersync_operations (op, data) VALUES (?, ?);', - ['save', encoded]); - } + List pushCheckpointComplete({int? priority, String lastOpId = '1'}) { + return syncLine({ + priority == null ? 'checkpoint_complete' : 'partial_checkpoint_complete': + { + 'last_op_id': lastOpId, + if (priority != null) 'priority': priority, + }, + }); + } - bool pushCheckpointComplete( - String lastOpId, String? writeCheckpoint, List checksums, - {int? priority}) { - final [row] = db.select('select powersync_validate_checkpoint(?) as r;', [ - json.encode({ - 'last_op_id': lastOpId, - 'write_checkpoint': writeCheckpoint, + ResultSet fetchRows() { + return db.select('select * from items'); + } + + group('goldens', () { + syncTest('starting stream', (_) { + matcher.load('starting_stream'); + invokeControl('start', null); + }); + + syncTest('simple sync iteration', (_) { + matcher.load('simple_iteration'); + invokeControl('start', null); + + syncLine({ + 'checkpoint': { + 'last_op_id': '1', + 'write_checkpoint': null, 'buckets': [ - for (final cs in checksums.cast>()) - if (priority == null || cs['priority'] <= priority) cs + { + 'bucket': 'a', + 'checksum': 0, + 'priority': 3, + 'count': 1, + } ], - 'priority': priority, - }) - ]); + }, + }); + syncLine({'token_expires_in': 60}); + pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'}); - final decoded = json.decode(row['r']); - if (decoded['valid'] != true) { - fail(row['r']); - } + syncLine({ + 'checkpoint_complete': {'last_op_id': '1'}, + }); - db.execute( - 'UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))', - [ - lastOpId, - json.encode(checksums.map((e) => (e as Map)['bucket']).toList()) - ], - ); + syncLine({'token_expires_in': 10}); + }); + }); - db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [ - 'sync_local', - priority != null - ? jsonEncode({ - 'priority': priority, - 'buckets': [ - for (final cs in checksums.cast>()) - if (cs['priority'] <= priority) cs['bucket'] - ], - }) - : null, - ]); - return db.lastInsertRowId == 1; - } + test('does not publish until reaching checkpoint', () { + invokeControl('start', null); + pushCheckpoint(buckets: priorityBuckets); + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); - ResultSet fetchRows() { - return db.select('select * from items'); - } + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); - test('does not publish until reaching checkpoint', () { - expect(fetchRows(), isEmpty); - pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); - expect(fetchRows(), isEmpty); + pushCheckpointComplete(); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'} + ]); + }); - expect( - pushCheckpointComplete( - '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), - isTrue); - expect(fetchRows(), [ - {'id': 'row-0', 'col': 'hi'} - ]); - }); + test('publishes with local data for prio=0 buckets', () { + invokeControl('start', null); + pushCheckpoint(buckets: priorityBuckets); + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); - test('does not publish with pending local data', () { - expect(fetchRows(), isEmpty); - db.execute("insert into items (id, col) values ('local', 'data');"); - expect(fetchRows(), isNotEmpty); + pushSyncData('prio0', '1', 'row-0', 'PUT', {'col': 'hi'}); - pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); - expect( - pushCheckpointComplete( - '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), - isFalse); - expect(fetchRows(), [ - {'id': 'local', 'col': 'data'} - ]); - }); + pushCheckpointComplete(priority: 0); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'}, + {'id': 'row-0', 'col': 'hi'}, + ]); + }); - test('publishes with local data for prio=0 buckets', () { - expect(fetchRows(), isEmpty); - db.execute("insert into items (id, col) values ('local', 'data');"); - expect(fetchRows(), isNotEmpty); + test('does not publish with pending local data', () { + invokeControl('start', null); + pushCheckpoint(buckets: priorityBuckets); + db.execute("insert into items (id, col) values ('local', 'data');"); + expect(fetchRows(), isNotEmpty); - pushSyncData('prio0', '1', 'row-0', 'PUT', {'col': 'hi'}); - expect( - pushCheckpointComplete( - '1', - null, - [_bucketChecksum('prio0', 0, checksum: 0)], - priority: 0, - ), - isTrue, + pushCheckpoint(buckets: priorityBuckets); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + pushCheckpointComplete(); + + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'} + ]); + }); + + test('can publish partial checkpoints under different priorities', () { + invokeControl('start', null); + pushCheckpoint(buckets: priorityBuckets); + for (var i = 0; i < 4; i++) { + pushSyncData('prio$i', '1', 'row-$i', 'PUT', {'col': '$i'}); + } + + expect(fetchRows(), isEmpty); + + // Simulate a partial checkpoint complete for each of the buckets. + for (var i = 0; i < 4; i++) { + pushCheckpointComplete( + priority: i, ); + expect(fetchRows(), [ - {'id': 'local', 'col': 'data'}, - {'id': 'row-0', 'col': 'hi'}, + for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'}, ]); - }); - test('can publish partial checkpoints under different priorities', () { - for (var i = 0; i < 4; i++) { - pushSyncData('prio$i', '1', 'row-$i', 'PUT', {'col': '$i'}); - } - expect(fetchRows(), isEmpty); + expect(db.select('select 1 from ps_sync_state where priority = ?', [i]), + isNotEmpty); + // A sync at this priority includes all higher priorities too, so they + // should be cleared. + expect(db.select('select 1 from ps_sync_state where priority < ?', [i]), + isEmpty); + } + }); + + syncTest('can sync multiple times', (controller) { + invokeControl('start', null); + + for (var i = 0; i < 10; i++) { + pushCheckpoint(buckets: priorityBuckets); - // Simulate a partial checkpoint complete for each of the buckets. - for (var i = 0; i < 4; i++) { + for (var prio in const [1, 2, 3, null]) { + pushCheckpointComplete(priority: prio); + + // Make sure there's only a single row in last_synced_at expect( - pushCheckpointComplete( - '1', - null, + db.select( + "SELECT datetime(last_synced_at) AS last_synced_at FROM ps_sync_state WHERE priority = ?", + [prio ?? 2147483647]), + [ + {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} + ], + ); + + if (prio == null) { + expect( + db.select( + "SELECT datetime(powersync_last_synced_at()) AS last_synced_at"), [ - for (var j = 0; j <= 4; j++) - _bucketChecksum( - 'prio$j', - j, - // Give buckets outside of the current priority a wrong - // checksum. They should not be validated yet. - checksum: j <= i ? 0 : 1234, - ), + {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} ], - priority: i, - ), - isTrue, - ); + ); + } + } - expect(fetchRows(), [ - for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'}, - ]); + controller.elapse(const Duration(hours: 1)); + } + }); - expect(db.select('select 1 from ps_sync_state where priority = ?', [i]), - isNotEmpty); - // A sync at this priority includes all higher priorities too, so they - // should be cleared. - expect(db.select('select 1 from ps_sync_state where priority < ?', [i]), - isEmpty); - } + test('clearing database clears sync status', () { + invokeControl('start', null); + pushCheckpoint(buckets: priorityBuckets); + pushCheckpointComplete(); + + expect(db.select('SELECT powersync_last_synced_at() AS r').single, + {'r': isNotNull}); + expect(db.select('SELECT priority FROM ps_sync_state').single, + {'priority': 2147483647}); + + db.execute('SELECT powersync_clear(0)'); + expect(db.select('SELECT powersync_last_synced_at() AS r').single, + {'r': isNull}); + expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0)); + }); + + test('persists download progress', () { + const bucket = 'bkt'; + void expectProgress(int atLast, int sinceLast) { + final [row] = db.select( + 'SELECT count_at_last, count_since_last FROM ps_buckets WHERE name = ?', + [bucket], + ); + final [actualAtLast, actualSinceLast] = row.values; + + expect(actualAtLast, atLast, reason: 'count_at_last mismatch'); + expect(actualSinceLast, sinceLast, reason: 'count_since_last mismatch'); + } + + invokeControl('start', null); + pushCheckpoint(buckets: [bucketDescription(bucket, count: 2)]); + pushCheckpointComplete(); + + pushSyncData(bucket, '1', 'row-0', 'PUT', {'col': 'hi'}); + expectProgress(0, 1); + + pushSyncData(bucket, '1', 'row-1', 'PUT', {'col': 'hi again'}); + expectProgress(0, 2); + + pushCheckpointComplete(lastOpId: '2'); + expectProgress(2, 0); + }); + + test('deletes old buckets', () { + for (final name in ['one', 'two', 'three', r'$local']) { + db.execute('INSERT INTO ps_buckets (name) VALUES (?)', [name]); + } + + expect( + invokeControl('start', null), + contains( + containsPair( + 'EstablishSyncStream', + containsPair('request', containsPair('buckets', hasLength(3))), + ), + ), + ); + + syncLine({ + 'checkpoint': { + 'last_op_id': '1', + 'write_checkpoint': null, + 'buckets': [ + { + 'bucket': 'one', + 'checksum': 0, + 'priority': 3, + 'count': 1, + } + ], + }, }); - test('can sync multiple times', () { - fakeAsync((controller) { - for (var i = 0; i < 10; i++) { - for (var prio in const [1, 2, 3, null]) { - pushCheckpointComplete('1', null, [], priority: prio); - - // Make sure there's only a single row in last_synced_at - expect( - db.select( - "SELECT datetime(last_synced_at, 'localtime') AS last_synced_at FROM ps_sync_state WHERE priority = ?", - [prio ?? 2147483647]), - [ - {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} - ], - ); - - if (prio == null) { - expect( - db.select( - "SELECT datetime(powersync_last_synced_at(), 'localtime') AS last_synced_at"), - [ - {'last_synced_at': '2025-03-01 ${10 + i}:00:00'} - ], - ); + // Should delete the old buckets two and three + expect(db.select('select name from ps_buckets order by id'), [ + {'name': 'one'}, + {'name': r'$local'} + ]); + }); + + if (isBson) { + test('can parse checksums from JS numbers', () { + invokeControl('start', null); + pushCheckpoint(buckets: [bucketDescription('global[]')]); + + syncLine({ + 'data': { + 'bucket': 'a', + 'has_more': false, + 'after': null, + 'next_after': null, + 'data': [ + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'items', + 'object_id': 'id', + 'checksum': 3573495687.0, + 'data': '{}', } + ], + }, + }); + }); + } + + group('progress', () { + Map? progress = null; + var lastOpId = 0; + + setUp(() { + lastOpId = 0; + return progress = null; + }); + + (int, int) totalProgress() { + return progress!.values.downloadAndTargetCount(); + } + + (int, int) priorityProgress(int priority) { + return progress!.values + .where((e) => e.priority <= priority) + .downloadAndTargetCount(); + } + + void applyInstructions(List instructions) { + for (final instruction in instructions.cast()) { + if (instruction['UpdateSyncStatus'] case final updateStatus?) { + final downloading = updateStatus['status']['downloading']; + if (downloading == null) { + progress = null; + } else { + progress = { + for (final MapEntry(:key, :value) + in downloading['buckets'].entries) + key: ( + atLast: value['at_last'] as int, + sinceLast: value['since_last'] as int, + targetCount: value['target_count'] as int, + priority: value['priority'] as int, + ), + }; } + } + } + } + + void pushSyncData(String bucket, int amount) { + final instructions = syncLine({ + 'data': { + 'bucket': bucket, + 'has_more': false, + 'after': null, + 'next_after': null, + 'data': [ + for (var i = 0; i < amount; i++) + { + 'op_id': (++lastOpId).toString(), + 'op': 'PUT', + 'object_type': 'items', + 'object_id': '$lastOpId', + 'checksum': 0, + 'data': '{}', + } + ], + }, + }); + + applyInstructions(instructions); + } - controller.elapse(const Duration(hours: 1)); + void addCheckpointComplete({int? priority}) { + applyInstructions( + pushCheckpointComplete(priority: priority, lastOpId: '$lastOpId')); + } + + test('without priorities', () { + applyInstructions(invokeControl('start', null)); + expect(progress, isNull); + + applyInstructions(pushCheckpoint( + buckets: [bucketDescription('a', count: 10)], lastOpId: 10)); + expect(totalProgress(), (0, 10)); + + pushSyncData('a', 10); + expect(totalProgress(), (10, 10)); + + addCheckpointComplete(); + expect(progress, isNull); + + // Emit new data, progress should be 0/2 instead of 10/12 + applyInstructions(syncLine({ + 'checkpoint_diff': { + 'last_op_id': '12', + 'updated_buckets': [ + { + 'bucket': 'a', + 'priority': 3, + 'checksum': 0, + 'count': 12, + 'last_op_id': null + }, + ], + 'removed_buckets': [], + 'write_checkpoint': null, } - }, initialTime: DateTime(2025, 3, 1, 10)); + })); + expect(totalProgress(), (0, 2)); + + pushSyncData('a', 2); + expect(totalProgress(), (2, 2)); + + addCheckpointComplete(); + expect(progress, isNull); }); - test('clearing database clears sync status', () { - pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + test('interrupted sync', () { + applyInstructions(invokeControl('start', null)); + applyInstructions(pushCheckpoint( + buckets: [bucketDescription('a', count: 10)], lastOpId: 10)); + expect(totalProgress(), (0, 10)); - expect( - pushCheckpointComplete( - '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), - isTrue); - expect(db.select('SELECT powersync_last_synced_at() AS r').single, - {'r': isNotNull}); - expect(db.select('SELECT priority FROM ps_sync_state').single, - {'priority': 2147483647}); - - db.execute('SELECT powersync_clear(0)'); - expect(db.select('SELECT powersync_last_synced_at() AS r').single, - {'r': isNull}); - expect(db.select('SELECT * FROM ps_sync_state'), hasLength(0)); + pushSyncData('a', 5); + expect(totalProgress(), (5, 10)); + + // Emulate stream closing + applyInstructions(invokeControl('stop', null)); + expect(progress, isNull); + + applyInstructions(invokeControl('start', null)); + applyInstructions(pushCheckpoint( + buckets: [bucketDescription('a', count: 10)], lastOpId: 10)); + expect(totalProgress(), (5, 10)); + + pushSyncData('a', 5); + expect(totalProgress(), (10, 10)); + addCheckpointComplete(); + expect(progress, isNull); }); - test('tracks download progress', () { - const bucket = 'bkt'; - void expectProgress(int atLast, int sinceLast) { - final [row] = db.select( - 'SELECT count_at_last, count_since_last FROM ps_buckets WHERE name = ?', - [bucket], - ); - final [actualAtLast, actualSinceLast] = row.values; + test('interrupted sync with new checkpoint', () { + applyInstructions(invokeControl('start', null)); + applyInstructions(pushCheckpoint( + buckets: [bucketDescription('a', count: 10)], lastOpId: 10)); + expect(totalProgress(), (0, 10)); - expect(actualAtLast, atLast, reason: 'count_at_last mismatch'); - expect(actualSinceLast, sinceLast, reason: 'count_since_last mismatch'); + pushSyncData('a', 5); + expect(totalProgress(), (5, 10)); + + // Emulate stream closing + applyInstructions(invokeControl('stop', null)); + expect(progress, isNull); + + applyInstructions(invokeControl('start', null)); + applyInstructions(pushCheckpoint( + buckets: [bucketDescription('a', count: 12)], lastOpId: 12)); + expect(totalProgress(), (5, 12)); + + pushSyncData('a', 7); + expect(totalProgress(), (12, 12)); + addCheckpointComplete(); + expect(progress, isNull); + }); + + test('interrupt and defrag', () { + applyInstructions(invokeControl('start', null)); + applyInstructions(pushCheckpoint( + buckets: [bucketDescription('a', count: 10)], lastOpId: 10)); + expect(totalProgress(), (0, 10)); + + pushSyncData('a', 5); + expect(totalProgress(), (5, 10)); + + // Emulate stream closing + applyInstructions(invokeControl('stop', null)); + expect(progress, isNull); + + applyInstructions(invokeControl('start', null)); + // A defrag in the meantime shrank the bucket. + applyInstructions(pushCheckpoint( + buckets: [bucketDescription('a', count: 4)], lastOpId: 14)); + // So we shouldn't report 5/4. + expect(totalProgress(), (0, 4)); + + // This should also reset the persisted progress counters. + final [bucket] = db.select('SELECT * FROM ps_buckets'); + expect(bucket, containsPair('count_since_last', 0)); + expect(bucket, containsPair('count_at_last', 0)); + }); + + test('different priorities', () { + void expectProgress((int, int) prio0, (int, int) prio2) { + expect(priorityProgress(0), prio0); + expect(priorityProgress(1), prio0); + expect(priorityProgress(2), prio2); + expect(totalProgress(), prio2); } - pushSyncData(bucket, '1', 'row-0', 'PUT', {'col': 'hi'}); - expectProgress(0, 1); + applyInstructions(invokeControl('start', null)); + applyInstructions(pushCheckpoint(buckets: [ + bucketDescription('a', count: 5, priority: 0), + bucketDescription('b', count: 5, priority: 2), + ], lastOpId: 10)); + expectProgress((0, 5), (0, 10)); + + pushSyncData('a', 5); + expectProgress((5, 5), (5, 10)); + + pushSyncData('b', 2); + expectProgress((5, 5), (7, 10)); + + // Before syncing b fully, send a new checkpoint + applyInstructions(pushCheckpoint(buckets: [ + bucketDescription('a', count: 8, priority: 0), + bucketDescription('b', count: 6, priority: 2), + ], lastOpId: 14)); + expectProgress((5, 8), (7, 14)); + + pushSyncData('a', 3); + expectProgress((8, 8), (10, 14)); + pushSyncData('b', 4); + expectProgress((8, 8), (14, 14)); + + addCheckpointComplete(); + expect(progress, isNull); + }); + }); - pushSyncData(bucket, '2', 'row-1', 'PUT', {'col': 'hi'}); - expectProgress(0, 2); + group('errors', () { + syncTest('diff without prior checkpoint', (_) { + invokeControl('start', null); expect( - pushCheckpointComplete( - '2', - null, - [_bucketChecksum(bucket, 1, checksum: 0)], - priority: 1, + () => syncLine({ + 'checkpoint_diff': { + 'last_op_id': '1', + 'write_checkpoint': null, + 'updated_buckets': [], + 'removed_buckets': [], + }, + }), + throwsA( + isA().having( + (e) => e.message, + 'message', + contains('checkpoint_diff without previous checkpoint'), + ), ), - isTrue, ); + }); - // Running partial or complete checkpoints should not reset stats, client - // SDKs are responsible for that. - expectProgress(0, 2); - expect(db.select('SELECT * FROM items'), isNotEmpty); + syncTest('checksum mismatch', (_) { + invokeControl('start', null); + + syncLine({ + 'checkpoint': { + 'last_op_id': '1', + 'write_checkpoint': null, + 'buckets': [ + { + 'bucket': 'a', + 'checksum': 1234, + 'priority': 3, + 'count': 1, + } + ], + }, + }); + pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'}, checksum: 4321); + + expect(db.select('SELECT * FROM ps_buckets'), hasLength(1)); expect( - pushCheckpointComplete( - '2', - null, - [_bucketChecksum(bucket, 1, checksum: 0)], - ), - isTrue, + syncLine({ + 'checkpoint_complete': {'last_op_id': '1'}, + }), + [ + { + 'LogLine': { + 'severity': 'WARNING', + 'line': contains( + "Checksums didn't match, failed for: a (expected 0x000004d2, got 0x000010e1 = 0x000010e1 (op) + 0x00000000 (add))") + } + }, + {'CloseSyncStream': {}}, + ], ); - expectProgress(0, 2); - db.execute(''' -UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name - WHERE ?1->name IS NOT NULL -''', [ - json.encode({bucket: 2}), - ]); - expectProgress(2, 0); - - // Run another iteration of this - pushSyncData(bucket, '3', 'row-3', 'PUT', {'col': 'hi'}); - expectProgress(2, 1); - db.execute(''' -UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name - WHERE ?1->name IS NOT NULL -''', [ - json.encode({bucket: 3}), - ]); - expectProgress(3, 0); + // Should delete bucket with checksum mismatch + expect(db.select('SELECT * FROM ps_buckets'), isEmpty); }); }); } -Object? _bucketChecksum(String bucket, int prio, {int checksum = 0}) { - return {'bucket': bucket, 'priority': prio, 'checksum': checksum}; -} - const _schema = { 'tables': [ { @@ -332,9 +684,167 @@ const _schema = { ] }; -const _bucketDescriptions = { - 'prio0': {'priority': 0}, - 'prio1': {'priority': 1}, - 'prio2': {'priority': 2}, - 'prio3': {'priority': 3}, -}; +Object bucketDescription(String name, + {int checksum = 0, int priority = 3, int count = 1}) { + return { + 'bucket': name, + 'checksum': checksum, + 'priority': priority, + 'count': count, + }; +} + +final priorityBuckets = [ + for (var i = 0; i < 4; i++) bucketDescription('prio$i', priority: i) +]; + +typedef BucketProgress = ({ + int priority, + int atLast, + int sinceLast, + int targetCount +}); + +extension on Iterable { + (int, int) downloadAndTargetCount() { + return fold((0, 0), (counters, entry) { + final (downloaded, total) = counters; + + return ( + downloaded + entry.sinceLast, + total + entry.targetCount - entry.atLast + ); + }); + } +} + +extension on Uint8List { + // ignore: unused_element + String get asRustByteString { + final buffer = StringBuffer('b"'); + + for (final byte in this) { + switch (byte) { + case >= 32 && < 127: + buffer.writeCharCode(byte); + default: + // Escape + buffer.write('\\x${byte.toRadixString(16).padLeft(2, '0')}'); + } + } + + buffer.write('"'); + return buffer.toString(); + } +} + +final class SyncLinesGoldenTest { + static bool _update = Platform.environment['UPDATE_GOLDENS'] == '1'; + + final List Function(String operation, Object? data) _invokeControl; + + String? name; + + final bool isBson; + final List expectedLines = []; + final List actualLines = []; + + String get path => join('test', 'goldens', '$name.json'); + + bool get enabled => name != null; + + SyncLinesGoldenTest(this.isBson, this._invokeControl); + + ExpectedSyncLine get _nextExpectation { + return expectedLines[actualLines.length]; + } + + void _checkMismatch(void Function() compare) { + try { + compare(); + } catch (e) { + print( + 'Golden test for sync lines failed, set UPDATE_GOLDENS=1 to update'); + rethrow; + } + } + + void load(String name) { + this.name = name; + final file = File(path); + try { + final loaded = json.decode(file.readAsStringSync()); + for (final entry in loaded) { + expectedLines.add(ExpectedSyncLine.fromJson(entry)); + } + } catch (e) { + if (!_update) { + rethrow; + } + } + } + + List invoke(String operation, Object? data) { + final matchData = switch (data) { + final String s => json.decode(s), + _ => data, + }; + + if (_update) { + final result = _invokeControl(operation, data); + actualLines.add(ExpectedSyncLine(operation, matchData, result)); + return result; + } else { + final expected = _nextExpectation; + if (!isBson) { + // We only want to compare the JSON inputs. We compare outputs + // regardless of the encoding mode. + _checkMismatch(() { + expect(operation, expected.operation); + expect(matchData, expected.data); + }); + } + + final result = _invokeControl(operation, data); + _checkMismatch(() { + expect(result, expected.output); + }); + + actualLines.add(ExpectedSyncLine(operation, matchData, result)); + return result; + } + } + + void finish() { + if (_update && enabled) { + if (!isBson) { + File(path).writeAsStringSync( + JsonEncoder.withIndent(' ').convert(actualLines)); + } + } else { + _checkMismatch( + () => expect(actualLines, hasLength(expectedLines.length))); + } + } +} + +final class ExpectedSyncLine { + final String operation; + final Object? data; + final List output; + + ExpectedSyncLine(this.operation, this.data, this.output); + + factory ExpectedSyncLine.fromJson(Map json) { + return ExpectedSyncLine( + json['operation'] as String, json['data'], json['output'] as List); + } + + Map toJson() { + return { + 'operation': operation, + 'data': data, + 'output': output, + }; + } +} diff --git a/docs/sync.md b/docs/sync.md new file mode 100644 index 0000000..fa6a1e3 --- /dev/null +++ b/docs/sync.md @@ -0,0 +1,79 @@ +## Sync interface + +The core extension implements the state machine and necessary SQL handling to decode and apply +sync line sent from a PowerSync service instance. + +After registering the PowerSync extension, this client is available through the `powersync_control` +function, which takes two arguments: A command (text), and a payload (text, blob, or null). + +The following commands are supported: + +1. `start`: Payload is a JSON-encoded object. This requests the client to start a sync iteration, specifying + parameters. +2. `stop`: No payload, requests the current sync iteration (if any) to be shut down. +3. `line_text`: Payload is a serialized JSON object received from the sync service. +4. `line_binary`: Payload is a BSON-encoded object received from the sync service. +5. `refreshed_token`: Notify the sync client that the JWT used to authenticate to the PowerSync service has + changed. + - The client will emit an instruction to stop the current stream, clients should restart by sending another `start` + command. +6. `completed_upload`: Notify the sync implementation that all local changes have been uploaded. + +`powersync_control` returns a JSON-encoded array of instructions for the client: + +```typescript +type Instruction = { LogLine: LogLine } + | { UpdateSyncStatus: UpdateSyncStatus } + | { EstablishSyncStream: EstablishSyncStream } + | { FetchCredentials: FetchCredentials } + // Close a connection previously started after EstablishSyncStream + | { CloseSyncStream: {} } + // For the Dart web client, flush the (otherwise non-durable) file system. + | { FlushFileSystem: {} } + // Notify clients that a checkpoint was completed. Clients can clear the + // download error state in response to this. + | { DidCompleteSync: {} } + +interface LogLine { + severity: 'DEBUG' | 'INFO' | 'WARNING', + line: String, +} + +// Instructs client SDKs to open a connection to the sync service. +interface EstablishSyncStream { + request: any // The JSON-encoded StreamingSyncRequest to send to the sync service +} + +// Instructs SDKS to update the downloading state of their SyncStatus. +interface UpdateSyncStatus { + connected: boolean, + connecting: boolean, + priority_status: [], + downloading: null | DownloadProgress, +} + +// Instructs SDKs to refresh credentials from the backend connector. +// They don't necessary have to close the connection, a CloseSyncStream instruction +// will be sent when the token has already expired. +interface FetchCredentials { + // Set as an option in case fetching and prefetching should be handled differently. + did_expire: boolean +} + +interface SyncPriorityStatus { + priority: int, + last_synced_at: null | int, + has_synced: null | boolean, +} + +interface DownloadProgress { + buckets: Record +} + +interface BucketProgress { + priority: int, + at_last: int, + since_last: int, + target_count: int +} +```