diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 5e53f7d39cc..3a5399e5a7e 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -821,9 +821,18 @@ impl RelationalDB { Txdata, }; + let is_not_ephemeral_table = |table_id: &TableId| -> bool { + tx_data + .ephemeral_tables() + .map(|etables| !etables.contains(table_id)) + .unwrap_or(true) + }; + if tx_data.tx_offset().is_some() { let inserts: Box<_> = tx_data .inserts() + // Skip ephemeral tables + .filter(|(table_id, _)| is_not_ephemeral_table(table_id)) .map(|(table_id, rowdata)| Ops { table_id: *table_id, rowdata: rowdata.clone(), @@ -834,6 +843,7 @@ impl RelationalDB { let deletes: Box<_> = tx_data .deletes() + .filter(|(table_id, _)| is_not_ephemeral_table(table_id)) .map(|(table_id, rowdata)| Ops { table_id: *table_id, rowdata: rowdata.clone(), @@ -842,6 +852,8 @@ impl RelationalDB { .filter(|ops| !truncates.contains(&ops.table_id)) .collect(); + let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect(); + let inputs = reducer_context.map(|rcx| rcx.into()); let txdata = Txdata { @@ -850,7 +862,7 @@ impl RelationalDB { mutations: Some(Mutations { inserts, deletes, - truncates: truncates.into_iter().collect(), + truncates, }), }; @@ -2408,6 +2420,27 @@ mod tests { TableSchema::from_module_def(&def, table, (), TableId::SENTINEL) } + fn view_module_def() -> ModuleDef { + let mut builder = RawModuleDefV9Builder::new(); + + let return_type_ref = builder.add_algebraic_type( + [], + "my_view_return_type", + AlgebraicType::product([("b", AlgebraicType::U8)]), + true, + ); + builder.add_view( + "my_view", + 0, + true, + false, + ProductType::unit(), + AlgebraicType::array(AlgebraicType::Ref(return_type_ref)), + ); + let raw = builder.finish(); + raw.try_into().expect("table validation failed") + } + fn table_auto_inc() -> TableSchema { table( "MyTable", @@ -2492,6 +2525,89 @@ mod tests { Ok(()) } + fn setup_view(stdb: &TestDB) -> ResultTest<(ViewId, TableId, ModuleDef, ViewDef)> { + let module_def = view_module_def(); + let view_def = module_def.view("my_view").unwrap(); + + let mut tx = begin_mut_tx(stdb); + let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?; + stdb.commit_tx(tx)?; + + Ok((view_id, table_id, module_def.clone(), view_def.clone())) + } + + fn insert_view_row( + stdb: &TestDB, + view_id: ViewId, + table_id: TableId, + typespace: &Typespace, + row_type: AlgebraicTypeRef, + sender: Identity, + v: u8, + ) -> ResultTest<()> { + let to_bsatn = |pv: &ProductValue| { + Bytes::from(bsatn::to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed")) + }; + + let row_pv = |v: u8| product![v]; + + let mut tx = begin_mut_tx(stdb); + tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; + stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bsatn(&row_pv(v)), typespace)?; + stdb.commit_tx(tx)?; + + Ok(()) + } + + fn project_views(stdb: &TestDB, table_id: TableId, sender: Identity) -> Vec { + let tx = begin_tx(stdb); + + stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into()) + .unwrap() + .map(|row| { + let pv = row.to_product_value(); + ProductValue { + elements: pv.elements.iter().skip(1).cloned().collect(), + } + }) + .collect() + } + + #[test] + fn test_view_tables_are_ephemeral() -> ResultTest<()> { + let stdb = TestDB::durable()?; + + let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?; + let row_type = view_def.product_type_ref; + let typespace = module_def.typespace(); + + // Write some rows (reusing the same helper) + insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ONE, 10)?; + insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ZERO, 20)?; + + assert!( + !project_views(&stdb, table_id, Identity::ZERO).is_empty(), + "View table should NOT be empty after insert" + ); + + // Reopen the database — view tables must not persist + let stdb = stdb.reopen()?; + + // Validate that the view's backing table has been removed + assert!( + project_views(&stdb, table_id, Identity::ZERO).is_empty(), + "View table should be empty after reopening the database" + ); + + let tx = begin_mut_tx(&stdb); + let subs_rows = tx.lookup_st_view_subs(view_id)?; + assert!( + subs_rows.is_empty(), + "st_view_subs should be empty after reopening the database" + ); + Ok(()) + } + #[test] fn test_table_name() -> ResultTest<()> { let stdb = TestDB::durable()?; diff --git a/crates/datastore/src/error.rs b/crates/datastore/src/error.rs index a89f092e7bc..5c892d2e82a 100644 --- a/crates/datastore/src/error.rs +++ b/crates/datastore/src/error.rs @@ -1,7 +1,7 @@ use super::system_tables::SystemTable; use enum_as_inner::EnumAsInner; use spacetimedb_lib::db::raw_def::{v9::RawSql, RawIndexDefV8}; -use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId}; +use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId, TableId, ViewId}; use spacetimedb_sats::buffer::DecodeError; use spacetimedb_sats::{product_value::InvalidFieldError, satn::Satn}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue}; @@ -41,6 +41,8 @@ pub enum DatastoreError { pub enum ViewError { #[error("view '{0}' not found")] NotFound(Box), + #[error("Table backing View '{0}' not found")] + TableNotFound(ViewId), #[error("failed to deserialize view arguments from row")] DeserializeArgs, #[error("failed to deserialize view return value: {0}")] diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 22e4d4d00ad..5d42000b0c3 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -8,18 +8,18 @@ use super::{ }; use crate::{ db_metrics::DB_METRICS, - error::{DatastoreError, IndexError, TableError}, + error::{DatastoreError, IndexError, TableError, ViewError}, execution_context::ExecutionContext, locking_tx_datastore::{mut_tx::ViewReadSets, state_view::iter_st_column_for_table}, system_tables::{ system_tables, StColumnRow, StConstraintData, StConstraintRow, StIndexRow, StSequenceRow, StTableFields, - StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, + StTableRow, StViewRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_IDX, ST_INDEX_NAME, ST_MODULE_ID, ST_MODULE_IDX, ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_IDX, ST_SCHEDULED_ID, ST_SCHEDULED_IDX, ST_SEQUENCE_ID, ST_SEQUENCE_IDX, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_IDX, ST_VAR_ID, ST_VAR_IDX, ST_VIEW_ARG_ID, ST_VIEW_ARG_IDX, }, - traits::TxData, + traits::{EphemeralTables, TxData}, }; use crate::{ locking_tx_datastore::ViewCallInfo, @@ -80,6 +80,12 @@ pub struct CommittedState { /// Any overlap will trigger a re-evaluation of the affected view, /// and its read set will be updated accordingly. read_sets: ViewReadSets, + + /// Tables which do not need to be made persistent. + /// These include: + /// - system tables: `st_view_sub`, `st_view_arg` + /// - Tables which back views. + pub(super) ephemeral_tables: EphemeralTables, } impl CommittedState { @@ -99,6 +105,7 @@ impl MemoryUsage for CommittedState { page_pool: _, table_dropped, read_sets, + ephemeral_tables, } = self; // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource. next_tx_offset.heap_usage() @@ -107,6 +114,7 @@ impl MemoryUsage for CommittedState { + index_id_map.heap_usage() + table_dropped.heap_usage() + read_sets.heap_usage() + + ephemeral_tables.heap_usage() } } @@ -171,6 +179,7 @@ impl CommittedState { table_dropped: <_>::default(), read_sets: <_>::default(), page_pool, + ephemeral_tables: <_>::default(), } } @@ -518,6 +527,32 @@ impl CommittedState { Ok(()) } + pub(super) fn collect_ephemeral_tables(&mut self) -> Result<()> { + self.ephemeral_tables = self.ephemeral_tables()?.into_iter().collect(); + Ok(()) + } + + fn ephemeral_tables(&self) -> Result> { + let mut tables = vec![ST_VIEW_SUB_ID, ST_VIEW_ARG_ID]; + + let Some(st_view) = self.tables.get(&ST_VIEW_ID) else { + return Ok(tables); + }; + let backing_tables = st_view + .scan_rows(&self.blob_store) + .map(|row_ref| { + let view_row = StViewRow::try_from(row_ref)?; + view_row + .table_id + .ok_or_else(|| DatastoreError::View(ViewError::TableNotFound(view_row.view_id))) + }) + .collect::>>()?; + + tables.extend(backing_tables); + + Ok(tables) + } + /// After replaying all old transactions, /// inserts and deletes into the system tables /// might not be reflected in the schemas of the built tables. @@ -675,6 +710,8 @@ impl CommittedState { self.next_tx_offset += 1; } + tx_data.set_ephemeral_tables(&self.ephemeral_tables); + tx_data } @@ -847,10 +884,16 @@ impl CommittedState { } // A table was removed. Add it back. TableRemoved(table_id, table) => { + let is_view_table = table.schema.is_view(); // We don't need to deal with sub-components. // That is, we don't need to add back indices and such. // Instead, there will be separate pending schema changes like `IndexRemoved`. self.tables.insert(table_id, table); + + // Incase, the table was ephemeral, add it back to that set as well. + if is_view_table { + self.ephemeral_tables.insert(table_id); + } } // A table was added. Remove it. TableAdded(table_id) => { @@ -858,6 +901,8 @@ impl CommittedState { // That is, we don't need to remove indices and such. // Instead, there will be separate pending schema changes like `IndexAdded`. self.tables.remove(&table_id); + // Incase, the table was ephemeral, remove it from that set as well. + self.ephemeral_tables.remove(&table_id); } // A table's access was changed. Change back to the old one. TableAlterAccess(table_id, access) => { diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 4a434e9ffb8..5766b4b7df5 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -146,6 +146,8 @@ impl Locking { committed_state.build_indexes()?; // Figure out where to pick up for each sequence. *self.sequence_state.lock() = committed_state.build_sequence_state()?; + + committed_state.collect_ephemeral_tables()?; Ok(()) } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index aadae882c56..d21d453b7af 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -384,6 +384,8 @@ impl MutTxId { self.insert_into_st_view_param(view_id, param_columns)?; self.insert_into_st_view_column(view_id, return_columns)?; + self.committed_state_write_lock.ephemeral_tables.insert(table_id); + Ok((view_id, table_id)) } diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index b04b2444863..e1161b5a11b 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -166,6 +166,8 @@ pub enum IsolationLevel { Serializable, } +pub type EphemeralTables = IntSet; + /// A record of all the operations within a transaction. /// /// Some extra information is embedded here @@ -191,6 +193,11 @@ pub struct TxData { /// `None` implies that `inserts` and `deletes` are both empty, /// but `Some` does not necessarily imply that either is non-empty. tx_offset: Option, + + /// Set of ephemeral tables modified in this transaction (only populated when a view is executed). + /// These tables do not need to be persisted to disk. + /// Every table listed here must appear in either `inserts` or `deletes`. + ephemeral_tables: Option, } impl TxData { @@ -226,6 +233,25 @@ impl TxData { self.truncates.extend(truncated_tables); } + /// Determines which ephemeral tables were modified in this transaction. + /// + /// Iterates over the tables updated in this transaction and records those that + /// also appear in `all_ephemeral_tables`. + /// `self.ephemeral_tables` remains `None` if no ephemeral tables were modified. + pub fn set_ephemeral_tables(&mut self, all_ephemeral_tables: &EphemeralTables) { + for tid in self.tables.keys() { + if all_ephemeral_tables.contains(tid) { + self.ephemeral_tables + .get_or_insert_with(EphemeralTables::default) + .insert(*tid); + } + } + } + + pub fn ephemeral_tables(&self) -> Option<&EphemeralTables> { + self.ephemeral_tables.as_ref() + } + /// Obtain an iterator over the inserted rows per table. pub fn inserts(&self) -> impl Iterator)> + '_ { self.inserts.iter()