Skip to content

Commit 8da6dbe

Browse files
committed
send tx offset for procedures & abort dangling anon txes
1 parent b0b2323 commit 8da6dbe

File tree

11 files changed

+168
-86
lines changed

11 files changed

+168
-86
lines changed

crates/client-api/src/routes/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ async fn procedure<S: ControlStateDelegate + NodeDelegate>(
281281
let result = match module
282282
.call_procedure(caller_identity, Some(connection_id), None, &procedure, args)
283283
.await
284+
.result
284285
{
285286
Ok(res) => Ok(res),
286287
Err(e) => {

crates/core/src/client/client_connection.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use super::{message_handlers, ClientActorId, MessageHandleError};
1212
use crate::db::relational_db::RelationalDB;
1313
use crate::error::DBError;
1414
use crate::host::module_host::ClientConnectedError;
15-
use crate::host::{FunctionArgs, ModuleHost, NoSuchModule, ReducerCallError, ReducerCallResult};
15+
use crate::host::{CallProcedureReturn, FunctionArgs, ModuleHost, NoSuchModule, ReducerCallError, ReducerCallResult};
1616
use crate::messages::websocket::Subscribe;
1717
use crate::subscription::module_subscription_manager::BroadcastError;
1818
use crate::util::asyncify;
@@ -848,7 +848,7 @@ impl ClientConnection {
848848
request_id: RequestId,
849849
timer: Instant,
850850
) -> Result<(), BroadcastError> {
851-
let res = self
851+
let CallProcedureReturn { result, tx_offset } = self
852852
.module()
853853
.call_procedure(
854854
self.id.identity,
@@ -859,9 +859,11 @@ impl ClientConnection {
859859
)
860860
.await;
861861

862+
let message = ProcedureResultMessage::from_result(&result, request_id);
863+
862864
self.module()
863865
.subscriptions()
864-
.send_procedure_message(self.sender(), ProcedureResultMessage::from_result(&res, request_id))
866+
.send_procedure_message(self.sender(), message, tx_offset)
865867
}
866868

867869
pub async fn subscribe_single(

crates/core/src/host/host_controller.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ use crate::db::{self, spawn_tx_metrics_recorder};
1010
use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor};
1111
use crate::host::module_host::ModuleRuntime as _;
1212
use crate::host::v8::V8Runtime;
13+
use crate::host::ProcedureCallError;
1314
use crate::messages::control_db::{Database, HostType};
1415
use crate::module_host_context::ModuleCreationContext;
1516
use crate::replica_context::ReplicaContext;
1617
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
17-
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
18+
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset};
1819
use crate::util::asyncify;
1920
use crate::util::jobs::{JobCores, SingleCoreExecutor};
2021
use crate::worker_metrics::WORKER_METRICS;
@@ -178,6 +179,11 @@ pub struct ProcedureCallResult {
178179
pub start_timestamp: Timestamp,
179180
}
180181

182+
pub struct CallProcedureReturn {
183+
pub result: Result<ProcedureCallResult, ProcedureCallError>,
184+
pub tx_offset: Option<TransactionOffset>,
185+
}
186+
181187
impl HostController {
182188
pub fn new(
183189
data_dir: Arc<ServerDataDir>,

crates/core/src/host/instance_env.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,8 @@ impl InstanceEnv {
230230
);
231231
}
232232

233-
/// End a console timer by logging the span at INFO level.
234-
pub(crate) fn console_timer_end(&self, span: &TimingSpan, function: Option<&str>) {
235-
let elapsed = span.start.elapsed();
236-
let message = format!("Timing span {:?}: {:?}", &span.name, elapsed);
237-
233+
/// Logs a simple `message` at `level`.
234+
pub(crate) fn console_log_simple_message(&self, level: LogLevel, function: Option<&str>, message: &str) {
238235
/// A backtrace provider that provides nothing.
239236
struct Noop;
240237
impl BacktraceProvider for Noop {
@@ -254,9 +251,17 @@ impl InstanceEnv {
254251
filename: None,
255252
line_number: None,
256253
function,
257-
message: &message,
254+
message,
258255
};
259-
self.console_log(LogLevel::Info, &record, &Noop);
256+
self.console_log(level, &record, &Noop);
257+
}
258+
259+
/// End a console timer by logging the span at INFO level.
260+
pub(crate) fn console_timer_end(&self, span: &TimingSpan, function: Option<&str>) {
261+
let elapsed = span.start.elapsed();
262+
let message = format!("Timing span {:?}: {:?}", &span.name, elapsed);
263+
264+
self.console_log_simple_message(LogLevel::Info, function, &message);
260265
}
261266

262267
/// Returns the current time suitable for logging.
@@ -622,7 +627,7 @@ impl TxSlot {
622627
MutexGuard::try_map(self.inner.lock(), |map| map.as_mut()).map_err(|_| GetTxError)
623628
}
624629

625-
/// Steals th tx from the slot.
630+
/// Steals the tx from the slot.
626631
pub fn take(&self) -> Result<MutTxId, GetTxError> {
627632
self.inner.lock().take().ok_or(GetTxError)
628633
}

crates/core/src/host/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ mod wasm_common;
2424

2525
pub use disk_storage::DiskStorage;
2626
pub use host_controller::{
27-
extract_schema, ExternalDurability, ExternalStorage, HostController, MigratePlanResult, ProcedureCallResult,
28-
ProgramStorage, ReducerCallResult, ReducerOutcome,
27+
extract_schema, CallProcedureReturn, ExternalDurability, ExternalStorage, HostController, MigratePlanResult,
28+
ProcedureCallResult, ProgramStorage, ReducerCallResult, ReducerOutcome,
2929
};
3030
pub use module_host::{ModuleHost, NoSuchModule, ProcedureCallError, ReducerCallError, UpdateDatabaseResult};
3131
pub use scheduler::Scheduler;

crates/core/src/host/module_host.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{
2-
ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ProcedureCallResult,
3-
ReducerCallResult, ReducerId, ReducerOutcome, Scheduler,
2+
ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ReducerCallResult, ReducerId,
3+
ReducerOutcome, Scheduler,
44
};
55
use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage};
66
use crate::client::{ClientActorId, ClientConnectionSender};
@@ -10,6 +10,7 @@ use crate::energy::EnergyQuanta;
1010
use crate::error::DBError;
1111
use crate::estimation::estimate_rows_scanned;
1212
use crate::hash::Hash;
13+
use crate::host::host_controller::CallProcedureReturn;
1314
use crate::host::scheduler::{handle_queued_call_reducer_params, QueueItem};
1415
use crate::host::v8::JsInstance;
1516
use crate::host::wasmtime::ModuleInstance;
@@ -401,7 +402,7 @@ impl Instance {
401402
}
402403
}
403404

404-
async fn call_procedure(&mut self, params: CallProcedureParams) -> Result<ProcedureCallResult, ProcedureCallError> {
405+
async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn {
405406
match self {
406407
Instance::Wasm(inst) => inst.call_procedure(params).await,
407408
Instance::Js(inst) => inst.call_procedure(params).await,
@@ -1550,7 +1551,7 @@ impl ModuleHost {
15501551
timer: Option<Instant>,
15511552
procedure_name: &str,
15521553
args: FunctionArgs,
1553-
) -> Result<ProcedureCallResult, ProcedureCallError> {
1554+
) -> CallProcedureReturn {
15541555
let res = async {
15551556
let (procedure_id, procedure_def) = self
15561557
.info
@@ -1569,7 +1570,15 @@ impl ModuleHost {
15691570
}
15701571
.await;
15711572

1572-
let log_message = match &res {
1573+
let ret = match res {
1574+
Ok(ret) => ret,
1575+
Err(err) => CallProcedureReturn {
1576+
result: Err(err),
1577+
tx_offset: None,
1578+
},
1579+
};
1580+
1581+
let log_message = match &ret.result {
15731582
Err(ProcedureCallError::NoSuchProcedure) => Some(no_such_function_log_message("procedure", procedure_name)),
15741583
Err(ProcedureCallError::Args(_)) => Some(args_error_log_message("procedure", procedure_name)),
15751584
_ => None,
@@ -1579,7 +1588,7 @@ impl ModuleHost {
15791588
self.inject_logs(LogLevel::Error, procedure_name, &log_message)
15801589
}
15811590

1582-
res
1591+
ret
15831592
}
15841593

15851594
async fn call_procedure_inner(
@@ -1590,10 +1599,10 @@ impl ModuleHost {
15901599
procedure_id: ProcedureId,
15911600
procedure_def: &ProcedureDef,
15921601
args: FunctionArgs,
1593-
) -> Result<ProcedureCallResult, ProcedureCallError> {
1602+
) -> Result<CallProcedureReturn, ProcedureCallError> {
15941603
let procedure_seed = ArgsSeed(self.info.module_def.typespace().with_type(procedure_def));
1595-
let args = args.into_tuple(procedure_seed).map_err(InvalidProcedureArguments)?;
15961604
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
1605+
let args = args.into_tuple(procedure_seed).map_err(InvalidProcedureArguments)?;
15971606

15981607
self.call_async_with_instance(&procedure_def.name, async move |mut inst| {
15991608
let res = inst
@@ -1608,7 +1617,8 @@ impl ModuleHost {
16081617
.await;
16091618
(res, inst)
16101619
})
1611-
.await?
1620+
.await
1621+
.map_err(Into::into)
16121622
}
16131623

16141624
// Scheduled reducers require a different function here to call their reducer

crates/core/src/host/v8/mod.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use super::module_common::{build_common_module_from_raw, run_describer, ModuleCo
1313
use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime};
1414
use super::UpdateDatabaseResult;
1515
use crate::client::ClientActorId;
16+
use crate::host::host_controller::CallProcedureReturn;
1617
use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot};
1718
use crate::host::module_host::{
1819
call_identity_connected, call_scheduled_reducer, init_database, CallViewParams, ClientConnectedError, Instance,
@@ -29,6 +30,7 @@ use crate::host::wasm_common::{RowIters, TimingSpanSet};
2930
use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler};
3031
use crate::module_host_context::{ModuleCreationContext, ModuleCreationContextLimited};
3132
use crate::replica_context::ReplicaContext;
33+
use crate::subscription::module_subscription_manager::TransactionOffset;
3234
use crate::util::asyncify;
3335
use anyhow::Context as _;
3436
use core::any::type_name;
@@ -380,10 +382,7 @@ impl JsInstance {
380382
.await
381383
}
382384

383-
pub async fn call_procedure(
384-
&mut self,
385-
_params: CallProcedureParams,
386-
) -> Result<super::ProcedureCallResult, super::ProcedureCallError> {
385+
pub async fn call_procedure(&mut self, _params: CallProcedureParams) -> CallProcedureReturn {
387386
todo!("JS/TS module procedure support")
388387
}
389388

@@ -781,7 +780,11 @@ impl WasmInstance for V8Instance<'_, '_, '_> {
781780
log_traceback(self.replica_ctx, func_type, func, trap)
782781
}
783782

784-
async fn call_procedure(&mut self, _op: ProcedureOp, _budget: FunctionBudget) -> ProcedureExecuteResult {
783+
async fn call_procedure(
784+
&mut self,
785+
_op: ProcedureOp,
786+
_budget: FunctionBudget,
787+
) -> (ProcedureExecuteResult, Option<TransactionOffset>) {
785788
todo!("JS/TS module procedure support")
786789
}
787790
}

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use super::*;
33
use crate::client::ClientActorId;
44
use crate::database_logger;
55
use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint};
6+
use crate::host::host_controller::CallProcedureReturn;
67
use crate::host::instance_env::{InstanceEnv, TxSlot};
78
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
89
use crate::host::module_host::{
@@ -20,6 +21,7 @@ use crate::messages::control_db::HostType;
2021
use crate::module_host_context::ModuleCreationContextLimited;
2122
use crate::replica_context::ReplicaContext;
2223
use crate::subscription::module_subscription_actor::commit_and_broadcast_event;
24+
use crate::subscription::module_subscription_manager::TransactionOffset;
2325
use crate::util::prometheus_handle::{HistogramExt, TimerGuard};
2426
use crate::worker_metrics::WORKER_METRICS;
2527
use bytes::Bytes;
@@ -79,7 +81,7 @@ pub trait WasmInstance {
7981
&mut self,
8082
op: ProcedureOp,
8183
budget: FunctionBudget,
82-
) -> impl Future<Output = ProcedureExecuteResult>;
84+
) -> impl Future<Output = (ProcedureExecuteResult, Option<TransactionOffset>)>;
8385
}
8486

8587
pub struct EnergyStats {
@@ -378,15 +380,12 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
378380
res
379381
}
380382

381-
pub async fn call_procedure(
382-
&mut self,
383-
params: CallProcedureParams,
384-
) -> Result<ProcedureCallResult, ProcedureCallError> {
385-
let res = self.common.call_procedure(params, &mut self.instance).await;
386-
if res.is_err() {
383+
pub async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn {
384+
let ret = self.common.call_procedure(params, &mut self.instance).await;
385+
if ret.result.is_err() {
387386
self.trapped = true;
388387
}
389-
res
388+
ret
390389
}
391390
}
392391

@@ -575,7 +574,7 @@ impl InstanceCommon {
575574
&mut self,
576575
params: CallProcedureParams,
577576
inst: &mut I,
578-
) -> Result<ProcedureCallResult, ProcedureCallError> {
577+
) -> CallProcedureReturn {
579578
let CallProcedureParams {
580579
timestamp,
581580
caller_identity,
@@ -612,7 +611,7 @@ impl InstanceCommon {
612611
// TODO(procedure-energy): replace with call to separate function `procedure_budget`.
613612
let budget = self.energy_monitor.reducer_budget(&energy_fingerprint);
614613

615-
let result = inst.call_procedure(op, budget).await;
614+
let (result, tx_offset) = inst.call_procedure(op, budget).await;
616615

617616
let ProcedureExecuteResult {
618617
stats:
@@ -630,7 +629,7 @@ impl InstanceCommon {
630629
self.allocated_memory = memory_allocation;
631630
}
632631

633-
match call_result {
632+
let result = match call_result {
634633
Err(err) => {
635634
inst.log_traceback("procedure", &procedure_def.name, &err);
636635

@@ -655,16 +654,17 @@ impl InstanceCommon {
655654
Ok(return_val) => {
656655
let return_type = &procedure_def.return_type;
657656
let seed = spacetimedb_sats::WithTypespace::new(self.info.module_def.typespace(), return_type);
658-
let return_val = seed
659-
.deserialize(bsatn::Deserializer::new(&mut &return_val[..]))
660-
.map_err(|err| ProcedureCallError::InternalError(format!("{err}")))?;
661-
Ok(ProcedureCallResult {
662-
return_val,
663-
execution_duration: timer.map(|timer| timer.elapsed()).unwrap_or_default(),
664-
start_timestamp: timestamp,
665-
})
657+
seed.deserialize(bsatn::Deserializer::new(&mut &return_val[..]))
658+
.map_err(|err| ProcedureCallError::InternalError(format!("{err}")))
659+
.map(|return_val| ProcedureCallResult {
660+
return_val,
661+
execution_duration: timer.map(|timer| timer.elapsed()).unwrap_or_default(),
662+
start_timestamp: timestamp,
663+
})
666664
}
667-
}
665+
};
666+
667+
CallProcedureReturn { result, tx_offset }
668668
}
669669

670670
/// Execute a reducer.

0 commit comments

Comments
 (0)