Skip to content

Commit f06dd35

Browse files
committed
Apply checkpoint results
1 parent c7e4858 commit f06dd35

File tree

8 files changed

+359
-30
lines changed

8 files changed

+359
-30
lines changed

crates/core/src/checkpoint.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
extern crate alloc;
22

3-
use alloc::format;
43
use alloc::string::String;
54
use alloc::vec::Vec;
65
use core::ffi::c_int;

crates/core/src/operations.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use alloc::format;
2-
31
use crate::error::SQLiteError;
42
use crate::sync::line::DataLine;
53
use crate::sync::operations::insert_bucket_operations;

crates/core/src/sync/bucket_priority.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,13 @@ impl Into<i32> for BucketPriority {
4646

4747
impl PartialOrd<BucketPriority> for BucketPriority {
4848
fn partial_cmp(&self, other: &BucketPriority) -> Option<core::cmp::Ordering> {
49-
Some(self.number.partial_cmp(&other.number)?.reverse())
49+
Some(self.cmp(other))
50+
}
51+
}
52+
53+
impl Ord for BucketPriority {
54+
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
55+
self.number.cmp(&other.number).reverse()
5056
}
5157
}
5258

crates/core/src/sync/storage_adapter.rs

Lines changed: 206 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
1-
use core::assert_matches::debug_assert_matches;
1+
use core::{assert_matches::debug_assert_matches, fmt::Display};
22

33
use alloc::{
44
collections::btree_map::BTreeMap,
55
string::{String, ToString},
66
vec::Vec,
77
};
8+
use serde::Serialize;
89
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
910
use streaming_iterator::StreamingIterator;
1011

11-
use crate::{error::SQLiteError, operations::delete_bucket};
12+
use crate::{
13+
error::SQLiteError,
14+
ext::SafeManagedStmt,
15+
operations::delete_bucket,
16+
sync_local::{PartialSyncOperation, SyncOperation},
17+
};
1218

13-
use super::{bucket_priority::BucketPriority, interface::BucketRequest};
19+
use super::{
20+
bucket_priority::BucketPriority, interface::BucketRequest, streaming_sync::OwnedCheckpoint,
21+
sync_status::Timestamp,
22+
};
1423

1524
/// An adapter for storing sync state.
1625
///
@@ -19,16 +28,22 @@ use super::{bucket_priority::BucketPriority, interface::BucketRequest};
1928
pub struct StorageAdapter {
2029
pub db: *mut sqlite::sqlite3,
2130
progress_stmt: ManagedStmt,
31+
time_stmt: ManagedStmt,
2232
}
2333

