Skip to content

Commit 6b43769

Browse files
authored
Views: cleanup unsubscribed views (#3651)
# Description of Changes A background task to cleanup unsubscribed views. fixes #3587 # API and ABI breaking changes NA # Expected complexity level and risk 2 # Testing Added a test --------- Signed-off-by: Shubham Mishra <[email protected]>
1 parent 4fc095c commit 6b43769

File tree

4 files changed

+238
-14
lines changed

4 files changed

+238
-14
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,50 @@ impl RelationalDB {
10731073
}
10741074
}
10751075

1076+
/// Duration after which expired unused views are cleaned up.
1077+
/// Value is chosen arbitrarily; can be tuned later if needed.
1078+
const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(10 * 60);
1079+
1080+
/// Duration to budget for each view cleanup job,
1081+
/// so that it doesn't hold transaction lock for to long.
1082+
//TODO: Make this value configurable
1083+
const VIEW_CLEANUP_BUDGET: std::time::Duration = std::time::Duration::from_millis(10);
1084+
1085+
/// Spawn a background task that periodically cleans up expired views
1086+
pub fn spawn_view_cleanup_loop(db: Arc<RelationalDB>) -> tokio::task::AbortHandle {
1087+
tokio::spawn(async move {
1088+
let db = &db;
1089+
loop {
1090+
match db.with_auto_commit(Workload::Internal, |tx| {
1091+
tx.clear_expired_views(VIEWS_EXPIRATION, VIEW_CLEANUP_BUDGET)
1092+
.map_err(DBError::from)
1093+
}) {
1094+
Ok((cleared, total_expired)) => {
1095+
if cleared != total_expired {
1096+
//TODO: Report it as metric
1097+
log::info!(
1098+
"[{}] DATABASE: cleared {} expired views ({} remaining)",
1099+
db.database_identity(),
1100+
cleared,
1101+
total_expired - cleared
1102+
);
1103+
}
1104+
}
1105+
Err(e) => {
1106+
log::error!(
1107+
"[{}] DATABASE: failed to clear expired views: {}",
1108+
db.database_identity(),
1109+
e
1110+
);
1111+
}
1112+
}
1113+
1114+
tokio::time::sleep(VIEWS_EXPIRATION).await;
1115+
}
1116+
})
1117+
.abort_handle()
1118+
}
1119+
10761120
impl RelationalDB {
10771121
pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result<TableId, DBError> {
10781122
Ok(self.inner.create_table_mut_tx(tx, schema)?)
@@ -2368,6 +2412,7 @@ mod tests {
23682412
use std::fs::OpenOptions;
23692413
use std::path::PathBuf;
23702414
use std::rc::Rc;
2415+
use std::time::Instant;
23712416

23722417
use super::tests_utils::begin_mut_tx;
23732418
use super::*;
@@ -2608,6 +2653,88 @@ mod tests {
26082653
Ok(())
26092654
}
26102655

2656+
#[test]
2657+
fn test_views() -> ResultTest<()> {
2658+
let stdb = TestDB::durable()?;
2659+
2660+
let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
2661+
let row_type = view_def.product_type_ref;
2662+
let typespace = module_def.typespace();
2663+
2664+
let sender1 = Identity::ONE;
2665+
2666+
// Sender 1 insert
2667+
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender1, 42)?;
2668+
2669+
assert_eq!(
2670+
project_views(&stdb, table_id, sender1)[0],
2671+
product![42u8],
2672+
"View row not inserted correctly"
2673+
);
2674+
2675+
// Sender 2 insert
2676+
let sender2 = Identity::ZERO;
2677+
let before_sender2 = Instant::now();
2678+
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 84)?;
2679+
2680+
assert_eq!(
2681+
project_views(&stdb, table_id, sender2)[0],
2682+
product![84u8],
2683+
"Sender 2 view row not inserted correctly"
2684+
);
2685+
2686+
// Restart database (view rows should NOT persist)
2687+
let stdb = stdb.reopen()?;
2688+
2689+
assert!(
2690+
project_views(&stdb, table_id, sender1).is_empty(),
2691+
"Sender 1 rows should NOT persist after reopen"
2692+
);
2693+
assert!(
2694+
project_views(&stdb, table_id, sender2).is_empty(),
2695+
"Sender 2 rows should NOT persist after reopen"
2696+
);
2697+
2698+
let tx = begin_mut_tx(&stdb);
2699+
let st = tx.lookup_st_view_subs(view_id)?;
2700+
assert!(st.is_empty(), "st_view_subs should also be cleared after restart");
2701+
stdb.commit_tx(tx)?;
2702+
2703+
// Reinsert after restart
2704+
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 91)?;
2705+
assert_eq!(
2706+
project_views(&stdb, table_id, sender2)[0],
2707+
product![91u8],
2708+
"Post-restart inserted rows must appear"
2709+
);
2710+
2711+
// Clean expired rows
2712+
let mut tx = begin_mut_tx(&stdb);
2713+
tx.clear_expired_views(
2714+
Instant::now().saturating_duration_since(before_sender2),
2715+
VIEW_CLEANUP_BUDGET,
2716+
)?;
2717+
stdb.commit_tx(tx)?;
2718+
2719+
// Only sender2 exists after reinsertion
2720+
assert!(
2721+
project_views(&stdb, table_id, sender1).is_empty(),
2722+
"Sender 1 should remain empty"
2723+
);
2724+
assert_eq!(
2725+
project_views(&stdb, table_id, sender2)[0],
2726+
product![91u8],
2727+
"Sender 2 row should remain"
2728+
);
2729+
2730+
// And st_view_subs must reflect only sender2
2731+
let tx = begin_mut_tx(&stdb);
2732+
let st_after = tx.lookup_st_view_subs(view_id)?;
2733+
assert_eq!(st_after.len(), 1);
2734+
assert_eq!(st_after[0].identity.0, sender2);
2735+
2736+
Ok(())
2737+
}
26112738
#[test]
26122739
fn test_table_name() -> ResultTest<()> {
26132740
let stdb = TestDB::durable()?;

crates/core/src/host/host_controller.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use super::{Scheduler, UpdateDatabaseResult};
55
use crate::client::{ClientActorId, ClientName};
66
use crate::database_logger::DatabaseLogger;
77
use crate::db::persistence::PersistenceProvider;
8-
use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata};
8+
use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata};
99
use crate::db::{self, spawn_tx_metrics_recorder};
1010
use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
1111
use crate::host::module_host::ModuleRuntime as _;
@@ -746,6 +746,9 @@ struct Host {
746746
/// Handle to the task responsible for recording metrics for each transaction.
747747
/// The task is aborted when [`Host`] is dropped.
748748
tx_metrics_recorder_task: AbortHandle,
749+
/// Handle to the task responsible for cleaning up old views.
750+
/// The task is aborted when [`Host`] is dropped.
751+
view_cleanup_task: AbortHandle,
749752
}
750753

