Skip to content

Commit fcaf181

Browse files
committed
add ProcedureContext::with_transaction
1 parent 210d6e2 commit fcaf181

File tree

6 files changed

+233
-56
lines changed

6 files changed

+233
-56
lines changed

crates/bindings-sys/src/lib.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -674,14 +674,15 @@ pub mod raw {
674674
///
675675
/// # Traps
676676
///
677-
/// This function does not trap.
677+
/// Traps if:
678+
/// - `out` is NULL or `out[..size_of::<i64>()]` is not in bounds of WASM memory.
678679
///
679680
/// # Errors
680681
///
681682
/// Returns an error:
682683
///
683684
/// - `WOULD_BLOCK_TRANSACTION`, if there's already an ongoing transaction.
684-
pub fn procedure_start_mut_transaction() -> u16;
685+
pub fn procedure_start_mut_transaction(out: *mut i64) -> u16;
685686

686687
/// Commits a mutable transaction,
687688
/// suspending execution of this WASM instance until
@@ -1322,7 +1323,7 @@ impl Drop for RowIter {
13221323
pub mod procedure {
13231324
//! Side-effecting or asynchronous operations which only procedures are allowed to perform.
13241325
1325-
use super::{call_no_ret, raw, Result};
1326+
use super::{call, call_no_ret, raw, Result};
13261327

13271328
#[inline]
13281329
pub fn sleep_until(wake_at_timestamp: i64) -> i64 {
@@ -1335,7 +1336,7 @@ pub mod procedure {
13351336
/// suspending execution of this WASM instance until
13361337
/// a mutable transaction lock is aquired.
13371338
///
1338-
/// Upon resuming, returns `Ok(())` on success,
1339+
/// Upon resuming, returns `Ok(timestamp)` on success,
13391340
/// enabling further calls that require a pending transaction,
13401341
/// or [`Errno`] otherwise.
13411342
///
@@ -1345,8 +1346,8 @@ pub mod procedure {
13451346
///
13461347
/// - `WOULD_BLOCK_TRANSACTION`, if there's already an ongoing transaction.
13471348
#[inline]
1348-
pub fn procedure_start_mut_transaction() -> Result<()> {
1349-
call_no_ret(|| unsafe { raw::procedure_start_mut_transaction() })
1349+
pub fn procedure_start_mut_transaction() -> Result<i64> {
1350+
unsafe { call(|out| raw::procedure_start_mut_transaction(out)) }
13501351
}
13511352

13521353
/// Commits a mutable transaction,

crates/bindings/src/lib.rs

Lines changed: 133 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use core::cell::{LazyCell, OnceCell, RefCell};
55
use core::ops::Deref;
66
use spacetimedb_lib::bsatn;
7+
use std::rc::Rc;
78

89
#[cfg(feature = "unstable")]
910
mod client_visibility_filter;
@@ -888,8 +889,6 @@ pub struct ViewContext {
888889
/// number generation.
889890
///
890891
/// Implements the `DbContext` trait for accessing views into a database.
891-
/// Currently, being this generic is only meaningful in clients,
892-
/// as `ReducerContext` is the only implementor of `DbContext` within modules.
893892
#[non_exhaustive]
894893
pub struct ReducerContext {
895894
/// The `Identity` of the client that invoked the reducer.
@@ -904,6 +903,8 @@ pub struct ReducerContext {
904903
/// including `init` and scheduled reducers.
905904
pub connection_id: Option<ConnectionId>,
906905

906+
sender_auth: AuthCtx,
907+
907908
/// Allows accessing the local database attached to a module.
908909
///
909910
/// This slightly strange type appears to have no methods, but that is misleading.
@@ -942,8 +943,6 @@ pub struct ReducerContext {
942943
/// See the [`#[table]`](macro@crate::table) macro for more information.
943944
pub db: Local,
944945

945-
sender_auth: AuthCtx,
946-
947946
#[cfg(feature = "rand08")]
948947
rng: std::cell::OnceCell<StdbRng>,
949948
}
@@ -964,16 +963,12 @@ impl ReducerContext {
964963

965964
#[doc(hidden)]
966965
fn new(db: Local, sender: Identity, connection_id: Option<ConnectionId>, timestamp: Timestamp) -> Self {
967-
let sender_auth = match connection_id {
968-
Some(cid) => AuthCtx::from_connection_id(cid),
969-
None => AuthCtx::internal(),
970-
};
971966
Self {
972967
db,
973968
sender,
974969
timestamp,
975970
connection_id,
976-
sender_auth,
971+
sender_auth: AuthCtx::from_connection_id_opt(connection_id),
977972
#[cfg(feature = "rand08")]
978973
rng: std::cell::OnceCell::new(),
979974
}
@@ -1011,6 +1006,57 @@ impl ReducerContext {
10111006
}
10121007
}
10131008

1009+
/// The context that an anonymous transaction
1010+
/// in [`ProcedureContext::with_transaction`] is provided with.
1011+
///
1012+
/// Includes information about the client starting the transaction
1013+
/// and the time of the procedure/reducer,
1014+
/// as well as a view into the module's database.
1015+
///
1016+
/// If the crate was compiled with the `rand` feature, also includes faculties for random
1017+
/// number generation.
1018+
///
1019+
/// Implements the `DbContext` trait for accessing views into a database.
1020+
pub struct TxContext(ReducerContext);
1021+
1022+
impl AsRef<ReducerContext> for TxContext {
1023+
fn as_ref(&self) -> &ReducerContext {
1024+
&self.0
1025+
}
1026+
}
1027+
1028+
impl Deref for TxContext {
1029+
type Target = ReducerContext;
1030+
1031+
fn deref(&self) -> &Self::Target {
1032+
&self.0
1033+
}
1034+
}
1035+
1036+
/// Values which knows whether they signify an ok state as opposed to error.
1037+
pub trait IsOk {
1038+
/// Returns whether the current state of `self` is "ok".
1039+
fn is_ok(&self) -> bool;
1040+
}
1041+
1042+
impl IsOk for () {
1043+
fn is_ok(&self) -> bool {
1044+
true
1045+
}
1046+
}
1047+
1048+
impl<T> IsOk for Option<T> {
1049+
fn is_ok(&self) -> bool {
1050+
self.is_some()
1051+
}
1052+
}
1053+
1054+
impl<T, E> IsOk for Result<T, E> {
1055+
fn is_ok(&self) -> bool {
1056+
self.is_ok()
1057+
}
1058+
}
1059+
10141060
/// The context that any procedure is provided with.
10151061
///
10161062
/// Each procedure must accept `&mut ProcedureContext` as its first argument.
@@ -1074,6 +1120,65 @@ impl ProcedureContext {
10741120
let new_time = Timestamp::from_micros_since_unix_epoch(new_time);
10751121
self.timestamp = new_time;
10761122
}
1123+
1124+
/// Acquire a mutable transaction
1125+
/// and execute `body` with read-write access to the database.
1126+
///
1127+
/// When `body().is_ok()`,
1128+
/// the transaction will be committed and its mutations persisted.
1129+
/// When `!body().is_ok()`,
1130+
/// the transaction will be rolled back and its mutations discarded.
1131+
///
1132+
/// Regardless of the transaction's success or failure,
1133+
/// the return value of `body` is not persisted to the commitlog
1134+
/// or broadcast to subscribed clients.
1135+
/// Clients attribute mutations performed by this transaction to `Event::UnknownTransaction`.
1136+
///
1137+
/// If the transaction fails to commit after `body` returns,
1138+
/// e.g., due to a conflict with a concurrent transaction,
1139+
/// this method will re-invoke `body` with a new transaction in order to retry.
1140+
/// This is done once. On the second failure, a panic will occur.
1141+
pub fn with_transaction<R: IsOk>(&mut self, body: impl Fn(&TxContext) -> R) -> R {
1142+
let run = || {
1143+
// Start the transaction.
1144+
let timestamp = sys::procedure::procedure_start_mut_transaction().expect(
1145+
"holding `&mut ProcedureContext`, so should not be in a tx already; called manually elsewhere?",
1146+
);
1147+
let timestamp = Timestamp::from_micros_since_unix_epoch(timestamp);
1148+
1149+
// We've resumed, so do the work.
1150+
let tx = ReducerContext::new(Local {}, self.sender, self.connection_id, timestamp);
1151+
let tx = TxContext(tx);
1152+
body(&tx)
1153+
};
1154+
1155+
let mut res = run();
1156+
let abort = || {
1157+
sys::procedure::procedure_abort_mut_transaction()
1158+
.expect("should have a pending mutable anon tx as `procedure_start_mut_transaction` preceded")
1159+
};
1160+
1161+
// Commit or roll back?
1162+
if res.is_ok() {
1163+
if sys::procedure::procedure_commit_mut_transaction().is_err() {
1164+
log::warn!("committing anonymous transaction failed");
1165+
1166+
// NOTE(procedure,centril): there's no actual guarantee that `body`
1167+
// does the exact same as the time before, as the timestamps differ
1168+
// and due to interior mutability.
1169+
res = run();
1170+
if res.is_ok() {
1171+
sys::procedure::procedure_commit_mut_transaction().expect("transaction retry failed again")
1172+
} else {
1173+
abort();
1174+
}
1175+
}
1176+
} else {
1177+
abort();
1178+
}
1179+
1180+
res
1181+
}
10771182
}
10781183

10791184
/// A handle on a database with a particular table schema.
@@ -1102,9 +1207,16 @@ impl DbContext for ReducerContext {
11021207
}
11031208
}
11041209

1105-
// `ProcedureContext` is *not* a `DbContext`. We will add a `TxContext`
1106-
// which can be obtained from `ProcedureContext::start_tx`,
1107-
// and that will be a `DbContext`.
1210+
impl DbContext for TxContext {
1211+
type DbView = Local;
1212+
1213+
fn db(&self) -> &Self::DbView {
1214+
&self.db
1215+
}
1216+
}
1217+
1218+
// `ProcedureContext` is *not* a `DbContext`
1219+
// but a `TxContext` derived from it is.
11081220

11091221
/// Allows accessing the local database attached to the module.
11101222
///
@@ -1125,18 +1237,25 @@ pub struct JwtClaims {
11251237
}
11261238

11271239
/// Authentication information for the caller of a reducer.
1240+
#[derive(Clone)]
11281241
pub struct AuthCtx {
11291242
is_internal: bool,
11301243
// NOTE(jsdt): cannot directly use a `LazyCell` without making this struct generic,
11311244
// which would cause `ReducerContext` to become generic as well.
1132-
jwt: Box<dyn Deref<Target = Option<JwtClaims>>>,
1245+
jwt: Rc<dyn Deref<Target = Option<JwtClaims>>>,
11331246
}
11341247

11351248
impl AuthCtx {
1249+
/// Creates an [`AuthCtx`] both for cases where there's a [`ConnectionId`]
1250+
/// and for when there isn't.
1251+
fn from_connection_id_opt(conn_id: Option<ConnectionId>) -> Self {
1252+
conn_id.map(Self::from_connection_id).unwrap_or_else(Self::internal)
1253+
}
1254+
11361255
fn new(is_internal: bool, jwt_fn: impl FnOnce() -> Option<JwtClaims> + 'static) -> Self {
11371256
AuthCtx {
11381257
is_internal,
1139-
jwt: Box::new(LazyCell::new(jwt_fn)),
1258+
jwt: Rc::new(LazyCell::new(jwt_fn)),
11401259
}
11411260
}
11421261

crates/bindings/src/rng.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
1-
use std::cell::UnsafeCell;
2-
use std::marker::PhantomData;
3-
4-
use crate::rand;
5-
1+
use crate::{rand, ReducerContext};
2+
use core::cell::UnsafeCell;
3+
use core::marker::PhantomData;
64
use rand::distributions::{Distribution, Standard};
75
use rand::rngs::StdRng;
86
use rand::{RngCore, SeedableRng};
9-
10-
use crate::ReducerContext;
7+
use spacetimedb_lib::Timestamp;
118

129
impl ReducerContext {
1310
/// Generates a random value.
@@ -48,10 +45,7 @@ impl ReducerContext {
4845
///
4946
/// For more information, see [`StdbRng`] and [`rand::Rng`].
5047
pub fn rng(&self) -> &StdbRng {
51-
self.rng.get_or_init(|| StdbRng {
52-
rng: StdRng::seed_from_u64(self.timestamp.to_micros_since_unix_epoch() as u64).into(),
53-
_marker: PhantomData,
54-
})
48+
self.rng.get_or_init(|| StdbRng::seed_from_ts(self.timestamp))
5549
}
5650
}
5751

@@ -82,6 +76,16 @@ pub struct StdbRng {
8276
_marker: PhantomData<*mut ()>,
8377
}
8478

79+
impl StdbRng {
80+
/// Seeds a [`StdbRng`] from a timestamp.
81+
fn seed_from_ts(timestamp: Timestamp) -> Self {
82+
Self {
83+
rng: StdRng::seed_from_u64(timestamp.to_micros_since_unix_epoch() as u64).into(),
84+
_marker: PhantomData,
85+
}
86+
}
87+
}
88+
8589
impl RngCore for StdbRng {
8690
fn next_u32(&mut self) -> u32 {
8791
(&*self).next_u32()

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,7 +1454,8 @@ impl WasmInstanceEnv {
14541454
///
14551455
/// # Traps
14561456
///
1457-
/// This function does not trap.
1457+
/// Traps if:
1458+
/// - `out` is NULL or `out[..size_of::<i64>()]` is not in bounds of WASM memory.
14581459
///
14591460
/// # Errors
14601461
///
@@ -1463,21 +1464,27 @@ impl WasmInstanceEnv {
14631464
/// - `WOULD_BLOCK_TRANSACTION`, if there's already an ongoing transaction.
14641465
pub fn procedure_start_mut_transaction<'caller>(
14651466
caller: Caller<'caller, Self>,
1466-
(): (),
1467+
(out,): (WasmPtr<u64>,),
14671468
) -> Fut<'caller, RtResult<u32>> {
1468-
Self::async_with_span(caller, AbiCall::ProcedureStartMutTransaction, |mut caller| async {
1469-
let (_, env) = Self::mem_env(&mut caller);
1470-
let res = env.instance_env.start_mutable_tx().await;
1471-
1472-
let result = res
1473-
.map(|()| {
1474-
env.in_anon_tx = true;
1475-
0u16.into()
1476-
})
1477-
.or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureStartMutTransaction, err.into()));
1478-
1479-
(caller, result)
1480-
})
1469+
Self::async_with_span(
1470+
caller,
1471+
AbiCall::ProcedureStartMutTransaction,
1472+
move |mut caller| async move {
1473+
let (mem, env) = Self::mem_env(&mut caller);
1474+
let res = env.instance_env.start_mutable_tx().await.map_err(Into::into);
1475+
let timestamp = Timestamp::now().to_micros_since_unix_epoch() as u64;
1476+
let res = res.and_then(|()| Ok(timestamp.write_to(mem, out)?));
1477+
1478+
let result = res
1479+
.map(|()| {
1480+
env.in_anon_tx = true;
1481+
0u16.into()
1482+
})
1483+
.or_else(|err| Self::convert_wasm_result(AbiCall::ProcedureStartMutTransaction, err));
1484+
1485+
(caller, result)
1486+
},
1487+
)
14811488
}
14821489

14831490
/// Commits a mutable transaction,

0 commit comments

Comments
 (0)