Skip to content

Commit b07f22e

Browse files
authored
Log every second a reducer takes to run (#2738)
1 parent 1c65d7b commit b07f22e

File tree

4 files changed

+42
-3
lines changed

4 files changed

+42
-3
lines changed

crates/client-api-messages/src/energy.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ impl fmt::Debug for EnergyBalance {
130130
pub struct ReducerBudget(u64);
131131

132132
impl ReducerBudget {
133-
pub const DEFAULT_BUDGET: Self = ReducerBudget(1_000_000_000_000_000_000);
133+
// 1 second of wasm runtime is roughly 2 TeV, so this is
134+
// roughly 1 minute of wasm runtime
135+
pub const DEFAULT_BUDGET: Self = ReducerBudget(120_000_000_000_000);
134136

135137
pub const ZERO: Self = ReducerBudget(0);
136138
pub const MAX: Self = ReducerBudget(u64::MAX);

crates/core/src/host/wasmtime/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::borrow::Cow;
2+
use std::time::Duration;
23

34
use anyhow::Context;
45
use spacetimedb_paths::server::{ServerDataDir, WasmtimeCacheDir};
@@ -23,12 +24,17 @@ pub struct WasmtimeRuntime {
2324
linker: Box<Linker<WasmInstanceEnv>>,
2425
}
2526

27+
const EPOCH_TICK_LENGTH: Duration = Duration::from_millis(10);
28+
29+
const EPOCH_TICKS_PER_SECOND: u64 = Duration::from_secs(1).div_duration_f64(EPOCH_TICK_LENGTH) as u64;
30+
2631
impl WasmtimeRuntime {
2732
pub fn new(data_dir: Option<&ServerDataDir>) -> Self {
2833
let mut config = wasmtime::Config::new();
2934
config
3035
.cranelift_opt_level(wasmtime::OptLevel::Speed)
3136
.consume_fuel(true)
37+
.epoch_interruption(true)
3238
.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
3339

3440
// Offer a compile-time flag for enabling perfmap generation,
@@ -45,6 +51,16 @@ impl WasmtimeRuntime {
4551

4652
let engine = Engine::new(&config).unwrap();
4753

54+
let weak_engine = engine.weak();
55+
tokio::spawn(async move {
56+
let mut interval = tokio::time::interval(EPOCH_TICK_LENGTH);
57+
loop {
58+
interval.tick().await;
59+
let Some(engine) = weak_engine.upgrade() else { break };
60+
engine.increment_epoch();
61+
}
62+
});
63+
4864
let mut linker = Box::new(Linker::new(&engine));
4965
WasmtimeModule::link_imports(&mut linker).unwrap();
5066

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl WasmInstanceEnv {
9797
timing_spans: Default::default(),
9898
reducer_start,
9999
call_times: CallTimes::new(),
100-
reducer_name: String::from(""),
100+
reducer_name: String::from("<initializing>"),
101101
chunk_pool: <_>::default(),
102102
}
103103
}
@@ -159,6 +159,16 @@ impl WasmInstanceEnv {
159159
(args, errors)
160160
}
161161

162+
/// Returns the name of the most recent reducer to be run in this environment.
163+
pub fn reducer_name(&self) -> &str {
164+
&self.reducer_name
165+
}
166+
167+
/// Returns the name of the most recent reducer to be run in this environment.
168+
pub fn reducer_start(&self) -> Instant {
169+
self.reducer_start
170+
}
171+
162172
/// Signal to this `WasmInstanceEnv` that a reducer call is over.
163173
/// This resets all of the state associated to a single reducer call,
164174
/// and returns instrumentation records.

crates/core/src/host/wasmtime/wasmtime_module.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use self::module_host_actor::ReducerOp;
22

33
use super::wasm_instance_env::WasmInstanceEnv;
4-
use super::{Mem, WasmtimeFuel};
4+
use super::{Mem, WasmtimeFuel, EPOCH_TICKS_PER_SECOND};
55
use crate::energy::ReducerBudget;
66
use crate::host::instance_env::InstanceEnv;
77
use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError};
@@ -99,8 +99,18 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule {
9999
let mem = Mem::extract(&instance, &mut store).unwrap();
100100
store.data_mut().instantiate(mem);
101101

102+
store.epoch_deadline_callback(|store| {
103+
let env = store.data();
104+
let database = env.instance_env().replica_ctx.database_identity;
105+
let reducer = env.reducer_name();
106+
let dur = env.reducer_start().elapsed();
107+
tracing::warn!(reducer, ?database, "Wasm has been running for {dur:?}");
108+
Ok(wasmtime::UpdateDeadline::Continue(EPOCH_TICKS_PER_SECOND))
109+
});
110+
102111
// Note: this budget is just for initializers
103112
set_store_fuel(&mut store, ReducerBudget::DEFAULT_BUDGET.into());
113+
store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND);
104114

105115
for preinit in &func_names.preinits {
106116
let func = instance.get_typed_func::<(), ()>(&mut store, preinit).unwrap();
@@ -193,6 +203,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
193203
// otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate.
194204
set_store_fuel(store, budget.into());
195205
let original_fuel = get_store_fuel(store);
206+
store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND);
196207

197208
// Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays.
198209
let [sender_0, sender_1, sender_2, sender_3] = bytemuck::must_cast(op.caller_identity.to_byte_array());

0 commit comments

Comments
 (0)