751754
impl Host {
@@ -870,19 +873,17 @@ impl Host {
870873

871874
scheduler_starter.start(&module_host)?;
872875
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
876+
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db.clone());
873877

874878
let module = watch::Sender::new(module_host);
875-
//TODO(shub): Below code interfere with `exit_module` code,
876-
// I suspect channel internally holds a reference to the module,
877-
// even after we drop the sender.
878-
//
879-
// replica_ctx.subscriptions.init(module.subscribe());
879+
880880
Ok(Host {
881881
module,
882882
replica_ctx,
883883
scheduler,
884884
disk_metrics_recorder_task,
885885
tx_metrics_recorder_task,
886+
view_cleanup_task,
886887
})
887888
}
888889

@@ -1059,6 +1060,7 @@ impl Drop for Host {
10591060
fn drop(&mut self) {
10601061
self.disk_metrics_recorder_task.abort();
10611062
self.tx_metrics_recorder_task.abort();
1063+
self.view_cleanup_task.abort();
10621064
}
10631065
}
10641066

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -669,8 +669,8 @@ impl CommittedState {
669669
tx_data.has_rows_or_connect_disconnect(ctx.reducer_context())
670670
}
671671

672-
pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId) {
673-
self.read_sets.remove_view(view_id)
672+
pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId, sender: Option<Identity>) {
673+
self.read_sets.remove_view(view_id, sender)
674674
}
675675

676676
pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData {

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ impl ViewReadSets {
107107
}
108108

109109
/// Removes keys for `view_id` from the read set
110-
pub fn remove_view(&mut self, view_id: ViewId) {
110+
pub fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
111111
self.tables.retain(|_, readset| {
112-
readset.remove_view(view_id);
112+
readset.remove_view(view_id, sender);
113113
!readset.is_empty()
114114
});
115115
}
@@ -144,9 +144,14 @@ impl TableReadSet {
144144
self.table_scans.is_empty()
145145
}
146146

147-
/// Removes keys for `view_id` from the read set
148-
fn remove_view(&mut self, view_id: ViewId) {
149-
self.table_scans.retain(|info| info.view_id != view_id);
147+
/// Removes keys for `view_id` from the read set, optionally filtering by `sender`
148+
fn remove_view(&mut self, view_id: ViewId, sender: Option<Identity>) {
149+
if let Some(identity) = sender {
150+
self.table_scans
151+
.retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity)));
152+
} else {
153+
self.table_scans.retain(|call| call.view_id != view_id);
154+
}
150155
}
151156

