Skip to content

Fix existing data (remove dangling rows) #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ inherits = "release"
inherits = "wasm"

[workspace.package]
version = "0.3.5"
version = "0.3.6"
edition = "2021"
authors = ["JourneyApps"]
keywords = ["sqlite", "powersync"]
Expand Down
2 changes: 1 addition & 1 deletion android/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = "co.powersync"
version = "0.3.5"
version = "0.3.6"
description = "PowerSync Core SQLite Extension"

repositories {
Expand Down
2 changes: 1 addition & 1 deletion android/src/prefab/prefab.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"name": "powersync_sqlite_core",
"schema_version": 2,
"dependencies": [],
"version": "0.3.5"
"version": "0.3.6"
}
47 changes: 47 additions & 0 deletions crates/core/src/fix035.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use alloc::format;

use crate::error::{PSResult, SQLiteError};
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, ResultCode};

use crate::ext::SafeManagedStmt;
use crate::util::quote_identifier;

// Apply a data migration to fix any existing data affected by the issue
// fixed in v0.3.5.
//
// The issue was that the `ps_updated_rows` table was not being populated
// with remove operations in some cases. This causes the rows to be removed
// from ps_oplog, but not from the ps_data__tables, resulting in dangling rows.
//
// The fix here is to find these dangling rows, and add them to ps_updated_rows.
// The next time the sync_local operation is run, these rows will be removed.
pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result<i64, SQLiteError> {
// language=SQLite
let statement = db
.prepare_v2("SELECT name, powersync_external_table_name(name) FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'")
.into_db_result(db)?;

while statement.step()? == ResultCode::ROW {
let full_name = statement.column_text(0)?;
let short_name = statement.column_text(1)?;
let quoted = quote_identifier(full_name);

// language=SQLite
let statement = db.prepare_v2(&format!(
"
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
SELECT ?1, id FROM {}
WHERE NOT EXISTS (
SELECT 1 FROM ps_oplog
WHERE row_type = ?1 AND row_id = {}.id
);",
quoted, quoted
))?;
statement.bind_text(1, short_name, sqlite::Destructor::STATIC)?;

statement.exec()?;
}

Ok(1)
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod crud_vtab;
mod diff;
mod error;
mod ext;
mod fix035;
mod kv;
mod macros;
mod migrations;
Expand Down
20 changes: 20 additions & 0 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::error::{PSResult, SQLiteError};
use crate::fix035::apply_v035_fix;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -283,5 +284,24 @@ VALUES(5,
.into_db_result(local_db)?;
}

if current_version < 6 && target_version >= 6 {
if current_version != 0 {
// Remove dangling rows, but skip if the database is created from scratch.
apply_v035_fix(local_db)?;
}

local_db
.exec_safe(
"\
INSERT INTO ps_migration(id, down_migrations)
VALUES(6,
json_array(
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 6')
));
",
)
.into_db_result(local_db)?;
}

Ok(())
}
1 change: 0 additions & 1 deletion crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id) VALUES(?1, ?2)",

while supersede_statement.step()? == ResultCode::ROW {
// Superseded (deleted) a previous operation, add the checksum
let superseded_op = supersede_statement.column_int64(0)?;
let supersede_checksum = supersede_statement.column_int(1)?;
add_checksum = add_checksum.wrapping_add(supersede_checksum);
op_checksum = op_checksum.wrapping_sub(supersede_checksum);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/view_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn powersync_init_impl(

setup_internal_views(local_db)?;

powersync_migrate(ctx, 5)?;
powersync_migrate(ctx, 6)?;

Ok(String::from(""))
}
Expand Down
36 changes: 36 additions & 0 deletions dart/test/migration_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:test/test.dart';

import 'utils/native_test_utils.dart';
import 'utils/migration_fixtures.dart' as fixtures;
import 'utils/fix_035_fixtures.dart' as fix035;
import 'utils/schema.dart';

void main() {
Expand Down Expand Up @@ -175,5 +176,40 @@ void main() {
'${fixtures.expectedState[3]!.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schemaDown3.trim()}';
expect(schema, equals(expected));
});

test('migrate from 5 with broken data', () async {
var tableSchema = {
'tables': [
{
'name': 'lists',
'columns': [
{'name': 'description', 'type': 'TEXT'}
]
},
{
'name': 'todos',
'columns': [
{'name': 'description', 'type': 'TEXT'}
]
}
]
};
db.select('select powersync_init()');
db.select(
'select powersync_replace_schema(?)', [jsonEncode(tableSchema)]);

db.select('select powersync_test_migration(5)');
db.execute(fix035.dataBroken);

db.select('select powersync_init()');
final data = getData(db);
expect(data, equals(fix035.dataMigrated.trim()));

db.select('insert into powersync_operations(op, data) values(?, ?)',
['sync_local', '']);

final data2 = getData(db);
expect(data2, equals(fix035.dataFixed.trim()));
});
});
}
54 changes: 54 additions & 0 deletions dart/test/utils/fix_035_fixtures.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/// Data with some records in actual tables but not in ps_oplog
const dataBroken = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_data__lists(id, data) VALUES
('l1', '{}'),
('l3', '{}')
;INSERT INTO ps_data__todos(id, data) VALUES
('t1', '{}'),
('t2', '{}'),
('t3', '{}')
''';

/// Data after applying the migration fix, but before sync_local
const dataMigrated = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l3'),
('todos', 't3')
;INSERT INTO ps_data__lists(id, data) VALUES
('l1', '{}'),
('l3', '{}')
;INSERT INTO ps_data__todos(id, data) VALUES
('t1', '{}'),
('t2', '{}'),
('t3', '{}')
''';

