Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/client-api-messages/src/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ impl fmt::Debug for EnergyBalance {
pub struct ReducerBudget(u64);

impl ReducerBudget {
pub const DEFAULT_BUDGET: Self = ReducerBudget(1_000_000_000_000_000_000);
// 1 second of wasm runtime is roughly 2 TeV, so this is
// roughly 1 minute of wasm runtime
pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000);

pub const ZERO: Self = ReducerBudget(0);
pub const MAX: Self = ReducerBudget(u64::MAX);
Expand Down
16 changes: 16 additions & 0 deletions crates/core/src/host/wasmtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::borrow::Cow;
use std::time::Duration;

use anyhow::Context;
use spacetimedb_paths::server::{ServerDataDir, WasmtimeCacheDir};
Expand All @@ -23,12 +24,17 @@ pub struct WasmtimeRuntime {
linker: Box<Linker<WasmInstanceEnv>>,
}

const EPOCH_TICK_LENGTH: Duration = Duration::from_millis(10);

const EPOCH_TICKS_PER_SECOND: u64 = Duration::from_secs(1).div_duration_f64(EPOCH_TICK_LENGTH) as u64;

impl WasmtimeRuntime {
pub fn new(data_dir: Option<&ServerDataDir>) -> Self {
let mut config = wasmtime::Config::new();
config
.cranelift_opt_level(wasmtime::OptLevel::Speed)
.consume_fuel(true)
.epoch_interruption(true)
.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);

// Offer a compile-time flag for enabling perfmap generation,
Expand All @@ -45,6 +51,16 @@ impl WasmtimeRuntime {

let engine = Engine::new(&config).unwrap();

let weak_engine = engine.weak();
tokio::spawn(async move {
let mut interval = tokio::time::interval(EPOCH_TICK_LENGTH);
loop {
interval.tick().await;
let Some(engine) = weak_engine.upgrade() else { break };
engine.increment_epoch();
}
});

let mut linker = Box::new(Linker::new(&engine));
WasmtimeModule::link_imports(&mut linker).unwrap();

Expand Down
12 changes: 11 additions & 1 deletion crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl WasmInstanceEnv {
timing_spans: Default::default(),
reducer_start,
call_times: CallTimes::new(),
reducer_name: String::from(""),
reducer_name: String::from("<initializing>"),
chunk_pool: <_>::default(),
}
}
Expand Down Expand Up @@ -159,6 +159,16 @@ impl WasmInstanceEnv {
(args, errors)
}

/// Returns the name of the most recent reducer to be run in this environment.
pub fn reducer_name(&self) -> &str {
&self.reducer_name
}

/// Returns the name of the most recent reducer to be run in this environment.
pub fn reducer_start(&self) -> Instant {
self.reducer_start
}

/// Signal to this `WasmInstanceEnv` that a reducer call is over.
/// This resets all of the state associated to a single reducer call,
/// and returns instrumentation records.
Expand Down
13 changes: 12 additions & 1 deletion crates/core/src/host/wasmtime/wasmtime_module.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use self::module_host_actor::ReducerOp;

use super::wasm_instance_env::WasmInstanceEnv;
use super::{Mem, WasmtimeFuel};
use super::{Mem, WasmtimeFuel, EPOCH_TICKS_PER_SECOND};
use crate::energy::ReducerBudget;
use crate::host::instance_env::InstanceEnv;
use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError};
Expand Down Expand Up @@ -99,8 +99,18 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule {
let mem = Mem::extract(&instance, &mut store).unwrap();
store.data_mut().instantiate(mem);

store.epoch_deadline_callback(|store| {
let env = store.data();
let database = env.instance_env().replica_ctx.database_identity;
let reducer = env.reducer_name();
let dur = env.reducer_start().elapsed();
tracing::warn!(reducer, ?database, "Wasm has been running for {dur:?}");
Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND))
});

// Note: this budget is just for initializers
set_store_fuel(&mut store, ReducerBudget::DEFAULT_BUDGET.into());
store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND);

for preinit in &func_names.preinits {
let func = instance.get_typed_func::<(), ()>(&mut store, preinit).unwrap();
Expand Down Expand Up @@ -193,6 +203,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
// otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate.
set_store_fuel(store, budget.into());
let original_fuel = get_store_fuel(store);
store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND);

// Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays.
let [sender_0, sender_1, sender_2, sender_3] = bytemuck::must_cast(op.caller_identity.to_byte_array());
Expand Down
Loading