Skip to content

Commit f719fe2

Browse files
authored
Support in-memory I/O APIs (#493)
* Support providers with in-memory I/O buffers * Add ValidatedModule abstraction * Reset fuel consumed after finalization * Reset fuel in initialize * Use iterator methods
1 parent bc1a102 commit f719fe2

19 files changed

Lines changed: 581 additions & 77 deletions

Cargo.lock

Lines changed: 48 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ members = [
88
"tests/fixtures/messagepack-valid",
99
"tests/fixtures/messagepack-invalid",
1010
"tests/fixtures/wasm_api_v1",
11+
"tests/fixtures/wasm_api_v2",
1112
]
1213

1314
[package]

providers/shopify_function_v2.wasm

73.4 KB
Binary file not shown.
1.11 MB
Binary file not shown.

src/engine.rs

Lines changed: 27 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use anyhow::{anyhow, Result};
2-
use rust_embed::RustEmbed;
2+
use std::path::PathBuf;
33
use std::string::String;
4-
use std::{collections::HashSet, path::PathBuf};
54
use wasmtime::{AsContextMut, Config, Engine, Linker, Module, ResourceLimiter, Store};
6-
use wasmtime_wasi::pipe::{MemoryInputPipe, MemoryOutputPipe};
75
use wasmtime_wasi::preview1::WasiP1Ctx;
8-
use wasmtime_wasi::{I32Exit, WasiCtxBuilder};
6+
use wasmtime_wasi::I32Exit;
97

108
use crate::function_run_result::FunctionRunResult;
9+
use crate::io::{IOHandler, OutputAndLogs};
10+
use crate::validated_module::ValidatedModule;
1111
use crate::{BytesContainer, BytesContainerType};
1212

1313
#[derive(Clone)]
@@ -16,44 +16,15 @@ pub struct ProfileOpts {
1616
pub out: PathBuf,
1717
}
1818

19-
#[derive(RustEmbed)]
20-
#[folder = "providers/"]
21-
struct StandardProviders;
22-
2319
pub fn uses_msgpack_provider(module: &Module) -> bool {
2420
module.imports().map(|i| i.module()).any(|module| {
25-
module.starts_with("shopify_function_v") || module == "shopify_functions_javy_v2"
21+
module.starts_with("shopify_function_v")
22+
|| module
23+
.strip_prefix("shopify_functions_javy_v")
24+
.is_some_and(|v| v.parse::<usize>().is_ok_and(|v| v >= 2))
2625
})
2726
}
2827

29-
fn import_modules<T>(
30-
module: &Module,
31-
engine: &Engine,
32-
linker: &mut Linker<T>,
33-
mut store: &mut Store<T>,
34-
) {
35-
let imported_modules: HashSet<String> =
36-
module.imports().map(|i| i.module().to_string()).collect();
37-
38-
imported_modules.iter().for_each(|module_name| {
39-
let provider_path = format!("{module_name}.wasm");
40-
let imported_module_bytes = StandardProviders::get(&provider_path);
41-
42-
if let Some(bytes) = imported_module_bytes {
43-
let imported_module = Module::from_binary(engine, &bytes.data)
44-
.unwrap_or_else(|_| panic!("Failed to load module {module_name}"));
45-
46-
let imported_module_instance = linker
47-
.instantiate(&mut store, &imported_module)
48-
.expect("Failed to instantiate imported instance");
49-
50-
linker
51-
.instance(&mut store, module_name, imported_module_instance)
52-
.expect("Failed to import module");
53-
}
54-
});
55-
}
56-
5728
pub struct FunctionRunParams<'a> {
5829
pub function_path: PathBuf,
5930
pub input: BytesContainer,
@@ -68,12 +39,12 @@ const STARTING_FUEL: u64 = u64::MAX;
6839
const MAXIMUM_MEMORIES: usize = 2; // 1 for the module, 1 for Javy's provider
6940

7041
struct FunctionContext {
71-
wasi: WasiP1Ctx,
42+
wasi: Option<WasiP1Ctx>,
7243
limiter: MemoryLimiter,
7344
}
7445

7546
impl FunctionContext {
76-
fn new(wasi: WasiP1Ctx) -> Self {
47+
fn new(wasi: Option<WasiP1Ctx>) -> Self {
7748
Self {
7849
wasi,
7950
limiter: Default::default(),
@@ -128,33 +99,29 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
12899
module,
129100
} = params;
130101

131-
let input_stream = MemoryInputPipe::new(input.raw.clone());
132-
let output_stream = MemoryOutputPipe::new(usize::MAX);
133-
let error_stream = MemoryOutputPipe::new(usize::MAX);
102+
let mut io_handler = IOHandler::new(ValidatedModule::new(module)?, input.clone());
134103

135104
let mut error_logs: String = String::new();
136105

137106
let mut linker = Linker::new(&engine);
138-
wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| {
139-
&mut ctx.wasi
140-
})?;
141-
deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?;
142-
let mut wasi_builder = WasiCtxBuilder::new();
143-
wasi_builder.stdin(input_stream);
144-
wasi_builder.stdout(output_stream.clone());
145-
wasi_builder.stderr(error_stream.clone());
146-
deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder);
147-
let wasi = wasi_builder.build_p1();
107+
let wasi = io_handler.wasi();
108+
if wasi.is_some() {
109+
wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| {
110+
ctx.wasi.as_mut().expect("Should have WASI context")
111+
})?;
112+
deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?;
113+
}
114+
148115
let function_context = FunctionContext::new(wasi);
149116
let mut store = Store::new(&engine, function_context);
150117
store.limiter(|s| &mut s.limiter);
118+
119+
io_handler.initialize(&engine, &mut linker, &mut store)?;
120+
151121
store.set_fuel(STARTING_FUEL)?;
152122
store.set_epoch_deadline(1);
153123

154-
import_modules(&module, &engine, &mut linker, &mut store);
155-
156-
linker.module(&mut store, "Function", &module)?;
157-
let instance = linker.instantiate(&mut store, &module)?;
124+
let instance = linker.instantiate(&mut store, io_handler.module())?;
158125

159126
let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?;
160127

@@ -163,7 +130,6 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
163130
.frequency(profile_opts.interval)
164131
.weight_unit(wasmprof::WeightUnit::Fuel)
165132
.profile(|store| func.call(store.as_context_mut(), ()));
166-
167133
(
168134
result,
169135
Some(profile_data.into_collapsed_stacks().to_string()),
@@ -191,18 +157,14 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
191157
}
192158
}
193159

194-
drop(store);
195-
196-
let mut logs = error_stream
197-
.try_into_inner()
198-
.expect("Log stream reference still exists");
160+
let OutputAndLogs {
161+
output: raw_output,
162+
mut logs,
163+
} = io_handler.finalize(store)?;
199164

200165
logs.extend_from_slice(error_logs.as_bytes());
201166

202167
let output_codec = input.codec;
203-
let raw_output = output_stream
204-
.try_into_inner()
205-
.expect("Output stream reference still exists");
206168
let output = BytesContainer::new(
207169
BytesContainerType::Output,
208170
output_codec,

src/function_run_result.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use colored::Colorize;
33
use serde::{Deserialize, Serialize};
44
use std::fmt;
55

6-
const FUNCTION_LOG_LIMIT: usize = 1_000;
6+
pub(crate) const FUNCTION_LOG_LIMIT: usize = 1_000;
77

88
#[derive(Serialize, Deserialize, Clone, Debug)]
99
pub struct InvalidOutput {

0 commit comments

Comments
 (0)