/// Data after applying the migration fix and sync_local
const dataFixed = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_data__lists(id, data) VALUES
('l1', '{}')
;INSERT INTO ps_data__todos(id, data) VALUES
('t1', '{}'),
('t2', '{}')
''';
57 changes: 54 additions & 3 deletions dart/test/utils/migration_fixtures.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// The current database version
const databaseVersion = 5;
const databaseVersion = 6;

/// This is the base database state that we expect at various schema versions.
/// Generated by loading the specific library version, and exporting the schema.
Expand Down Expand Up @@ -133,6 +133,45 @@ const expectedState = <int, String>{
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
''',
6: r'''
;CREATE TABLE ps_buckets(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
last_applied_op INTEGER NOT NULL DEFAULT 0,
last_op INTEGER NOT NULL DEFAULT 0,
target_op INTEGER NOT NULL DEFAULT 0,
add_checksum INTEGER NOT NULL DEFAULT 0,
op_checksum INTEGER NOT NULL DEFAULT 0,
pending_delete INTEGER NOT NULL DEFAULT 0
) STRICT
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
;CREATE TABLE ps_oplog(
bucket INTEGER NOT NULL,
op_id INTEGER NOT NULL,
row_type TEXT,
row_id TEXT,
key TEXT,
data TEXT,
hash INTEGER NOT NULL) STRICT
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
;CREATE TABLE ps_updated_rows(
row_type TEXT,
row_id TEXT,
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
'''
};

Expand Down Expand Up @@ -180,13 +219,24 @@ const data1 = <int, String>{
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
''',
6: r'''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 1005, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
'''
};

/// data to test "down" migrations
/// This is slightly different from the above,
/// since we don't preserve all data in the migration process
const dataDown1 = <int, String>{
final dataDown1 = <int, String>{
2: r'''
;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES
('$local', 0, 0, 9223372036854775807, 0, 1),
Expand Down Expand Up @@ -219,7 +269,8 @@ const dataDown1 = <int, String>{
('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0),
('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0),
('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0)
'''
''',
5: data1[5]!
};

final finalData1 = data1[databaseVersion]!;
Expand Down
8 changes: 8 additions & 0 deletions dart/test/utils/schema.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ String getData(CommonDatabase db) {
{
'table': 'ps_updated_rows',
'query': 'select * from ps_updated_rows order by row_type, row_id'
},
{
'table': 'ps_data__lists',
'query': 'select * from ps_data__lists order by id'
},
{
'table': 'ps_data__todos',
'query': 'select * from ps_data__todos order by id'
}
];
List<String> result = [];
Expand Down
2 changes: 1 addition & 1 deletion powersync-sqlite-core.podspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Pod::Spec.new do |s|
s.name = 'powersync-sqlite-core'
s.version = '0.3.5'
s.version = '0.3.6'
s.summary = 'PowerSync SQLite Extension'
s.description = <<-DESC
PowerSync extension for SQLite.
Expand Down
4 changes: 2 additions & 2 deletions tool/build_xcframework.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ function createXcframework() {
<key>MinimumOSVersion</key>
<string>11.0</string>
<key>CFBundleVersion</key>
<string>0.3.5</string>
<string>0.3.6</string>
<key>CFBundleShortVersionString</key>
<string>0.3.5</string>
<string>0.3.6</string>
</dict>
</plist>
EOF
Expand Down
Loading