Skip to content

Commit 5e5c105

Browse files
committed
Implement json parsing of sync stream data.
1 parent c9d7b4e commit 5e5c105

File tree

6 files changed

+250
-46
lines changed

6 files changed

+250
-46
lines changed

crates/core/src/checkpoint.rs

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,56 +15,14 @@ use sqlite_nostd::{Connection, Context, Value};
1515

1616
use crate::create_sqlite_text_fn;
1717
use crate::error::SQLiteError;
18-
19-
#[derive(Serialize, Deserialize)]
20-
struct Checkpoint {
21-
#[serde(deserialize_with = "deserialize_string_to_i64")]
22-
last_op_id: i64,
23-
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
24-
write_checkpoint: Option<i64>,
25-
buckets: Vec<BucketChecksum>,
26-
}
27-
28-
#[derive(Serialize, Deserialize)]
29-
struct BucketChecksum {
30-
bucket: String,
31-
checksum: i32,
32-
}
18+
use crate::sync_types::Checkpoint;
3319

3420
#[derive(Serialize, Deserialize)]
3521
struct CheckpointResult {
3622
valid: bool,
3723
failed_buckets: Vec<String>,
3824
}
3925

40-
fn deserialize_string_to_i64<'de, D>(deserializer: D) -> Result<i64, D::Error>
41-
where
42-
D: serde::Deserializer<'de>,
43-
{
44-
let value = json::Value::deserialize(deserializer)?;
45-
46-
match value {
47-
json::Value::String(s) => s.parse::<i64>().map_err(serde::de::Error::custom),
48-
_ => Err(serde::de::Error::custom("Expected a string.")),
49-
}
50-
}
51-
52-
fn deserialize_optional_string_to_i64<'de, D>(deserializer: D) -> Result<Option<i64>, D::Error>
53-
where
54-
D: serde::Deserializer<'de>,
55-
{
56-
let value = json::Value::deserialize(deserializer)?;
57-
58-
match value {
59-
json::Value::Null => Ok(None),
60-
json::Value::String(s) => s.parse::<i64>()
61-
.map(Some)
62-
.map_err(serde::de::Error::custom),
63-
_ => Err(serde::de::Error::custom("Expected a string or null.")),
64-
}
65-
}
66-
67-
6826
fn powersync_validate_checkpoint_impl(
6927
ctx: *mut sqlite::context,
7028
args: &[*mut sqlite::value],

crates/core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![feature(vec_into_raw_parts)]
33
#![feature(core_intrinsics)]
44
#![feature(error_in_core)]
5+
#![feature(assert_matches)]
56

67
extern crate alloc;
78

@@ -26,6 +27,7 @@ mod vtab_util;
2627
mod sync_local;
2728
mod checkpoint;
2829
mod version;
30+
mod sync_types;
2931

3032
#[no_mangle]
3133
pub extern "C" fn sqlite3_powersync_init(

crates/core/src/operations.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
use alloc::format;
22
use alloc::string::{String, ToString};
3+
use alloc::vec::Vec;
4+
use serde::{Deserialize, Deserializer, Serialize};
5+
use serde_json as json;
36

47
use sqlite_nostd as sqlite;
58
use sqlite_nostd::{Connection, ResultCode};
69
use uuid::Uuid;
710
use crate::error::{SQLiteError, PSResult};
811

912
use crate::ext::SafeManagedStmt;
13+
use crate::sync_types::{BucketChecksum, Checkpoint, StreamingSyncLine};
14+
use crate::util::*;
1015

1116
// Run inside a transaction
1217
pub fn insert_operation(
@@ -290,3 +295,14 @@ pub fn delete_bucket(
290295

291296
Ok(())
292297
}
298+
299+
300+
pub fn stream_operation(
301+
db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
302+
303+
let line: StreamingSyncLine = serde_json::from_str(data)?;
304+
305+
Ok(())
306+
}
307+
308+

crates/core/src/operations_vtab.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,22 @@ use core::slice;
77
use sqlite::{Connection, ResultCode, Value};
88
use sqlite_nostd as sqlite;
99

10-
use crate::operations::{clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation};
10+
use crate::operations::{clear_remove_ops, delete_bucket, delete_pending_buckets, insert_operation, stream_operation};
1111
use crate::sync_local::sync_local;
12+
use crate::sync_types::Checkpoint;
1213
use crate::vtab_util::*;
1314

1415
#[repr(C)]
1516
struct VirtualTable {
1617
base: sqlite::vtab,
1718
db: *mut sqlite::sqlite3,
19+
20+
target_checkpoint: Option<Checkpoint>,
21+
target_applied: bool,
22+
target_validated: bool
1823
}
1924

25+
2026
extern "C" fn connect(
2127
db: *mut sqlite::sqlite3,
2228
_aux: *mut c_void,
@@ -38,6 +44,9 @@ extern "C" fn connect(
3844
zErrMsg: core::ptr::null_mut(),
3945
},
4046
db,
47+
target_checkpoint: None,
48+
target_validated: false,
49+
target_applied: false
4150
}));
4251
*vtab = tab.cast::<sqlite::vtab>();
4352
let _ = sqlite::vtab_config(db, 0);
@@ -93,7 +102,10 @@ extern "C" fn update(
93102
} else if op == "delete_bucket" {
94103
let result = delete_bucket(db, data);
95104
vtab_result(vtab, result)
96-
} else {
105+
} else if op == "stream" {
106+
let result = stream_operation(db, data);
107+
vtab_result(vtab, result)
108+
} else {
97109
ResultCode::MISUSE as c_int
98110
}
99111
} else {

crates/core/src/sync_types.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
use alloc::string::String;
2+
use alloc::vec::Vec;
3+
use serde::{de, Deserialize, Deserializer, Serialize};
4+
use serde_json as json;
5+
6+
use crate::util::{deserialize_string_to_i64, deserialize_optional_string_to_i64};
7+
use alloc::format;
8+
use alloc::string::{ToString};
9+
use core::fmt;
10+
use serde::de::{MapAccess, Visitor};
11+
12+
use sqlite_nostd as sqlite;
13+
use sqlite_nostd::{Connection, ResultCode};
14+
use uuid::Uuid;
15+
use crate::error::{SQLiteError, PSResult};
16+
17+
use crate::ext::SafeManagedStmt;
18+
19+
#[derive(Serialize, Deserialize, Debug)]
20+
pub struct Checkpoint {
21+
#[serde(deserialize_with = "deserialize_string_to_i64")]
22+
pub last_op_id: i64,
23+
#[serde(default)]
24+
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
25+
pub write_checkpoint: Option<i64>,
26+
pub buckets: Vec<BucketChecksum>,
27+
}
28+
29+
#[derive(Serialize, Deserialize, Debug)]
30+
pub struct BucketChecksum {
31+
pub bucket: String,
32+
pub checksum: i32,
33+
}
34+
35+
36+
#[derive(Serialize, Deserialize, Debug)]
37+
pub struct CheckpointComplete {
38+
#[serde(deserialize_with = "deserialize_string_to_i64")]
39+
last_op_id: i64
40+
}
41+
42+
#[derive(Serialize, Deserialize, Debug)]
43+
pub struct SyncBucketData {
44+
// TODO: complete this
45+
bucket: String
46+
}
47+
48+
#[derive(Serialize, Deserialize, Debug)]
49+
pub struct Keepalive {
50+
token_expires_in: i32
51+
}
52+
53+
#[derive(Serialize, Deserialize, Debug)]
54+
pub struct CheckpointDiff {
55+
#[serde(deserialize_with = "deserialize_string_to_i64")]
56+
last_op_id: i64,
57+
updated_buckets: Vec<BucketChecksum>,
58+
removed_buckets: Vec<String>,
59+
#[serde(default)]
60+
#[serde(deserialize_with = "deserialize_optional_string_to_i64")]
61+
write_checkpoint: Option<i64>
62+
}
63+
64+
65+
66+
#[derive(Debug)]
67+
pub enum StreamingSyncLine {
68+
CheckpointLine(Checkpoint),
69+
CheckpointDiffLine(CheckpointDiff),
70+
CheckpointCompleteLine(CheckpointComplete),
71+
SyncBucketDataLine(SyncBucketData),
72+
KeepaliveLine(i32),
73+
Unknown
74+
}
75+
76+
// Serde does not supporting ignoring unknown fields in externally-tagged enums, so we use our own
77+
// serializer.
78+
79+
struct StreamingSyncLineVisitor;
80+
81+
impl<'de> Visitor<'de> for StreamingSyncLineVisitor {
82+
type Value = StreamingSyncLine;
83+
84+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
85+
formatter.write_str("sync data")
86+
}
87+
88+
fn visit_map<A>(self, mut access: A) -> Result<Self::Value, A::Error>
89+
where
90+
A: MapAccess<'de>,
91+
{
92+
let mut r = StreamingSyncLine::Unknown;
93+
while let Some((key, value)) = access.next_entry::<String, json::Value>()? {
94+
if !matches!(r, StreamingSyncLine::Unknown) {
95+
// Generally, we don't expect to receive multiple in one line.
96+
// But if it does happen, we keep the first one.
97+
continue;
98+
}
99+
match key.as_str() {
100+
"checkpoint" => {
101+
r = StreamingSyncLine::CheckpointLine(
102+
serde_json::from_value(value).map_err(de::Error::custom)?,
103+
);
104+
}
105+
"checkpoint_diff" => {
106+
r = StreamingSyncLine::CheckpointDiffLine(
107+
serde_json::from_value(value).map_err(de::Error::custom)?,
108+
);
109+
}
110+
"checkpoint_complete" => {
111+
r = StreamingSyncLine::CheckpointCompleteLine(
112+
serde_json::from_value(value).map_err(de::Error::custom)?,
113+
);
114+
}
115+
"data" => {
116+
r = StreamingSyncLine::SyncBucketDataLine(
117+
serde_json::from_value(value).map_err(de::Error::custom)?,
118+
);
119+
}
120+
"token_expires_in" => {
121+
r = StreamingSyncLine::KeepaliveLine(
122+
serde_json::from_value(value).map_err(de::Error::custom)?,
123+
);
124+
}
125+
_ => {}
126+
}
127+
}
128+
129+
Ok(r)
130+
}
131+
}
132+
133+
impl<'de> Deserialize<'de> for StreamingSyncLine {
134+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
135+
where
136+
D: Deserializer<'de>,
137+
{
138+
deserializer.deserialize_map(StreamingSyncLineVisitor)
139+
}
140+
}
141+
142+
143+
#[cfg(test)]
144+
mod tests {
145+
use core::assert_matches::assert_matches;
146+
use super::*;
147+
148+
#[test]
149+
fn json_parsing_test() {
150+
let line: StreamingSyncLine = serde_json::from_str(r#"{"token_expires_in": 42}"#).unwrap();
151+
assert_matches!(line, StreamingSyncLine::KeepaliveLine(42));
152+
153+
let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint_complete": {"last_op_id": "123"}}"#).unwrap();
154+
assert_matches!(line, StreamingSyncLine::CheckpointCompleteLine(CheckpointComplete { last_op_id: 123 }));
155+
156+
let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint_complete": {"last_op_id": "123", "other": "foo"}}"#).unwrap();
157+
assert_matches!(line, StreamingSyncLine::CheckpointCompleteLine(CheckpointComplete { last_op_id: 123 }));
158+
159+
let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint": {"last_op_id": "123", "buckets": []}}"#).unwrap();
160+
assert_matches!(line, StreamingSyncLine::CheckpointLine(Checkpoint { last_op_id: 123, .. }));
161+
162+
let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint": {"last_op_id": "123", "write_checkpoint": "42", "buckets": []}}"#).unwrap();
163+
assert_matches!(line, StreamingSyncLine::CheckpointLine(Checkpoint { last_op_id: 123, write_checkpoint: Some(42), .. }));
164+
165+
let line: StreamingSyncLine = serde_json::from_str(r#"{"checkpoint_diff": {"last_op_id": "123", "updated_buckets": [], "removed_buckets": []}}"#).unwrap();
166+
assert_matches!(line, StreamingSyncLine::CheckpointDiffLine(CheckpointDiff { last_op_id: 123, .. }));
167+
168+
// Additional/unknown fields
169+
let line: StreamingSyncLine = serde_json::from_str(r#"{"token_expires_in": 42, "foo": 1}"#).unwrap();
170+
assert_matches!(line, StreamingSyncLine::KeepaliveLine(42));
171+
let line: StreamingSyncLine = serde_json::from_str(r#"{}"#).unwrap();
172+
assert_matches!(line, StreamingSyncLine::Unknown);
173+
let line: StreamingSyncLine = serde_json::from_str(r#"{"other":"test"}"#).unwrap();
174+
assert_matches!(line, StreamingSyncLine::Unknown);
175+
176+
// Multiple - keep the first one
177+
let line: StreamingSyncLine = serde_json::from_str(r#"{"token_expires_in": 42, "checkpoint_complete": {"last_op_id": "123"}}"#).unwrap();
178+
assert_matches!(line, StreamingSyncLine::KeepaliveLine(42));
179+
180+
// Test error handling
181+
let line: Result<StreamingSyncLine, _> = serde_json::from_str(r#"{"token_expires_in": "42"}"#);
182+
assert!(line.is_err());
183+
}
184+
}

crates/core/src/util.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ extern crate alloc;
33
use alloc::format;
44
use alloc::string::String;
55

6+
use serde::{Deserialize};
7+
use serde_json as json;
8+
69
use sqlite::{Connection, ResultCode};
710
use sqlite_nostd as sqlite;
8-
use sqlite_nostd::{ManagedStmt};
11+
use sqlite_nostd::ManagedStmt;
912

1013
use crate::error::SQLiteError;
1114

@@ -52,6 +55,35 @@ pub fn extract_table_info(db: *mut sqlite::sqlite3, data: &str) -> Result<Manage
5255
Ok(statement)
5356
}
5457

58+
59+
pub fn deserialize_string_to_i64<'de, D>(deserializer: D) -> Result<i64, D::Error>
60+
where
61+
D: serde::Deserializer<'de>,
62+
{
63+
let value = json::Value::deserialize(deserializer)?;
64+
65+
match value {
66+
json::Value::String(s) => s.parse::<i64>().map_err(serde::de::Error::custom),
67+
_ => Err(serde::de::Error::custom("Expected a string.")),
68+
}
69+
}
70+
71+
pub fn deserialize_optional_string_to_i64<'de, D>(deserializer: D) -> Result<Option<i64>, D::Error>
72+
where
73+
D: serde::Deserializer<'de>,
74+
{
75+
let value = json::Value::deserialize(deserializer)?;
76+
77+
match value {
78+
json::Value::Null => Ok(None),
79+
json::Value::String(s) => s.parse::<i64>()
80+
.map(Some)
81+
.map_err(serde::de::Error::custom),
82+
_ => Err(serde::de::Error::custom("Expected a string or null.")),
83+
}
84+
}
85+
86+
5587
pub const MAX_OP_ID: &str = "9223372036854775807";
5688

5789
#[cfg(test)]

0 commit comments

Comments
 (0)