2434
impl StorageAdapter {
2535
pub fn new(db: *mut sqlite::sqlite3) -> Result<Self, ResultCode> {
36+
// language=SQLite
2637
let progress =
2738
db.prepare_v2("SELECT name, count_at_last, count_since_last FROM ps_buckets")?;
2839

40+
// language=SQLite
41+
let time = db.prepare_v2("SELECT unixepoch()")?;
42+
2943
Ok(Self {
3044
db,
3145
progress_stmt: progress,
46+
time_stmt: time,
3247
})
3348
}
3449

@@ -131,12 +146,200 @@ impl StorageAdapter {
131146
last_applied_op,
132147
});
133148
}
149+
150+
fn validate_checkpoint(
151+
&self,
152+
checkpoint: &OwnedCheckpoint,
153+
priority: Option<BucketPriority>,
154+
) -> Result<CheckpointResult, SQLiteError> {
155+
// language=SQLite
156+
let statement = self.db.prepare_v2(
157+
"WITH
158+
bucket_list(bucket, checksum) AS (
159+
SELECT
160+
json_extract(json_each.value, '$.bucket') as bucket,
161+
json_extract(json_each.value, '$.checksum') as checksum
162+
FROM json_each(?1)
163+
)
164+
SELECT
165+
bucket_list.bucket as bucket,
166+
IFNULL(buckets.add_checksum, 0) as add_checksum,
167+
IFNULL(buckets.op_checksum, 0) as oplog_checksum,
168+
bucket_list.checksum as expected_checksum
169+
FROM bucket_list
170+
LEFT OUTER JOIN ps_buckets AS buckets ON
171+
buckets.name = bucket_list.bucket
172+
GROUP BY bucket_list.bucket",
173+
)?;
174+
175+
#[derive(Serialize)]
176+
struct BucketInfo<'a> {
177+
bucket: &'a str,
178+
checksum: i32,
179+
}
180+
181+
let mut buckets = Vec::<BucketInfo>::new();
182+
for bucket in &checkpoint.buckets {
183+
if bucket.is_in_priority(priority) {
184+
buckets.push(BucketInfo {
185+
bucket: &bucket.bucket,
186+
checksum: bucket.checksum,
187+
});
188+
}
189+
}
190+
191+
let bucket_desc = serde_json::to_string(&buckets)?;
192+
statement.bind_text(1, &bucket_desc, sqlite::Destructor::STATIC)?;
193+
194+
let mut failures: Vec<String> = Vec::new();
195+
while statement.step()? == ResultCode::ROW {
196+
let name = statement.column_text(0)?;
197+
// checksums with column_int are wrapped to i32 by SQLite
198+
let add_checksum = statement.column_int(1);
199+
let oplog_checksum = statement.column_int(2);
200+
let expected_checksum = statement.column_int(3);
201+
202+
// wrapping add is like +, but safely overflows
203+
let checksum = oplog_checksum.wrapping_add(add_checksum);
204+
205+
if checksum != expected_checksum {
206+
failures.push(String::from(name));
207+
}
208+
}
209+
210+
Ok(CheckpointResult {
211+
failed_buckets: failures,
212+
})
213+
}
214+
215+
pub fn sync_local(
216+
&self,
217+
checkpoint: &OwnedCheckpoint,
218+
priority: Option<BucketPriority>,
219+
) -> Result<SyncLocalResult, SQLiteError> {
220+
let checksums = self.validate_checkpoint(checkpoint, priority)?;
221+
222+
if !checksums.is_valid() {
223+
self.delete_buckets(checksums.failed_buckets.iter().map(|i| i.as_str()))?;
224+
return Ok(SyncLocalResult::ChecksumFailure(checksums));
225+
}
226+
227+
let update_bucket = self
228+
.db
229+
.prepare_v2("UPDATE ps_buckets SET last_op = ? WHERE name = ?")?;
230+
231+
for bucket in &checkpoint.buckets {
232+
if bucket.is_in_priority(priority) {
233+
update_bucket.bind_int64(1, checkpoint.last_op_id)?;
234+
update_bucket.bind_text(2, &bucket.bucket, sqlite::Destructor::STATIC)?;
235+
update_bucket.exec()?;
236+
}
237+
}
238+
239+
if let (None, Some(write_checkpoint)) = (&priority, &checkpoint.write_checkpoint) {
240+
update_bucket.bind_int64(1, *write_checkpoint)?;
241+
update_bucket.bind_text(2, "$local", sqlite::Destructor::STATIC)?;
242+
update_bucket.exec()?;
243+
}
244+
245+
#[derive(Serialize)]
246+
struct PartialArgs<'a> {
247+
priority: BucketPriority,
248+
buckets: Vec<&'a str>,
249+
}
250+
251+
let sync_result = match priority {
252+
None => SyncOperation::new(self.db, None).apply(),
253+
Some(priority) => {
254+
let args = PartialArgs {
255+
priority,
256+
buckets: checkpoint
257+
.buckets
258+
.iter()
259+
.filter_map(|item| {
260+
if item.is_in_priority(Some(priority)) {
261+
Some(item.bucket.as_str())
262+
} else {
263+
None
264+
}
265+
})
266+
.collect(),
267+
};
268+
269+
// TODO: Avoid this serialization, it's currently used to bind JSON SQL parameters.
270+
let serialized_args = serde_json::to_string(&args)?;
271+
SyncOperation::new(
272+
self.db,
273+
Some(PartialSyncOperation {
274+
priority,
275+
args: &serialized_args,
276+
}),
277+
)
278+
.apply()
279+
}
280+
}?;
281+
282+
if sync_result == 1 {
283+
// TODO: Force compact
284+
285+
Ok(SyncLocalResult::ChangesApplied)
286+
} else {
287+
Ok(SyncLocalResult::PendingLocalChanges)
288+
}
289+
}
290+
291+
pub fn now(&self) -> Result<Timestamp, ResultCode> {
292+
self.time_stmt.reset()?;
293+
self.time_stmt.step()?;
294+
295+
Ok(Timestamp(self.time_stmt.column_int64(0)))
296+
}
134297
}
135298

136299
pub struct BucketInfo {
137300
pub id: i64,
138301
pub last_applied_op: i64,
139302
}
303+
304+
pub struct CheckpointResult {
305+
failed_buckets: Vec<String>,
306+
}
307+
308+
impl CheckpointResult {
309+
pub fn is_valid(&self) -> bool {
310+
self.failed_buckets.is_empty()
311+
}
312+
}
313+
314+
impl Display for CheckpointResult {
315+
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
316+
if self.is_valid() {
317+
write!(f, "Valid checkpoint result")
318+
} else {
319+
write!(f, "Checksums didn't match, failed for: ")?;
320+
for (i, item) in self.failed_buckets.iter().enumerate() {
321+
if i != 0 {
322+
write!(f, ", ")?;
323+
}
324+
325+
item.fmt(f)?;
326+
}
327+
328+
Ok(())
329+
}
330+
}
331+
}
332+
333+
pub enum SyncLocalResult {
334+
/// Changes could not be applied due to a checksum mismatch.
335+
ChecksumFailure(CheckpointResult),
336+
/// Changes could not be applied because they would break consistency - we need to wait for
337+
/// pending local CRUD data to be uploaded and acknowledged in a write checkpoint.
338+
PendingLocalChanges,
339+
/// The checkpoint has been applied and changes have been published.
340+
ChangesApplied,
341+
}
342+
140343
/// Information about the amount of operations a bucket had at the last checkpoint and how many
141344
/// operations have been inserted in the meantime.
142345
pub struct PersistedBucketProgress<'a> {

0 commit comments

Comments
 (0)