diff --git a/Cargo.lock b/Cargo.lock index b7cb8d1..4de1b34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -533,6 +533,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chacha20" version = "0.10.0" @@ -829,6 +835,27 @@ dependencies = [ "hybrid-array", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "ctutils" version = "0.4.2" @@ -1479,8 +1506,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.1+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1757,6 +1786,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots 1.0.7", ] [[package]] @@ -2227,7 +2257,7 @@ dependencies = [ "referencing", "regex", "regex-syntax", - "reqwest", + "reqwest 0.13.3", "rustls", "serde", "serde_json", @@ -2339,6 +2369,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "ls-types" version = "0.0.6" @@ -3137,6 +3173,61 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.18", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.4", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.18", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.45" @@ -3371,6 +3462,44 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cadadef317c2f20755a64d7fdc48f9e7178ee6b0e1f7fce33fa60f1d68a276e6" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 1.0.7", +] + [[package]] name = "reqwest" version = "0.13.3" @@ -3459,6 +3588,7 @@ version = "0.9.0" dependencies = [ "assert_cmd", "async-nats", + "async-trait", "axum", "bytes", "chrono", @@ -3577,15 +3707,24 @@ version = "0.9.0" dependencies = [ "arc-swap", "async-nats", + "async-trait", "chrono", "criterion", + "csv", "evtx", + "futures", + "jaq-interpret", + "jaq-parse", + "notify", "opentelemetry-proto", "parking_lot", "prost", "rand 0.10.1", + "regex", + "reqwest 0.12.28", "rsigma-eval", "rsigma-parser", + "rusqlite", "serde_json", "syslog_loose", "tempfile", @@ -3596,6 +3735,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "yaml_serde", ] [[package]] @@ -3685,6 +3825,7 @@ version = "1.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" dependencies = [ + "web-time", "zeroize", ] diff --git a/crates/rsigma-cli/Cargo.toml b/crates/rsigma-cli/Cargo.toml index f15ca07..06bb4b9 100644 --- a/crates/rsigma-cli/Cargo.toml +++ b/crates/rsigma-cli/Cargo.toml @@ -11,7 +11,7 @@ homepage.workspace = true [features] default = ["daemon"] -daemon = ["rsigma-runtime", "tokio", "axum", "prometheus", "notify", "rusqlite"] +daemon = ["rsigma-runtime", "tokio", "axum", "async-trait", "prometheus", "notify", "rusqlite"] daemon-nats = ["daemon", "rsigma-runtime/nats", "async-nats", "tokio-stream", "time"] daemon-otlp = ["daemon", "rsigma-runtime/otlp", "prost", "tonic", "flate2", "tokio-stream"] logfmt = ["rsigma-runtime/logfmt"] @@ -43,6 +43,7 @@ chrono = { version = "0.4", default-features = false, features = ["std", "now"] # daemon mode dependencies tokio = { version = "1", features = ["full"], optional = true } axum = { version = "0.8", features = ["json"], optional = true } +async-trait = { version = "0.1", optional = true } prometheus = { version = "0.14", default-features = false, optional = true } notify = { version = "8.2", optional = true } rusqlite = { version = "0.39", features = ["bundled"], optional = true } diff --git a/crates/rsigma-cli/src/commands/mod.rs b/crates/rsigma-cli/src/commands/mod.rs index b267904..2f8c900 100644 --- a/crates/rsigma-cli/src/commands/mod.rs +++ b/crates/rsigma-cli/src/commands/mod.rs @@ -3,6 +3,7 @@ mod eval; mod fields; mod lint; mod parse; +mod resolve; mod validate; pub(crate) use convert::{cmd_convert, cmd_list_formats, cmd_list_targets}; @@ -10,4 +11,5 @@ pub(crate) use eval::cmd_eval; pub(crate) use fields::cmd_fields; pub(crate) use lint::cmd_lint; pub(crate) use parse::{cmd_condition, cmd_parse, cmd_stdin}; +pub(crate) use resolve::cmd_resolve; pub(crate) use validate::cmd_validate; diff --git a/crates/rsigma-cli/src/commands/resolve.rs b/crates/rsigma-cli/src/commands/resolve.rs new file mode 100644 index 0000000..4434dec --- /dev/null +++ b/crates/rsigma-cli/src/commands/resolve.rs @@ -0,0 +1,104 @@ +//! CLI `resolve` command: test dynamic source resolution offline. + +use std::path::PathBuf; +use std::sync::Arc; + +use rsigma_eval::parse_pipeline_file; +use rsigma_runtime::DefaultSourceResolver; +use rsigma_runtime::sources::SourceResolver; + +pub fn cmd_resolve(pipeline_paths: Vec, source_filter: Option, pretty: bool) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap_or_else(|e| { + eprintln!("Failed to start async runtime: {e}"); + std::process::exit(crate::exit_code::CONFIG_ERROR); + }); + + rt.block_on(async { resolve_async(pipeline_paths, source_filter, pretty).await }); +} + +async fn resolve_async(pipeline_paths: Vec, source_filter: Option, pretty: bool) { + let mut all_sources = Vec::new(); + + for path in &pipeline_paths { + let pipeline = match parse_pipeline_file(path) { + Ok(p) => p, + Err(e) => { + eprintln!("Error reading pipeline {}: {e}", path.display()); + std::process::exit(crate::exit_code::RULE_ERROR); + } + }; + + if !pipeline.is_dynamic() { + eprintln!( + "Pipeline '{}' has no dynamic sources, skipping.", + pipeline.name + ); + continue; + } + + for source in &pipeline.sources { + if let Some(ref filter) = source_filter + && source.id != *filter + { + continue; + } + all_sources.push((pipeline.name.clone(), source.clone())); + } + } + + if all_sources.is_empty() { + if source_filter.is_some() { + eprintln!("No sources matched the filter."); + } else { + eprintln!("No dynamic sources found in the provided pipelines."); + } + std::process::exit(crate::exit_code::RULE_ERROR); + } + + let resolver = Arc::new(DefaultSourceResolver::new()); + let mut results = Vec::new(); + let mut had_error = false; + + for (pipeline_name, source) in &all_sources { + let source_id = source.id.clone(); + match resolver.resolve(source).await { + Ok(value) => { + results.push(serde_json::json!({ + "pipeline": pipeline_name, + "source_id": source_id, + "status": "ok", + "data": value.data, + })); + } + Err(e) => { + had_error = true; + results.push(serde_json::json!({ + "pipeline": pipeline_name, + "source_id": source_id, + "status": "error", + "error": e.to_string(), + })); + } + } + } + + let output = if results.len() == 1 { + results.into_iter().next().unwrap() + } else { + serde_json::Value::Array(results) + }; + + let json_str = if pretty { + serde_json::to_string_pretty(&output).unwrap() + } else { + serde_json::to_string(&output).unwrap() + }; + println!("{json_str}"); + + if had_error { + std::process::exit(1); + } +} diff --git a/crates/rsigma-cli/src/daemon/instrumented_resolver.rs b/crates/rsigma-cli/src/daemon/instrumented_resolver.rs new file mode 100644 index 0000000..c11fcb3 --- /dev/null +++ b/crates/rsigma-cli/src/daemon/instrumented_resolver.rs @@ -0,0 +1,73 @@ +//! Wraps `DefaultSourceResolver` with Prometheus instrumentation. + +use std::sync::Arc; +use std::time::Instant; + +use rsigma_eval::pipeline::sources::{DynamicSource, SourceType}; +use rsigma_runtime::sources::{ResolvedValue, SourceError, SourceResolver}; + +use super::metrics::Metrics; + +/// A source resolver that delegates to [`rsigma_runtime::DefaultSourceResolver`] +/// and records Prometheus metrics for each resolution attempt. +pub struct InstrumentedResolver { + inner: rsigma_runtime::DefaultSourceResolver, + metrics: Arc, +} + +impl InstrumentedResolver { + pub fn new(metrics: Arc) -> Self { + Self { + inner: rsigma_runtime::DefaultSourceResolver::new(), + metrics, + } + } +} + +#[async_trait::async_trait] +impl SourceResolver for InstrumentedResolver { + async fn resolve(&self, source: &DynamicSource) -> Result { + let source_type_label = source_type_label(&source.source_type); + self.metrics + .source_resolves_total + .with_label_values(&[source.id.as_str(), source_type_label]) + .inc(); + + let start = Instant::now(); + let result = self.inner.resolve(source).await; + let elapsed = start.elapsed().as_secs_f64(); + + self.metrics.source_resolve_latency.observe(elapsed); + + match &result { + Ok(value) => { + if value.from_cache { + self.metrics.source_cache_hits.inc(); + } + } + Err(e) => { + let error_kind = match &e.kind { + rsigma_runtime::SourceErrorKind::Fetch(_) => "Fetch", + rsigma_runtime::SourceErrorKind::Parse(_) => "Parse", + rsigma_runtime::SourceErrorKind::Extract(_) => "Extract", + rsigma_runtime::SourceErrorKind::Timeout => "Timeout", + }; + self.metrics + .source_resolve_errors + .with_label_values(&[source.id.as_str(), error_kind]) + .inc(); + } + } + + result + } +} + +fn source_type_label(st: &SourceType) -> &'static str { + match st { + SourceType::File { .. } => "file", + SourceType::Command { .. } => "command", + SourceType::Http { .. } => "http", + SourceType::Nats { .. } => "nats", + } +} diff --git a/crates/rsigma-cli/src/daemon/metrics.rs b/crates/rsigma-cli/src/daemon/metrics.rs index 7249cf5..4511c62 100644 --- a/crates/rsigma-cli/src/daemon/metrics.rs +++ b/crates/rsigma-cli/src/daemon/metrics.rs @@ -26,6 +26,10 @@ pub struct Metrics { pub dlq_events: IntCounter, pub detection_matches_by_rule: IntCounterVec, pub correlation_matches_by_rule: IntCounterVec, + pub source_resolves_total: IntCounterVec, + pub source_resolve_errors: IntCounterVec, + pub source_resolve_latency: Histogram, + pub source_cache_hits: IntCounter, #[cfg(feature = "daemon-otlp")] pub otlp_requests: IntCounterVec, #[cfg(feature = "daemon-otlp")] @@ -202,6 +206,49 @@ impl Metrics { .register(Box::new(correlation_matches_by_rule.clone())) .unwrap(); + let source_resolves_total = IntCounterVec::new( + Opts::new( + "rsigma_source_resolves_total", + "Total dynamic source resolution attempts", + ), + &["source_id", "source_type"], + ) + .unwrap(); + let source_resolve_errors = IntCounterVec::new( + Opts::new( + "rsigma_source_resolve_errors_total", + "Failed dynamic source resolutions", + ), + &["source_id", "error_kind"], + ) + .unwrap(); + let source_resolve_latency = Histogram::with_opts( + HistogramOpts::new( + "rsigma_source_resolve_seconds", + "Dynamic source resolution latency", + ) + .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]), + ) + .unwrap(); + let source_cache_hits = IntCounter::with_opts(Opts::new( + "rsigma_source_cache_hits_total", + "Times cached source data was served on resolution failure", + )) + .unwrap(); + + registry + .register(Box::new(source_resolves_total.clone())) + .unwrap(); + registry + .register(Box::new(source_resolve_errors.clone())) + .unwrap(); + registry + .register(Box::new(source_resolve_latency.clone())) + .unwrap(); + registry + .register(Box::new(source_cache_hits.clone())) + .unwrap(); + #[cfg(feature = "daemon-otlp")] let otlp_requests = IntCounterVec::new( Opts::new( @@ -254,6 +301,10 @@ impl Metrics { dlq_events, detection_matches_by_rule, correlation_matches_by_rule, + source_resolves_total, + source_resolve_errors, + source_resolve_latency, + source_cache_hits, #[cfg(feature = "daemon-otlp")] otlp_requests, #[cfg(feature = "daemon-otlp")] diff --git a/crates/rsigma-cli/src/daemon/mod.rs b/crates/rsigma-cli/src/daemon/mod.rs index bb3a88f..d29d857 100644 --- a/crates/rsigma-cli/src/daemon/mod.rs +++ b/crates/rsigma-cli/src/daemon/mod.rs @@ -1,4 +1,5 @@ mod health; +mod instrumented_resolver; mod metrics; mod reload; pub(crate) mod server; diff --git a/crates/rsigma-cli/src/daemon/server.rs b/crates/rsigma-cli/src/daemon/server.rs index 0b7086c..a4d7dd5 100644 --- a/crates/rsigma-cli/src/daemon/server.rs +++ b/crates/rsigma-cli/src/daemon/server.rs @@ -53,6 +53,8 @@ struct AppState { start_time: Instant, /// Channel for HTTP event ingestion. Set when --input is http. event_tx: Option>, + /// Channel for on-demand source resolution triggers. + sources_trigger_tx: Option>, /// Channel for OTLP event ingestion. Always set when daemon-otlp is compiled in. #[cfg(feature = "daemon-otlp")] otlp_event_tx: mpsc::Sender, @@ -108,17 +110,36 @@ pub async fn run_daemon(config: DaemonConfig) { ); engine.set_pipeline_paths(config.pipeline_paths.clone()); - for pipeline in &config.pipelines { - if pipeline.is_dynamic() { - for source in &pipeline.sources { - tracing::warn!( - pipeline = %pipeline.name, - source_id = %source.id, - refresh = ?source.refresh, - required = source.required, - "Dynamic source detected -- source resolution not yet supported" - ); - } + // Set up dynamic source resolver if any pipeline has dynamic sources + let has_dynamic = config.pipelines.iter().any(|p| p.is_dynamic()); + let mut sources_trigger_tx_val: Option< + mpsc::Sender, + > = None; + + if has_dynamic { + let resolver: Arc = Arc::new( + super::instrumented_resolver::InstrumentedResolver::new(metrics.clone()), + ); + engine.set_source_resolver(resolver.clone()); + + // Resolve dynamic sources at startup (blocks on required sources) + if let Err(e) = engine.resolve_dynamic_pipelines().await { + tracing::error!(error = %e, "Failed to resolve required dynamic sources at startup"); + std::process::exit(crate::exit_code::CONFIG_ERROR); + } + + // Collect all dynamic sources for the refresh scheduler + let all_sources: Vec<_> = config + .pipelines + .iter() + .filter(|p| p.is_dynamic()) + .flat_map(|p| p.sources.iter().cloned()) + .collect(); + + if !all_sources.is_empty() { + let scheduler = rsigma_runtime::sources::refresh::RefreshScheduler::new(); + sources_trigger_tx_val = Some(scheduler.trigger_sender()); + scheduler.run(all_sources, resolver); } } @@ -232,6 +253,7 @@ pub async fn run_daemon(config: DaemonConfig) { reload_tx: reload_tx.clone(), start_time, event_tx: http_event_tx, + sources_trigger_tx: sources_trigger_tx_val, #[cfg(feature = "daemon-otlp")] otlp_event_tx, }; @@ -243,7 +265,13 @@ pub async fn run_daemon(config: DaemonConfig) { .route("/api/v1/rules", get(list_rules)) .route("/api/v1/status", get(status)) .route("/api/v1/reload", post(trigger_reload)) - .route("/api/v1/events", post(ingest_events)); + .route("/api/v1/events", post(ingest_events)) + .route("/api/v1/sources", get(list_sources)) + .route("/api/v1/sources/resolve", post(resolve_sources)) + .route( + "/api/v1/sources/resolve/{source_id}", + post(resolve_source_by_id), + ); #[cfg(feature = "daemon-otlp")] let app = app.route("/v1/logs", post(otlp_http_logs)); @@ -999,6 +1027,74 @@ async fn trigger_reload(State(state): State) -> impl IntoResponse { } } +async fn list_sources(State(state): State) -> impl IntoResponse { + let snapshot = state.processor.engine_snapshot(); + let guard = snapshot.lock(); + let pipelines = guard.pipelines(); + + let mut sources_info = Vec::new(); + for pipeline in pipelines { + for source in &pipeline.sources { + sources_info.push(serde_json::json!({ + "source_id": source.id, + "pipeline": pipeline.name, + "type": format!("{:?}", source.source_type).split('{').next().unwrap_or("Unknown").trim(), + "refresh": format!("{:?}", source.refresh), + "required": source.required, + })); + } + } + + Json(serde_json::json!({ "sources": sources_info })) +} + +async fn resolve_sources(State(state): State) -> impl IntoResponse { + use rsigma_runtime::sources::refresh::RefreshTrigger; + + let Some(tx) = &state.sources_trigger_tx else { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ "error": "no dynamic sources configured" })), + ); + }; + + match tx.try_send(RefreshTrigger::All) { + Ok(()) => ( + StatusCode::OK, + Json(serde_json::json!({ "status": "resolve_triggered" })), + ), + Err(_) => ( + StatusCode::TOO_MANY_REQUESTS, + Json(serde_json::json!({ "status": "resolve_already_pending" })), + ), + } +} + +async fn resolve_source_by_id( + State(state): State, + axum::extract::Path(source_id): axum::extract::Path, +) -> impl IntoResponse { + use rsigma_runtime::sources::refresh::RefreshTrigger; + + let Some(tx) = &state.sources_trigger_tx else { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ "error": "no dynamic sources configured" })), + ); + }; + + match tx.try_send(RefreshTrigger::Single(source_id.clone())) { + Ok(()) => ( + StatusCode::OK, + Json(serde_json::json!({ "status": "resolve_triggered", "source_id": source_id })), + ), + Err(_) => ( + StatusCode::TOO_MANY_REQUESTS, + Json(serde_json::json!({ "status": "resolve_already_pending" })), + ), + } +} + /// Accept events via HTTP POST for processing. /// Each non-empty line in the request body is parsed using the configured /// `--input-format` and forwarded to the engine. diff --git a/crates/rsigma-cli/src/main.rs b/crates/rsigma-cli/src/main.rs index 0f1a386..09594d6 100644 --- a/crates/rsigma-cli/src/main.rs +++ b/crates/rsigma-cli/src/main.rs @@ -451,6 +451,25 @@ enum Commands { target: String, }, + /// Resolve dynamic pipeline sources and display their data + /// + /// Test source resolution without running the daemon. Resolves all + /// dynamic sources declared in the given pipeline(s) and prints + /// the resulting data as JSON. + Resolve { + /// Processing pipeline(s) containing dynamic sources + #[arg(short = 'p', long = "pipeline", required = true)] + pipelines: Vec, + + /// Resolve only a specific source by ID + #[arg(short, long)] + source: Option, + + /// Pretty-print JSON output + #[arg(long)] + pretty: bool, + }, + /// List all fields referenced by Sigma rules /// /// Extracts field names from detection blocks, correlation group-by/condition @@ -707,6 +726,11 @@ fn main() { no_filters, json, } => commands::cmd_fields(rules, pipelines, no_filters, json), + Commands::Resolve { + pipelines, + source, + pretty, + } => commands::cmd_resolve(pipelines, source, pretty), } } diff --git a/crates/rsigma-eval/src/lib.rs b/crates/rsigma-eval/src/lib.rs index 4d32dde..76978db 100644 --- a/crates/rsigma-eval/src/lib.rs +++ b/crates/rsigma-eval/src/lib.rs @@ -117,10 +117,10 @@ pub use error::{EvalError, Result}; pub use event::{Event, EventValue, JsonEvent, KvEvent, MapEvent, PlainEvent}; pub use matcher::CompiledMatcher; pub use pipeline::{ - Pipeline, apply_pipelines, apply_pipelines_with_state, + Pipeline, TransformationItem, apply_pipelines, apply_pipelines_with_state, builtin::{ builtin_names as builtin_pipeline_names, resolve_builtin as resolve_builtin_pipeline, }, - merge_pipelines, parse_pipeline, parse_pipeline_file, + merge_pipelines, parse_pipeline, parse_pipeline_file, parse_transformation_items, }; pub use result::{FieldMatch, MatchResult}; diff --git a/crates/rsigma-eval/src/pipeline/mod.rs b/crates/rsigma-eval/src/pipeline/mod.rs index ead64c2..eb1defe 100644 --- a/crates/rsigma-eval/src/pipeline/mod.rs +++ b/crates/rsigma-eval/src/pipeline/mod.rs @@ -57,7 +57,7 @@ pub use conditions::{ eval_condition_expr, }; pub use finalizers::Finalizer; -pub use parsing::{parse_pipeline, parse_pipeline_file}; +pub use parsing::{parse_pipeline, parse_pipeline_file, parse_transformation_items}; pub use state::PipelineState; pub use transformations::Transformation; diff --git a/crates/rsigma-eval/src/pipeline/parsing.rs b/crates/rsigma-eval/src/pipeline/parsing.rs index 1a647d1..4d95d7b 100644 --- a/crates/rsigma-eval/src/pipeline/parsing.rs +++ b/crates/rsigma-eval/src/pipeline/parsing.rs @@ -112,7 +112,8 @@ fn parse_vars(value: Option<&serde_yaml::Value>) -> HashMap> vars } -fn parse_transformation_items(value: &serde_yaml::Value) -> Result> { +/// Parse a YAML value as a sequence of transformation items. +pub fn parse_transformation_items(value: &serde_yaml::Value) -> Result> { let items = value.as_sequence().ok_or_else(|| { EvalError::InvalidModifiers("transformations must be a sequence".to_string()) })?; diff --git a/crates/rsigma-runtime/Cargo.toml b/crates/rsigma-runtime/Cargo.toml index 9b51d4e..017077a 100644 --- a/crates/rsigma-runtime/Cargo.toml +++ b/crates/rsigma-runtime/Cargo.toml @@ -11,7 +11,7 @@ homepage.workspace = true [features] default = [] -nats = ["async-nats", "tokio-stream", "time"] +nats = ["async-nats", "tokio-stream", "time", "futures"] otlp = ["opentelemetry-proto", "prost"] logfmt = [] cef = [] @@ -20,18 +20,28 @@ evtx = ["dep:evtx"] [dependencies] rsigma-parser = { path = "../rsigma-parser", version = "0.9.0" } rsigma-eval = { path = "../rsigma-eval", version = "0.9.0", features = ["parallel"] } -tokio = { version = "1", features = ["rt-multi-thread", "sync", "macros", "io-util", "io-std"] } +tokio = { version = "1", features = ["rt-multi-thread", "sync", "macros", "io-util", "io-std", "process", "fs"] } serde_json = "1" +serde_yaml = { package = "yaml_serde", version = "0.10" } thiserror = "2" tracing = "0.1" arc-swap = "1" parking_lot = "0.12" syslog_loose = "0.23" chrono = { version = "0.4", default-features = false, features = ["std"] } +async-trait = "0.1" +regex = "1" +csv = "1" +jaq-interpret = "1.5.0" +jaq-parse = "1.0.3" +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] } +rusqlite = { version = "0.39", features = ["bundled"] } +notify = "8.2" # optional tokio-stream = { version = "0.1", optional = true } async-nats = { version = "0.47", optional = true } +futures = { version = "0.3", optional = true } time = { version = "0.3", optional = true } evtx = { version = "0.11", optional = true, default-features = false } opentelemetry-proto = { version = "0.31", default-features = false, features = ["gen-tonic", "logs", "with-serde"], optional = true } diff --git a/crates/rsigma-runtime/src/engine.rs b/crates/rsigma-runtime/src/engine.rs index 2ffb20e..ae1d938 100644 --- a/crates/rsigma-runtime/src/engine.rs +++ b/crates/rsigma-runtime/src/engine.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::Arc; use rsigma_eval::event::Event; use rsigma_eval::{ @@ -7,6 +8,8 @@ use rsigma_eval::{ }; use rsigma_parser::SigmaCollection; +use crate::sources::{self, SourceResolver, TemplateExpander}; + /// Wraps a CorrelationEngine (or a plain Engine) and provides the interface /// the runtime needs: process events, reload rules, and query state. pub struct RuntimeEngine { @@ -16,6 +19,8 @@ pub struct RuntimeEngine { rules_path: std::path::PathBuf, corr_config: CorrelationConfig, include_event: bool, + source_resolver: Option>, + allow_remote_include: bool, } enum EngineVariant { @@ -44,9 +49,34 @@ impl RuntimeEngine { rules_path, corr_config, include_event, + source_resolver: None, + allow_remote_include: false, } } + /// Set a source resolver for dynamic pipeline sources. + /// + /// When set, `load_rules()` resolves dynamic sources and expands + /// `${source.*}` templates before compiling rules. + pub fn set_source_resolver(&mut self, resolver: Arc) { + self.source_resolver = Some(resolver); + } + + /// Get the source resolver, if one is configured. + pub fn source_resolver(&self) -> Option<&Arc> { + self.source_resolver.as_ref() + } + + /// Allow `include` directives to reference HTTP/NATS sources. + pub fn set_allow_remote_include(&mut self, allow: bool) { + self.allow_remote_include = allow; + } + + /// Whether remote includes are allowed. + pub fn allow_remote_include(&self) -> bool { + self.allow_remote_include + } + /// Set the pipeline file paths used for hot-reload. /// /// When paths are set, `load_rules()` re-reads pipeline YAML from disk @@ -61,6 +91,45 @@ impl RuntimeEngine { &self.pipeline_paths } + /// Resolve dynamic sources in all pipelines and expand templates. + /// + /// This is the async entry point for source resolution. Call this before + /// `load_rules()` when you have an async context available, or let + /// `load_rules()` handle it synchronously via `tokio::runtime::Handle`. + pub async fn resolve_dynamic_pipelines(&mut self) -> Result<(), String> { + let Some(resolver) = &self.source_resolver else { + return Ok(()); + }; + + let mut resolved_pipelines = Vec::with_capacity(self.pipelines.len()); + for pipeline in &self.pipelines { + if pipeline.is_dynamic() { + match sources::resolve_all(resolver.as_ref(), &pipeline.sources).await { + Ok(resolved_data) => { + let mut expanded = TemplateExpander::expand(pipeline, &resolved_data); + // Expand include directives + sources::include::expand_includes( + &mut expanded, + &resolved_data, + self.allow_remote_include, + )?; + resolved_pipelines.push(expanded); + } + Err(e) => { + return Err(format!( + "Failed to resolve dynamic pipeline '{}': {e}", + pipeline.name + )); + } + } + } else { + resolved_pipelines.push(pipeline.clone()); + } + } + self.pipelines = resolved_pipelines; + Ok(()) + } + /// Load (or reload) rules from the configured path. /// /// On reload, correlation state is exported before replacing the engine @@ -71,11 +140,34 @@ impl RuntimeEngine { /// pipelines are re-read from disk before rebuilding the engine. If any /// pipeline file fails to parse, the entire reload is aborted and the /// old engine remains active. + /// + /// Dynamic pipeline sources are resolved if a source resolver is configured. pub fn load_rules(&mut self) -> Result { if !self.pipeline_paths.is_empty() { self.pipelines = reload_pipelines(&self.pipeline_paths)?; } + // Resolve dynamic sources if a resolver is set + if self.source_resolver.is_some() && self.pipelines.iter().any(|p| p.is_dynamic()) { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let pipelines = std::mem::take(&mut self.pipelines); + let resolver = self.source_resolver.clone().unwrap(); + let allow_remote = self.allow_remote_include; + let resolved = handle.block_on(async { + resolve_pipelines_async(&resolver, &pipelines, allow_remote).await + }); + match resolved { + Ok(p) => self.pipelines = p, + Err(e) => { + self.pipelines = pipelines; + tracing::warn!(error = %e, "Dynamic source resolution failed, using unresolved pipelines"); + } + } + } else { + tracing::warn!("No tokio runtime available for dynamic source resolution"); + } + } + let previous_state = self.export_state(); let collection = load_collection(&self.rules_path)?; let has_correlations = !collection.correlations.is_empty(); @@ -228,3 +320,30 @@ fn reload_pipelines(paths: &[PathBuf]) -> Result, String> { pipelines.sort_by_key(|p| p.priority); Ok(pipelines) } + +/// Resolve dynamic sources in pipelines asynchronously. +async fn resolve_pipelines_async( + resolver: &Arc, + pipelines: &[Pipeline], + allow_remote_include: bool, +) -> Result, String> { + let mut resolved_pipelines = Vec::with_capacity(pipelines.len()); + for pipeline in pipelines { + if pipeline.is_dynamic() { + let resolved_data = sources::resolve_all(resolver.as_ref(), &pipeline.sources) + .await + .map_err(|e| { + format!( + "Failed to resolve dynamic pipeline '{}': {e}", + pipeline.name + ) + })?; + let mut expanded = TemplateExpander::expand(pipeline, &resolved_data); + sources::include::expand_includes(&mut expanded, &resolved_data, allow_remote_include)?; + resolved_pipelines.push(expanded); + } else { + resolved_pipelines.push(pipeline.clone()); + } + } + Ok(resolved_pipelines) +} diff --git a/crates/rsigma-runtime/src/lib.rs b/crates/rsigma-runtime/src/lib.rs index 0257979..7938507 100644 --- a/crates/rsigma-runtime/src/lib.rs +++ b/crates/rsigma-runtime/src/lib.rs @@ -49,6 +49,7 @@ pub mod io; pub mod metrics; pub mod parse; pub mod processor; +pub mod sources; pub use engine::{EngineStats, RuntimeEngine}; pub use error::RuntimeError; @@ -60,6 +61,11 @@ pub use metrics::{MetricsHook, NoopMetrics}; pub use processor::{EventFilter, LogProcessor}; pub use rsigma_eval::ProcessResult; +pub use sources::refresh::{RefreshResult, RefreshScheduler, RefreshTrigger}; +pub use sources::{ + DefaultSourceResolver, ResolvedValue, SourceCache, SourceError, SourceErrorKind, + SourceResolver, TemplateExpander, +}; #[cfg(feature = "nats")] pub use io::{NatsConnectConfig, NatsSink, NatsSource, ReplayPolicy}; diff --git a/crates/rsigma-runtime/src/processor.rs b/crates/rsigma-runtime/src/processor.rs index d80822b..86341fc 100644 --- a/crates/rsigma-runtime/src/processor.rs +++ b/crates/rsigma-runtime/src/processor.rs @@ -256,7 +256,16 @@ impl LogProcessor { /// /// If pipeline or rule loading fails, the old engine remains active. pub fn reload_rules(&self) -> Result { - let (old_state, rules_path, pipelines, pipeline_paths, corr_config, include_event) = { + let ( + old_state, + rules_path, + pipelines, + pipeline_paths, + corr_config, + include_event, + resolver, + allow_remote_include, + ) = { let snapshot = self.engine.load(); let old = snapshot.lock(); ( @@ -266,11 +275,17 @@ impl LogProcessor { old.pipeline_paths().to_vec(), old.corr_config().clone(), old.include_event(), + old.source_resolver().cloned(), + old.allow_remote_include(), ) }; let mut new_engine = RuntimeEngine::new(rules_path, pipelines, corr_config, include_event); new_engine.set_pipeline_paths(pipeline_paths); + new_engine.set_allow_remote_include(allow_remote_include); + if let Some(resolver) = resolver { + new_engine.set_source_resolver(resolver); + } let stats = new_engine.load_rules()?; if let Some(state) = old_state diff --git a/crates/rsigma-runtime/src/sources/cache.rs b/crates/rsigma-runtime/src/sources/cache.rs new file mode 100644 index 0000000..25ed521 --- /dev/null +++ b/crates/rsigma-runtime/src/sources/cache.rs @@ -0,0 +1,140 @@ +//! Source resolution cache with in-memory and optional SQLite persistence. +//! +//! Stores last-known-good values so that `on_error: use_cached` can serve +//! stale data when a source fetch fails. + +use std::collections::HashMap; +use std::path::Path; +use std::sync::Mutex; + +/// Thread-safe cache for resolved source data. +/// +/// Provides an in-memory layer with optional SQLite-backed disk persistence. +pub struct SourceCache { + entries: Mutex>, + db: Option>, +} + +impl SourceCache { + /// Create a new in-memory-only cache. + pub fn new() -> Self { + Self { + entries: Mutex::new(HashMap::new()), + db: None, + } + } + + /// Create a cache backed by a SQLite database at the given path. + /// + /// The table is created if it does not exist. Existing cached values + /// are loaded into memory on construction. + pub fn with_sqlite(path: &Path) -> Result { + let conn = rusqlite::Connection::open(path) + .map_err(|e| format!("failed to open source cache DB: {e}"))?; + + conn.execute_batch( + "CREATE TABLE IF NOT EXISTS source_cache ( + source_id TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + )", + ) + .map_err(|e| format!("failed to create source_cache table: {e}"))?; + + // Load existing entries into memory + let entries = { + let mut map = HashMap::new(); + let mut stmt = conn + .prepare("SELECT source_id, value FROM source_cache") + .map_err(|e| format!("failed to prepare SELECT: {e}"))?; + let rows = stmt + .query_map([], |row| { + let id: String = row.get(0)?; + let val: String = row.get(1)?; + Ok((id, val)) + }) + .map_err(|e| format!("failed to query source_cache: {e}"))?; + + for (id, val_str) in rows.flatten() { + if let Ok(val) = serde_json::from_str(&val_str) { + map.insert(id, val); + } + } + map + }; + + Ok(Self { + entries: Mutex::new(entries), + db: Some(Mutex::new(conn)), + }) + } + + /// Store a resolved value in the cache (memory + disk if available). + pub fn store(&self, source_id: &str, value: &serde_json::Value) { + { + let mut entries = self.entries.lock().unwrap(); + entries.insert(source_id.to_string(), value.clone()); + } + + if let Some(db) = &self.db { + let conn = db.lock().unwrap(); + let val_str = serde_json::to_string(value).unwrap_or_default(); + let _ = conn.execute( + "INSERT OR REPLACE INTO source_cache (source_id, value, updated_at) VALUES (?1, ?2, datetime('now'))", + rusqlite::params![source_id, val_str], + ); + } + } + + /// Retrieve a cached value for a source. + pub fn get(&self, source_id: &str) -> Option { + let entries = self.entries.lock().unwrap(); + entries.get(source_id).cloned() + } + + /// Remove a cached entry (memory + disk). + pub fn invalidate(&self, source_id: &str) { + { + let mut entries = self.entries.lock().unwrap(); + entries.remove(source_id); + } + + if let Some(db) = &self.db { + let conn = db.lock().unwrap(); + let _ = conn.execute( + "DELETE FROM source_cache WHERE source_id = ?1", + rusqlite::params![source_id], + ); + } + } + + /// Clear all cached entries (memory + disk). + pub fn clear(&self) { + { + let mut entries = self.entries.lock().unwrap(); + entries.clear(); + } + + if let Some(db) = &self.db { + let conn = db.lock().unwrap(); + let _ = conn.execute("DELETE FROM source_cache", []); + } + } + + /// Returns the number of cached entries. + pub fn len(&self) -> usize { + let entries = self.entries.lock().unwrap(); + entries.len() + } + + /// Returns true if the cache is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl Default for SourceCache { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/rsigma-runtime/src/sources/command.rs b/crates/rsigma-runtime/src/sources/command.rs new file mode 100644 index 0000000..bff35c2 --- /dev/null +++ b/crates/rsigma-runtime/src/sources/command.rs @@ -0,0 +1,70 @@ +//! Command source resolver: runs a local command and captures stdout. + +use std::time::Instant; + +use rsigma_eval::pipeline::sources::DataFormat; + +use super::extract::apply_extract; +use super::file::parse_data; +use super::{ResolvedValue, SourceError, SourceErrorKind}; + +/// Resolve a command source by executing it and parsing stdout. +pub async fn resolve_command( + command: &[String], + format: DataFormat, + extract_expr: Option<&str>, +) -> Result { + if command.is_empty() { + return Err(SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch("command is empty".into()), + }); + } + + let output = tokio::process::Command::new(&command[0]) + .args(&command[1..]) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("failed to spawn '{}': {e}", command[0])), + })? + .wait_with_output() + .await + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("command execution failed: {e}")), + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!( + "command exited with {}: {}", + output.status, + stderr.trim() + )), + }); + } + + let stdout = String::from_utf8(output.stdout).map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Parse(format!("command output is not valid UTF-8: {e}")), + })?; + + let parsed = parse_data(&stdout, format)?; + + let data = if let Some(expr) = extract_expr { + apply_extract(&parsed, expr)? + } else { + parsed + }; + + Ok(ResolvedValue { + data, + resolved_at: Instant::now(), + from_cache: false, + }) +} diff --git a/crates/rsigma-runtime/src/sources/extract.rs b/crates/rsigma-runtime/src/sources/extract.rs new file mode 100644 index 0000000..95f4e08 --- /dev/null +++ b/crates/rsigma-runtime/src/sources/extract.rs @@ -0,0 +1,87 @@ +//! Expression-based data extraction for dynamic sources. +//! +//! Supports three extraction languages with dual syntax: +//! - Plain string: always jq (the common case) +//! - Structured object `{ expr, type }`: explicit language selection +//! +//! Supported types: `jq` (default), `jsonpath`, `cel`. + +use super::{SourceError, SourceErrorKind}; + +/// Apply an extract expression to parsed source data. +/// +/// The expression is always treated as jq in Phase 2a. JSONPath and CEL +/// support will be added in later sub-phases. +pub fn apply_extract( + data: &serde_json::Value, + expr: &str, +) -> Result { + apply_jq(data, expr) +} + +/// Apply a jq expression using jaq. +fn apply_jq(data: &serde_json::Value, expr: &str) -> Result { + use jaq_interpret::{Ctx, FilterT, RcIter, Val}; + + let mut defs = jaq_interpret::ParseCtx::new(vec![]); + let (filter, errs) = jaq_parse::parse(expr, jaq_parse::main()); + + if !errs.is_empty() || filter.is_none() { + return Err(SourceError { + source_id: String::new(), + kind: SourceErrorKind::Extract(format!("invalid jq expression: {expr}")), + }); + } + + let filter = defs.compile(filter.unwrap()); + let inputs = RcIter::new(std::iter::empty()); + let val = Val::from(data.clone()); + + let ctx = Ctx::new([], &inputs); + let results: Vec = filter + .run((ctx, val)) + .collect::, _>>() + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Extract(format!("jq execution error: {e}")), + })?; + + match results.len() { + 0 => Ok(serde_json::Value::Null), + 1 => Ok(val_to_json(&results[0])), + _ => { + let arr: Vec = results.iter().map(val_to_json).collect(); + Ok(serde_json::Value::Array(arr)) + } + } +} + +/// Convert a jaq `Val` to a `serde_json::Value`. +fn val_to_json(val: &jaq_interpret::Val) -> serde_json::Value { + match val { + jaq_interpret::Val::Null => serde_json::Value::Null, + jaq_interpret::Val::Bool(b) => serde_json::Value::Bool(*b), + jaq_interpret::Val::Int(i) => serde_json::json!(i), + jaq_interpret::Val::Float(f) => serde_json::json!(f), + jaq_interpret::Val::Num(n) => { + if let Ok(i) = n.parse::() { + serde_json::json!(i) + } else if let Ok(f) = n.parse::() { + serde_json::json!(f) + } else { + serde_json::Value::String(n.to_string()) + } + } + jaq_interpret::Val::Str(s) => serde_json::Value::String(s.to_string()), + jaq_interpret::Val::Arr(arr) => { + serde_json::Value::Array(arr.iter().map(val_to_json).collect()) + } + jaq_interpret::Val::Obj(obj) => { + let map: serde_json::Map = obj + .iter() + .map(|(k, v)| (k.to_string(), val_to_json(v))) + .collect(); + serde_json::Value::Object(map) + } + } +} diff --git a/crates/rsigma-runtime/src/sources/file.rs b/crates/rsigma-runtime/src/sources/file.rs new file mode 100644 index 0000000..df9e421 --- /dev/null +++ b/crates/rsigma-runtime/src/sources/file.rs @@ -0,0 +1,91 @@ +//! File source resolver: reads data from local files. + +use std::path::Path; +use std::time::Instant; + +use rsigma_eval::pipeline::sources::DataFormat; + +use super::extract::apply_extract; +use super::{ResolvedValue, SourceError, SourceErrorKind}; + +/// Resolve a file source by reading and parsing the file at `path`. +pub async fn resolve_file( + path: &Path, + format: DataFormat, + extract_expr: Option<&str>, +) -> Result { + let contents = tokio::fs::read_to_string(path) + .await + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("failed to read {}: {e}", path.display())), + })?; + + let parsed = parse_data(&contents, format)?; + + let data = if let Some(expr) = extract_expr { + apply_extract(&parsed, expr)? + } else { + parsed + }; + + Ok(ResolvedValue { + data, + resolved_at: Instant::now(), + from_cache: false, + }) +} + +/// Parse raw string data according to the specified format. +pub fn parse_data(raw: &str, format: DataFormat) -> Result { + match format { + DataFormat::Json => serde_json::from_str(raw).map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Parse(format!("invalid JSON: {e}")), + }), + DataFormat::Yaml => { + let yaml: serde_yaml::Value = serde_yaml::from_str(raw).map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Parse(format!("invalid YAML: {e}")), + })?; + Ok(super::yaml_value_to_json(&yaml)) + } + DataFormat::Lines => { + let lines: Vec = raw + .lines() + .filter(|l| !l.is_empty()) + .map(|l| serde_json::Value::String(l.to_string())) + .collect(); + Ok(serde_json::Value::Array(lines)) + } + DataFormat::Csv => { + let mut reader = csv::ReaderBuilder::new() + .has_headers(true) + .from_reader(raw.as_bytes()); + let headers: Vec = reader + .headers() + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Parse(format!("CSV header error: {e}")), + })? + .iter() + .map(|h| h.to_string()) + .collect(); + + let mut rows = Vec::new(); + for result in reader.records() { + let record = result.map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Parse(format!("CSV row error: {e}")), + })?; + let obj: serde_json::Map = headers + .iter() + .zip(record.iter()) + .map(|(h, v)| (h.clone(), serde_json::Value::String(v.to_string()))) + .collect(); + rows.push(serde_json::Value::Object(obj)); + } + Ok(serde_json::Value::Array(rows)) + } + } +} diff --git a/crates/rsigma-runtime/src/sources/http.rs b/crates/rsigma-runtime/src/sources/http.rs new file mode 100644 index 0000000..7d3368b --- /dev/null +++ b/crates/rsigma-runtime/src/sources/http.rs @@ -0,0 +1,95 @@ +//! HTTP source resolver: fetches data from HTTP endpoints. + +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use rsigma_eval::pipeline::sources::DataFormat; + +use super::extract::apply_extract; +use super::file::parse_data; +use super::{ResolvedValue, SourceError, SourceErrorKind}; + +/// Resolve an HTTP source by fetching the URL and parsing the response. +pub async fn resolve_http( + url: &str, + method: Option<&str>, + headers: &HashMap, + format: DataFormat, + extract_expr: Option<&str>, + timeout: Option, +) -> Result { + let client = reqwest::Client::builder() + .timeout(timeout.unwrap_or(Duration::from_secs(30))) + .build() + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("failed to build HTTP client: {e}")), + })?; + + let method_str = method.unwrap_or("GET"); + let reqwest_method = method_str + .parse::() + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("invalid HTTP method '{method_str}': {e}")), + })?; + + let mut request = client.request(reqwest_method, url); + + for (key, value) in headers { + let expanded_value = expand_env_vars(value); + request = request.header(key.as_str(), expanded_value); + } + + let response = request.send().await.map_err(|e| { + if e.is_timeout() { + SourceError { + source_id: String::new(), + kind: SourceErrorKind::Timeout, + } + } else { + SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("HTTP request failed: {e}")), + } + } + })?; + + let status = response.status(); + if !status.is_success() { + let body = response.text().await.unwrap_or_default(); + return Err(SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("HTTP {status}: {}", body.trim())), + }); + } + + let body = response.text().await.map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("failed to read response body: {e}")), + })?; + + let parsed = parse_data(&body, format)?; + + let data = if let Some(expr) = extract_expr { + apply_extract(&parsed, expr)? + } else { + parsed + }; + + Ok(ResolvedValue { + data, + resolved_at: Instant::now(), + from_cache: false, + }) +} + +/// Expand `${ENV_VAR}` references in a string with environment variable values. +fn expand_env_vars(s: &str) -> String { + let re = regex::Regex::new(r"\$\{([A-Z_][A-Z0-9_]*)\}").unwrap(); + re.replace_all(s, |caps: ®ex::Captures| { + let var_name = caps.get(1).unwrap().as_str(); + std::env::var(var_name).unwrap_or_default() + }) + .to_string() +} diff --git a/crates/rsigma-runtime/src/sources/include.rs b/crates/rsigma-runtime/src/sources/include.rs new file mode 100644 index 0000000..7b57bfe --- /dev/null +++ b/crates/rsigma-runtime/src/sources/include.rs @@ -0,0 +1,121 @@ +//! Include expansion for dynamic pipelines. +//! +//! Expands `Transformation::Include { template }` directives by fetching the +//! referenced source and parsing it as a list of transformation YAML objects, +//! then splicing them into the transformations list. + +use std::collections::HashMap; + +use rsigma_eval::pipeline::sources::SourceType; +use rsigma_eval::pipeline::transformations::Transformation; +use rsigma_eval::{Pipeline, TransformationItem}; + +/// Expand all `Include` transformations in a pipeline. +/// +/// For each `Include { template }`, the template references a source ID. +/// The resolved source data is expected to be a YAML array of transformation +/// objects. These are parsed and spliced into the pipeline at the include position. +/// +/// Security: if `allow_remote_include` is false, includes referencing HTTP or NATS +/// sources produce an error. +pub fn expand_includes( + pipeline: &mut Pipeline, + resolved: &HashMap, + allow_remote_include: bool, +) -> Result<(), String> { + let mut expanded_transformations = Vec::new(); + let mut had_include = false; + + for item in &pipeline.transformations { + if let Transformation::Include { template } = &item.transformation { + had_include = true; + let source_id = extract_source_id(template); + + // Security check: block remote includes if not allowed + if !allow_remote_include + && let Some(source) = pipeline.sources.iter().find(|s| s.id == source_id) + { + match &source.source_type { + SourceType::Http { .. } | SourceType::Nats { .. } => { + return Err(format!( + "include references remote source '{}'; use --allow-remote-include to permit", + source_id + )); + } + _ => {} + } + } + + if let Some(data) = resolved.get(&source_id) { + let items = parse_transformation_array(data)?; + expanded_transformations.extend(items); + } else { + return Err(format!( + "include references unresolved source '{source_id}'" + )); + } + } else { + expanded_transformations.push(item.clone()); + } + } + + if had_include { + pipeline.transformations = expanded_transformations; + } + + Ok(()) +} + +/// Extract the source ID from a template string like `${source.my_transforms}`. +fn extract_source_id(template: &str) -> String { + let trimmed = template.trim(); + if let Some(inner) = trimmed.strip_prefix("${source.") + && let Some(id) = inner.strip_suffix('}') + { + return id.split('.').next().unwrap_or(id).to_string(); + } + trimmed.to_string() +} + +/// Parse a JSON value as an array of transformation objects. +/// +/// Each element should be a JSON object with at minimum a "type" field. +/// Uses rsigma-eval's `parse_transformation_items` to handle the full +/// transformation grammar. +fn parse_transformation_array(data: &serde_json::Value) -> Result, String> { + if !data.is_array() { + return Err("include source data must be an array of transformation objects".to_string()); + } + + // Convert JSON -> YAML string -> serde_yaml::Value, then use the eval parser + let yaml_str = + serde_json::to_string(data).map_err(|e| format!("include serialization: {e}"))?; + let yaml_val: serde_yaml::Value = serde_yaml::from_str(&yaml_str) + .map_err(|e| format!("include data is not valid YAML: {e}"))?; + + rsigma_eval::parse_transformation_items(&yaml_val) + .map_err(|e| format!("include parse error: {e}")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn extract_source_id_simple() { + assert_eq!( + extract_source_id("${source.my_transforms}"), + "my_transforms" + ); + } + + #[test] + fn extract_source_id_with_path() { + assert_eq!(extract_source_id("${source.config.transforms}"), "config"); + } + + #[test] + fn extract_source_id_plain_string() { + assert_eq!(extract_source_id("my_source"), "my_source"); + } +} diff --git a/crates/rsigma-runtime/src/sources/mod.rs b/crates/rsigma-runtime/src/sources/mod.rs new file mode 100644 index 0000000..808752c --- /dev/null +++ b/crates/rsigma-runtime/src/sources/mod.rs @@ -0,0 +1,275 @@ +//! Dynamic source resolution for Sigma pipelines. +//! +//! This module provides the [`SourceResolver`] trait and a [`DefaultSourceResolver`] +//! implementation that fetches data from file, command, HTTP, and NATS sources +//! declared in a pipeline's `sources` section. + +pub mod cache; +pub mod command; +pub mod extract; +pub mod file; +pub mod http; +pub mod include; +#[cfg(feature = "nats")] +pub mod nats; +pub mod refresh; +pub mod template; + +use std::time::Instant; + +use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType}; + +pub use cache::SourceCache; +pub use template::TemplateExpander; + +/// The result of successfully resolving a dynamic source. +#[derive(Debug, Clone)] +pub struct ResolvedValue { + /// The resolved data as a YAML value (can be scalar, sequence, or mapping). + pub data: serde_json::Value, + /// When this value was resolved. + pub resolved_at: Instant, + /// Whether this value was served from cache rather than freshly fetched. + pub from_cache: bool, +} + +/// An error that occurred while resolving a dynamic source. +#[derive(Debug, Clone)] +pub struct SourceError { + /// The source ID that failed. + pub source_id: String, + /// What went wrong. + pub kind: SourceErrorKind, +} + +impl std::fmt::Display for SourceError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "source '{}': {}", self.source_id, self.kind) + } +} + +impl std::error::Error for SourceError {} + +/// The kind of error that occurred during source resolution. +#[derive(Debug, Clone)] +pub enum SourceErrorKind { + /// Failed to fetch/read the source data. + Fetch(String), + /// Failed to parse the fetched data into the expected format. + Parse(String), + /// The `extract` expression failed or returned no data. + Extract(String), + /// The fetch exceeded the configured timeout. + Timeout, +} + +impl std::fmt::Display for SourceErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Fetch(msg) => write!(f, "fetch failed: {msg}"), + Self::Parse(msg) => write!(f, "parse failed: {msg}"), + Self::Extract(msg) => write!(f, "extract failed: {msg}"), + Self::Timeout => write!(f, "timed out"), + } + } +} + +/// Trait for resolving dynamic pipeline sources. +/// +/// Implementations fetch data from external sources (files, commands, HTTP, NATS) +/// and return it as a JSON value that can be injected into the pipeline. +#[async_trait::async_trait] +pub trait SourceResolver: Send + Sync { + /// Resolve a single dynamic source, returning the fetched data. + async fn resolve(&self, source: &DynamicSource) -> Result; +} + +/// Default source resolver that dispatches to file, command, and HTTP resolvers. +pub struct DefaultSourceResolver { + cache: SourceCache, +} + +impl DefaultSourceResolver { + /// Create a new resolver with an in-memory cache. + pub fn new() -> Self { + Self { + cache: SourceCache::new(), + } + } + + /// Create a new resolver with the given cache. + pub fn with_cache(cache: SourceCache) -> Self { + Self { cache } + } + + /// Get a reference to the cache (for inspection/testing). + pub fn cache(&self) -> &SourceCache { + &self.cache + } +} + +impl Default for DefaultSourceResolver { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl SourceResolver for DefaultSourceResolver { + async fn resolve(&self, source: &DynamicSource) -> Result { + let result = match &source.source_type { + SourceType::File { path, format } => file::resolve_file(path, *format, None).await, + SourceType::Command { + command, + format, + extract: extract_expr, + } => command::resolve_command(command, *format, extract_expr.as_deref()).await, + SourceType::Http { + url, + method, + headers, + format, + extract: extract_expr, + } => { + http::resolve_http( + url, + method.as_deref(), + headers, + *format, + extract_expr.as_deref(), + source.timeout, + ) + .await + } + #[cfg(feature = "nats")] + SourceType::Nats { + url, + subject, + format, + extract: extract_expr, + } => nats::resolve_nats_initial(url, subject, *format, extract_expr.as_deref()).await, + #[cfg(not(feature = "nats"))] + SourceType::Nats { .. } => { + return Err(SourceError { + source_id: source.id.clone(), + kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()), + }); + } + }; + + match result { + Ok(value) => { + self.cache.store(&source.id, &value.data); + Ok(value) + } + Err(mut err) => { + err.source_id = source.id.clone(); + match source.on_error { + ErrorPolicy::UseCached => { + if let Some(cached) = self.cache.get(&source.id) { + tracing::warn!( + source_id = %source.id, + error = %err, + "Source resolution failed, using cached value" + ); + Ok(ResolvedValue { + data: cached, + resolved_at: Instant::now(), + from_cache: true, + }) + } else { + Err(err) + } + } + ErrorPolicy::UseDefault => { + if let Some(default) = &source.default { + tracing::warn!( + source_id = %source.id, + error = %err, + "Source resolution failed, using default value" + ); + let json_default = yaml_value_to_json(default); + Ok(ResolvedValue { + data: json_default, + resolved_at: Instant::now(), + from_cache: false, + }) + } else { + Err(err) + } + } + ErrorPolicy::Fail => Err(err), + } + } + } + } +} + +/// Resolve all sources in a pipeline, returning a map of source_id -> resolved data. +/// +/// Applies error policies: `use_cached`, `use_default`, or `fail`. +/// Required sources with `Fail` policy propagate errors immediately. +/// Optional sources (required=false) that fail are logged and skipped +/// with a Null fallback value. +pub async fn resolve_all( + resolver: &dyn SourceResolver, + sources: &[DynamicSource], +) -> Result, SourceError> { + let mut resolved = std::collections::HashMap::new(); + for source in sources { + match resolver.resolve(source).await { + Ok(value) => { + resolved.insert(source.id.clone(), value.data); + } + Err(e) => { + if source.required { + return Err(e); + } + tracing::warn!( + source_id = %source.id, + error = %e, + "Optional source resolution failed, using null" + ); + resolved.insert(source.id.clone(), serde_json::Value::Null); + } + } + } + Ok(resolved) +} + +/// Convert a `serde_yaml::Value` to a `serde_json::Value`. +pub fn yaml_value_to_json(yaml: &serde_yaml::Value) -> serde_json::Value { + match yaml { + serde_yaml::Value::Null => serde_json::Value::Null, + serde_yaml::Value::Bool(b) => serde_json::Value::Bool(*b), + serde_yaml::Value::Number(n) => { + if let Some(i) = n.as_i64() { + serde_json::Value::Number(i.into()) + } else if let Some(u) = n.as_u64() { + serde_json::Value::Number(u.into()) + } else if let Some(f) = n.as_f64() { + serde_json::json!(f) + } else { + serde_json::Value::Null + } + } + serde_yaml::Value::String(s) => serde_json::Value::String(s.clone()), + serde_yaml::Value::Sequence(seq) => { + serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect()) + } + serde_yaml::Value::Mapping(map) => { + let obj = map + .iter() + .map(|(k, v)| { + let key = match k { + serde_yaml::Value::String(s) => s.clone(), + other => format!("{other:?}"), + }; + (key, yaml_value_to_json(v)) + }) + .collect(); + serde_json::Value::Object(obj) + } + serde_yaml::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value), + } +} diff --git a/crates/rsigma-runtime/src/sources/nats.rs b/crates/rsigma-runtime/src/sources/nats.rs new file mode 100644 index 0000000..7ad0889 --- /dev/null +++ b/crates/rsigma-runtime/src/sources/nats.rs @@ -0,0 +1,80 @@ +//! NATS push source resolver: subscribes to a NATS subject for live updates. + +use std::time::Instant; + +use rsigma_eval::pipeline::sources::DataFormat; + +use super::extract::apply_extract; +use super::file::parse_data; +use super::{ResolvedValue, SourceError, SourceErrorKind}; + +/// Resolve a NATS source by connecting and fetching the latest message. +/// +/// For the initial resolution, this connects to the NATS server and subscribes +/// briefly to the subject. For push sources, the initial value may be Null if +/// no message is immediately available; the refresh scheduler will handle +/// ongoing updates via subscription. +#[cfg(feature = "nats")] +pub async fn resolve_nats_initial( + url: &str, + subject: &str, + format: DataFormat, + extract_expr: Option<&str>, +) -> Result { + use futures::StreamExt; + + let client = async_nats::connect(url).await.map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("failed to connect to NATS at {url}: {e}")), + })?; + + let mut sub = client + .subscribe(subject.to_string()) + .await + .map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Fetch(format!("failed to subscribe to '{subject}': {e}")), + })?; + + // Wait up to 1 second for an initial message + let data = match tokio::time::timeout(std::time::Duration::from_secs(1), sub.next()).await { + Ok(Some(msg)) => { + let raw = std::str::from_utf8(&msg.payload).map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Parse(format!("NATS message is not valid UTF-8: {e}")), + })?; + let parsed = parse_data(raw, format)?; + if let Some(expr) = extract_expr { + apply_extract(&parsed, expr)? + } else { + parsed + } + } + _ => serde_json::Value::Null, + }; + + Ok(ResolvedValue { + data, + resolved_at: Instant::now(), + from_cache: false, + }) +} + +/// Parse a raw NATS message payload into a resolved value. +#[cfg(feature = "nats")] +pub fn parse_nats_message( + payload: &[u8], + format: DataFormat, + extract_expr: Option<&str>, +) -> Result { + let raw = std::str::from_utf8(payload).map_err(|e| SourceError { + source_id: String::new(), + kind: SourceErrorKind::Parse(format!("NATS message is not valid UTF-8: {e}")), + })?; + let parsed = parse_data(raw, format)?; + if let Some(expr) = extract_expr { + apply_extract(&parsed, expr) + } else { + Ok(parsed) + } +} diff --git a/crates/rsigma-runtime/src/sources/refresh.rs b/crates/rsigma-runtime/src/sources/refresh.rs new file mode 100644 index 0000000..98dea24 --- /dev/null +++ b/crates/rsigma-runtime/src/sources/refresh.rs @@ -0,0 +1,339 @@ +//! Background refresh scheduler for dynamic pipeline sources. +//! +//! Manages per-source refresh loops based on `RefreshPolicy`: +//! - `Interval(duration)`: re-fetches on a timer +//! - `Watch`: uses file system notifications (via `notify`) +//! - `Push`: receives updates from external triggers (NATS) +//! - `OnDemand`: only refreshes when explicitly triggered via API/signal + +use std::collections::HashMap; +use std::sync::Arc; + +use rsigma_eval::pipeline::sources::{DynamicSource, RefreshPolicy, SourceType}; +use tokio::sync::{mpsc, watch}; + +use super::{SourceResolver, resolve_all}; + +/// A message requesting source re-resolution. +#[derive(Debug, Clone)] +pub enum RefreshTrigger { + /// Re-resolve all sources. + All, + /// Re-resolve a specific source by ID. + Single(String), + /// A NATS push message arrived with pre-parsed data for a specific source. + #[cfg(feature = "nats")] + NatsPush { + source_id: String, + data: serde_json::Value, + }, +} + +/// Notification sent when sources have been refreshed. +#[derive(Debug, Clone)] +pub struct RefreshResult { + /// The newly resolved source data (source_id -> value). + pub resolved: HashMap, +} + +/// Manages background refresh tasks for dynamic sources. +/// +/// The scheduler spawns per-source tasks based on their refresh policy and +/// sends `RefreshResult` notifications whenever source data changes. +pub struct RefreshScheduler { + /// Channel for on-demand refresh triggers (from API, SIGHUP, NATS control). + trigger_tx: mpsc::Sender, + /// Receiver for on-demand triggers (consumed by the run loop). + trigger_rx: Option>, + /// Watch channel sender for notifying consumers of updated source data. + result_tx: watch::Sender>, + /// Watch channel receiver for consumers. + result_rx: watch::Receiver>, +} + +impl RefreshScheduler { + /// Create a new scheduler. + pub fn new() -> Self { + let (trigger_tx, trigger_rx) = mpsc::channel(32); + let (result_tx, result_rx) = watch::channel(None); + Self { + trigger_tx, + trigger_rx: Some(trigger_rx), + result_tx, + result_rx, + } + } + + /// Get a sender for triggering on-demand resolution. + pub fn trigger_sender(&self) -> mpsc::Sender { + self.trigger_tx.clone() + } + + /// Get a receiver that is notified when sources are refreshed. + pub fn result_receiver(&self) -> watch::Receiver> { + self.result_rx.clone() + } + + /// Start the scheduler background loop. + /// + /// Takes ownership of the trigger receiver and spawns per-source interval tasks. + /// Returns a `JoinHandle` for the main coordination task. + /// + /// When a refresh occurs (via interval timer or on-demand trigger), all sources + /// are re-resolved and the result is published on the watch channel. + pub fn run( + mut self, + sources: Vec, + resolver: Arc, + ) -> tokio::task::JoinHandle<()> { + let trigger_rx = self + .trigger_rx + .take() + .expect("run() can only be called once"); + + tokio::spawn(async move { + Self::run_loop( + sources, + resolver, + trigger_rx, + self.trigger_tx, + self.result_tx, + ) + .await; + }) + } + + async fn run_loop( + sources: Vec, + resolver: Arc, + mut trigger_rx: mpsc::Receiver, + trigger_tx: mpsc::Sender, + result_tx: watch::Sender>, + ) { + // Spawn interval timers + for source in &sources { + if let RefreshPolicy::Interval(duration) = &source.refresh { + let tx = trigger_tx.clone(); + let id = source.id.clone(); + let interval = *duration; + tokio::spawn(async move { + let mut timer = tokio::time::interval(interval); + timer.tick().await; // skip immediate first tick + loop { + timer.tick().await; + if tx.send(RefreshTrigger::Single(id.clone())).await.is_err() { + break; + } + } + }); + } + } + + // Spawn NATS push subscriptions + #[cfg(feature = "nats")] + for source in &sources { + if source.refresh == RefreshPolicy::Push + && let SourceType::Nats { + url, + subject, + format, + extract: extract_expr, + } = &source.source_type + { + let tx = trigger_tx.clone(); + let id = source.id.clone(); + let url = url.clone(); + let subject = subject.clone(); + let format = *format; + let extract_expr = extract_expr.clone(); + tokio::spawn(async move { + if let Err(e) = + nats_push_loop(&url, &subject, format, extract_expr.as_deref(), &id, &tx) + .await + { + tracing::error!( + source_id = %id, + error = %e, + "NATS push subscription failed" + ); + } + }); + } + } + + // Spawn file watchers for Watch policy sources + for source in &sources { + if source.refresh == RefreshPolicy::Watch + && let SourceType::File { path, .. } = &source.source_type + { + let tx = trigger_tx.clone(); + let id = source.id.clone(); + let path = path.clone(); + tokio::spawn(async move { + file_watch_loop(&path, &id, &tx).await; + }); + } + } + + // Main loop: wait for triggers and resolve + while let Some(trigger) = trigger_rx.recv().await { + // Handle NATS push with pre-parsed data (no re-resolution needed) + #[cfg(feature = "nats")] + if let RefreshTrigger::NatsPush { source_id, data } = trigger { + let mut resolved = HashMap::new(); + resolved.insert(source_id, data); + let _ = result_tx.send(Some(RefreshResult { resolved })); + continue; + } + + let to_resolve: Vec<&DynamicSource> = match &trigger { + RefreshTrigger::All => sources.iter().collect(), + RefreshTrigger::Single(id) => sources.iter().filter(|s| s.id == *id).collect(), + #[cfg(feature = "nats")] + RefreshTrigger::NatsPush { .. } => unreachable!(), + }; + + if to_resolve.is_empty() { + continue; + } + + match resolve_all( + resolver.as_ref(), + &to_resolve.iter().map(|s| (*s).clone()).collect::>(), + ) + .await + { + Ok(resolved) => { + let _ = result_tx.send(Some(RefreshResult { resolved })); + } + Err(e) => { + tracing::warn!(error = %e, "Background source refresh failed"); + } + } + } + } +} + +impl Default for RefreshScheduler { + fn default() -> Self { + Self::new() + } +} + +/// Subscribe to a NATS subject and forward parsed messages as triggers. +#[cfg(feature = "nats")] +async fn nats_push_loop( + url: &str, + subject: &str, + format: rsigma_eval::pipeline::sources::DataFormat, + extract_expr: Option<&str>, + source_id: &str, + trigger_tx: &mpsc::Sender, +) -> Result<(), String> { + use futures::StreamExt; + + let client = async_nats::connect(url) + .await + .map_err(|e| format!("NATS connect failed: {e}"))?; + + let mut subscriber = client + .subscribe(subject.to_string()) + .await + .map_err(|e| format!("NATS subscribe failed: {e}"))?; + + tracing::info!( + source_id = %source_id, + subject = %subject, + "NATS push subscription active" + ); + + while let Some(msg) = subscriber.next().await { + match super::nats::parse_nats_message(&msg.payload, format, extract_expr) { + Ok(data) => { + let trigger = RefreshTrigger::NatsPush { + source_id: source_id.to_string(), + data, + }; + if trigger_tx.send(trigger).await.is_err() { + break; + } + } + Err(e) => { + tracing::warn!( + source_id = %source_id, + error = %e, + "Failed to parse NATS push message" + ); + } + } + } + + Ok(()) +} + +/// Watch a file for changes and send refresh triggers. +async fn file_watch_loop( + path: &std::path::Path, + source_id: &str, + trigger_tx: &mpsc::Sender, +) { + use notify::{Event, EventKind, RecommendedWatcher, Watcher}; + use tokio::sync::mpsc as tokio_mpsc; + + let (notify_tx, mut notify_rx) = tokio_mpsc::channel::<()>(4); + + let _watcher = { + let tx = notify_tx.clone(); + match RecommendedWatcher::new( + move |res: Result| { + if let Ok(event) = res + && matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_)) + { + let _ = tx.try_send(()); + } + }, + notify::Config::default(), + ) { + Ok(mut w) => { + if let Err(e) = w.watch(path, notify::RecursiveMode::NonRecursive) { + tracing::warn!( + source_id = %source_id, + path = %path.display(), + error = %e, + "Could not watch source file" + ); + return; + } + tracing::info!( + source_id = %source_id, + path = %path.display(), + "Watching source file for changes" + ); + Some(w) + } + Err(e) => { + tracing::warn!( + source_id = %source_id, + error = %e, + "Could not create file watcher for source" + ); + return; + } + } + }; + + while notify_rx.recv().await.is_some() { + // Debounce: wait a short period for additional changes + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Drain any queued notifications + while notify_rx.try_recv().is_ok() {} + + if trigger_tx + .send(RefreshTrigger::Single(source_id.to_string())) + .await + .is_err() + { + break; + } + } +} diff --git a/crates/rsigma-runtime/src/sources/template.rs b/crates/rsigma-runtime/src/sources/template.rs new file mode 100644 index 0000000..d5d72d7 --- /dev/null +++ b/crates/rsigma-runtime/src/sources/template.rs @@ -0,0 +1,293 @@ +//! Template expansion for dynamic pipeline sources. +//! +//! Walks a pipeline's fields and replaces `${source.X}` and `${source.X.path.to.field}` +//! references with resolved data from the source resolution map. + +use std::collections::HashMap; + +use regex::Regex; +use rsigma_eval::Pipeline; +use std::sync::LazyLock; + +static SOURCE_TEMPLATE_RE: LazyLock = + LazyLock::new(|| Regex::new(r"\$\{source\.([a-zA-Z0-9_]+)(?:\.([a-zA-Z0-9_.]+))?\}").unwrap()); + +/// Expands `${source.*}` template references in a pipeline using resolved source data. +pub struct TemplateExpander; + +impl TemplateExpander { + /// Expand all `${source.*}` references in the pipeline's vars and return an updated pipeline. + /// + /// The pipeline's `vars` values are expanded by replacing template expressions + /// with data from the `resolved` map. Transformation fields containing templates + /// are left in place (they are handled at apply-time, not here) since transformations + /// use typed structures rather than raw strings. + pub fn expand(pipeline: &Pipeline, resolved: &HashMap) -> Pipeline { + let mut expanded = pipeline.clone(); + + // Expand vars + for (_var_name, values) in expanded.vars.iter_mut() { + let mut new_values = Vec::new(); + for val in values.iter() { + if let Some(expanded_vals) = Self::expand_string_value(val, resolved) { + new_values.extend(expanded_vals); + } else { + new_values.push(val.clone()); + } + } + *values = new_values; + } + + expanded + } + + /// Try to expand a single string value containing `${source.*}` templates. + /// + /// Returns `None` if the string contains no templates. + /// Returns `Some(vec)` with the expanded values if templates were found. + fn expand_string_value( + value: &str, + resolved: &HashMap, + ) -> Option> { + if !value.contains("${source.") { + return None; + } + + // If the entire value is a single template reference, replace it directly + if let Some(caps) = SOURCE_TEMPLATE_RE.captures(value) + && caps.get(0).unwrap().as_str() == value + { + let source_id = caps.get(1).unwrap().as_str(); + let sub_path = caps.get(2).map(|m| m.as_str()); + + if let Some(data) = resolved.get(source_id) { + let target = if let Some(path) = sub_path { + navigate_path(data, path) + } else { + Some(data) + }; + + if let Some(val) = target { + return Some(json_to_string_vec(val)); + } + } + + return None; + } + + // Otherwise, do substring replacement (inline templates within larger strings) + let result = SOURCE_TEMPLATE_RE + .replace_all(value, |caps: ®ex::Captures| { + let source_id = caps.get(1).unwrap().as_str(); + let sub_path = caps.get(2).map(|m| m.as_str()); + + if let Some(data) = resolved.get(source_id) { + let target = if let Some(path) = sub_path { + navigate_path(data, path) + } else { + Some(data) + }; + + if let Some(val) = target { + return json_to_single_string(val); + } + } + + caps.get(0).unwrap().as_str().to_string() + }) + .to_string(); + + Some(vec![result]) + } +} + +/// Navigate a dot-separated path into a JSON value. +/// +/// E.g., `"field_mapping.sysmon"` navigates `data["field_mapping"]["sysmon"]`. +fn navigate_path<'a>(data: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> { + let mut current = data; + for segment in path.split('.') { + match current { + serde_json::Value::Object(map) => { + current = map.get(segment)?; + } + serde_json::Value::Array(arr) => { + let idx: usize = segment.parse().ok()?; + current = arr.get(idx)?; + } + _ => return None, + } + } + Some(current) +} + +/// Convert a JSON value to a vector of strings for use in pipeline vars. +/// +/// Arrays are flattened into multiple string entries. +/// Objects are serialized as JSON strings. +/// Scalars become single-element vectors. +fn json_to_string_vec(val: &serde_json::Value) -> Vec { + match val { + serde_json::Value::Array(arr) => arr.iter().map(json_to_single_string).collect(), + serde_json::Value::Null => vec![], + other => vec![json_to_single_string(other)], + } +} + +/// Convert a single JSON value to a string representation. +fn json_to_single_string(val: &serde_json::Value) -> String { + match val { + serde_json::Value::String(s) => s.clone(), + serde_json::Value::Null => String::new(), + serde_json::Value::Bool(b) => b.to_string(), + serde_json::Value::Number(n) => n.to_string(), + other => other.to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn expand_simple_var() { + let mut vars = HashMap::new(); + vars.insert( + "admin_emails".to_string(), + vec!["${source.admin_emails}".to_string()], + ); + + let pipeline = Pipeline { + name: "test".to_string(), + priority: 0, + vars, + transformations: vec![], + finalizers: vec![], + sources: vec![], + source_refs: vec![], + }; + + let mut resolved = HashMap::new(); + resolved.insert( + "admin_emails".to_string(), + serde_json::json!(["admin@corp.com", "root@corp.com"]), + ); + + let expanded = TemplateExpander::expand(&pipeline, &resolved); + assert_eq!( + expanded.vars.get("admin_emails").unwrap(), + &vec!["admin@corp.com".to_string(), "root@corp.com".to_string()] + ); + } + + #[test] + fn expand_nested_path() { + let mut vars = HashMap::new(); + vars.insert( + "log_index".to_string(), + vec!["${source.env_config.log_index}".to_string()], + ); + + let pipeline = Pipeline { + name: "test".to_string(), + priority: 0, + vars, + transformations: vec![], + finalizers: vec![], + sources: vec![], + source_refs: vec![], + }; + + let mut resolved = HashMap::new(); + resolved.insert( + "env_config".to_string(), + serde_json::json!({"log_index": "security-events", "retention": "30d"}), + ); + + let expanded = TemplateExpander::expand(&pipeline, &resolved); + assert_eq!( + expanded.vars.get("log_index").unwrap(), + &vec!["security-events".to_string()] + ); + } + + #[test] + fn expand_inline_template() { + let mut vars = HashMap::new(); + vars.insert( + "index_pattern".to_string(), + vec!["logs-${source.env_config.env}-*".to_string()], + ); + + let pipeline = Pipeline { + name: "test".to_string(), + priority: 0, + vars, + transformations: vec![], + finalizers: vec![], + sources: vec![], + source_refs: vec![], + }; + + let mut resolved = HashMap::new(); + resolved.insert( + "env_config".to_string(), + serde_json::json!({"env": "production"}), + ); + + let expanded = TemplateExpander::expand(&pipeline, &resolved); + assert_eq!( + expanded.vars.get("index_pattern").unwrap(), + &vec!["logs-production-*".to_string()] + ); + } + + #[test] + fn static_vars_unchanged() { + let mut vars = HashMap::new(); + vars.insert("static".to_string(), vec!["no_template_here".to_string()]); + + let pipeline = Pipeline { + name: "test".to_string(), + priority: 0, + vars, + transformations: vec![], + finalizers: vec![], + sources: vec![], + source_refs: vec![], + }; + + let resolved = HashMap::new(); + let expanded = TemplateExpander::expand(&pipeline, &resolved); + assert_eq!( + expanded.vars.get("static").unwrap(), + &vec!["no_template_here".to_string()] + ); + } + + #[test] + fn unresolved_template_kept_as_is() { + let mut vars = HashMap::new(); + vars.insert( + "missing".to_string(), + vec!["${source.nonexistent}".to_string()], + ); + + let pipeline = Pipeline { + name: "test".to_string(), + priority: 0, + vars, + transformations: vec![], + finalizers: vec![], + sources: vec![], + source_refs: vec![], + }; + + let resolved = HashMap::new(); + let expanded = TemplateExpander::expand(&pipeline, &resolved); + assert_eq!( + expanded.vars.get("missing").unwrap(), + &vec!["${source.nonexistent}".to_string()] + ); + } +} diff --git a/crates/rsigma-runtime/tests/sources_integration.rs b/crates/rsigma-runtime/tests/sources_integration.rs new file mode 100644 index 0000000..c5f7da8 --- /dev/null +++ b/crates/rsigma-runtime/tests/sources_integration.rs @@ -0,0 +1,475 @@ +//! Integration tests for dynamic source resolution. + +use std::collections::HashMap; + +use rsigma_eval::Pipeline; +use rsigma_eval::pipeline::sources::{ + DataFormat, DynamicSource, ErrorPolicy, RefreshPolicy, SourceType, +}; +use rsigma_runtime::sources::cache::SourceCache; +use rsigma_runtime::sources::file::resolve_file; +use rsigma_runtime::sources::template::TemplateExpander; +use rsigma_runtime::sources::{DefaultSourceResolver, SourceResolver, resolve_all}; + +// ============================================================================= +// File source tests +// ============================================================================= + +#[tokio::test] +async fn file_source_json() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("data.json"); + std::fs::write(&path, r#"{"emails": ["a@b.com", "c@d.com"]}"#).unwrap(); + + let result = resolve_file(&path, DataFormat::Json, None).await.unwrap(); + let expected = serde_json::json!({"emails": ["a@b.com", "c@d.com"]}); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn file_source_json_with_extract() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("data.json"); + std::fs::write(&path, r#"{"emails": ["a@b.com", "c@d.com"]}"#).unwrap(); + + let result = resolve_file(&path, DataFormat::Json, Some(".emails[]")) + .await + .unwrap(); + let expected = serde_json::json!(["a@b.com", "c@d.com"]); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn file_source_lines() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("ips.txt"); + std::fs::write(&path, "10.0.0.1\n10.0.0.2\n192.168.1.1\n").unwrap(); + + let result = resolve_file(&path, DataFormat::Lines, None).await.unwrap(); + let expected = serde_json::json!(["10.0.0.1", "10.0.0.2", "192.168.1.1"]); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn file_source_yaml() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("config.yaml"); + std::fs::write( + &path, + "field_mapping:\n EventID: event_id\n HostName: hostname\n", + ) + .unwrap(); + + let result = resolve_file(&path, DataFormat::Yaml, None).await.unwrap(); + let expected = serde_json::json!({ + "field_mapping": { + "EventID": "event_id", + "HostName": "hostname" + } + }); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn file_source_csv() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("assets.csv"); + std::fs::write(&path, "hostname,owner\nserver1,alice\nserver2,bob\n").unwrap(); + + let result = resolve_file(&path, DataFormat::Csv, None).await.unwrap(); + let expected = serde_json::json!([ + {"hostname": "server1", "owner": "alice"}, + {"hostname": "server2", "owner": "bob"} + ]); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn file_source_missing_file() { + let result = resolve_file( + std::path::Path::new("/nonexistent/file.json"), + DataFormat::Json, + None, + ) + .await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(matches!( + err.kind, + rsigma_runtime::SourceErrorKind::Fetch(_) + )); +} + +// ============================================================================= +// Command source tests +// ============================================================================= + +#[tokio::test] +async fn command_source_echo_json() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("data.json"); + std::fs::write(&path, r#"{"status": "ok", "count": 42}"#).unwrap(); + + #[cfg(unix)] + let cmd = vec!["cat".to_string(), path.to_str().unwrap().to_string()]; + #[cfg(windows)] + let cmd = vec![ + "cmd".to_string(), + "/C".to_string(), + format!("type {}", path.to_str().unwrap()), + ]; + + let result = rsigma_runtime::sources::command::resolve_command(&cmd, DataFormat::Json, None) + .await + .unwrap(); + + let expected = serde_json::json!({"status": "ok", "count": 42}); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn command_source_with_extract() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("data.json"); + std::fs::write(&path, r#"{"items": [1, 2, 3]}"#).unwrap(); + + #[cfg(unix)] + let cmd = vec!["cat".to_string(), path.to_str().unwrap().to_string()]; + #[cfg(windows)] + let cmd = vec![ + "cmd".to_string(), + "/C".to_string(), + format!("type {}", path.to_str().unwrap()), + ]; + + let result = + rsigma_runtime::sources::command::resolve_command(&cmd, DataFormat::Json, Some(".items[]")) + .await + .unwrap(); + + let expected = serde_json::json!([1, 2, 3]); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn command_source_lines() { + // Use a cross-platform approach: write to a temp file and cat it + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("lines.txt"); + std::fs::write(&path, "line1\nline2\nline3\n").unwrap(); + + #[cfg(unix)] + let cmd = vec!["cat".to_string(), path.to_str().unwrap().to_string()]; + #[cfg(windows)] + let cmd = vec![ + "cmd".to_string(), + "/C".to_string(), + format!("type {}", path.to_str().unwrap()), + ]; + + let result = rsigma_runtime::sources::command::resolve_command(&cmd, DataFormat::Lines, None) + .await + .unwrap(); + + let expected = serde_json::json!(["line1", "line2", "line3"]); + assert_eq!(result.data, expected); +} + +#[tokio::test] +async fn command_source_failing_command() { + #[cfg(unix)] + let cmd = vec!["false".to_string()]; + #[cfg(windows)] + let cmd = vec!["cmd".to_string(), "/C".to_string(), "exit 1".to_string()]; + + let result = + rsigma_runtime::sources::command::resolve_command(&cmd, DataFormat::Json, None).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(matches!( + err.kind, + rsigma_runtime::SourceErrorKind::Fetch(_) + )); +} + +#[tokio::test] +async fn command_source_empty_command() { + let result = + rsigma_runtime::sources::command::resolve_command(&[], DataFormat::Json, None).await; + + assert!(result.is_err()); +} + +// ============================================================================= +// DefaultSourceResolver with error policies +// ============================================================================= + +#[tokio::test] +async fn resolver_file_source_end_to_end() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("admins.json"); + std::fs::write(&path, r#"["admin@corp.com", "root@corp.com"]"#).unwrap(); + + let source = DynamicSource { + id: "admin_emails".to_string(), + source_type: SourceType::File { + path: path.clone(), + format: DataFormat::Json, + }, + refresh: RefreshPolicy::Once, + timeout: None, + on_error: ErrorPolicy::Fail, + required: true, + default: None, + }; + + let resolver = DefaultSourceResolver::new(); + let result = resolver.resolve(&source).await.unwrap(); + assert_eq!( + result.data, + serde_json::json!(["admin@corp.com", "root@corp.com"]) + ); +} + +#[tokio::test] +async fn resolver_use_cached_on_failure() { + let resolver = DefaultSourceResolver::new(); + + // Pre-populate cache + resolver + .cache() + .store("missing_source", &serde_json::json!(["cached_value"])); + + let source = DynamicSource { + id: "missing_source".to_string(), + source_type: SourceType::File { + path: "/nonexistent/file.json".into(), + format: DataFormat::Json, + }, + refresh: RefreshPolicy::Once, + timeout: None, + on_error: ErrorPolicy::UseCached, + required: true, + default: None, + }; + + let result = resolver.resolve(&source).await.unwrap(); + assert_eq!(result.data, serde_json::json!(["cached_value"])); +} + +#[tokio::test] +async fn resolver_use_default_on_failure() { + let resolver = DefaultSourceResolver::new(); + + let default_val = + serde_yaml::Value::Sequence(vec![serde_yaml::Value::String("fallback@corp.com".into())]); + + let source = DynamicSource { + id: "missing_source".to_string(), + source_type: SourceType::File { + path: "/nonexistent/file.json".into(), + format: DataFormat::Json, + }, + refresh: RefreshPolicy::Once, + timeout: None, + on_error: ErrorPolicy::UseDefault, + required: true, + default: Some(default_val), + }; + + let result = resolver.resolve(&source).await.unwrap(); + assert_eq!(result.data, serde_json::json!(["fallback@corp.com"])); +} + +#[tokio::test] +async fn resolver_fail_policy_returns_error() { + let resolver = DefaultSourceResolver::new(); + + let source = DynamicSource { + id: "bad_source".to_string(), + source_type: SourceType::File { + path: "/nonexistent/file.json".into(), + format: DataFormat::Json, + }, + refresh: RefreshPolicy::Once, + timeout: None, + on_error: ErrorPolicy::Fail, + required: true, + default: None, + }; + + let result = resolver.resolve(&source).await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().source_id, "bad_source"); +} + +// ============================================================================= +// resolve_all + TemplateExpander end-to-end +// ============================================================================= + +#[tokio::test] +async fn end_to_end_dynamic_pipeline_resolution() { + let dir = tempfile::tempdir().unwrap(); + + // Create source files + let emails_path = dir.path().join("admins.json"); + std::fs::write(&emails_path, r#"["admin@corp.com", "sec@corp.com"]"#).unwrap(); + + let config_path = dir.path().join("config.json"); + std::fs::write( + &config_path, + r#"{"env": "production", "index": "security-logs"}"#, + ) + .unwrap(); + + // Build a dynamic pipeline + let mut vars = HashMap::new(); + vars.insert( + "admin_emails".to_string(), + vec!["${source.admin_emails}".to_string()], + ); + vars.insert( + "log_index".to_string(), + vec!["${source.env_config.index}".to_string()], + ); + vars.insert("static_var".to_string(), vec!["unchanged".to_string()]); + + let sources = vec![ + DynamicSource { + id: "admin_emails".to_string(), + source_type: SourceType::File { + path: emails_path, + format: DataFormat::Json, + }, + refresh: RefreshPolicy::Once, + timeout: None, + on_error: ErrorPolicy::Fail, + required: true, + default: None, + }, + DynamicSource { + id: "env_config".to_string(), + source_type: SourceType::File { + path: config_path, + format: DataFormat::Json, + }, + refresh: RefreshPolicy::Once, + timeout: None, + on_error: ErrorPolicy::Fail, + required: true, + default: None, + }, + ]; + + let pipeline = Pipeline { + name: "dynamic-test".to_string(), + priority: 10, + vars, + transformations: vec![], + finalizers: vec![], + sources: sources.clone(), + source_refs: vec![], + }; + + assert!(pipeline.is_dynamic()); + + // Resolve sources + let resolver = DefaultSourceResolver::new(); + let resolved_data = resolve_all(&resolver, &sources).await.unwrap(); + + assert_eq!(resolved_data.len(), 2); + assert_eq!( + resolved_data["admin_emails"], + serde_json::json!(["admin@corp.com", "sec@corp.com"]) + ); + + // Expand templates + let expanded = TemplateExpander::expand(&pipeline, &resolved_data); + + assert_eq!( + expanded.vars["admin_emails"], + vec!["admin@corp.com", "sec@corp.com"] + ); + assert_eq!(expanded.vars["log_index"], vec!["security-logs"]); + assert_eq!(expanded.vars["static_var"], vec!["unchanged"]); +} + +// ============================================================================= +// Cache tests +// ============================================================================= + +#[test] +fn cache_store_and_retrieve() { + let cache = SourceCache::new(); + assert!(cache.is_empty()); + + cache.store("src1", &serde_json::json!({"key": "value"})); + assert_eq!(cache.len(), 1); + + let val = cache.get("src1").unwrap(); + assert_eq!(val, serde_json::json!({"key": "value"})); +} + +#[test] +fn cache_invalidate() { + let cache = SourceCache::new(); + cache.store("src1", &serde_json::json!("data")); + cache.invalidate("src1"); + assert!(cache.get("src1").is_none()); +} + +#[test] +fn cache_clear() { + let cache = SourceCache::new(); + cache.store("src1", &serde_json::json!("a")); + cache.store("src2", &serde_json::json!("b")); + assert_eq!(cache.len(), 2); + cache.clear(); + assert!(cache.is_empty()); +} + +#[test] +fn cache_sqlite_persistence() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("cache.db"); + + // Store some data + { + let cache = SourceCache::with_sqlite(&db_path).unwrap(); + cache.store("src1", &serde_json::json!({"key": "value1"})); + cache.store("src2", &serde_json::json!(["a", "b", "c"])); + } + + // Re-open and verify data is still there + { + let cache = SourceCache::with_sqlite(&db_path).unwrap(); + assert_eq!(cache.len(), 2); + assert_eq!( + cache.get("src1").unwrap(), + serde_json::json!({"key": "value1"}) + ); + assert_eq!( + cache.get("src2").unwrap(), + serde_json::json!(["a", "b", "c"]) + ); + } +} + +#[test] +fn cache_sqlite_invalidate_persists() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("cache.db"); + + { + let cache = SourceCache::with_sqlite(&db_path).unwrap(); + cache.store("src1", &serde_json::json!("data")); + cache.invalidate("src1"); + } + + { + let cache = SourceCache::with_sqlite(&db_path).unwrap(); + assert!(cache.get("src1").is_none()); + assert!(cache.is_empty()); + } +}