Skip to content

Commit c7e4858

Browse files
committed
Persist sync lines
1 parent 898de16 commit c7e4858

14 files changed

+266
-267
lines changed

crates/core/src/bson/de.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl<'de, 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
154154
fn deserialize_enum<V>(
155155
self,
156156
name: &'static str,
157-
variants: &'static [&'static str],
157+
_variants: &'static [&'static str],
158158
visitor: V,
159159
) -> Result<V::Value, Self::Error>
160160
where
@@ -283,7 +283,7 @@ impl<'a, 'de> VariantAccess<'de> for &'a mut Deserializer<'de> {
283283
seed.deserialize(self)
284284
}
285285

286-
fn tuple_variant<V>(self, len: usize, visitor: V) -> Result<V::Value, Self::Error>
286+
fn tuple_variant<V>(self, _len: usize, visitor: V) -> Result<V::Value, Self::Error>
287287
where
288288
V: Visitor<'de>,
289289
{
@@ -292,8 +292,8 @@ impl<'a, 'de> VariantAccess<'de> for &'a mut Deserializer<'de> {
292292
}
293293

294294
fn struct_variant<V>(
295-
mut self,
296-
fields: &'static [&'static str],
295+
self,
296+
_fields: &'static [&'static str],
297297
visitor: V,
298298
) -> Result<V::Value, Self::Error>
299299
where

crates/core/src/bson/error.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ use alloc::{
66
};
77
use serde::de::{self, StdError};
88

9-
use crate::error::SQLiteError;
10-
119
use super::parser::ElementType;
1210

1311
#[derive(Debug)]

