Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1397,13 +1397,9 @@ impl RelationalDB {
}

/// Clear all rows from a table without dropping it.
pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> {
let relation = self
.iter_mut(tx, table_id)?
.map(|row_ref| row_ref.pointer())
.collect::<Vec<_>>();
self.delete(tx, table_id, relation);
Ok(())
pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<usize, DBError> {
let rows_deleted = tx.clear_table(table_id)?;
Ok(rows_deleted)
}

pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
Expand Down
129 changes: 97 additions & 32 deletions crates/datastore/src/locking_tx_datastore/committed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
use anyhow::anyhow;
use core::{convert::Infallible, ops::RangeBounds};
use itertools::Itertools;
use spacetimedb_data_structures::map::{HashSet, IntMap};
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
use spacetimedb_lib::{
db::auth::{StAccess, StTableType},
Identity,
Expand All @@ -44,6 +44,7 @@ use spacetimedb_table::{
};
use std::collections::BTreeMap;
use std::sync::Arc;
use thin_vec::ThinVec;

/// Contains the live, in-memory snapshot of a database. This structure
/// is exposed in order to support tools wanting to process the commit
Expand All @@ -65,6 +66,11 @@ pub struct CommittedState {
/// Pages are shared between all modules running on a particular host,
/// not allocated per-module.
pub(super) page_pool: PagePool,
/// Whether the table was dropped during replay.
/// TODO(centril): Only used during bootstrap and is otherwise unused.
/// We should split `CommittedState` into two types
/// where one, e.g., `ReplayCommittedState`, has this field.
table_dropped: IntSet<TableId>,
}

impl MemoryUsage for CommittedState {
Expand All @@ -75,9 +81,14 @@ impl MemoryUsage for CommittedState {
blob_store,
index_id_map,
page_pool: _,
table_dropped,
} = self;
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
next_tx_offset.heap_usage() + tables.heap_usage() + blob_store.heap_usage() + index_id_map.heap_usage()
next_tx_offset.heap_usage()
+ tables.heap_usage()
+ blob_store.heap_usage()
+ index_id_map.heap_usage()
+ table_dropped.heap_usage()
}
}

Expand Down Expand Up @@ -156,6 +167,7 @@ impl CommittedState {
tables: <_>::default(),
blob_store: <_>::default(),
index_id_map: <_>::default(),
table_dropped: <_>::default(),
page_pool,
}
}
Expand Down Expand Up @@ -333,17 +345,30 @@ impl CommittedState {
Ok(())
}

pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> {
let table = self
.tables
.get_mut(&table_id)
.ok_or(TableError::IdNotFoundState(table_id))?;
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
// Get the table for mutation.
// If it was dropped, avoid an error and just ignore the row instead.
let table = match self.tables.get_mut(&table_id) {
Some(t) => t,
None if self.table_dropped.contains(&table_id) => return Ok(()),
None => return Err(TableError::IdNotFoundState(table_id).into()),
};

// Delete the row.
let blob_store = &mut self.blob_store;
table
.delete_equal_row(&self.page_pool, blob_store, rel)
.delete_equal_row(&self.page_pool, blob_store, row)
.map_err(TableError::Bflatn)?
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;

if table_id == ST_TABLE_ID {
// A row was removed from `st_table`, so a table was dropped.
// Remove that table from the in-memory structures.
self.tables
.remove(&Self::read_table_id(row))
.expect("table to remove should exist");
}

Ok(())
}

Expand Down Expand Up @@ -378,8 +403,7 @@ impl CommittedState {
///
/// The `row_ptr` is a pointer to `row`.
fn st_column_changed(&mut self, row: &ProductValue, row_ptr: RowPointer) -> Result<()> {
let target_table_id = TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0]))
.expect("first field in `st_column` should decode to a `TableId`");
let target_table_id = Self::read_table_id(row);
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&row.elements[1]))
.expect("second field in `st_column` should decode to a `ColId`");

Expand Down Expand Up @@ -410,6 +434,12 @@ impl CommittedState {
Ok(())
}

/// Assuming that a `TableId` is stored as the first field in `row`, read it.
fn read_table_id(row: &ProductValue) -> TableId {
TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0]))
.expect("first field in `st_column` should decode to a `TableId`")
}

pub(super) fn build_sequence_state(&mut self, sequence_state: &mut SequencesState) -> Result<()> {
let st_sequences = self.tables.get(&ST_SEQUENCE_ID).unwrap();
for row_ref in st_sequences.scan_rows(&self.blob_store) {
Expand Down Expand Up @@ -594,7 +624,7 @@ impl CommittedState {
let mut tx_data = TxData::default();

// First, apply deletes. This will free up space in the committed tables.
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes);

// Then, apply inserts. This will re-fill the holes freed by deletions
// before allocating new pages.
Expand All @@ -610,33 +640,68 @@ impl CommittedState {
tx_data
}

fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap<TableId, DeleteTable>) {
fn merge_apply_deletes(
&mut self,
tx_data: &mut TxData,
delete_tables: BTreeMap<TableId, DeleteTable>,
pending_schema_changes: ThinVec<PendingSchemaChange>,
) {
fn delete_rows(
tx_data: &mut TxData,
table_id: TableId,
table: &mut Table,
blob_store: &mut dyn BlobStore,
row_ptrs_len: usize,
row_ptrs: impl Iterator<Item = RowPointer>,
) {
let mut deletes = Vec::with_capacity(row_ptrs_len);

// Note: we maintain the invariant that the delete_tables
// holds only committed rows which should be deleted,
// i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`,
// so no need to check before applying the deletes.
for row_ptr in row_ptrs {
debug_assert!(row_ptr.squashed_offset().is_committed_state());

// TODO: re-write `TxData` to remove `ProductValue`s
let pv = table
.delete(blob_store, row_ptr, |row| row.to_product_value())
.expect("Delete for non-existent row!");
deletes.push(pv);
}

if !deletes.is_empty() {
let table_name = &*table.get_schema().table_name;
// TODO(centril): Pass this along to record truncated tables.
let _truncated = table.row_count == 0;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
}
}

for (table_id, row_ptrs) in delete_tables {
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
let mut deletes = Vec::with_capacity(row_ptrs.len());

// Note: we maintain the invariant that the delete_tables
// holds only committed rows which should be deleted,
// i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`,
// so no need to check before applying the deletes.
for row_ptr in row_ptrs.iter() {
debug_assert!(row_ptr.squashed_offset().is_committed_state());

// TODO: re-write `TxData` to remove `ProductValue`s
let pv = table
.delete(blob_store, row_ptr, |row| row.to_product_value())
.expect("Delete for non-existent row!");
deletes.push(pv);
}

if !deletes.is_empty() {
let table_name = &*table.get_schema().table_name;
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
}
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
} else if !row_ptrs.is_empty() {
panic!("Deletion for non-existent table {table_id:?}... huh?");
}
}

// Delete all tables marked for deletion.
// The order here does not matter as once a `table_id` has been dropped
// it will never be re-created.
for change in pending_schema_changes {
if let PendingSchemaChange::TableRemoved(table_id, mut table) = change {
let row_ptrs = table.scan_all_row_ptrs();
delete_rows(
tx_data,
table_id,
&mut table,
&mut self.blob_store,
row_ptrs.len(),
row_ptrs.into_iter(),
);
}
}
}

fn merge_apply_inserts(
Expand Down
Loading
Loading