Skip to content

Commit d3d3e42

Browse files
committed
Track subscriptions
1 parent a8d1d56 commit d3d3e42

File tree

9 files changed

+421
-28
lines changed

9 files changed

+421
-28
lines changed

crates/core/src/migrations.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,13 @@ INSERT INTO ps_migration(id, down_migrations) VALUES (10, json_array(
389389
CREATE TABLE ps_stream_subscriptions (
390390
id NOT NULL INTEGER PRIMARY KEY,
391391
stream_name TEXT NOT NULL,
392-
is_default INTEGER NOT NULL,
392+
active INTEGER NOT NULL DEFAULT FALSE,
393+
is_default INTEGER NOT NULL DEFAULT FALSE,
393394
local_priority INTEGER,
394395
local_params TEXT,
395-
ttl INTEGER
396+
ttl INTEGER,
397+
expires_at INTEGER
396398
) STRICT;
397-
ALTER TABLE ps_buckets ADD COLUMN from_subscriptions TEXT NOT NULL DEFAULT '[null]';
398399
399400
INSERT INTO ps_migration(id, down_migrations) VALUES(11, json_array(
400401
json_object('sql', 'todo down migration'),

crates/core/src/sync/checkpoint.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use alloc::{string::String, vec::Vec};
22
use num_traits::Zero;
33

4-
use crate::sync::{line::BucketChecksum, BucketPriority, Checksum};
4+
use crate::sync::{
5+
line::{BucketChecksum, BucketSubscriptionReason},
6+
BucketPriority, Checksum,
7+
};
58
use sqlite_nostd::{self as sqlite, Connection, ResultCode};
69

710
/// A structure cloned from [BucketChecksum]s with an owned bucket name instead of one borrowed from
@@ -12,6 +15,7 @@ pub struct OwnedBucketChecksum {
1215
pub checksum: Checksum,
1316
pub priority: BucketPriority,
1417
pub count: Option<i64>,
18+
pub subscriptions: BucketSubscriptionReason,
1519
}
1620

1721
impl OwnedBucketChecksum {
@@ -30,6 +34,7 @@ impl From<&'_ BucketChecksum<'_>> for OwnedBucketChecksum {
3034
checksum: value.checksum,
3135
priority: value.priority.unwrap_or(BucketPriority::FALLBACK),
3236
count: value.count,
37+
subscriptions: value.subscriptions.clone(),
3338
}
3439
}
3540
}

crates/core/src/sync/interface.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use sqlite_nostd::{Connection, Context};
1515
use crate::error::PowerSyncError;
1616
use crate::schema::Schema;
1717
use crate::state::DatabaseState;
18+
use crate::sync::BucketPriority;
1819

1920
use super::streaming_sync::SyncClient;
2021
use super::sync_status::DownloadSyncStatus;
@@ -141,6 +142,7 @@ pub struct RequestedStreamSubscription {
141142
pub stream: String,
142143
/// Parameters to make available in the stream's definition.
143144
pub parameters: Box<serde_json::value::RawValue>,
145+
pub override_priority: Option<BucketPriority>,
144146
#[serde_as(as = "DisplayFromStr")]
145147
pub client_id: i64,
146148
}

crates/core/src/sync/line.rs

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use alloc::borrow::Cow;
2+
use alloc::string::{String, ToString};
23
use alloc::vec::Vec;
3-
use serde::de::{IgnoredAny, VariantAccess, Visitor};
4+
use serde::de::{Error, IgnoredAny, VariantAccess, Visitor};
45
use serde::Deserialize;
56
use serde_with::{serde_as, DisplayFromStr};
67

@@ -82,6 +83,14 @@ pub struct Checkpoint<'a> {
8283
pub write_checkpoint: Option<i64>,
8384
#[serde(borrow)]
8485
pub buckets: Vec<BucketChecksum<'a>>,
86+
#[serde(default, borrow)]
87+
pub streams: Vec<StreamDefinition<'a>>,
88+
}
89+
90+
#[derive(Deserialize, Debug)]
91+
pub struct StreamDefinition<'a> {
92+
pub name: SyncLineStr<'a>,
93+
pub is_default: bool,
8594
}
8695

8796
#[serde_as]
@@ -120,14 +129,67 @@ pub struct BucketChecksum<'a> {
120129
pub priority: Option<BucketPriority>,
121130
#[serde(default)]
122131
pub count: Option<i64>,
123-
#[serde_as(as = "Vec<Option<DisplayFromStr>>")]
124132
#[serde(default)]
125-
pub subscriptions: Vec<Option<i64>>,
133+
pub subscriptions: BucketSubscriptionReason,
126134
// #[serde(default)]
127135
// #[serde(deserialize_with = "deserialize_optional_string_to_i64")]
128136
// pub last_op_id: Option<i64>,
129137
}
130138

