Skip to content

Commit b25c03c

Browse files
committed
Allow subscribing to streams
1 parent 0916b45 commit b25c03c

File tree

6 files changed

+172
-24
lines changed

6 files changed

+172
-24
lines changed

crates/core/src/migrations.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,10 +392,11 @@ CREATE TABLE ps_stream_subscriptions (
392392
active INTEGER NOT NULL DEFAULT FALSE,
393393
is_default INTEGER NOT NULL DEFAULT FALSE,
394394
local_priority INTEGER,
395-
local_params TEXT,
395+
local_params TEXT NOT NULL DEFAULT 'null',
396396
ttl INTEGER,
397397
expires_at INTEGER,
398-
last_synced_at INTEGER
398+
last_synced_at INTEGER,
399+
UNIQUE (stream_name, local_params)
399400
) STRICT;
400401
401402
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(

crates/core/src/sync/interface.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use sqlite_nostd::{Connection, Context};
1515
use crate::error::PowerSyncError;
1616
use crate::schema::Schema;
1717
use crate::state::DatabaseState;
18+
use crate::sync::subscriptions::{apply_subscriptions, SubscriptionChangeRequest};
1819
use crate::sync::BucketPriority;
1920

2021
use super::streaming_sync::SyncClient;
@@ -216,6 +217,11 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(
216217
}),
217218
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
218219
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
220+
"subscriptions" => {
221+
let request = serde_json::from_str(payload.text())
222+
.map_err(PowerSyncError::as_argument_error)?;
223+
return apply_subscriptions(ctx.db_handle(), request);
224+
}
219225
_ => {
220226
return Err(PowerSyncError::argument_error("Unknown operation"));
221227
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub struct StorageAdapter {
3535
pub db: *mut sqlite::sqlite3,
3636
pub progress_stmt: ManagedStmt,
3737
time_stmt: ManagedStmt,
38+
delete_subscription: ManagedStmt,
3839
}
3940

4041
impl StorageAdapter {
@@ -47,10 +48,15 @@ impl StorageAdapter {
4748
// language=SQLite
4849
let time = db.prepare_v2("SELECT unixepoch()")?;
4950

51+
// language=SQLite
52+
let delete_subscription =
53+
db.prepare_v2("DELETE FROM ps_stream_subscriptions WHERE id = ?")?;
54+
5055
Ok(Self {
5156
db,
5257
progress_stmt: progress,
5358
time_stmt: time,
59+
delete_subscription,
5460
})
5561
}
5662

@@ -269,6 +275,8 @@ impl StorageAdapter {
269275
fn read_stream_subscription(
270276
stmt: &ManagedStmt,
271277
) -> Result<LocallyTrackedSubscription, PowerSyncError> {
278+
let raw_params = stmt.column_text(5)?;
279+
272280
Ok(LocallyTrackedSubscription {
273281
id: stmt.column_int64(0),
274282
stream_name: stmt.column_text(1)?.to_string(),
@@ -277,9 +285,11 @@ impl StorageAdapter {
277285
local_priority: column_nullable(&stmt, 4, || {
278286
BucketPriority::try_from(stmt.column_int(4))
279287
})?,
280-
local_params: column_nullable(&stmt, 5, || {
281-
JsonString::from_string(stmt.column_text(5)?.to_string())
282-
})?,
288+
local_params: if raw_params == "null" {
289+
None
290+
} else {
291+
Some(JsonString::from_string(stmt.column_text(5)?.to_string())?)
292+
},
283293
ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?,
284294
expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?,
285295
last_synced_at: column_nullable(&stmt, 8, || Ok(stmt.column_int64(8)))?,
@@ -313,6 +323,13 @@ impl StorageAdapter {
313323
Err(PowerSyncError::unknown_internal())
314324
}
315325
}
326+
327+
pub fn delete_subscription(&self, id: i64) -> Result<(), PowerSyncError> {
328+
let _ = self.delete_subscription.reset();
329+
self.delete_subscription.bind_int64(1, id)?;
330+
self.delete_subscription.exec()?;
331+
Ok(())
332+
}
316333
}
317334

318335
pub struct BucketInfo {

crates/core/src/sync/streaming_sync.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -573,25 +573,41 @@ impl StreamingSyncIteration {
573573
let mut tracked_subscriptions: Vec<LocallyTrackedSubscription> = Vec::new();
574574

575575
// Load known subscriptions from database
576-
self.adapter.iterate_local_subscriptions(|sub| {
576+
self.adapter.iterate_local_subscriptions(|mut sub| {
577+
// We will mark it as active again if it's part of the streams included in the
578+
// checkpoint.
579+
sub.active = false;
580+
577581
tracked_subscriptions.push(sub);
578582
})?;
579583

580584
// If they don't exist already, create default subscriptions included in checkpoint
581585
for subscription in &tracked.streams {
582-
if subscription.is_default {
583-
let found = tracked_subscriptions
584-
.iter()
585-
.filter(|s| s.stream_name == subscription.name && s.local_params.is_none())
586-
.next();
586+
let matching_local_subscriptions = tracked_subscriptions
587+
.iter_mut()
588+
.filter(|s| s.stream_name == subscription.name);
589+
590+
let mut has_local = false;
591+
for subscription in matching_local_subscriptions {
592+
subscription.active = true;
593+
has_local = true;
594+
}
587595

588-
if found.is_none() {
589-
let subscription = self.adapter.create_default_subscription(subscription)?;
590-
tracked_subscriptions.push(subscription);
591-
}
596+
if !has_local && subscription.is_default {
597+
let subscription = self.adapter.create_default_subscription(subscription)?;
598+
tracked_subscriptions.push(subscription);
592599
}
593600
}
594601

602+
// Clean up default subscriptions that are no longer active.
603+
for subscription in &tracked_subscriptions {
604+
if subscription.is_default && !subscription.active {
605+
self.adapter.delete_subscription(subscription.id)?;
606+
}
607+
}
608+
tracked_subscriptions
609+
.retain(|subscription| !subscription.is_default || subscription.active);
610+
595611
debug_assert!(tracked_subscriptions.is_sorted_by_key(|s| s.id));
596612

597613
let mut resolved: Vec<ActiveStreamSubscription> =
@@ -609,8 +625,6 @@ impl StreamingSyncIteration {
609625
}
610626
}
611627

612-
// TODO: Cleanup old default subscriptions?
613-
614628
// Iterate over buckets to associate them with subscriptions
615629
for bucket in tracked.checkpoint.buckets.values() {
616630
match &bucket.subscriptions {

crates/core/src/sync/subscriptions.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ use core::{cmp::Ordering, hash::Hash, time::Duration};
33
use alloc::{boxed::Box, string::String};
44
use serde::Deserialize;
55
use serde_with::{serde_as, DurationSeconds};
6+
use sqlite_nostd::{self as sqlite, Connection};
67

7-
use crate::{sync::BucketPriority, util::JsonString};
8+
use crate::{
9+
error::{PSResult, PowerSyncError},
10+
ext::SafeManagedStmt,
11+
sync::BucketPriority,
12+
util::JsonString,
13+
};
814

915
/// A key that uniquely identifies a stream subscription.
1016
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord)]
@@ -38,16 +44,20 @@ impl LocallyTrackedSubscription {
3844
/// A request sent from a PowerSync SDK to alter the subscriptions managed by this client.
3945
#[derive(Deserialize)]
4046
pub enum SubscriptionChangeRequest {
47+
#[serde(rename = "subscribe")]
4148
Subscribe(SubscribeToStream),
4249
}
4350

4451
#[serde_as]
4552
#[derive(Deserialize)]
4653
pub struct SubscribeToStream {
4754
pub stream: String,
55+
#[serde(default)]
4856
pub params: Option<Box<serde_json::value::RawValue>>,
4957
#[serde_as(as = "Option<DurationSeconds>")]
58+
#[serde(default)]
5059
pub ttl: Option<Duration>,
60+
#[serde(default)]
5161
pub priority: Option<BucketPriority>,
5262
}
5363

@@ -57,3 +67,37 @@ pub struct UnsubscribeFromStream {
5767
pub params: Option<Box<serde_json::value::RawValue>>,
5868
pub immediate: bool,
5969
}
70+
71+
pub fn apply_subscriptions(
72+
db: *mut sqlite::sqlite3,
73+
subscription: SubscriptionChangeRequest,
74+
) -> Result<(), PowerSyncError> {
75+
match subscription {
76+
SubscriptionChangeRequest::Subscribe(subscription) => {
77+
let stmt = db
78+
.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params, ttl) VALUES (?, ?2, ?, ?4) ON CONFLICT DO UPDATE SET local_priority = min(coalesce(?2, local_priority), local_priority), ttl = ?4, is_default = FALSE")
79+
.into_db_result(db)?;
80+
81+
stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?;
82+
match &subscription.priority {
83+
Some(priority) => stmt.bind_int(2, priority.number),
84+
None => stmt.bind_null(2),
85+
}?;
86+
stmt.bind_text(
87+
3,
88+
match &subscription.params {
89+
Some(params) => params.get(),
90+
None => "null",
91+
},
92+
sqlite::Destructor::STATIC,
93+
)?;
94+
match &subscription.ttl {
95+
Some(ttl) => stmt.bind_int64(4, ttl.as_secs() as i64),
96+
None => stmt.bind_null(4),
97+
}?;
98+
stmt.exec()?;
99+
}
100+
}
101+
102+
Ok(())
103+
}

dart/test/sync_stream_test.dart

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,19 @@ void main() {
4747

4848
db.execute('commit');
4949
final [row] = result;
50-
final instructions = jsonDecode(row.columnAt(0)) as List;
51-
for (final instruction in instructions) {
52-
if (instruction case {'UpdateSyncStatus': final status}) {
53-
lastStatus = status['status']!;
50+
51+
final rawResult = row.columnAt(0);
52+
if (rawResult is String) {
53+
final instructions = jsonDecode(row.columnAt(0)) as List;
54+
for (final instruction in instructions) {
55+
if (instruction case {'UpdateSyncStatus': final status}) {
56+
lastStatus = status['status']!;
57+
}
5458
}
59+
return instructions;
60+
} else {
61+
return const [];
5562
}
56-
57-
return instructions;
5863
}
5964

6065
group('default streams', () {
@@ -109,5 +114,66 @@ void main() {
109114
final [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
110115
expect(stored, containsPair('last_synced_at', 1740823200));
111116
});
117+
118+
syncTest('are deleted', (_) {
119+
control('start', null);
120+
121+
for (final stream in ['s1', 's2']) {
122+
control(
123+
'line_text',
124+
json.encode(
125+
checkpoint(
126+
lastOpId: 1,
127+
buckets: [
128+
bucketDescription('a', subscriptions: stream, priority: 1),
129+
],
130+
streams: [(stream, true)],
131+
),
132+
),
133+
);
134+
control(
135+
'line_text',
136+
json.encode(checkpointComplete(priority: 1)),
137+
);
138+
}
139+
140+
expect(
141+
lastStatus,
142+
containsPair(
143+
'streams',
144+
[containsPair('name', 's2')],
145+
),
146+
);
147+
});
148+
149+
syncTest('can be made explicit', (_) {
150+
control('start', null);
151+
control(
152+
'line_text',
153+
json.encode(
154+
checkpoint(
155+
lastOpId: 1,
156+
buckets: [
157+
bucketDescription('a', subscriptions: 'a', priority: 1),
158+
],
159+
streams: [('a', true)],
160+
),
161+
),
162+
);
163+
164+
var [stored] = db.select('SELECT * FROM ps_stream_subscriptions');
165+
expect(stored, containsPair('is_default', 1));
166+
167+
control(
168+
'subscriptions',
169+
json.encode({
170+
'subscribe': {'stream': 'a'},
171+
}),
172+
);
173+
174+
[stored] = db.select('SELECT * FROM ps_stream_subscriptions');
175+
expect(stored, containsPair('active', 1));
176+
expect(stored, containsPair('is_default', 0));
177+
});
112178
});
113179
}

0 commit comments

Comments
 (0)