152157
/// Merge or union two read sets for this table
@@ -221,7 +226,13 @@ impl MutTxId {
221226
/// Removes keys for `view_id` from the committed read set.
222227
/// Used for dropping views in an auto-migration.
223228
pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) {
224-
self.committed_state_write_lock.drop_view_from_read_sets(view_id)
229+
self.committed_state_write_lock.drop_view_from_read_sets(view_id, None)
230+
}
231+
232+
/// Removes a specific view call from the committed read set.
233+
pub fn drop_view_with_sender_from_committed_read_set(&mut self, view_id: ViewId, sender: Identity) {
234+
self.committed_state_write_lock
235+
.drop_view_from_read_sets(view_id, Some(sender))
225236
}
226237
}
227238

@@ -1961,6 +1972,90 @@ impl MutTxId {
19611972
Ok(())
19621973
}
19631974

1975+
/// Clean up views that have no subscribers and haven’t been called recently.
1976+
///
1977+
/// This function will scan for subscription entries in `st_view_sub` where:
1978+
/// - `has_subscribers == false`, `num_subscribers == 0`.
1979+
/// - `last_called` is older than `expiration_duration`.
1980+
///
1981+
/// For each such expired view:
1982+
/// 1. It clears the backing table,
1983+
/// 2. Removes the view from the committed read set, and
1984+
/// 3. Deletes the subscription row.
1985+
///
1986+
/// The cleanup is bounded by a total `max_duration`. The function stops when either:
1987+
/// - all expired views have been processed, or
1988+
/// - the `max_duration` budget is reached.
1989+
///
1990+
/// Returns a tuple `(cleaned, total_expired)`:
1991+
/// - `cleaned`: Number of views actually cleaned (deleted) in this run.
1992+
/// - `total_expired`: Total number of expired views found (even if not all were cleaned due to time budget).
1993+
pub fn clear_expired_views(
1994+
&mut self,
1995+
expiration_duration: Duration,
1996+
max_duration: Duration,
1997+
) -> Result<(usize, usize)> {
1998+
let start = std::time::Instant::now();
1999+
let now = Timestamp::now();
2000+
let expiration_threshold = now - expiration_duration;
2001+
let mut cleaned_count = 0;
2002+
2003+
// Collect all expired views from st_view_sub
2004+
let expired_items: Vec<(ViewId, Identity, RowPointer)> = self
2005+
.iter_by_col_eq(
2006+
ST_VIEW_SUB_ID,
2007+
StViewSubFields::HasSubscribers,
2008+
&AlgebraicValue::from(false),
2009+
)?
2010+
.filter_map(|row_ref| {
2011+
let row = StViewSubRow::try_from(row_ref).expect("Failed to deserialize st_view_sub row");
2012+
2013+
if !row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold {
2014+
Some((row.view_id, row.identity.into(), row_ref.pointer()))
2015+
} else {
2016+
None
2017+
}
2018+
})
2019+
.collect();
2020+
2021+
let total_expired = expired_items.len();
2022+
2023+
// For each expired view subscription, clear the backing table and delete the subscription
2024+
for (view_id, sender, sub_row_ptr) in expired_items {
2025+
// Check if we've exceeded our time budget
2026+
if start.elapsed() >= max_duration {
2027+
break;
2028+
}
2029+
2030+
let StViewRow {
2031+
table_id, is_anonymous, ..
2032+
} = self.lookup_st_view(view_id)?;
2033+
let table_id = table_id.expect("views have backing table");
2034+
2035+
if is_anonymous {
2036+
self.clear_table(table_id)?;
2037+
self.drop_view_from_committed_read_set(view_id);
2038+
} else {
2039+
let rows_to_delete = self
2040+
.iter_by_col_eq(table_id, 0, &sender.into())?
2041+
.map(|res| res.pointer())
2042+
.collect::<Vec<_>>();
2043+
2044+
for row_ptr in rows_to_delete {
2045+
self.delete(table_id, row_ptr)?;
2046+
}
2047+
2048+
self.drop_view_with_sender_from_committed_read_set(view_id, sender);
2049+
}
2050+
2051+
// Finally, delete the subscription row
2052+
self.delete(ST_VIEW_SUB_ID, sub_row_ptr)?;
2053+
cleaned_count += 1;
2054+
}
2055+
2056+
Ok((cleaned_count, total_expired))
2057+
}
2058+
19642059
/// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view.
19652060
pub fn unsubscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> {
19662061
use StViewSubFields::*;

0 commit comments

Comments
 (0)