crates/core/src/bson/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl BsonWriter {
2525
pub fn put_str(&mut self, name: &str, value: &str) {
2626
self.put_entry(ElementType::String, name);
2727

28-
let bytes = name.as_bytes();
28+
let bytes = value.as_bytes();
2929
self.output.put_i32_le(bytes.len() as i32);
3030
self.output.put_slice(bytes);
3131
self.output.push(0);

crates/core/src/diff.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
extern crate alloc;
22

3-
use alloc::format;
43
use alloc::string::{String, ToString};
54
use core::ffi::c_int;
65

crates/core/src/json_merge.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
extern crate alloc;
22

3-
use alloc::format;
43
use alloc::string::{String, ToString};
54
use core::ffi::c_int;
65

crates/core/src/operations.rs

Lines changed: 7 additions & 243 deletions
Original file line numberDiff line numberDiff line change
@@ -1,255 +1,19 @@
11
use alloc::format;
2-
use alloc::string::String;
32

4-
use crate::error::{PSResult, SQLiteError};
3+
use crate::error::SQLiteError;
4+
use crate::sync::line::DataLine;
5+
use crate::sync::operations::insert_bucket_operations;
6+
use crate::sync::storage_adapter::StorageAdapter;
57
use sqlite_nostd as sqlite;
68
use sqlite_nostd::{Connection, ResultCode};
79

810
use crate::ext::SafeManagedStmt;
911

1012
// Run inside a transaction
1113
pub fn insert_operation(db: *mut sqlite::sqlite3, data: &str) -> Result<(), SQLiteError> {
12-
// language=SQLite
13-
let statement = db.prepare_v2(
14-
"\
15-
SELECT
16-
json_extract(e.value, '$.bucket') as bucket,
17-
json_extract(e.value, '$.data') as data,
18-
json_extract(e.value, '$.has_more') as has_more,
19-
json_extract(e.value, '$.after') as after,
20-
json_extract(e.value, '$.next_after') as next_after
21-
FROM json_each(json_extract(?1, '$.buckets')) e",
22-
)?;
23-
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
24-
25-
while statement.step()? == ResultCode::ROW {
26-
let bucket = statement.column_text(0)?;
27-
let data = statement.column_text(1)?;
28-
// let _has_more = statement.column_int(2)? != 0;
29-
// let _after = statement.column_text(3)?;
30-
// let _next_after = statement.column_text(4)?;
31-
32-
insert_bucket_operations(db, bucket, data)?;
33-
}
34-
35-
Ok(())
36-
}
37-
38-
pub fn insert_bucket_operations(
39-
db: *mut sqlite::sqlite3,
40-
bucket: &str,
41-
data: &str,
42-
) -> Result<(), SQLiteError> {
43-
// Statement to insert new operations (only for PUT and REMOVE).
44-
// language=SQLite
45-
let iterate_statement = db.prepare_v2(
46-
"\
47-
SELECT
48-
json_extract(e.value, '$.op_id') as op_id,
49-
json_extract(e.value, '$.op') as op,
50-
json_extract(e.value, '$.object_type') as object_type,
51-
json_extract(e.value, '$.object_id') as object_id,
52-
json_extract(e.value, '$.checksum') as checksum,
53-
json_extract(e.value, '$.data') as data,
54-
json_extract(e.value, '$.subkey') as subkey
55-
FROM json_each(?) e",
56-
)?;
57-
iterate_statement.bind_text(1, data, sqlite::Destructor::STATIC)?;
58-
59-
// We do an ON CONFLICT UPDATE simply so that the RETURNING bit works for existing rows.
60-
// We can consider splitting this into separate SELECT and INSERT statements.
61-
// language=SQLite
62-
let bucket_statement = db.prepare_v2(
63-
"INSERT INTO ps_buckets(name)
64-
VALUES(?)
65-
ON CONFLICT DO UPDATE
66-
SET last_applied_op = last_applied_op
67-
RETURNING id, last_applied_op",
68-
)?;
69-
bucket_statement.bind_text(1, bucket, sqlite::Destructor::STATIC)?;
70-
bucket_statement.step()?;
71-
72-
let bucket_id = bucket_statement.column_int64(0);
73-
74-
// This is an optimization for initial sync - we can avoid persisting individual REMOVE
75-
// operations when last_applied_op = 0.
76-
// We do still need to do the "supersede_statement" step for this case, since a REMOVE
77-
// operation can supersede another PUT operation we're syncing at the same time.
78-
let mut is_empty = bucket_statement.column_int64(1) == 0;
79-
80-
// Statement to supersede (replace) operations with the same key.
81-
// language=SQLite
82-
let supersede_statement = db.prepare_v2(
83-
"\
84-
DELETE FROM ps_oplog
85-
WHERE unlikely(ps_oplog.bucket = ?1)
86-
AND ps_oplog.key = ?2
87-
RETURNING op_id, hash",
88-
)?;
89-
supersede_statement.bind_int64(1, bucket_id)?;
90-
91-
// language=SQLite
92-
let insert_statement = db.prepare_v2("\
93-
INSERT INTO ps_oplog(bucket, op_id, key, row_type, row_id, data, hash) VALUES (?, ?, ?, ?, ?, ?, ?)")?;
94-
insert_statement.bind_int64(1, bucket_id)?;
95-
96-
let updated_row_statement = db.prepare_v2(
97-
"\
98-
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",
99-
)?;
100-
101-
bucket_statement.reset()?;
102-
103-
let mut last_op: Option<i64> = None;
104-
let mut add_checksum: i32 = 0;
105-
let mut op_checksum: i32 = 0;
106-
let mut added_ops: i32 = 0;
107-
108-
while iterate_statement.step()? == ResultCode::ROW {
109-
let op_id = iterate_statement.column_int64(0);
110-
let op = iterate_statement.column_text(1)?;
111-
let object_type = iterate_statement.column_text(2);
112-
let object_id = iterate_statement.column_text(3);
113-
let checksum = iterate_statement.column_int(4);
114-
let op_data = iterate_statement.column_text(5);
115-
116-
last_op = Some(op_id);
117-
added_ops += 1;
118-
119-
if op == "PUT" || op == "REMOVE" {
120-
let key: String;
121-
if let (Ok(object_type), Ok(object_id)) = (object_type.as_ref(), object_id.as_ref()) {
122-
let subkey = iterate_statement.column_text(6).unwrap_or("null");
123-
key = format!("{}/{}/{}", &object_type, &object_id, subkey);
124-
} else {
125-
key = String::from("");
126-
}
127-
128-
supersede_statement.bind_text(2, &key, sqlite::Destructor::STATIC)?;
129-
130-
let mut superseded = false;
131-
132-
while supersede_statement.step()? == ResultCode::ROW {
133-
// Superseded (deleted) a previous operation, add the checksum
134-
let supersede_checksum = supersede_statement.column_int(1);
135-
add_checksum = add_checksum.wrapping_add(supersede_checksum);
136-
op_checksum = op_checksum.wrapping_sub(supersede_checksum);
137-
138-
// Superseded an operation, only skip if the bucket was empty
139-
// Previously this checked "superseded_op <= last_applied_op".
140-
// However, that would not account for a case where a previous
141-
// PUT operation superseded the original PUT operation in this
142-
// same batch, in which case superseded_op is not accurate for this.
143-
if !is_empty {
144-
superseded = true;
145-
}
146-
}
147-
supersede_statement.reset()?;
148-
149-
if op == "REMOVE" {
150-
let should_skip_remove = !superseded;
151-
152-
add_checksum = add_checksum.wrapping_add(checksum);
153-
154-
if !should_skip_remove {
155-
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
156-
updated_row_statement.bind_text(
157-
1,
158-
object_type,
159-
sqlite::Destructor::STATIC,
160-
)?;
161-
updated_row_statement.bind_text(
162-
2,
163-
object_id,
164-
sqlite::Destructor::STATIC,
165-
)?;
166-
updated_row_statement.exec()?;
167-
}
168-
}
169-
170-
continue;
171-
}
172-
173-
insert_statement.bind_int64(2, op_id)?;
174-
if key != "" {
175-
insert_statement.bind_text(3, &key, sqlite::Destructor::STATIC)?;
176-
} else {
177-
insert_statement.bind_null(3)?;
178-
}
179-
180-
if let (Ok(object_type), Ok(object_id)) = (object_type, object_id) {
181-
insert_statement.bind_text(4, object_type, sqlite::Destructor::STATIC)?;
182-
insert_statement.bind_text(5, object_id, sqlite::Destructor::STATIC)?;
183-
} else {
184-
insert_statement.bind_null(4)?;
185-
insert_statement.bind_null(5)?;
186-
}
187-
if let Ok(data) = op_data {
188-
insert_statement.bind_text(6, data, sqlite::Destructor::STATIC)?;
189-
} else {
190-
insert_statement.bind_null(6)?;
191-
}
192-
193-
insert_statement.bind_int(7, checksum)?;
194-
insert_statement.exec()?;
195-
196-
op_checksum = op_checksum.wrapping_add(checksum);
197-
} else if op == "MOVE" {
198-
add_checksum = add_checksum.wrapping_add(checksum);
199-
} else if op == "CLEAR" {
200-
// Any remaining PUT operations should get an implicit REMOVE
201-
// language=SQLite
202-
let clear_statement1 = db
203-
.prepare_v2(
204-
"INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
205-
SELECT row_type, row_id
206-
FROM ps_oplog
207-
WHERE bucket = ?1",
208-
)
209-
.into_db_result(db)?;
210-
clear_statement1.bind_int64(1, bucket_id)?;
211-
clear_statement1.exec()?;
212-
213-
let clear_statement2 = db
214-
.prepare_v2("DELETE FROM ps_oplog WHERE bucket = ?1")
215-
.into_db_result(db)?;
216-
clear_statement2.bind_int64(1, bucket_id)?;
217-
clear_statement2.exec()?;
218-
219-
// And we need to re-apply all of those.
220-
// We also replace the checksum with the checksum of the CLEAR op.
221-
// language=SQLite
222-
let clear_statement2 = db.prepare_v2(
223-
"UPDATE ps_buckets SET last_applied_op = 0, add_checksum = ?1, op_checksum = 0 WHERE id = ?2",
224-
)?;
225-
clear_statement2.bind_int64(2, bucket_id)?;
226-
clear_statement2.bind_int(1, checksum)?;
227-
clear_statement2.exec()?;
228-
229-
add_checksum = 0;
230-
is_empty = true;
231-
op_checksum = 0;
232-
}
233-
}
234-
235-
if let Some(last_op) = &last_op {
236-
// language=SQLite
237-
let statement = db.prepare_v2(
238-
"UPDATE ps_buckets
239-
SET last_op = ?2,
240-
add_checksum = (add_checksum + ?3) & 0xffffffff,
241-
op_checksum = (op_checksum + ?4) & 0xffffffff,
242-
count_since_last = count_since_last + ?5
243-
WHERE id = ?1",
244-
)?;
245-
statement.bind_int64(1, bucket_id)?;
246-
statement.bind_int64(2, *last_op)?;
247-
statement.bind_int(3, add_checksum)?;
248-
statement.bind_int(4, op_checksum)?;
249-
statement.bind_int(5, added_ops)?;
250-
251-
statement.exec()?;
252-
}
14+
let line = serde_json::from_str::<DataLine>(data)?;
15+
let adapter = StorageAdapter::new(db)?;
16+
insert_bucket_operations(&adapter, &line)?;
25317

25418
Ok(())
25519
}

crates/core/src/sync/line.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
use core::any::TypeId;
2-
1+
use alloc::borrow::Cow;
32
use alloc::string::ToString;
43
use alloc::vec::Vec;
5-
use alloc::{borrow::Cow, string::String};
64
use serde::de::DeserializeSeed;
75
use serde::{de::Visitor, Deserialize};
86

@@ -108,12 +106,12 @@ pub struct OplogEntry<'a> {
108106
}
109107

110108
#[derive(Debug)]
111-
enum OplogData<'a> {
109+
pub enum OplogData<'a> {
112110
JsonString { data: Cow<'a, str> },
113111
BsonDocument { data: Cow<'a, [u8]> },
114112
}
115113

116-
#[derive(Deserialize, Debug, Clone, Copy)]
114+
#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
117115
pub enum OpType {
118116
CLEAR,
119117
MOVE,

crates/core/src/sync/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use sqlite_nostd::{self as sqlite, ResultCode};
33
pub mod bucket_priority;
44
mod interface;
55
pub mod line;
6-
mod storage_adapter;
6+
pub mod operations;
7+
pub mod storage_adapter;
78
mod streaming_sync;
89
mod sync_status;
910

0 commit comments

Comments
 (0)