139+
/// The reason for why a bucket was included in a checkpoint.
140+
#[derive(Debug, Default, Clone)]
141+
pub enum BucketSubscriptionReason {
142+
/// A bucket was created for all of the subscription ids we've explicitly requested in the sync
143+
/// request.
144+
ExplicitlySubscribed { subscriptions: Vec<i64> },
145+
/// A bucket was created from a default stream.
146+
IsDefault { stream_name: String },
147+
/// We're talking to an older sync service not sending the reason.
148+
#[default]
149+
Unknown,
150+
}
151+
152+
impl<'de> Deserialize<'de> for BucketSubscriptionReason {
153+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
154+
where
155+
D: serde::Deserializer<'de>,
156+
{
157+
struct MyVisitor;
158+
159+
impl<'de> Visitor<'de> for MyVisitor {
160+
type Value = BucketSubscriptionReason;
161+
162+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
163+
write!(formatter, "a subscription reason")
164+
}
165+
166+
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
167+
where
168+
A: serde::de::SeqAccess<'de>,
169+
{
170+
let mut subscriptions = Vec::<i64>::new();
171+
172+
while let Some(item) = seq.next_element::<&'de str>()? {
173+
subscriptions.push(item.parse().map_err(|_| A::Error::custom("not an int"))?);
174+
}
175+
176+
Ok(BucketSubscriptionReason::ExplicitlySubscribed { subscriptions })
177+
}
178+
179+
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
180+
where
181+
E: serde::de::Error,
182+
{
183+
Ok(BucketSubscriptionReason::IsDefault {
184+
stream_name: v.to_string(),
185+
})
186+
}
187+
}
188+
189+
deserializer.deserialize_any(MyVisitor)
190+
}
191+
}
192+
131193
#[derive(Deserialize, Debug)]
132194
pub struct DataLine<'a> {
133195
#[serde(borrow)]
@@ -229,6 +291,7 @@ mod tests {
229291
last_op_id: 10,
230292
write_checkpoint: None,
231293
buckets: _,
294+
streams: _,
232295
})
233296
);
234297

@@ -264,6 +327,7 @@ mod tests {
264327
last_op_id: 1,
265328
write_checkpoint: None,
266329
buckets: _,
330+
streams: _,
267331
})
268332
);
269333
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::{assert_matches::debug_assert_matches, fmt::Display};
22

33
use alloc::{string::ToString, vec::Vec};
44
use serde::Serialize;
5+
use serde_json::value::RawValue;
56
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
67

78
use crate::{
@@ -13,8 +14,11 @@ use crate::{
1314
sync::{
1415
checkpoint::{validate_checkpoint, ChecksumMismatch},
1516
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
17+
streaming_sync::OwnedStreamDefinition,
18+
subscriptions::LocallyTrackedSubscription,
1619
},
1720
sync_local::{PartialSyncOperation, SyncOperation},
21+
util::{column_nullable, JsonString},
1822
};
1923

2024
use super::{
@@ -261,6 +265,55 @@ impl StorageAdapter {
261265

262266
Ok(res)
263267
}
268+
269+
fn read_stream_subscription(
270+
stmt: &ManagedStmt,
271+
) -> Result<LocallyTrackedSubscription, PowerSyncError> {
272+
Ok(LocallyTrackedSubscription {
273+
id: stmt.column_int64(0),
274+
stream_name: stmt.column_text(1)?.to_string(),
275+
active: stmt.column_int(2) != 0,
276+
is_default: stmt.column_int(3) != 0,
277+
local_priority: column_nullable(&stmt, 4, || {
278+
BucketPriority::try_from(stmt.column_int(4))
279+
})?,
280+
local_params: column_nullable(&stmt, 5, || {
281+
JsonString::from_string(stmt.column_text(5)?.to_string())
282+
})?,
283+
ttl: column_nullable(&stmt, 6, || Ok(stmt.column_int64(6)))?,
284+
expires_at: column_nullable(&stmt, 7, || Ok(stmt.column_int64(7)))?,
285+
})
286+
}
287+
288+
pub fn iterate_local_subscriptions<F: FnMut(LocallyTrackedSubscription) -> ()>(
289+
&self,
290+
mut action: F,
291+
) -> Result<(), PowerSyncError> {
292+
let stmt = self
293+
.db
294+
.prepare_v2("SELECT * FROM ps_stream_subscriptions ORDER BY id ASC")?;
295+
296+
while stmt.step()? == ResultCode::ROW {
297+
action(Self::read_stream_subscription(&stmt)?);
298+
}
299+
300+
stmt.finalize()?;
301+
Ok(())
302+
}
303+
304+
pub fn create_default_subscription(
305+
&self,
306+
stream: &OwnedStreamDefinition,
307+
) -> Result<LocallyTrackedSubscription, PowerSyncError> {
308+
let stmt = self.db.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, active, is_default) VALUES (?, TRUE, TRUE) RETURNING *;")?;
309+
stmt.bind_text(1, &stream.name, sqlite_nostd::Destructor::STATIC)?;
310+
311+
if stmt.step()? == ResultCode::ROW {
312+
Self::read_stream_subscription(&stmt)
313+
} else {
314+
Err(PowerSyncError::unknown_internal())
315+
}
316+
}
264317
}
265318

266319
pub struct BucketInfo {

0 commit comments

Comments
 (0)