@@ -4,6 +4,7 @@ use alloc::{
4
4
string:: { String , ToString } ,
5
5
vec:: Vec ,
6
6
} ;
7
+ use num_traits:: Zero ;
7
8
use serde:: Serialize ;
8
9
use sqlite_nostd:: { self as sqlite, Connection , ManagedStmt , ResultCode } ;
9
10
use streaming_iterator:: StreamingIterator ;
@@ -145,56 +146,39 @@ impl StorageAdapter {
145
146
) -> Result < CheckpointResult , SQLiteError > {
146
147
// language=SQLite
147
148
let statement = self . db . prepare_v2 (
148
- "WITH
149
- bucket_list(bucket, checksum) AS (
149
+ "
150
150
SELECT
151
- json_extract(json_each.value, '$.bucket') as bucket,
152
- json_extract(json_each.value, '$.checksum') as checksum
153
- FROM json_each(?1)
154
- )
155
- SELECT
156
- bucket_list.bucket as bucket,
157
- IFNULL(buckets.add_checksum, 0) as add_checksum,
158
- IFNULL(buckets.op_checksum, 0) as oplog_checksum,
159
- bucket_list.checksum as expected_checksum
160
- FROM bucket_list
161
- LEFT OUTER JOIN ps_buckets AS buckets ON
162
- buckets.name = bucket_list.bucket
163
- GROUP BY bucket_list.bucket" ,
151
+ ps_buckets.add_checksum as add_checksum,
152
+ ps_buckets.op_checksum as oplog_checksum
153
+ FROM ps_buckets WHERE name = ?;" ,
164
154
) ?;
165
155
166
- #[ derive( Serialize ) ]
167
- struct BucketInfo < ' a > {
168
- bucket : & ' a str ,
169
- checksum : Checksum ,
170
- }
171
-
172
- let mut buckets = Vec :: < BucketInfo > :: new ( ) ;
156
+ let mut failures: Vec < ChecksumMismatch > = Vec :: new ( ) ;
173
157
for bucket in checkpoint. buckets . values ( ) {
174
158
if bucket. is_in_priority ( priority) {
175
- buckets. push ( BucketInfo {
176
- bucket : & bucket. bucket ,
177
- checksum : bucket. checksum ,
178
- } ) ;
179
- }
180
- }
159
+ statement. bind_text ( 1 , & bucket. bucket , sqlite_nostd:: Destructor :: STATIC ) ?;
181
160
182
- let bucket_desc = serde_json:: to_string ( & buckets) ?;
183
- statement. bind_text ( 1 , & bucket_desc, sqlite:: Destructor :: STATIC ) ?;
161
+ let ( add_checksum, oplog_checksum) = match statement. step ( ) ? {
162
+ ResultCode :: ROW => {
163
+ let add_checksum = Checksum :: from_i32 ( statement. column_int ( 0 ) ) ;
164
+ let oplog_checksum = Checksum :: from_i32 ( statement. column_int ( 1 ) ) ;
165
+ ( add_checksum, oplog_checksum)
166
+ }
167
+ _ => ( Checksum :: zero ( ) , Checksum :: zero ( ) ) ,
168
+ } ;
184
169
185
- let mut failures: Vec < String > = Vec :: new ( ) ;
186
- while statement. step ( ) ? == ResultCode :: ROW {
187
- let name = statement. column_text ( 0 ) ?;
188
- // checksums with column_int are wrapped to i32 by SQLite
189
- let add_checksum = statement. column_int ( 1 ) ;
190
- let oplog_checksum = statement. column_int ( 2 ) ;
191
- let expected_checksum = statement. column_int ( 3 ) ;
170
+ let actual = add_checksum + oplog_checksum;
192
171
193
- // wrapping add is like +, but safely overflows
194
- let checksum = oplog_checksum. wrapping_add ( add_checksum) ;
172
+ if actual != bucket. checksum {
173
+ failures. push ( ChecksumMismatch {
174
+ bucket_name : bucket. bucket . clone ( ) ,
175
+ expected_checksum : bucket. checksum ,
176
+ actual_add_checksum : add_checksum,
177
+ actual_op_checksum : oplog_checksum,
178
+ } ) ;
179
+ }
195
180
196
- if checksum != expected_checksum {
197
- failures. push ( String :: from ( name) ) ;
181
+ statement. reset ( ) ?;
198
182
}
199
183
}
200
184
@@ -211,7 +195,12 @@ GROUP BY bucket_list.bucket",
211
195
let checksums = self . validate_checkpoint ( checkpoint, priority) ?;
212
196
213
197
if !checksums. is_valid ( ) {
214
- self . delete_buckets ( checksums. failed_buckets . iter ( ) . map ( |i| i. as_str ( ) ) ) ?;
198
+ self . delete_buckets (
199
+ checksums
200
+ . failed_buckets
201
+ . iter ( )
202
+ . map ( |i| i. bucket_name . as_str ( ) ) ,
203
+ ) ?;
215
204
return Ok ( SyncLocalResult :: ChecksumFailure ( checksums) ) ;
216
205
}
217
206
@@ -312,7 +301,14 @@ pub struct BucketInfo {
312
301
}
313
302
314
303
pub struct CheckpointResult {
315
- failed_buckets : Vec < String > ,
304
+ failed_buckets : Vec < ChecksumMismatch > ,
305
+ }
306
+
307
+ pub struct ChecksumMismatch {
308
+ bucket_name : String ,
309
+ expected_checksum : Checksum ,
310
+ actual_op_checksum : Checksum ,
311
+ actual_add_checksum : Checksum ,
316
312
}
317
313
318
314
impl CheckpointResult {
@@ -340,6 +336,21 @@ impl Display for CheckpointResult {
340
336
}
341
337
}
342
338
339
+ impl Display for ChecksumMismatch {
340
+ fn fmt ( & self , f : & mut core:: fmt:: Formatter < ' _ > ) -> core:: fmt:: Result {
341
+ let actual = self . actual_add_checksum + self . actual_op_checksum ;
342
+ write ! (
343
+ f,
344
+ "{} (expected {}, got {} = {} (op) + {} (add))" ,
345
+ self . bucket_name,
346
+ self . expected_checksum,
347
+ actual,
348
+ self . actual_op_checksum,
349
+ self . actual_add_checksum
350
+ )
351
+ }
352
+ }
353
+
343
354
pub enum SyncLocalResult {
344
355
/// Changes could not be applied due to a checksum mismatch.
345
356
ChecksumFailure ( CheckpointResult ) ,
0 commit comments