diff --git a/CHANGELOG.md b/CHANGELOG.md index 20b65929..37974277 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,48 @@ Each entry corresponds to a [GitHub Release](https://github.com/timescale/rsigma ## [Unreleased] +### Unknown-field discovery API (#149) + +The `engine daemon` learns to surface two halves of detection coverage live from inside the process: which event fields are not referenced by any loaded rule (gap signal) and which rule fields have never appeared in an event (broken-coverage signal). RSigma owns both rule parsing and event ingestion end-to-end, so this view does not need an external pipeline. + +**Two new flags on `rsigma engine daemon`** (off by default; zero overhead when not set): + +| Flag | Default | Purpose | +|------|---------|---------| +| `--observe-fields` | off | Enable the field observer. When enabled, every event evaluated by the engine task has its dotted field paths recorded. | +| `--observe-fields-max-keys ` | `10000` | Hard ceiling on distinct field names. Existing keys keep counting once the cap is hit; new keys are dropped and counted as overflow. | + +**Four new HTTP endpoints.** + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/api/v1/fields` | Snapshot bundling `summary` + `unknown` + `missing` for a one-shot dashboard read. | +| `GET` | `/api/v1/fields/unknown` | Event fields not referenced by any rule. Sorted by descending count. | +| `GET` | `/api/v1/fields/missing` | Rule fields never seen in events. Each entry includes up to 10 rule titles with a `truncated` flag for fields that span more rules. | +| `DELETE` | `/api/v1/fields/observer` | Clear the observer's counters and return `{previous_keys, previous_events}`. | + +Each list endpoint accepts `?limit=N&offset=M` (default `limit=100`, cap `1000`) and returns `total` + `next_offset` for deterministic pagination. All four return `503 Service Unavailable` with `{"error":"field observation disabled","hint":"..."}` when `--observe-fields` is not set. + +**Three new Prometheus surfaces.** + +| Metric | Type | Description | +|--------|------|-------------| +| `rsigma_fields_observed_total` | counter | Total events scanned by the opt-in field observer. | +| `rsigma_fields_observer_unique_keys` | gauge | Distinct field names currently tracked. | +| `rsigma_fields_observer_overflow_dropped_total` | counter | New-key insert attempts dropped because the observer was at capacity. | + +The gauges refresh on every `/metrics` scrape and after every successful `/api/v1/fields/*` call, so a Prometheus alert on `rsigma_fields_observer_overflow_dropped_total` fires the moment an operator's `--observe-fields-max-keys` choice is too low for the deployment. + +**Shared extraction with `rsigma rule fields`.** The rule-field side of the join lives in a new `rsigma_eval::fields` module (`RuleFieldSet`) that both the CLI subcommand and the daemon import. The daemon caches the post-pipeline set on `RuntimeEngine` via `ArcSwap` and refreshes it on every successful `load_rules()`, so the HTTP handlers run lock-free against a stable view even during hot reloads. + +**Shared join primitive.** `FieldObservation::coverage(&RuleFieldSet) -> FieldCoverage` lives in `rsigma-eval` and partitions an observation snapshot into the unknown / intersection / missing buckets in one pass. Both the daemon's HTTP handlers and the eval report consume this, so the partition semantics cannot drift across runtimes. + +**Implementation cost.** Default-off; the engine task takes a single `ArcSwap` load per batch when no observer is attached and skips field iteration entirely. With `--observe-fields` set, the only added work is one `Event::field_keys()` walk per parsed event (one `String` allocation per leaf path, depth-capped at 64; flat formats like `KvEvent` return `Cow::Borrowed`) plus a short `std::sync::Mutex` lock to update counters. Memory is bounded by `--observe-fields-max-keys` (10k default ≈ a few hundred KB; keys stored as `Arc` so snapshots refcount-bump rather than copy). + +**Offline coverage report.** `rsigma engine eval` mirrors the daemon's field-observability surface with three new flags: `--observe-fields` enables observation; `--observe-fields-max-keys ` (default 10000, validated as `NonZeroUsize` so 0 is rejected at parse time); `--observe-fields-report ` writes the JSON report to a file (defaults to stderr if omitted so detections on stdout stay machine-consumable; clap-`requires` `--observe-fields` so the typo case fails fast). The report has the same shape as `GET /api/v1/fields`, so the same `jq` queries work against either runtime. To make this possible without coupling `engine eval` to the `daemon` Cargo feature, `FieldObserver` lives in `rsigma-eval` (which every consumer already links) and uses `std::sync::Mutex` to keep `rsigma-eval` dependency-light. `rsigma-runtime` keeps a `pub use rsigma_eval::{FieldObserver, FieldObservation, FieldObservationEntry, FieldCoverage}` re-export so existing imports continue to compile unchanged. + +**Docs.** Endpoint reference under "Field observability" in `docs/reference/http-api.md`; flag rows in `docs/cli/engine/daemon.md` and `docs/cli/engine/eval.md`; metric rows in `docs/reference/metrics.md`; combined daemon/eval workflow in `docs/guide/observability.md`. + ### Server-side TLS for the daemon API listener (#128) The `engine daemon` API listener now terminates TLS in-process for every protocol that already shares `--api-addr`: the Axum HTTP REST API (`/healthz`, `/readyz`, `/metrics`, `/api/v1/*`), OTLP/HTTP on `POST /v1/logs`, and OTLP/gRPC via `LogsService/Export`. Operators can drop the sidecar reverse proxy they previously needed for confidentiality, integrity, and agent-to-daemon pinning. diff --git a/README.md b/README.md index d7a8e57c..9b06068e 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ For rule quality and editor integration, a built-in linter validates rules again * **Post-evaluation enrichment:** Inject contextual data (asset info, IP reputation, identity, GeoIP, runbook URLs, ...) into detection and correlation results via four primitives (`template`, `lookup`, `http`, `command`) with kind-aware template namespaces, response cache, scope filtering, and hot-reload * **Rule conversion:** Convert rules into backend-native query strings via a pluggable backend trait (PostgreSQL/TimescaleDB SQL, LynxDB) * **Eval prefilters:** Use optional prefilters for large rule sets, including a bloom filter for substring matchers (`--bloom-prefilter`) and cross-rule Aho-Corasick index for whole-rule pruning (`--cross-rule-ac`, requires `daachorse-index` feature) +* **Field observability:** Opt-in `--observe-fields` mode on both `engine daemon` (live, exposed over `GET /api/v1/fields*` with Prometheus counters) and `engine eval` (one-shot JSON report at end-of-run, ideal for CI gap analysis) surfaces which event fields no rule references (gap signal) and which rule fields have never appeared in an event (broken-coverage signal); same JSON shape across runtimes * **TLS termination:** Use in-process TLS termination for the daemon API listener (HTTP REST, `/metrics`, OTLP/HTTP, OTLP/gRPC) with optional mutual TLS, `aws-lc-rs` crypto, and cross-platform certificate hot-reload * **NATS JetStream:** Use NATS JetStream support with authentication (credentials, mTLS), replay, consumer groups, and dead-letter queues * **OTLP ingestion:** Use OTLP support for any OpenTelemetry-compatible agent (Grafana Alloy, Vector, Fluent Bit, OTel Collector) via HTTP or gRPC diff --git a/crates/rsigma-cli/README.md b/crates/rsigma-cli/README.md index a920803c..59f41c90 100644 --- a/crates/rsigma-cli/README.md +++ b/crates/rsigma-cli/README.md @@ -173,6 +173,8 @@ Unlike `engine eval`, the daemon stays alive after stdin reaches EOF and support | `--bloom-prefilter` | flag | `false` | Enable bloom-filter pre-filtering of positive substring matchers (workload-dependent; see `crates/rsigma-eval/README.md`) | | `--bloom-max-bytes` | integer | **1048576** | Memory budget for the bloom index (no effect without `--bloom-prefilter`) | | `--cross-rule-ac` | flag | `false` | Enable cross-rule Aho-Corasick pre-filter (requires `--features daachorse-index`; see `crates/rsigma-eval/README.md`) | +| `--observe-fields` | flag | `false` | Record the field keys of every event evaluated by the engine so `/api/v1/fields*` can report gap and broken-coverage signals. Off by default; when off the engine task does not iterate event fields at all | +| `--observe-fields-max-keys` | integer | **10000** | Hard ceiling on distinct field names tracked by the observer. Overflow drops are counted via `rsigma_fields_observer_overflow_dropped_total`. No effect without `--observe-fields` | | `--buffer-size` | integer | **10000** | Bounded channel capacity for source-to-engine and engine-to-sink queues | | `--batch-size` | integer | **1** | Maximum events per engine lock acquisition (reduces mutex overhead under load) | | `--drain-timeout` | integer | **5** | Seconds to wait for in-flight events to drain on shutdown | @@ -311,6 +313,10 @@ rsigma engine daemon \ | `/api/v1/sources` | GET | List dynamic sources and their resolution status | | `/api/v1/sources/resolve` | POST | Trigger re-resolution of all dynamic sources (or specific ones via request body) | | `/api/v1/sources/cache/{source_id}` | DELETE | Invalidate the cached value for a specific source | +| `/api/v1/fields` | GET | Combined snapshot with summary, unknown (gap signal), and missing (broken coverage). Returns 503 unless `--observe-fields` is set. Paginated via `?limit=&offset=` | +| `/api/v1/fields/unknown` | GET | Event fields no rule references, sorted by descending count. Requires `--observe-fields`. Paginated | +| `/api/v1/fields/missing` | GET | Rule fields never observed in events, with sample rule titles. Requires `--observe-fields`. Paginated | +| `/api/v1/fields/observer` | DELETE | Clear the observer's counters and return `{previous_keys, previous_events}`. Requires `--observe-fields` | | `/v1/logs` | POST | OTLP log ingestion (`application/x-protobuf` or `application/json`, gzip supported). Requires `daemon-otlp` feature | **OTLP log ingestion** (requires `daemon-otlp` feature): @@ -441,6 +447,9 @@ Evaluate JSON events against Sigma detection and correlation rules. | `--bloom-prefilter` | flag | `false` | Enable bloom-filter pre-filtering of positive substring matchers (see `crates/rsigma-eval/README.md` for the trade-off) | | `--bloom-max-bytes` | integer | **1048576** | Memory budget for the bloom index (no effect without `--bloom-prefilter`) | | `--cross-rule-ac` | flag | `false` | Enable cross-rule Aho-Corasick pre-filter (requires `--features daachorse-index`; see `crates/rsigma-eval/README.md`) | +| `--observe-fields` | flag | `false` | Record the field keys of every evaluated event and emit a coverage report at end-of-run (gap signal + broken-coverage signal). Same JSON shape as the daemon's `GET /api/v1/fields` endpoint | +| `--observe-fields-max-keys` | integer | **10000** | Hard ceiling on distinct field names tracked. Overflow is counted via `overflow_dropped` in the report. No effect without `--observe-fields` | +| `--observe-fields-report` | path | none | Path to write the report. Defaults to stderr when omitted so detections on stdout stay machine-consumable. No effect without `--observe-fields` | \* Feature-gated: `logfmt` requires the `logfmt` feature, `cef` requires the `cef` feature, `evtx` requires the `evtx` feature. diff --git a/crates/rsigma-cli/src/commands/daemon.rs b/crates/rsigma-cli/src/commands/daemon.rs index e8aa7552..981cef6a 100644 --- a/crates/rsigma-cli/src/commands/daemon.rs +++ b/crates/rsigma-cli/src/commands/daemon.rs @@ -227,6 +227,30 @@ pub(crate) struct DaemonArgs { #[arg(long = "bloom-max-bytes")] pub bloom_max_bytes: Option, + /// Enable opt-in observation of every event's field keys so the + /// daemon can answer two coverage questions over its admin API: + /// which fields appear in events but are never referenced by any + /// loaded rule (gap signal), and which fields are referenced by + /// rules but have never appeared in an event (broken coverage). + /// + /// Off by default. When set, an in-memory counter records the field + /// keys of every event evaluated by the engine task; the counter is + /// hard-capped by `--observe-fields-max-keys` and surfaced via the + /// `/api/v1/fields`, `/api/v1/fields/unknown`, and + /// `/api/v1/fields/missing` endpoints (plus + /// `DELETE /api/v1/fields/observer` to reset). + #[arg(long = "observe-fields")] + pub observe_fields: bool, + + /// Hard ceiling on the number of distinct field names tracked by + /// the field observer. Once the ceiling is reached, new keys are + /// dropped (and counted via + /// `rsigma_fields_observer_overflow_dropped_total`); existing keys + /// keep incrementing. Default: 10000. Has no effect unless + /// `--observe-fields` is set. + #[arg(long = "observe-fields-max-keys", default_value_t = 10_000)] + pub observe_fields_max_keys: usize, + /// Enable the cross-rule Aho-Corasick pre-filter (daachorse-index). /// /// Off by default. When enabled, the engine builds a single @@ -408,6 +432,8 @@ pub(crate) fn cmd_daemon(args: DaemonArgs) { allow_remote_include, bloom_prefilter, bloom_max_bytes, + observe_fields, + observe_fields_max_keys, #[cfg(feature = "daachorse-index")] cross_rule_ac, enrichers, @@ -507,6 +533,8 @@ pub(crate) fn cmd_daemon(args: DaemonArgs) { allow_remote_include, bloom_prefilter, bloom_max_bytes, + observe_fields, + observe_fields_max_keys, #[cfg(feature = "daachorse-index")] cross_rule_ac, enrichers, @@ -560,6 +588,8 @@ fn run_daemon( allow_remote_include: bool, bloom_prefilter: bool, bloom_max_bytes: Option, + observe_fields: bool, + observe_fields_max_keys: usize, #[cfg(feature = "daachorse-index")] cross_rule_ac: bool, enrichers_path: Option, source_paths: Vec, @@ -685,6 +715,8 @@ fn run_daemon( allow_remote_include, bloom_prefilter, bloom_max_bytes, + observe_fields, + observe_fields_max_keys, #[cfg(feature = "daachorse-index")] cross_rule_ac, enrichers_path, diff --git a/crates/rsigma-cli/src/commands/eval.rs b/crates/rsigma-cli/src/commands/eval.rs index 6f2c76d8..df3a2163 100644 --- a/crates/rsigma-cli/src/commands/eval.rs +++ b/crates/rsigma-cli/src/commands/eval.rs @@ -1,10 +1,13 @@ use std::fs::File; -use std::io::{self, BufRead, BufReader}; -use std::path::PathBuf; +use std::io::{self, BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; use std::process; +use std::sync::Arc; use clap::Args; -use rsigma_eval::{CorrelationEngine, Engine, JsonEvent, Pipeline}; +use rsigma_eval::{ + CorrelationEngine, Engine, Event, FieldObserver, JsonEvent, Pipeline, RuleFieldSet, +}; use rsigma_parser::SigmaCollection; use crate::EventFilter; @@ -116,6 +119,44 @@ pub(crate) struct EvalArgs { #[cfg(feature = "daachorse-index")] #[arg(long = "cross-rule-ac")] pub cross_rule_ac: bool, + + /// Record the field keys of every evaluated event and emit a + /// coverage report at the end of the run. + /// + /// The report joins observed event fields against the field names + /// referenced by the loaded rules and surfaces two halves of + /// detection coverage: + /// + /// - **gap signal:** event fields no rule references. + /// - **broken-coverage signal:** rule fields that never appeared + /// in an event during the run. + /// + /// Off by default. Same JSON shape as the daemon's + /// `GET /api/v1/fields` endpoint so the same `jq` query works + /// against either runtime (e.g. for a CI gate). + #[arg(long = "observe-fields")] + pub observe_fields: bool, + + /// Hard ceiling on the number of distinct field names tracked. + /// Once the ceiling is reached, new keys are dropped (and counted + /// via `overflow_dropped` in the report); existing keys keep + /// incrementing. Default: 10000. Has no effect unless + /// `--observe-fields` is set. + #[arg( + long = "observe-fields-max-keys", + default_value_t = std::num::NonZeroUsize::new(10_000).unwrap(), + )] + pub observe_fields_max_keys: std::num::NonZeroUsize, + + /// Path to write the field-observation JSON report to. When + /// omitted (and `--observe-fields` is set) the report is written + /// to stderr so detections on stdout stay machine-consumable. + #[arg( + long = "observe-fields-report", + value_name = "PATH", + requires = "observe_fields" + )] + pub observe_fields_report: Option, } /// Resolved event source from the `--event` flag. @@ -179,6 +220,9 @@ pub(crate) fn cmd_eval(args: EvalArgs) -> bool { bloom_max_bytes, #[cfg(feature = "daachorse-index")] cross_rule_ac, + observe_fields, + observe_fields_max_keys, + observe_fields_report, } = args; let collection = crate::load_collection(&rules_path); @@ -208,7 +252,24 @@ pub(crate) fn cmd_eval(args: EvalArgs) -> bool { "wallclock", ); - if has_correlations { + // Field observability context, built once before evaluation and + // shared across the eval helpers. None unless `--observe-fields` + // is set. The rule field set is computed from the collection + + // pipelines so the report matches what the engine evaluates + // against; ownership of the collection is preserved because + // `add_collection` borrows it. + let observe_ctx: Option = if observe_fields { + Some(ObserveContext { + observer: Arc::new(FieldObserver::new(observe_fields_max_keys.get())), + rule_field_set: RuleFieldSet::collect(&collection, &pipelines, true), + report_path: observe_fields_report, + }) + } else { + None + }; + let observe_ref = observe_ctx.as_ref(); + + let had_matches = if has_correlations { cmd_eval_with_correlations( collection, &rules_path, @@ -224,6 +285,7 @@ pub(crate) fn cmd_eval(args: EvalArgs) -> bool { bloom_max_bytes, #[cfg(feature = "daachorse-index")] cross_rule_ac, + observe_ref, ) } else { cmd_eval_detection_only( @@ -240,8 +302,15 @@ pub(crate) fn cmd_eval(args: EvalArgs) -> bool { bloom_max_bytes, #[cfg(feature = "daachorse-index")] cross_rule_ac, + observe_ref, ) + }; + + if let Some(ctx) = observe_ref { + render_field_report(ctx); } + + had_matches } /// Evaluation with correlations (stateful). Returns `true` if any match fired. @@ -260,6 +329,7 @@ fn cmd_eval_with_correlations( bloom_prefilter: bool, bloom_max_bytes: Option, #[cfg(feature = "daachorse-index")] cross_rule_ac: bool, + observe: Option<&ObserveContext>, ) -> bool { let mut engine = CorrelationEngine::new(config); engine.set_include_event(include_event); @@ -303,6 +373,7 @@ fn cmd_eval_with_correlations( let mut had_matches = false; for payload in crate::apply_event_filter(&value, event_filter) { let event = JsonEvent::borrow(&payload); + observe_event(observe, &event); let result = engine.process_event(&event); if result.is_empty() { @@ -329,6 +400,7 @@ fn cmd_eval_with_correlations( pretty, input_format_str, syslog_tz_str, + observe, ); eprintln!( "Processed {line_num} events, {det_count} detection matches, {corr_count} correlation matches." @@ -338,7 +410,7 @@ fn cmd_eval_with_correlations( #[cfg(feature = "evtx")] EventSource::EvtxFile(path) => { let (det_count, corr_count, rec_count) = - eval_evtx_corr(&mut engine, &path, event_filter, pretty); + eval_evtx_corr(&mut engine, &path, event_filter, pretty, observe); eprintln!( "Processed {rec_count} EVTX records, {det_count} detection matches, {corr_count} correlation matches." ); @@ -353,6 +425,7 @@ fn cmd_eval_with_correlations( pretty, input_format_str, syslog_tz_str, + observe, ); eprintln!( "Processed {line_num} events, {det_count} detection matches, {corr_count} correlation matches." @@ -372,6 +445,7 @@ fn eval_stream_corr( pretty: bool, input_format_str: &str, syslog_tz_str: &str, + observe: Option<&ObserveContext>, ) -> (u64, u64, u64) { let mut line_num = 0u64; let mut det_count = 0u64; @@ -407,6 +481,7 @@ fn eval_stream_corr( pretty, &mut det_count, &mut corr_count, + observe, ); } #[cfg(not(feature = "daemon"))] @@ -418,6 +493,7 @@ fn eval_stream_corr( pretty, &mut det_count, &mut corr_count, + observe, ); } } @@ -427,6 +503,7 @@ fn eval_stream_corr( /// Evaluate a single line through the correlation engine using format-aware parsing. #[cfg(feature = "daemon")] +#[allow(clippy::too_many_arguments)] fn eval_line_corr( engine: &mut CorrelationEngine, line: &str, @@ -435,15 +512,15 @@ fn eval_line_corr( pretty: bool, det_count: &mut u64, corr_count: &mut u64, + observe: Option<&ObserveContext>, ) { - use rsigma_eval::Event; - if let Some(decoded) = rsigma_runtime::parse_line(line, format) { // For JSON events, apply the event filter (which may produce multiple payloads). if matches!(decoded, rsigma_runtime::EventInputDecoded::Json(_)) { let json_value = decoded.to_json(); for payload in crate::apply_event_filter(&json_value, event_filter) { let event = JsonEvent::borrow(&payload); + observe_event(observe, &event); let result = engine.process_event(&event); for m in &result { if m.is_detection() { @@ -456,6 +533,7 @@ fn eval_line_corr( } } else { // Non-JSON events: evaluate directly (no event filter). + observe_event(observe, &decoded); let result = engine.process_event(&decoded); for m in &result { if m.is_detection() { @@ -471,6 +549,7 @@ fn eval_line_corr( /// Evaluate a single line through the correlation engine (JSON-only fallback). #[cfg(not(feature = "daemon"))] +#[allow(clippy::too_many_arguments)] fn eval_line_corr_json( engine: &mut CorrelationEngine, line: &str, @@ -478,6 +557,7 @@ fn eval_line_corr_json( pretty: bool, det_count: &mut u64, corr_count: &mut u64, + observe: Option<&ObserveContext>, ) { let value: serde_json::Value = match serde_json::from_str(line) { Ok(v) => v, @@ -489,6 +569,7 @@ fn eval_line_corr_json( for payload in crate::apply_event_filter(&value, event_filter) { let event = JsonEvent::borrow(&payload); + observe_event(observe, &event); let result = engine.process_event(&event); for m in &result { if m.is_detection() { @@ -516,6 +597,7 @@ fn cmd_eval_detection_only( bloom_prefilter: bool, bloom_max_bytes: Option, #[cfg(feature = "daachorse-index")] cross_rule_ac: bool, + observe: Option<&ObserveContext>, ) -> bool { let mut engine = Engine::new(); engine.set_include_event(include_event); @@ -562,6 +644,7 @@ fn cmd_eval_detection_only( } else { for payload in &payloads { let event = JsonEvent::borrow(payload); + observe_event(observe, &event); let matches = engine.evaluate(&event); if matches.is_empty() { @@ -589,13 +672,15 @@ fn cmd_eval_detection_only( pretty, input_format_str, syslog_tz_str, + observe, ); eprintln!("Processed {line_num} events, {match_count} matches."); match_count > 0 } #[cfg(feature = "evtx")] EventSource::EvtxFile(path) => { - let (match_count, rec_count) = eval_evtx_detect(&engine, &path, event_filter, pretty); + let (match_count, rec_count) = + eval_evtx_detect(&engine, &path, event_filter, pretty, observe); eprintln!("Processed {rec_count} EVTX records, {match_count} matches."); match_count > 0 } @@ -608,6 +693,7 @@ fn cmd_eval_detection_only( pretty, input_format_str, syslog_tz_str, + observe, ); eprintln!("Processed {line_num} events, {match_count} matches."); match_count > 0 @@ -625,6 +711,7 @@ fn eval_stream_detect( pretty: bool, input_format_str: &str, syslog_tz_str: &str, + observe: Option<&ObserveContext>, ) -> (u64, u64) { let mut line_num = 0u64; let mut match_count = 0u64; @@ -657,11 +744,19 @@ fn eval_stream_detect( event_filter, pretty, &mut match_count, + observe, ); } #[cfg(not(feature = "daemon"))] { - eval_line_detect_json(engine, &line, event_filter, pretty, &mut match_count); + eval_line_detect_json( + engine, + &line, + event_filter, + pretty, + &mut match_count, + observe, + ); } } @@ -670,6 +765,7 @@ fn eval_stream_detect( /// Evaluate a single line through the detection engine using format-aware parsing. #[cfg(feature = "daemon")] +#[allow(clippy::too_many_arguments)] fn eval_line_detect( engine: &Engine, line: &str, @@ -677,20 +773,21 @@ fn eval_line_detect( event_filter: &EventFilter, pretty: bool, match_count: &mut u64, + observe: Option<&ObserveContext>, ) { - use rsigma_eval::Event; - if let Some(decoded) = rsigma_runtime::parse_line(line, format) { if matches!(decoded, rsigma_runtime::EventInputDecoded::Json(_)) { let json_value = decoded.to_json(); for payload in crate::apply_event_filter(&json_value, event_filter) { let event = JsonEvent::borrow(&payload); + observe_event(observe, &event); for m in &engine.evaluate(&event) { *match_count += 1; crate::print_json(m, pretty); } } } else { + observe_event(observe, &decoded); for m in &engine.evaluate(&decoded) { *match_count += 1; crate::print_json(m, pretty); @@ -707,6 +804,7 @@ fn eval_line_detect_json( event_filter: &EventFilter, pretty: bool, match_count: &mut u64, + observe: Option<&ObserveContext>, ) { let value: serde_json::Value = match serde_json::from_str(line) { Ok(v) => v, @@ -718,6 +816,7 @@ fn eval_line_detect_json( for payload in crate::apply_event_filter(&value, event_filter) { let event = JsonEvent::borrow(&payload); + observe_event(observe, &event); for m in &engine.evaluate(&event) { *match_count += 1; crate::print_json(m, pretty); @@ -737,6 +836,7 @@ fn eval_evtx_corr( path: &std::path::Path, event_filter: &EventFilter, pretty: bool, + observe: Option<&ObserveContext>, ) -> (u64, u64, u64) { let mut reader = rsigma_runtime::EvtxFileReader::open(path).unwrap_or_else(|e| { eprintln!("Error opening EVTX file '{}': {e}", path.display()); @@ -759,6 +859,7 @@ fn eval_evtx_corr( for payload in crate::apply_event_filter(&value, event_filter) { let event = JsonEvent::borrow(&payload); + observe_event(observe, &event); let result = engine.process_event(&event); for m in &result { if m.is_detection() { @@ -782,6 +883,7 @@ fn eval_evtx_detect( path: &std::path::Path, event_filter: &EventFilter, pretty: bool, + observe: Option<&ObserveContext>, ) -> (u64, u64) { let mut reader = rsigma_runtime::EvtxFileReader::open(path).unwrap_or_else(|e| { eprintln!("Error opening EVTX file '{}': {e}", path.display()); @@ -803,6 +905,7 @@ fn eval_evtx_detect( for payload in crate::apply_event_filter(&value, event_filter) { let event = JsonEvent::borrow(&payload); + observe_event(observe, &event); for m in &engine.evaluate(&event) { match_count += 1; crate::print_json(m, pretty); @@ -812,3 +915,119 @@ fn eval_evtx_detect( (match_count, rec_count) } + +// --------------------------------------------------------------------------- +// Field observability for `engine eval` +// --------------------------------------------------------------------------- + +/// Shared context for the eval-time field observer. Built once before +/// the event loop and threaded through the helpers via +/// `Option<&ObserveContext>` so the no-op (`None`) case stays a single +/// pointer-comparison branch. +struct ObserveContext { + observer: Arc, + rule_field_set: RuleFieldSet, + /// When `None`, the report is written to stderr at end-of-run. + /// When `Some(path)`, it is written to that file (created or + /// truncated). Stdout is intentionally not a destination: the + /// detection NDJSON stream lives there. + report_path: Option, +} + +/// Observe one event if the context is set; no-op otherwise. Inlined +/// to keep the disabled path a single null pointer check. +#[inline] +fn observe_event(ctx: Option<&ObserveContext>, event: &E) { + if let Some(ctx) = ctx { + ctx.observer.observe(event); + } +} + +/// Maximum number of rule titles surfaced per missing-field entry in +/// the eval report (matches the daemon's `/api/v1/fields/missing` +/// behaviour). A `truncated: true` flag accompanies any field that +/// touches more rules than this cap. +const EVAL_MISSING_RULE_TITLES_CAP: usize = 10; + +/// Render the end-of-run field coverage report and write it to the +/// configured destination (file or stderr). The JSON shape matches +/// the daemon's `GET /api/v1/fields` payload so CI pipelines can +/// share a single `jq` query across runtimes. +fn render_field_report(ctx: &ObserveContext) { + let snapshot = ctx.observer.snapshot(); + let coverage = snapshot.coverage(&ctx.rule_field_set); + + let unknown_entries: Vec = coverage + .unknown + .iter() + .map(|e| { + let field: &str = &e.field; + serde_json::json!({ "field": field, "count": e.count }) + }) + .collect(); + let missing_entries: Vec = coverage + .missing + .iter() + .map(|(name, origin)| { + let total = origin.rule_titles.len(); + let truncated = total > EVAL_MISSING_RULE_TITLES_CAP; + let rule_titles: Vec<&str> = origin + .rule_titles + .iter() + .map(String::as_str) + .take(EVAL_MISSING_RULE_TITLES_CAP) + .collect(); + let sources: Vec<&str> = origin.sources.iter().map(|s| s.as_str()).collect(); + serde_json::json!({ + "field": name, + "rule_count": total, + "sources": sources, + "rule_titles": rule_titles, + "truncated": truncated, + }) + }) + .collect(); + + let report = serde_json::json!({ + "summary": { + "events_observed": snapshot.events_observed, + "unique_keys_observed": snapshot.unique_keys, + "rule_fields_loaded": ctx.rule_field_set.len(), + "overflow_dropped": snapshot.overflow_dropped, + "max_keys": snapshot.max_keys, + "uptime_seconds": snapshot.uptime_seconds, + "intersection_count": coverage.intersection_count, + "unknown_count": unknown_entries.len(), + "missing_count": missing_entries.len(), + }, + "unknown": unknown_entries, + "missing": missing_entries, + }); + + let serialized = serde_json::to_string_pretty(&report).unwrap_or_else(|_| report.to_string()); + + match ctx.report_path.as_deref() { + Some(path) => { + if let Err(e) = write_report_to_file(path, &serialized) { + eprintln!( + "Failed to write field observation report to {}: {e}", + path.display() + ); + } + } + None => { + // Write to stderr so detections on stdout stay + // machine-consumable. A best-effort write: failures here + // do not change the exit code because the run already + // produced the NDJSON results the operator cares about. + let _ = writeln!(io::stderr(), "{serialized}"); + } + } +} + +fn write_report_to_file(path: &Path, serialized: &str) -> std::io::Result<()> { + let mut file = File::create(path)?; + file.write_all(serialized.as_bytes())?; + file.write_all(b"\n")?; + file.flush() +} diff --git a/crates/rsigma-cli/src/commands/fields.rs b/crates/rsigma-cli/src/commands/fields.rs index 7375c8d2..1cd2e17b 100644 --- a/crates/rsigma-cli/src/commands/fields.rs +++ b/crates/rsigma-cli/src/commands/fields.rs @@ -1,18 +1,10 @@ -use std::collections::{BTreeMap, BTreeSet}; use std::path::PathBuf; use clap::Args; -use rsigma_eval::{Pipeline, apply_pipelines}; -use rsigma_parser::{ - CorrelationCondition, CorrelationRule, Detection, DetectionItem, Detections, FilterRule, - SigmaCollection, SigmaRule, -}; +use rsigma_eval::{FieldOrigin, FieldSource, Pipeline, RuleFieldSet}; +use rsigma_parser::SigmaCollection; use serde::Serialize; -// --------------------------------------------------------------------------- -// Public entry point -// --------------------------------------------------------------------------- - /// Arguments for `rsigma rule fields` (and the deprecated `rsigma fields`). #[derive(Args, Debug)] pub(crate) struct FieldsArgs { @@ -96,127 +88,6 @@ struct PipelineMapping { pipeline: String, } -// --------------------------------------------------------------------------- -// Field collection -// --------------------------------------------------------------------------- - -/// Tracks where a field was seen. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -enum FieldSource { - Detection, - Correlation, - Filter, - Metadata, -} - -impl FieldSource { - fn as_str(self) -> &'static str { - match self { - FieldSource::Detection => "detection", - FieldSource::Correlation => "correlation", - FieldSource::Filter => "filter", - FieldSource::Metadata => "metadata", - } - } -} - -struct FieldCollector { - /// field_name -> (set of rule titles that reference it, set of source types) - fields: BTreeMap, BTreeSet)>, -} - -impl FieldCollector { - fn new() -> Self { - Self { - fields: BTreeMap::new(), - } - } - - fn add(&mut self, field: &str, rule_title: &str, source: FieldSource) { - let entry = self - .fields - .entry(field.to_string()) - .or_insert_with(|| (BTreeSet::new(), BTreeSet::new())); - entry.0.insert(rule_title.to_string()); - entry.1.insert(source); - } - - fn collect_detection_items( - &mut self, - detection: &Detection, - rule_title: &str, - source: FieldSource, - ) { - match detection { - Detection::AllOf(items) => { - for item in items { - self.collect_item(item, rule_title, source); - } - } - Detection::AnyOf(subs) => { - for sub in subs { - self.collect_detection_items(sub, rule_title, source); - } - } - Detection::Keywords(_) => {} - } - } - - fn collect_item(&mut self, item: &DetectionItem, rule_title: &str, source: FieldSource) { - if let Some(ref name) = item.field.name { - self.add(name, rule_title, source); - } - } - - fn collect_detections( - &mut self, - detections: &Detections, - rule_title: &str, - source: FieldSource, - ) { - for det in detections.named.values() { - self.collect_detection_items(det, rule_title, source); - } - } - - fn collect_rule(&mut self, rule: &SigmaRule) { - self.collect_detections(&rule.detection, &rule.title, FieldSource::Detection); - for f in &rule.fields { - self.add(f, &rule.title, FieldSource::Metadata); - } - } - - fn collect_correlation(&mut self, corr: &CorrelationRule) { - for f in &corr.group_by { - self.add(f, &corr.title, FieldSource::Correlation); - } - if let CorrelationCondition::Threshold { - field: Some(ref fields), - .. - } = corr.condition - { - for f in fields { - self.add(f, &corr.title, FieldSource::Correlation); - } - } - for alias in &corr.aliases { - for mapped_field in alias.mapping.values() { - self.add(mapped_field, &corr.title, FieldSource::Correlation); - } - } - for f in &corr.fields { - self.add(f, &corr.title, FieldSource::Metadata); - } - } - - fn collect_filter(&mut self, filter: &FilterRule) { - self.collect_detections(&filter.detection, &filter.title, FieldSource::Filter); - for f in &filter.fields { - self.add(f, &filter.title, FieldSource::Metadata); - } - } -} - // --------------------------------------------------------------------------- // Pipeline mapping extraction // --------------------------------------------------------------------------- @@ -280,53 +151,32 @@ fn extract_pipeline_mappings(pipelines: &[Pipeline]) -> Vec { // Report building // --------------------------------------------------------------------------- +fn entry_from_origin(name: &str, origin: &FieldOrigin) -> FieldEntry { + let mut sources: Vec<&FieldSource> = origin.sources.iter().collect(); + sources.sort(); + FieldEntry { + field: name.to_string(), + rule_count: origin.rule_titles.len(), + sources: sources + .into_iter() + .map(|s| s.as_str().to_string()) + .collect(), + } +} + fn build_report( collection: &SigmaCollection, pipelines: &[Pipeline], no_filters: bool, ) -> FieldsReport { - let mut collector = FieldCollector::new(); + let set = RuleFieldSet::collect(collection, pipelines, !no_filters); - if pipelines.is_empty() { - for rule in &collection.rules { - collector.collect_rule(rule); - } - for corr in &collection.correlations { - collector.collect_correlation(corr); - } - } else { - for rule in &collection.rules { - let mut transformed = rule.clone(); - if let Err(e) = apply_pipelines(pipelines, &mut transformed) { - eprintln!("Warning: pipeline error for '{}': {e}", rule.title); - collector.collect_rule(rule); - continue; - } - collector.collect_rule(&transformed); - } - for corr in &collection.correlations { - collector.collect_correlation(corr); - } - } - - if !no_filters { - for filter in &collection.filters { - collector.collect_filter(filter); - } - } - - let pipeline_mappings = extract_pipeline_mappings(pipelines); - - let fields: Vec = collector - .fields - .into_iter() - .map(|(name, (rules, sources))| FieldEntry { - field: name, - rule_count: rules.len(), - sources: sources.iter().map(|s| s.as_str().to_string()).collect(), - }) + let fields: Vec = set + .iter() + .map(|(name, origin)| entry_from_origin(name, origin)) .collect(); + let pipeline_mappings = extract_pipeline_mappings(pipelines); let unique_fields = fields.len(); FieldsReport { diff --git a/crates/rsigma-cli/src/daemon/metrics.rs b/crates/rsigma-cli/src/daemon/metrics.rs index ae023be5..66f430ef 100644 --- a/crates/rsigma-cli/src/daemon/metrics.rs +++ b/crates/rsigma-cli/src/daemon/metrics.rs @@ -47,6 +47,9 @@ pub struct Metrics { pub tls_certificate_expiry_seconds: Gauge, #[cfg(feature = "daemon-tls")] pub tls_active_connections: std::sync::Arc, + pub fields_observed_total: IntCounter, + pub fields_observer_unique_keys: IntGauge, + pub fields_observer_overflow_dropped_total: IntCounter, } impl Metrics { @@ -391,6 +394,31 @@ impl Metrics { .unwrap(); } + let fields_observed_total = IntCounter::with_opts(Opts::new( + "rsigma_fields_observed_total", + "Total events scanned by the opt-in field observer (--observe-fields)", + )) + .unwrap(); + let fields_observer_unique_keys = IntGauge::with_opts(Opts::new( + "rsigma_fields_observer_unique_keys", + "Distinct field names currently tracked by the field observer", + )) + .unwrap(); + let fields_observer_overflow_dropped_total = IntCounter::with_opts(Opts::new( + "rsigma_fields_observer_overflow_dropped_total", + "New-key insert attempts dropped because the field observer was at capacity", + )) + .unwrap(); + registry + .register(Box::new(fields_observed_total.clone())) + .unwrap(); + registry + .register(Box::new(fields_observer_unique_keys.clone())) + .unwrap(); + registry + .register(Box::new(fields_observer_overflow_dropped_total.clone())) + .unwrap(); + Metrics { registry, events_processed, @@ -433,6 +461,39 @@ impl Metrics { tls_certificate_expiry_seconds, #[cfg(feature = "daemon-tls")] tls_active_connections, + fields_observed_total, + fields_observer_unique_keys, + fields_observer_overflow_dropped_total, + } + } + + /// Refresh the field-observer Prometheus gauges from a snapshot. + /// Called both by the periodic uptime tick path inside the + /// `/metrics` handler and by the dedicated `/api/v1/fields*` + /// endpoints so a scrape immediately after a reset reflects the + /// new state. + /// + /// Prometheus counters must be monotonic, so this bridges from the + /// observer's *lifetime* totals (which never reset) rather than from + /// `events_observed` / `overflow_dropped` (which reset on + /// `DELETE /api/v1/fields/observer`). Without that, a reset between + /// scrapes silently drops every event observed before the + /// lifetime-total bridge re-overtook the Prometheus counter's + /// last-known value. + pub fn update_field_observer_metrics(&self, snapshot: &rsigma_runtime::FieldObservation) { + self.fields_observer_unique_keys + .set(snapshot.unique_keys as i64); + let observed_now = snapshot.lifetime_events_observed; + let observed_prev = self.fields_observed_total.get(); + if observed_now > observed_prev { + self.fields_observed_total + .inc_by(observed_now - observed_prev); + } + let overflow_now = snapshot.lifetime_overflow_dropped; + let overflow_prev = self.fields_observer_overflow_dropped_total.get(); + if overflow_now > overflow_prev { + self.fields_observer_overflow_dropped_total + .inc_by(overflow_now - overflow_prev); } } diff --git a/crates/rsigma-cli/src/daemon/server.rs b/crates/rsigma-cli/src/daemon/server.rs index d4f80743..4fb75846 100644 --- a/crates/rsigma-cli/src/daemon/server.rs +++ b/crates/rsigma-cli/src/daemon/server.rs @@ -12,8 +12,8 @@ use axum::routing::{delete, get, post}; use axum::{Json, Router}; use rsigma_eval::{CorrelationConfig, Pipeline, ProcessResult}; use rsigma_runtime::{ - AckToken, EnrichmentPipeline, FileSink, InputFormat, LogProcessor, MetricsHook, RawEvent, - RuntimeEngine, Sink, StdinSource, StdoutSink, spawn_source, + AckToken, EnrichmentPipeline, FieldObserver, FileSink, InputFormat, LogProcessor, MetricsHook, + RawEvent, RuntimeEngine, Sink, StdinSource, StdoutSink, spawn_source, }; use serde::Serialize; use tokio::sync::mpsc; @@ -66,6 +66,10 @@ struct AppState { otlp_event_tx: mpsc::Sender, /// The daemon-wide source registry for API endpoints. source_registry: Arc, + /// Opt-in field observer. `Some` when the daemon was started with + /// `--observe-fields`; the engine task records observed field keys + /// and the `/api/v1/fields/*` handlers (Phase 4) consume snapshots. + field_observer: Option>, } #[derive(Clone)] @@ -101,6 +105,17 @@ pub struct DaemonConfig { /// Optional override for the bloom memory budget (bytes). `None` means /// the crate default (1 MB). pub bloom_max_bytes: Option, + /// Enable the opt-in field observer that counts every event's field + /// keys so the `/api/v1/fields/*` endpoints can surface gap and + /// broken-coverage signals. Off by default; when off the engine + /// task does not iterate `Event::field_keys` at all. + pub observe_fields: bool, + /// Hard ceiling on the number of distinct field names tracked by + /// the field observer. Once the ceiling is reached, new keys are + /// dropped (and counted via + /// `rsigma_fields_observer_overflow_dropped_total`); existing keys + /// keep incrementing. + pub observe_fields_max_keys: usize, /// Enable the cross-rule Aho-Corasick pre-filter. Off by default; /// benefit is workload-dependent (large rule sets with shared /// substring patterns). Available behind the `daachorse-index` @@ -402,6 +417,18 @@ pub async fn run_daemon(config: DaemonConfig) { #[cfg(feature = "daemon-otlp")] let otlp_event_tx = event_tx.clone(); + let field_observer = if config.observe_fields { + let observer = Arc::new(FieldObserver::new(config.observe_fields_max_keys)); + processor.set_field_observer(Some(observer.clone())); + tracing::info!( + max_keys = config.observe_fields_max_keys, + "Field observer enabled" + ); + Some(observer) + } else { + None + }; + let app_state = AppState { processor: processor.clone(), metrics: metrics.clone(), @@ -414,6 +441,7 @@ pub async fn run_daemon(config: DaemonConfig) { #[cfg(feature = "daemon-otlp")] otlp_event_tx, source_registry: Arc::new(config.source_registry.clone()), + field_observer: field_observer.clone(), }; let app = Router::new() @@ -433,7 +461,11 @@ pub async fn run_daemon(config: DaemonConfig) { .route( "/api/v1/sources/cache/{source_id}", delete(invalidate_source_cache), - ); + ) + .route("/api/v1/fields", get(fields_full)) + .route("/api/v1/fields/unknown", get(fields_unknown)) + .route("/api/v1/fields/missing", get(fields_missing)) + .route("/api/v1/fields/observer", delete(fields_observer_reset)); #[cfg(feature = "daemon-otlp")] let app = app.route("/v1/logs", post(otlp_http_logs)); @@ -1367,6 +1399,11 @@ async fn metrics_handler(State(state): State) -> impl IntoResponse { .uptime_seconds .set(state.start_time.elapsed().as_secs_f64()); + if let Some(observer) = state.field_observer.as_ref() { + let snapshot = observer.snapshot(); + state.metrics.update_field_observer_metrics(&snapshot); + } + ( [( axum::http::header::CONTENT_TYPE, @@ -1564,6 +1601,231 @@ async fn invalidate_source_cache( ) } +// --------------------------------------------------------------------------- +// Field observability handlers +// --------------------------------------------------------------------------- + +/// Default `?limit=` value for the paginated `/api/v1/fields/*` endpoints. +const FIELDS_DEFAULT_LIMIT: usize = 100; +/// Hard upper bound on `?limit=` to keep response payloads bounded even +/// when an operator asks for everything. +const FIELDS_MAX_LIMIT: usize = 1000; +/// Maximum number of rule titles surfaced per missing-field entry. The +/// API also reports `truncated: true` when a field carries more. +const MISSING_RULE_TITLES_CAP: usize = 10; + +#[derive(serde::Deserialize, Default)] +struct FieldsQuery { + limit: Option, + offset: Option, +} + +impl FieldsQuery { + fn limit(&self) -> usize { + self.limit + .unwrap_or(FIELDS_DEFAULT_LIMIT) + .min(FIELDS_MAX_LIMIT) + } + fn offset(&self) -> usize { + self.offset.unwrap_or(0) + } +} + +fn observation_disabled_response() -> Response { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(serde_json::json!({ + "error": "field observation disabled", + "hint": "restart the daemon with --observe-fields to enable /api/v1/fields/*", + })), + ) + .into_response() +} + +fn missing_field_payload(field: &str, origin: &rsigma_eval::FieldOrigin) -> serde_json::Value { + let mut rule_titles: Vec<&str> = origin.rule_titles.iter().map(String::as_str).collect(); + let total = rule_titles.len(); + let truncated = total > MISSING_RULE_TITLES_CAP; + rule_titles.truncate(MISSING_RULE_TITLES_CAP); + let sources: Vec<&str> = origin.sources.iter().map(|s| s.as_str()).collect(); + serde_json::json!({ + "field": field, + "rule_count": total, + "sources": sources, + "rule_titles": rule_titles, + "truncated": truncated, + }) +} + +/// Slice a window out of `items` by moving the page elements out of the +/// source `Vec` rather than cloning. Returns the page, the original +/// total (preserved before the move), and the next offset (if any). +/// +/// Consumes `items` because all four field endpoints construct the +/// list fresh from a snapshot and then discard the rest; cloning each +/// `serde_json::Value` just to throw the leftovers away wastes work +/// proportional to `total - limit`. +fn paginate(mut items: Vec, offset: usize, limit: usize) -> (Vec, usize, Option) { + let total = items.len(); + if offset >= total { + return (Vec::new(), total, None); + } + let end = offset.saturating_add(limit).min(total); + items.truncate(end); + let page: Vec = items.drain(offset..).collect(); + let next_offset = if end < total { Some(end) } else { None }; + (page, total, next_offset) +} + +async fn fields_full( + State(state): State, + axum::extract::Query(query): axum::extract::Query, +) -> Response { + let Some(observer) = state.field_observer.as_ref() else { + return observation_disabled_response(); + }; + let snapshot = observer.snapshot(); + state.metrics.update_field_observer_metrics(&snapshot); + + let rule_field_set = state.processor.rule_field_set(); + let coverage = snapshot.coverage(&rule_field_set); + + let unknown_entries: Vec = coverage + .unknown + .iter() + .map(|e| { + let field: &str = &e.field; + serde_json::json!({ "field": field, "count": e.count }) + }) + .collect(); + let missing_entries: Vec = coverage + .missing + .iter() + .map(|(name, origin)| missing_field_payload(name, origin)) + .collect(); + let intersection_count = coverage.intersection_count; + + let (unknown_page, unknown_total, unknown_next) = + paginate(unknown_entries, query.offset(), query.limit()); + let (missing_page, missing_total, missing_next) = + paginate(missing_entries, query.offset(), query.limit()); + + let body = serde_json::json!({ + "summary": { + "events_observed": snapshot.events_observed, + "unique_keys_observed": snapshot.unique_keys, + "rule_fields_loaded": rule_field_set.len(), + "overflow_dropped": snapshot.overflow_dropped, + "max_keys": snapshot.max_keys, + "uptime_seconds": snapshot.uptime_seconds, + "intersection_count": intersection_count, + "unknown_count": unknown_total, + "missing_count": missing_total, + }, + "unknown": { + "items": unknown_page, + "total": unknown_total, + "offset": query.offset(), + "limit": query.limit(), + "next_offset": unknown_next, + }, + "missing": { + "items": missing_page, + "total": missing_total, + "offset": query.offset(), + "limit": query.limit(), + "next_offset": missing_next, + }, + }); + + (StatusCode::OK, Json(body)).into_response() +} + +async fn fields_unknown( + State(state): State, + axum::extract::Query(query): axum::extract::Query, +) -> Response { + let Some(observer) = state.field_observer.as_ref() else { + return observation_disabled_response(); + }; + let snapshot = observer.snapshot(); + state.metrics.update_field_observer_metrics(&snapshot); + + let rule_field_set = state.processor.rule_field_set(); + + let coverage = snapshot.coverage(&rule_field_set); + let entries: Vec = coverage + .unknown + .iter() + .map(|e| { + let field: &str = &e.field; + serde_json::json!({ "field": field, "count": e.count }) + }) + .collect(); + let (page, total, next_offset) = paginate(entries, query.offset(), query.limit()); + ( + StatusCode::OK, + Json(serde_json::json!({ + "items": page, + "total": total, + "offset": query.offset(), + "limit": query.limit(), + "next_offset": next_offset, + })), + ) + .into_response() +} + +async fn fields_missing( + State(state): State, + axum::extract::Query(query): axum::extract::Query, +) -> Response { + let Some(observer) = state.field_observer.as_ref() else { + return observation_disabled_response(); + }; + let snapshot = observer.snapshot(); + state.metrics.update_field_observer_metrics(&snapshot); + + let rule_field_set = state.processor.rule_field_set(); + + let coverage = snapshot.coverage(&rule_field_set); + let entries: Vec = coverage + .missing + .iter() + .map(|(name, origin)| missing_field_payload(name, origin)) + .collect(); + let (page, total, next_offset) = paginate(entries, query.offset(), query.limit()); + ( + StatusCode::OK, + Json(serde_json::json!({ + "items": page, + "total": total, + "offset": query.offset(), + "limit": query.limit(), + "next_offset": next_offset, + })), + ) + .into_response() +} + +async fn fields_observer_reset(State(state): State) -> Response { + let Some(observer) = state.field_observer.as_ref() else { + return observation_disabled_response(); + }; + let (previous_keys, previous_events) = observer.reset(); + let snapshot = observer.snapshot(); + state.metrics.update_field_observer_metrics(&snapshot); + ( + StatusCode::OK, + Json(serde_json::json!({ + "status": "reset", + "previous_keys": previous_keys, + "previous_events": previous_events, + })), + ) + .into_response() +} + /// Accept events via HTTP POST for processing. /// Each non-empty line in the request body is parsed using the configured /// `--input-format` and forwarded to the engine. diff --git a/crates/rsigma-cli/tests/cli_daemon_fields_observer.rs b/crates/rsigma-cli/tests/cli_daemon_fields_observer.rs new file mode 100644 index 00000000..c5bbe98f --- /dev/null +++ b/crates/rsigma-cli/tests/cli_daemon_fields_observer.rs @@ -0,0 +1,236 @@ +//! E2E tests for the daemon's field-observability surface. +//! +//! Each test spawns `rsigma engine daemon --observe-fields ...` against a +//! tiny rule, posts a few events through `/api/v1/events`, and asserts +//! what the `/api/v1/fields/*` endpoints report. + +#![cfg(feature = "daemon")] + +mod common; + +use common::{DaemonProcess, http_delete, http_get, http_post, poll_until, temp_file}; +use serde_json::Value; +use std::time::Duration; + +const RULE: &str = r#" +title: Whoami Detector +id: 00000000-0000-0000-0000-000000000055 +status: test +logsource: + category: test + product: test +detection: + selection: + CommandLine|contains: "whoami" + condition: selection +level: high +"#; + +fn wait_for_events_observed(daemon: &DaemonProcess, expected: u64) -> Value { + poll_until(Duration::from_secs(5), || { + let (status, body) = http_get(&daemon.url("/api/v1/fields")); + if status != 200 { + return None; + } + let v: Value = serde_json::from_str(&body).ok()?; + let observed = v["summary"]["events_observed"].as_u64()?; + if observed >= expected { Some(v) } else { None } + }) + .expect("events not observed within 5s") +} + +#[test] +fn fields_endpoints_return_503_when_observer_disabled() { + let rule = temp_file(".yml", RULE); + let daemon = DaemonProcess::spawn_http(rule.path().to_str().unwrap()); + + for path in [ + "/api/v1/fields", + "/api/v1/fields/unknown", + "/api/v1/fields/missing", + ] { + let (status, body) = http_get(&daemon.url(path)); + assert_eq!(status, 503, "{path} should be 503 when observer disabled"); + let v: Value = serde_json::from_str(&body).unwrap(); + assert_eq!(v["error"], "field observation disabled"); + } + + let (status, _body) = http_delete(&daemon.url("/api/v1/fields/observer")); + assert_eq!(status, 503); +} + +#[test] +fn unknown_endpoint_lists_event_fields_no_rule_references() { + let rule = temp_file(".yml", RULE); + let daemon = + DaemonProcess::spawn_http_with_args(rule.path().to_str().unwrap(), &["--observe-fields"]); + + // Two events: one matching the rule, one with extra unknown fields. + let payload = "{\"CommandLine\":\"whoami\",\"User\":\"alice\"}\n\ + {\"CommandLine\":\"id\",\"src_ip\":\"10.0.0.1\",\"User\":\"bob\"}"; + let (status, _body) = http_post(&daemon.url("/api/v1/events"), payload); + assert_eq!(status, 200); + + let full = wait_for_events_observed(&daemon, 2); + assert_eq!(full["summary"]["events_observed"].as_u64().unwrap(), 2); + + let (status, body) = http_get(&daemon.url("/api/v1/fields/unknown")); + assert_eq!(status, 200); + let v: Value = serde_json::from_str(&body).unwrap(); + let names: Vec<&str> = v["items"] + .as_array() + .unwrap() + .iter() + .filter_map(|e| e["field"].as_str()) + .collect(); + // User and src_ip are observed but not referenced by the rule. CommandLine is referenced. + assert!(names.contains(&"User"), "User should be flagged unknown"); + assert!( + names.contains(&"src_ip"), + "src_ip should be flagged unknown" + ); + assert!( + !names.contains(&"CommandLine"), + "CommandLine is rule-referenced, must not appear in unknown" + ); +} + +#[test] +fn missing_endpoint_lists_rule_fields_never_observed() { + let rule = temp_file(".yml", RULE); + let daemon = + DaemonProcess::spawn_http_with_args(rule.path().to_str().unwrap(), &["--observe-fields"]); + + // Post an event that does NOT contain CommandLine, so the rule field + // is unobserved. + let (status, _body) = http_post(&daemon.url("/api/v1/events"), r#"{"User":"alice"}"#); + assert_eq!(status, 200); + wait_for_events_observed(&daemon, 1); + + let (status, body) = http_get(&daemon.url("/api/v1/fields/missing")); + assert_eq!(status, 200); + let v: Value = serde_json::from_str(&body).unwrap(); + let items = v["items"].as_array().unwrap(); + let cmd_entry = items + .iter() + .find(|e| e["field"] == "CommandLine") + .expect("CommandLine should be flagged missing"); + assert!(cmd_entry["rule_count"].as_u64().unwrap() >= 1); + let sources: Vec<&str> = cmd_entry["sources"] + .as_array() + .unwrap() + .iter() + .filter_map(|s| s.as_str()) + .collect(); + assert!(sources.contains(&"detection")); +} + +#[test] +fn full_endpoint_reports_summary_unknown_and_missing() { + let rule = temp_file(".yml", RULE); + let daemon = + DaemonProcess::spawn_http_with_args(rule.path().to_str().unwrap(), &["--observe-fields"]); + + let payload = r#"{"CommandLine":"whoami","User":"alice"}"#; + let (status, _body) = http_post(&daemon.url("/api/v1/events"), payload); + assert_eq!(status, 200); + let v = wait_for_events_observed(&daemon, 1); + + let summary = &v["summary"]; + assert_eq!(summary["events_observed"].as_u64().unwrap(), 1); + assert!(summary["unique_keys_observed"].as_u64().unwrap() >= 2); + assert_eq!(summary["overflow_dropped"].as_u64().unwrap(), 0); + assert!(summary["rule_fields_loaded"].as_u64().unwrap() >= 1); + // CommandLine is in both the rule and the event -> intersection_count > 0. + assert!(summary["intersection_count"].as_u64().unwrap() >= 1); + + assert!(v["unknown"]["items"].is_array()); + assert!(v["missing"]["items"].is_array()); +} + +#[test] +fn delete_observer_resets_counters() { + let rule = temp_file(".yml", RULE); + let daemon = + DaemonProcess::spawn_http_with_args(rule.path().to_str().unwrap(), &["--observe-fields"]); + + let (status, _body) = http_post( + &daemon.url("/api/v1/events"), + r#"{"CommandLine":"whoami","User":"alice"}"#, + ); + assert_eq!(status, 200); + wait_for_events_observed(&daemon, 1); + + let (status, body) = http_delete(&daemon.url("/api/v1/fields/observer")); + assert_eq!(status, 200); + let v: Value = serde_json::from_str(&body).unwrap(); + assert_eq!(v["status"], "reset"); + assert!(v["previous_events"].as_u64().unwrap() >= 1); + assert!(v["previous_keys"].as_u64().unwrap() >= 2); + + // Right after reset the observer should report a clean slate. + let (status, body) = http_get(&daemon.url("/api/v1/fields")); + assert_eq!(status, 200); + let v: Value = serde_json::from_str(&body).unwrap(); + assert_eq!(v["summary"]["events_observed"].as_u64().unwrap(), 0); + assert_eq!(v["summary"]["unique_keys_observed"].as_u64().unwrap(), 0); + assert_eq!(v["summary"]["overflow_dropped"].as_u64().unwrap(), 0); +} + +#[test] +fn overflow_cap_drops_new_keys_after_capacity_reached() { + let rule = temp_file(".yml", RULE); + let daemon = DaemonProcess::spawn_http_with_args( + rule.path().to_str().unwrap(), + &["--observe-fields", "--observe-fields-max-keys", "2"], + ); + + let payload = r#"{"a":1,"b":2,"c":3,"d":4}"#; + let (status, _body) = http_post(&daemon.url("/api/v1/events"), payload); + assert_eq!(status, 200); + let v = wait_for_events_observed(&daemon, 1); + + let summary = &v["summary"]; + assert_eq!(summary["unique_keys_observed"].as_u64().unwrap(), 2); + assert_eq!(summary["overflow_dropped"].as_u64().unwrap(), 2); + assert_eq!(summary["max_keys"].as_u64().unwrap(), 2); +} + +#[test] +fn fields_unknown_pagination_returns_next_offset() { + let rule = temp_file(".yml", RULE); + let daemon = + DaemonProcess::spawn_http_with_args(rule.path().to_str().unwrap(), &["--observe-fields"]); + + // Five unknown fields plus the rule's CommandLine. + let payload = r#"{"CommandLine":"whoami","a":1,"b":2,"c":3,"d":4,"e":5}"#; + let (status, _body) = http_post(&daemon.url("/api/v1/events"), payload); + assert_eq!(status, 200); + wait_for_events_observed(&daemon, 1); + + let (status, body) = http_get(&daemon.url("/api/v1/fields/unknown?limit=2&offset=0")); + assert_eq!(status, 200); + let v: Value = serde_json::from_str(&body).unwrap(); + assert_eq!(v["items"].as_array().unwrap().len(), 2); + assert_eq!(v["limit"].as_u64().unwrap(), 2); + assert_eq!(v["offset"].as_u64().unwrap(), 0); + assert_eq!(v["total"].as_u64().unwrap(), 5); + assert_eq!(v["next_offset"].as_u64().unwrap(), 2); +} + +#[test] +fn metrics_includes_field_observer_counters_when_enabled() { + let rule = temp_file(".yml", RULE); + let daemon = + DaemonProcess::spawn_http_with_args(rule.path().to_str().unwrap(), &["--observe-fields"]); + + let (status, _body) = http_post(&daemon.url("/api/v1/events"), r#"{"CommandLine":"whoami"}"#); + assert_eq!(status, 200); + wait_for_events_observed(&daemon, 1); + + let (status, body) = http_get(&daemon.url("/metrics")); + assert_eq!(status, 200); + assert!(body.contains("rsigma_fields_observed_total")); + assert!(body.contains("rsigma_fields_observer_unique_keys")); + assert!(body.contains("rsigma_fields_observer_overflow_dropped_total")); +} diff --git a/crates/rsigma-cli/tests/cli_eval.rs b/crates/rsigma-cli/tests/cli_eval.rs index 735a6496..f816f47c 100644 --- a/crates/rsigma-cli/tests/cli_eval.rs +++ b/crates/rsigma-cli/tests/cli_eval.rs @@ -722,3 +722,189 @@ fn help_flag() { .success() .stdout(predicate::str::contains("Parse, validate, and evaluate")); } + +// --------------------------------------------------------------------------- +// eval subcommand — field observability (--observe-fields) +// --------------------------------------------------------------------------- + +/// Helper: run `engine eval` with `--observe-fields --observe-fields-report +/// ` against an inline event, and return the parsed report JSON. +fn run_eval_with_observation( + rule_yaml: &str, + event_json: &str, + extra_flags: &[&str], +) -> serde_json::Value { + let rule = temp_file(".yml", rule_yaml); + let report_file = tempfile::Builder::new().suffix(".json").tempfile().unwrap(); + let mut args: Vec<&str> = vec![ + "engine", + "eval", + "--rules", + rule.path().to_str().unwrap(), + "--event", + event_json, + "--observe-fields", + "--observe-fields-report", + report_file.path().to_str().unwrap(), + ]; + args.extend_from_slice(extra_flags); + rsigma().args(&args).assert().success(); + let body = std::fs::read_to_string(report_file.path()).unwrap(); + serde_json::from_str(&body).expect("report file should be valid JSON") +} + +#[test] +fn observe_fields_emits_full_report_to_file() { + let report = run_eval_with_observation( + SIMPLE_RULE, + r#"{"CommandLine":"malware","User":"alice","src_ip":"10.0.0.1"}"#, + &[], + ); + let summary = &report["summary"]; + assert_eq!(summary["events_observed"].as_u64().unwrap(), 1); + assert_eq!(summary["unique_keys_observed"].as_u64().unwrap(), 3); + assert_eq!(summary["overflow_dropped"].as_u64().unwrap(), 0); + assert!(summary["rule_fields_loaded"].as_u64().unwrap() >= 1); + assert_eq!(summary["intersection_count"].as_u64().unwrap(), 1); // CommandLine + assert_eq!(summary["unknown_count"].as_u64().unwrap(), 2); // User + src_ip +} + +#[test] +fn observe_fields_unknown_lists_event_fields_no_rule_references() { + let report = run_eval_with_observation( + SIMPLE_RULE, + r#"{"CommandLine":"malware","User":"alice","src_ip":"10.0.0.1"}"#, + &[], + ); + let names: Vec<&str> = report["unknown"] + .as_array() + .unwrap() + .iter() + .filter_map(|e| e["field"].as_str()) + .collect(); + assert!(names.contains(&"User")); + assert!(names.contains(&"src_ip")); + assert!(!names.contains(&"CommandLine")); +} + +#[test] +fn observe_fields_missing_lists_rule_fields_never_observed() { + // No CommandLine in the event => rule's CommandLine field is "missing". + let report = run_eval_with_observation(SIMPLE_RULE, r#"{"User":"alice"}"#, &[]); + let items = report["missing"].as_array().unwrap(); + let cmd_entry = items + .iter() + .find(|e| e["field"] == "CommandLine") + .expect("CommandLine should be flagged missing"); + assert!(cmd_entry["rule_count"].as_u64().unwrap() >= 1); + let sources: Vec<&str> = cmd_entry["sources"] + .as_array() + .unwrap() + .iter() + .filter_map(|s| s.as_str()) + .collect(); + assert!(sources.contains(&"detection")); +} + +#[test] +fn observe_fields_max_keys_caps_distinct_fields() { + let report = run_eval_with_observation( + SIMPLE_RULE, + r#"{"a":1,"b":2,"c":3,"d":4,"e":5}"#, + &["--observe-fields-max-keys", "2"], + ); + let summary = &report["summary"]; + assert_eq!(summary["unique_keys_observed"].as_u64().unwrap(), 2); + assert_eq!(summary["overflow_dropped"].as_u64().unwrap(), 3); + assert_eq!(summary["max_keys"].as_u64().unwrap(), 2); +} + +#[test] +fn observe_fields_writes_to_stderr_when_no_report_path() { + let rule = temp_file(".yml", SIMPLE_RULE); + let output = rsigma() + .args([ + "engine", + "eval", + "--rules", + rule.path().to_str().unwrap(), + "--event", + r#"{"CommandLine":"malware","extra":"hello"}"#, + "--observe-fields", + ]) + .output() + .unwrap(); + assert!(output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + // The report is JSON on stderr; check for stable summary fields. + assert!(stderr.contains("\"events_observed\"")); + assert!(stderr.contains("\"unknown\"")); + assert!(stderr.contains("\"missing\"")); + // And detections stay on stdout (rule fires on the matching event). + let stdout = String::from_utf8_lossy(&output.stdout); + assert!(stdout.contains("Test Rule")); +} + +#[test] +fn observe_fields_report_without_observe_flag_is_rejected() { + // clap `requires` should refuse `--observe-fields-report` when + // `--observe-fields` is not also supplied, so a typo at the CLI + // surface fails fast instead of silently producing no report. + let rule = temp_file(".yml", SIMPLE_RULE); + let report_file = tempfile::Builder::new().suffix(".json").tempfile().unwrap(); + rsigma() + .args([ + "engine", + "eval", + "--rules", + rule.path().to_str().unwrap(), + "--event", + r#"{"CommandLine":"malware"}"#, + "--observe-fields-report", + report_file.path().to_str().unwrap(), + ]) + .assert() + .failure() + .stderr(predicate::str::contains("--observe-fields")); +} + +#[test] +fn observe_fields_max_keys_zero_is_rejected() { + // NonZeroUsize value parser refuses 0; otherwise every observation + // would count as overflow with no useful tracking. + let rule = temp_file(".yml", SIMPLE_RULE); + rsigma() + .args([ + "engine", + "eval", + "--rules", + rule.path().to_str().unwrap(), + "--event", + r#"{"CommandLine":"malware"}"#, + "--observe-fields", + "--observe-fields-max-keys", + "0", + ]) + .assert() + .failure(); +} + +#[test] +fn observe_fields_off_by_default_emits_no_report() { + let rule = temp_file(".yml", SIMPLE_RULE); + let output = rsigma() + .args([ + "engine", + "eval", + "--rules", + rule.path().to_str().unwrap(), + "--event", + r#"{"CommandLine":"malware","extra":"hello"}"#, + ]) + .output() + .unwrap(); + assert!(output.status.success()); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(!stderr.contains("\"events_observed\"")); + assert!(!stderr.contains("\"unknown\"")); +} diff --git a/crates/rsigma-cli/tests/common/mod.rs b/crates/rsigma-cli/tests/common/mod.rs index a236849e..eac85652 100644 --- a/crates/rsigma-cli/tests/common/mod.rs +++ b/crates/rsigma-cli/tests/common/mod.rs @@ -179,6 +179,25 @@ impl DaemonProcess { ]) } + /// Spawn the daemon in HTTP-input mode with extra CLI flags appended + /// after the standard scaffolding (`-r`, `--input http`, + /// `--api-addr 127.0.0.1:0`). Useful for opt-in flags like + /// `--observe-fields` that integration tests need to exercise. + pub fn spawn_http_with_args(rule_path: &str, extra_args: &[&str]) -> Self { + let mut args = vec![ + "engine", + "daemon", + "-r", + rule_path, + "--input", + "http", + "--api-addr", + "127.0.0.1:0", + ]; + args.extend_from_slice(extra_args); + Self::spawn(&args) + } + pub fn url(&self, path: &str) -> String { format!("http://{}{path}", self.api_addr) } @@ -279,10 +298,15 @@ fn rewrite_wildcard_to_loopback(addr: String) -> String { // HTTP and polling helpers // --------------------------------------------------------------------------- -/// GET `url`, panicking on transport errors. Returns the status code and -/// body. Status codes other than 2xx are returned, not panicked. +/// GET `url`. Returns (status, body) for any HTTP response code +/// (including 4xx/5xx with their JSON error bodies). Panics on transport +/// errors only. pub fn http_get(url: &str) -> (u16, String) { - let resp = ureq::get(url).call().expect("HTTP GET failed"); + let agent: ureq::Agent = ureq::Agent::config_builder() + .http_status_as_error(false) + .build() + .into(); + let resp = agent.get(url).call().expect("HTTP GET failed"); let status = resp.status().as_u16(); let body = resp.into_body().read_to_string().unwrap(); (status, body) @@ -302,6 +326,20 @@ pub fn http_post(url: &str, body: &str) -> (u16, String) { } } +/// DELETE `url`. Returns (status, body) for any HTTP response code +/// (including 4xx/5xx with their JSON error bodies). Panics on transport +/// errors only. +pub fn http_delete(url: &str) -> (u16, String) { + let agent: ureq::Agent = ureq::Agent::config_builder() + .http_status_as_error(false) + .build() + .into(); + let resp = agent.delete(url).call().expect("HTTP DELETE failed"); + let status = resp.status().as_u16(); + let body = resp.into_body().read_to_string().unwrap(); + (status, body) +} + /// Poll `check` every 50ms until it returns `Some(value)` or `deadline` /// elapses. Use this in place of fixed sleeps when you want to wait for a /// specific observable condition. diff --git a/crates/rsigma-eval/README.md b/crates/rsigma-eval/README.md index 5bade806..3996b057 100644 --- a/crates/rsigma-eval/README.md +++ b/crates/rsigma-eval/README.md @@ -79,6 +79,29 @@ This library is part of [rsigma]. | `apply_pipelines_with_state(pipelines, rule)` | Apply pipelines and return the merged `PipelineState` (for backends) | | `merge_pipelines(pipelines)` | Merge multiple pipelines into one (sorted by priority) | +### Rule field extraction (`fields` module) + +Shared between `rsigma rule fields` and the field-observability surfaces in both `engine eval` and `engine daemon`, so the offline view matches what the engine references at runtime. + +| Type / function | Description | +|-----------------|-------------| +| `RuleFieldSet::collect(collection, pipelines, include_filters)` | Walk a `SigmaCollection` (after optional pipeline transformations) and return every field name referenced by detection items, correlation `group-by`/threshold/alias fields, filter detections, and rule-level `fields:` metadata | +| `RuleFieldSet::contains(name)` / `origin(name)` / `iter()` / `names()` / `len()` | Query the resulting set; entries are sorted lexicographically | +| `FieldOrigin { rule_titles, sources }` | Per-field provenance: which rules touched it and from which kind | +| `FieldSource::{Detection, Correlation, Filter, Metadata}` | Tagged source kind, with `as_str()` for stable JSON serialization | + +### Field observability (`field_observer` module) + +Opt-in counter that records every observed field name and surfaces gap and broken-coverage signals. Consumed by both `engine eval` (one-shot report at end-of-run) and `engine daemon` (live `/api/v1/fields*` endpoints). + +| Type / function | Description | +|-----------------|-------------| +| `FieldObserver::new(max_keys)` | Construct an observer with a hard cap on distinct keys (overflow drops are counted, existing keys keep updating) | +| `FieldObserver::observe(event: &impl Event)` | Walk `event.field_keys()` and bump per-field counters; `&self` so callers can share an `Arc` | +| `FieldObserver::snapshot()` | `FieldObservation { entries, events_observed, unique_keys, overflow_dropped, lifetime_events_observed, lifetime_overflow_dropped, max_keys, uptime_seconds }`. Entries sorted by descending count then name. Keys held as `Arc` so the snapshot is refcount-cheap | +| `FieldObservation::coverage(&RuleFieldSet)` | Partition a snapshot into `FieldCoverage { unknown, intersection_count, missing }` against a rule field set in one pass. Both the daemon HTTP handlers and the eval report consume this, so the partition semantics cannot drift | +| `FieldObserver::reset()` | Clear counters; lifetime totals survive so Prometheus counter bridges stay monotonic | + ## Detection Engine - **Compiled matchers**: optimized matching for all 30 modifier combinations — exact, contains, startswith, endswith, regex, CIDR, numeric comparison, base64 offset (3 alignment variants), windash expansion (5 replacement characters), field references, placeholder expansion, timestamp part extraction @@ -144,6 +167,7 @@ The `Event` wrapper provides flexible field access over `serde_json::Value`: - **Dot-notation**: if no flat key matches and the path contains `.`, split and traverse nested objects. - **Array traversal**: arrays are searched with OR semantics (first matching element wins). - **Keyword detection**: `matches_keyword` searches all string values across all fields recursively. +- **Field enumeration**: `field_keys()` returns the leaf field paths in dot-notation (e.g. `actor.id`), with intermediate object names included so callers can inspect coverage at any nesting level. Used by the daemon's opt-in field observer; not on the detection hot path. Ships with a default impl that walks `to_json()`; `JsonEvent` overrides with a zero-copy recursive walk. - **Max nesting depth**: recursive traversal stops at depth **64** (`MAX_NESTING_DEPTH`). ## Correlation Engine diff --git a/crates/rsigma-eval/src/event/json.rs b/crates/rsigma-eval/src/event/json.rs index dadaabea..73093217 100644 --- a/crates/rsigma-eval/src/event/json.rs +++ b/crates/rsigma-eval/src/event/json.rs @@ -92,6 +92,22 @@ impl<'a> Event for JsonEvent<'a> { fn to_json(&self) -> Value { self.inner.as_ref().clone() } + + /// Walk every leaf field in the event and yield dot-joined paths. + /// Intermediate object names (`actor` for `{"actor":{"id":"x"}}`) + /// are NOT emitted; only the leaves (`actor.id`) appear. This + /// matches typical Sigma rules, which reference nested values via + /// dot-notation; emitting the intermediate name would falsely flag + /// every parent object as "unknown" in the gap signal even when + /// the rule references a child path. Top-level scalar fields + /// (`{"actor":"alice"}`) emit `actor` because they ARE leaves. + /// Arrays contribute their parent path once; per-index suffixes + /// are not emitted. + fn field_keys(&self) -> Vec> { + let mut out = Vec::new(); + collect_field_keys(&self.inner, "", &mut out, MAX_NESTING_DEPTH); + out + } } /// Recursively traverse a JSON value following dot-notation path segments. @@ -138,6 +154,31 @@ fn any_string_value_json(v: &Value, pred: &dyn Fn(&str) -> bool, depth: usize) - } } +fn collect_field_keys<'a>(v: &'a Value, prefix: &str, out: &mut Vec>, depth: usize) { + if depth == 0 { + return; + } + if let Value::Object(map) = v { + for (k, child) in map { + let path = if prefix.is_empty() { + k.clone() + } else { + format!("{prefix}.{k}") + }; + match child { + // Recurse into nested objects but do NOT emit the + // intermediate path; only the leaf descendants count. + // Sigma rules normally reference leaves via + // dot-notation, so emitting `actor` alongside + // `actor.id` would falsely flag the parent as + // "unknown" in the gap signal. + Value::Object(_) => collect_field_keys(child, &path, out, depth - 1), + _ => out.push(Cow::Owned(path)), + } + } + } +} + fn collect_string_values_json<'a>(v: &'a Value, out: &mut Vec>, depth: usize) { if depth == 0 { return; @@ -302,4 +343,50 @@ mod tests { ); assert_eq!(event.to_json(), v); } + + #[test] + fn json_field_keys_flat() { + let v = json!({"CommandLine": "x", "User": "y"}); + let event = JsonEvent::borrow(&v); + let mut keys: Vec = event.field_keys().iter().map(|c| c.to_string()).collect(); + keys.sort(); + assert_eq!(keys, vec!["CommandLine", "User"]); + } + + #[test] + fn json_field_keys_nested_leaves_only() { + // Intermediate object names like `actor` are NOT emitted; only + // leaves (`actor.id`, `actor.type`) and top-level scalars + // (`verb`) appear. + let v = json!({"actor": {"id": "u1", "type": "User"}, "verb": "login"}); + let event = JsonEvent::borrow(&v); + let mut keys: Vec = event.field_keys().iter().map(|c| c.to_string()).collect(); + keys.sort(); + assert_eq!(keys, vec!["actor.id", "actor.type", "verb"]); + } + + #[test] + fn json_field_keys_deeply_nested_leaves_only() { + let v = json!({"a": {"b": {"c": 1}}, "flat": "x"}); + let event = JsonEvent::borrow(&v); + let mut keys: Vec = event.field_keys().iter().map(|c| c.to_string()).collect(); + keys.sort(); + assert_eq!(keys, vec!["a.b.c", "flat"]); + } + + #[test] + fn json_field_keys_array_parent_only() { + let v = json!({"events": [{"id": 1}, {"id": 2}]}); + let event = JsonEvent::borrow(&v); + let keys: Vec = event.field_keys().iter().map(|c| c.to_string()).collect(); + // Arrays contribute their parent key only; array indices are not enumerated. + assert_eq!(keys, vec!["events"]); + } + + #[test] + fn json_field_keys_top_level_non_object_empty() { + let v = json!("just a string"); + let event = JsonEvent::owned(v); + assert!(event.field_keys().is_empty()); + } } diff --git a/crates/rsigma-eval/src/event/kv.rs b/crates/rsigma-eval/src/event/kv.rs index cfb4a4b9..83907fed 100644 --- a/crates/rsigma-eval/src/event/kv.rs +++ b/crates/rsigma-eval/src/event/kv.rs @@ -50,6 +50,13 @@ impl Event for KvEvent { .collect(); Value::Object(map) } + + fn field_keys(&self) -> Vec> { + self.fields + .iter() + .map(|(k, _)| Cow::Borrowed(k.as_str())) + .collect() + } } #[cfg(test)] @@ -83,4 +90,14 @@ mod tests { let j = event.to_json(); assert_eq!(j, json!({"key": "val"})); } + + #[test] + fn kv_field_keys() { + let event = KvEvent::new(vec![ + ("host".into(), "web01".into()), + ("status".into(), "200".into()), + ]); + let keys: Vec = event.field_keys().iter().map(|c| c.to_string()).collect(); + assert_eq!(keys, vec!["host", "status"]); + } } diff --git a/crates/rsigma-eval/src/event/map.rs b/crates/rsigma-eval/src/event/map.rs index 416b039e..67e15f45 100644 --- a/crates/rsigma-eval/src/event/map.rs +++ b/crates/rsigma-eval/src/event/map.rs @@ -63,6 +63,13 @@ where .collect(); Value::Object(map) } + + fn field_keys(&self) -> Vec> { + self.inner + .keys() + .map(|k| Cow::Borrowed(k.as_ref())) + .collect() + } } #[cfg(test)] @@ -89,4 +96,15 @@ mod tests { let event = MapEvent::new(m); assert_eq!(event.to_json(), json!({"k": "v"})); } + + #[test] + fn map_event_field_keys() { + let mut m = HashMap::new(); + m.insert("user".to_string(), "admin".to_string()); + m.insert("host".to_string(), "web01".to_string()); + let event = MapEvent::new(m); + let mut keys: Vec = event.field_keys().iter().map(|c| c.to_string()).collect(); + keys.sort(); + assert_eq!(keys, vec!["host", "user"]); + } } diff --git a/crates/rsigma-eval/src/event/mod.rs b/crates/rsigma-eval/src/event/mod.rs index 988261b9..c763a033 100644 --- a/crates/rsigma-eval/src/event/mod.rs +++ b/crates/rsigma-eval/src/event/mod.rs @@ -171,6 +171,59 @@ pub trait Event { /// Materialize the event as a `serde_json::Value`. fn to_json(&self) -> Value; + + /// Collect the names of every leaf field in the event, with nested + /// objects flattened to dot-separated paths (e.g. `actor.id`). + /// Intermediate object names are not emitted; only leaves count. + /// Arrays contribute their parent path once; per-index suffixes are + /// not emitted. + /// + /// Used by the daemon's opt-in field-observability surface; not on the + /// detection hot path. The default implementation walks `to_json()`, + /// which clones the event and allocates one `String` per leaf path; + /// concrete event types override to skip the `to_json()` clone. The + /// per-leaf `String` allocation is unavoidable for nested objects + /// (the dot-joined path doesn't exist anywhere in the source) but + /// flat formats like `KvEvent` can return `Cow::Borrowed`. + fn field_keys(&self) -> Vec> { + let mut paths: Vec = Vec::new(); + collect_field_keys_json(&self.to_json(), "", &mut paths); + paths.into_iter().map(Cow::Owned).collect() + } +} + +/// Maximum nesting depth honoured by `field_keys` default + JsonEvent +/// overrides. Matches the existing 64-level cap used elsewhere in the +/// crate for recursive JSON traversal. +pub(crate) const FIELD_KEYS_MAX_DEPTH: usize = 64; + +/// Walk a JSON value and push every leaf field path into `out`. The +/// helper threads owned `String`s rather than `Cow` because every path +/// is constructed by `format!`-joining (or copying the top-level key), +/// so there are no borrowed shortcuts to capture. +pub(crate) fn collect_field_keys_json(value: &Value, prefix: &str, out: &mut Vec) { + collect_field_keys_json_depth(value, prefix, out, FIELD_KEYS_MAX_DEPTH); +} + +fn collect_field_keys_json_depth(value: &Value, prefix: &str, out: &mut Vec, depth: usize) { + if depth == 0 { + return; + } + if let Value::Object(map) = value { + for (k, v) in map { + let path = if prefix.is_empty() { + k.clone() + } else { + format!("{prefix}.{k}") + }; + match v { + // Recurse into nested objects without emitting the + // intermediate path; only leaf descendants count. + Value::Object(_) => collect_field_keys_json_depth(v, &path, out, depth - 1), + _ => out.push(path), + } + } + } } impl Event for &T { @@ -189,6 +242,10 @@ impl Event for &T { fn to_json(&self) -> Value { (**self).to_json() } + + fn field_keys(&self) -> Vec> { + (**self).field_keys() + } } #[cfg(test)] diff --git a/crates/rsigma-eval/src/event/plain.rs b/crates/rsigma-eval/src/event/plain.rs index eeb8cbc1..897b6598 100644 --- a/crates/rsigma-eval/src/event/plain.rs +++ b/crates/rsigma-eval/src/event/plain.rs @@ -38,6 +38,13 @@ impl Event for PlainEvent { fn to_json(&self) -> Value { serde_json::json!({ "_raw": self.raw }) } + + /// Plain log lines have no structured field surface; the synthetic + /// `_raw` envelope from `to_json` is not a field operators care about + /// for coverage analysis. + fn field_keys(&self) -> Vec> { + Vec::new() + } } #[cfg(test)] @@ -58,6 +65,12 @@ mod tests { assert!(!event.any_string_value(&|s| s.contains("memory"))); } + #[test] + fn plain_field_keys_is_empty() { + let event = PlainEvent::new("error: disk full".into()); + assert!(event.field_keys().is_empty()); + } + #[test] fn plain_to_json() { let event = PlainEvent::new("hello".into()); diff --git a/crates/rsigma-eval/src/field_observer.rs b/crates/rsigma-eval/src/field_observer.rs new file mode 100644 index 00000000..81b52153 --- /dev/null +++ b/crates/rsigma-eval/src/field_observer.rs @@ -0,0 +1,558 @@ +//! Opt-in observer that records every field name seen at evaluation +//! time so consumers can report which event fields are not referenced +//! by any loaded rule (gap signal) and which rule fields have never +//! been seen in events (broken-coverage signal). +//! +//! Lives in `rsigma-eval` because the observer only depends on the +//! [`Event`] trait. The daemon (`rsigma-runtime` + `rsigma-cli`'s +//! `engine daemon`) and the one-shot evaluator (`rsigma-cli`'s +//! `engine eval`) both consume the same type so the report shape is +//! consistent across runtimes. +//! +//! # Design +//! +//! - Backed by a `std::sync::Mutex, u64>>`. The mutex +//! is only held long enough to bump or insert a counter; the lock +//! never wraps user code that could panic, so poisoning is +//! effectively impossible in practice and the `lock().unwrap()` +//! calls below treat poisoning as a programmer bug. +//! - A hard cap (`max_keys`) bounds memory. Once the cap is reached +//! new keys are dropped and the `overflow_dropped` counter is +//! incremented; existing counters keep updating so the observer +//! keeps surfacing high-frequency keys even on a saturated set. +//! - The observer is opt-in: callers construct an `Arc` +//! only when their `--observe-fields` flag is set. When unset the +//! observation call sites stay unwired and the hot path is +//! untouched. +//! - Keys are stored as `Arc` so a snapshot only refcount-bumps +//! each key rather than copying the string. Trade: one extra +//! allocation per first-time-insert (Arc header) in exchange for +//! near-free repeated snapshots. +//! - Lifetime counters (`lifetime_events_observed`, +//! `lifetime_overflow_dropped`) are monotonic across resets so +//! Prometheus counter bridges don't desync when the daemon resets +//! the observer via `DELETE /api/v1/fields/observer`. +//! +//! # Coordinates +//! +//! - Iterate [`Event::field_keys`](crate::Event::field_keys) once per +//! event before evaluation. For JSON events this is a recursive +//! walk that allocates one `String` per leaf path (dot-joined paths +//! are not substrings of the source value); for flat formats like +//! `KvEvent` the override returns `Cow::Borrowed`. The cost is +//! acceptable in the opt-in diagnostic mode but is not free. +//! - Render the gap and broken-coverage signals by joining a +//! [`snapshot`](FieldObserver::snapshot) against a +//! [`RuleFieldSet`](crate::RuleFieldSet). + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; + +use crate::event::Event; +use crate::fields::{FieldOrigin, RuleFieldSet}; + +/// Single field-name counter as exposed via the snapshot API. +/// +/// The field name is held as `Arc` so snapshotting only bumps a +/// refcount rather than copying every key out of the observer's +/// internal map. Treat as a string slice for read access: +/// `entry.field.as_ref()` or `&*entry.field`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FieldObservationEntry { + /// Dot-joined field path (matches what `Event::field_keys` returns). + pub field: Arc, + /// Number of events that contained this field since the last reset. + pub count: u64, +} + +/// Immutable snapshot of an observer's state at one moment in time. +/// +/// Returned by [`FieldObserver::snapshot`]; consumers (the daemon's +/// HTTP handlers, the `engine eval` report writer) render coverage +/// reports from this against a [`RuleFieldSet`](crate::RuleFieldSet). +#[derive(Debug, Clone, Default)] +pub struct FieldObservation { + /// Per-field counters, sorted by descending count then ascending name. + pub entries: Vec, + /// Number of events evaluated by the observer since construction or + /// the last reset. + pub events_observed: u64, + /// Distinct field names tracked (saturates at `max_keys`). + pub unique_keys: usize, + /// Number of insert attempts dropped because the observer was at + /// capacity since the last reset. + pub overflow_dropped: u64, + /// Lifetime total of events evaluated since the observer was + /// constructed, ignoring resets. Drives Prometheus counters, which + /// must be monotonic. + pub lifetime_events_observed: u64, + /// Lifetime total of insert attempts dropped because the observer + /// was at capacity, ignoring resets. Drives Prometheus counters. + pub lifetime_overflow_dropped: u64, + /// Configured ceiling for distinct keys. + pub max_keys: usize, + /// Seconds since the observer was created (or last reset). + pub uptime_seconds: f64, +} + +/// Capped, opt-in field-name counter shared across producers (the +/// daemon's event task, the eval streaming loop) and consumers (the +/// daemon's HTTP handlers, the eval report writer). +pub struct FieldObserver { + inner: Mutex, u64>>, + max_keys: usize, + /// Resets to 0 on [`reset`](Self::reset). Drives the "since-last-reset" + /// view exposed in [`FieldObservation::overflow_dropped`]. + overflow_dropped: AtomicU64, + /// Resets to 0 on [`reset`](Self::reset). Drives the "since-last-reset" + /// view exposed in [`FieldObservation::events_observed`]. + events_observed: AtomicU64, + /// Monotonic. Never reset. Drives the Prometheus counter bridge so + /// the lifetime metric stays consistent across observer resets. + lifetime_events_observed: AtomicU64, + /// Monotonic. Never reset. Drives the Prometheus counter bridge. + lifetime_overflow_dropped: AtomicU64, + start: Mutex, +} + +impl FieldObservation { + /// Join the snapshot against a [`RuleFieldSet`] and return the + /// partitioned coverage view in a single pass. + /// + /// Returned references borrow from `self` (the entries) and the + /// supplied `rule_field_set` (the missing entries), so this is + /// allocation-light: one `Vec` for the unknown borrows, one `Vec` + /// for the missing borrows, one `HashSet` for the seen lookup. + /// + /// Centralises the logic shared between the daemon's + /// `GET /api/v1/fields*` handlers and the `engine eval` end-of-run + /// report so the two surfaces cannot drift on field semantics. + pub fn coverage<'a>(&'a self, rule_field_set: &'a RuleFieldSet) -> FieldCoverage<'a> { + let mut unknown: Vec<&'a FieldObservationEntry> = Vec::new(); + let mut intersection_count: usize = 0; + let mut seen: HashSet<&'a str> = HashSet::with_capacity(self.entries.len()); + for entry in &self.entries { + let field: &str = &entry.field; + seen.insert(field); + if rule_field_set.contains(field) { + intersection_count += 1; + } else { + unknown.push(entry); + } + } + let missing: Vec<(&'a str, &'a FieldOrigin)> = rule_field_set + .iter() + .filter(|(name, _)| !seen.contains(name)) + .collect(); + FieldCoverage { + unknown, + intersection_count, + missing, + } + } +} + +/// Borrowed view over a [`FieldObservation`] joined against a +/// [`RuleFieldSet`]. Produced by [`FieldObservation::coverage`]. +/// +/// Consumers (the daemon HTTP handlers, the eval report writer) own +/// the JSON shape; this struct only provides the partitioned data. +pub struct FieldCoverage<'a> { + /// Observed fields not referenced by any loaded rule (gap signal). + /// Ordered the same way as [`FieldObservation::entries`]: by + /// descending count, then ascending name. + pub unknown: Vec<&'a FieldObservationEntry>, + /// Count of observed fields that *are* rule-referenced. + pub intersection_count: usize, + /// Rule field names that have not appeared in any observed event + /// (broken-coverage signal), paired with their [`FieldOrigin`] so + /// consumers can render rule titles and source kinds. + pub missing: Vec<(&'a str, &'a FieldOrigin)>, +} + +impl FieldObserver { + /// Create a new observer with the given upper bound on distinct keys. + /// + /// A `max_keys` of 0 is allowed and disables tracking entirely; every + /// observed field counts as overflow. Callers wanting "no cap" should + /// pick a large finite number (e.g. `usize::MAX / 2`). + pub fn new(max_keys: usize) -> Self { + Self { + inner: Mutex::new(HashMap::new()), + max_keys, + overflow_dropped: AtomicU64::new(0), + events_observed: AtomicU64::new(0), + lifetime_events_observed: AtomicU64::new(0), + lifetime_overflow_dropped: AtomicU64::new(0), + start: Mutex::new(Instant::now()), + } + } + + /// Walk the event's field keys and update the per-field counters. + /// + /// Insertion of a new key is skipped once the observer is at capacity; + /// already-tracked keys keep counting. The method takes `&self`, so + /// the observer can be shared behind an `Arc` without locking from + /// the caller's side. + pub fn observe(&self, event: &E) { + self.events_observed.fetch_add(1, Ordering::Relaxed); + self.lifetime_events_observed + .fetch_add(1, Ordering::Relaxed); + let keys = event.field_keys(); + if keys.is_empty() { + return; + } + let mut overflow_local = 0u64; + let mut counts = self.inner.lock().expect("field observer mutex poisoned"); + for key in keys { + if let Some(slot) = counts.get_mut(key.as_ref()) { + *slot = slot.saturating_add(1); + } else if counts.len() < self.max_keys { + counts.insert(Arc::::from(key.as_ref()), 1); + } else { + overflow_local = overflow_local.saturating_add(1); + } + } + drop(counts); + if overflow_local > 0 { + self.overflow_dropped + .fetch_add(overflow_local, Ordering::Relaxed); + self.lifetime_overflow_dropped + .fetch_add(overflow_local, Ordering::Relaxed); + } + } + + /// Snapshot the current counts. Entries are sorted by descending + /// count, then by ascending name for deterministic output. + /// + /// Cheap relative to the cardinality of the observer: each entry + /// only refcount-clones the `Arc` key rather than copying the + /// key bytes, so a 10 000-key snapshot costs ~10 000 atomic + /// increments plus one `Vec` allocation, not 10 000 `String` + /// allocations. + pub fn snapshot(&self) -> FieldObservation { + let counts = self.inner.lock().expect("field observer mutex poisoned"); + let mut entries: Vec = counts + .iter() + .map(|(k, v)| FieldObservationEntry { + field: Arc::clone(k), + count: *v, + }) + .collect(); + let unique_keys = entries.len(); + drop(counts); + entries.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.field.cmp(&b.field))); + FieldObservation { + entries, + events_observed: self.events_observed.load(Ordering::Relaxed), + unique_keys, + overflow_dropped: self.overflow_dropped.load(Ordering::Relaxed), + lifetime_events_observed: self.lifetime_events_observed.load(Ordering::Relaxed), + lifetime_overflow_dropped: self.lifetime_overflow_dropped.load(Ordering::Relaxed), + max_keys: self.max_keys, + uptime_seconds: self + .start + .lock() + .expect("field observer start mutex poisoned") + .elapsed() + .as_secs_f64(), + } + } + + /// Reset every counter and the overflow tally. Returns the previous + /// `(unique_keys, events_observed)` pair so the API endpoint can + /// report what was cleared. + pub fn reset(&self) -> (usize, u64) { + let mut counts = self.inner.lock().expect("field observer mutex poisoned"); + let previous_keys = counts.len(); + counts.clear(); + drop(counts); + let previous_events = self.events_observed.swap(0, Ordering::Relaxed); + self.overflow_dropped.store(0, Ordering::Relaxed); + *self + .start + .lock() + .expect("field observer start mutex poisoned") = Instant::now(); + (previous_keys, previous_events) + } + + /// Total events observed since the observer was created or last reset. + pub fn events_observed(&self) -> u64 { + self.events_observed.load(Ordering::Relaxed) + } + + /// Lifetime total of events observed since the observer was + /// constructed, ignoring resets. Monotonic; suitable for driving + /// Prometheus counters. + pub fn lifetime_events_observed(&self) -> u64 { + self.lifetime_events_observed.load(Ordering::Relaxed) + } + + /// Distinct keys currently tracked (does not include overflow drops). + pub fn unique_keys(&self) -> usize { + self.inner + .lock() + .expect("field observer mutex poisoned") + .len() + } + + /// Insert attempts dropped because the observer was at capacity + /// since the last reset. + pub fn overflow_dropped(&self) -> u64 { + self.overflow_dropped.load(Ordering::Relaxed) + } + + /// Lifetime total of insert attempts dropped because the observer + /// was at capacity, ignoring resets. Monotonic. + pub fn lifetime_overflow_dropped(&self) -> u64 { + self.lifetime_overflow_dropped.load(Ordering::Relaxed) + } + + /// Configured per-observer ceiling for distinct keys. + pub fn max_keys(&self) -> usize { + self.max_keys + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::JsonEvent; + use serde_json::json; + + #[test] + fn observes_flat_json_fields() { + let observer = FieldObserver::new(100); + let v = json!({"CommandLine": "whoami", "User": "admin"}); + observer.observe(&JsonEvent::borrow(&v)); + let snap = observer.snapshot(); + assert_eq!(snap.events_observed, 1); + assert_eq!(snap.unique_keys, 2); + assert_eq!(snap.overflow_dropped, 0); + let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect(); + assert!(names.contains(&"CommandLine")); + assert!(names.contains(&"User")); + } + + #[test] + fn observes_nested_json_with_dotted_leaves() { + let observer = FieldObserver::new(100); + let v = json!({"actor": {"id": "u1"}}); + observer.observe(&JsonEvent::borrow(&v)); + let snap = observer.snapshot(); + let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect(); + // Only the leaf is observed; the intermediate `actor` is not. + // This keeps the gap signal free of false positives on objects + // whose children are rule-referenced. + assert!(names.contains(&"actor.id")); + assert!(!names.contains(&"actor")); + } + + #[test] + fn counts_accumulate_across_observations() { + let observer = FieldObserver::new(100); + for _ in 0..5 { + let v = json!({"CommandLine": "whoami"}); + observer.observe(&JsonEvent::borrow(&v)); + } + let snap = observer.snapshot(); + assert_eq!(snap.events_observed, 5); + let entry = snap + .entries + .iter() + .find(|e| &*e.field == "CommandLine") + .expect("CommandLine tracked"); + assert_eq!(entry.count, 5); + } + + #[test] + fn cap_enforced_and_overflow_recorded() { + let observer = FieldObserver::new(2); + let v = json!({"a": 1, "b": 2, "c": 3, "d": 4}); + observer.observe(&JsonEvent::borrow(&v)); + let snap = observer.snapshot(); + assert_eq!(snap.unique_keys, 2); + assert_eq!(snap.overflow_dropped, 2); + // Existing keys keep counting after cap is hit: + observer.observe(&JsonEvent::borrow(&v)); + let snap2 = observer.snapshot(); + assert_eq!(snap2.unique_keys, 2); + assert_eq!(snap2.overflow_dropped, 4); + for entry in &snap2.entries { + assert_eq!(entry.count, 2, "tracked key counter advanced"); + } + } + + #[test] + fn snapshot_sorts_by_count_desc_then_name() { + let observer = FieldObserver::new(100); + for _ in 0..3 { + observer.observe(&JsonEvent::borrow(&json!({"hot": 1}))); + } + observer.observe(&JsonEvent::borrow(&json!({"warm": 1}))); + observer.observe(&JsonEvent::borrow(&json!({"chill": 1}))); + let snap = observer.snapshot(); + let order: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect(); + assert_eq!(order, vec!["hot", "chill", "warm"]); + } + + #[test] + fn reset_clears_state_and_returns_previous_counts() { + let observer = FieldObserver::new(100); + observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2}))); + observer.observe(&JsonEvent::borrow(&json!({"a": 1}))); + let (prev_keys, prev_events) = observer.reset(); + assert_eq!(prev_keys, 2); + assert_eq!(prev_events, 2); + let snap = observer.snapshot(); + assert_eq!(snap.events_observed, 0); + assert_eq!(snap.unique_keys, 0); + assert_eq!(snap.overflow_dropped, 0); + assert!(snap.entries.is_empty()); + } + + #[test] + fn lifetime_counters_survive_reset() { + // Regression: the Prometheus counter bridge relies on monotonic + // lifetime totals. Resetting the observer must not lose data + // points that the next /metrics scrape needs to see. + let observer = FieldObserver::new(2); + // 3 events, 4 unique fields => 2 fit, 2 overflow per event. + for _ in 0..3 { + observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4}))); + } + let before = observer.snapshot(); + assert_eq!(before.events_observed, 3); + assert_eq!(before.lifetime_events_observed, 3); + assert_eq!(before.overflow_dropped, 6); + assert_eq!(before.lifetime_overflow_dropped, 6); + + observer.reset(); + let after_reset = observer.snapshot(); + assert_eq!(after_reset.events_observed, 0); + assert_eq!(after_reset.overflow_dropped, 0); + // Lifetime totals MUST NOT reset: + assert_eq!(after_reset.lifetime_events_observed, 3); + assert_eq!(after_reset.lifetime_overflow_dropped, 6); + + // Continue observing; lifetime keeps climbing from where it was. + observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4}))); + let after = observer.snapshot(); + assert_eq!(after.events_observed, 1); + assert_eq!(after.lifetime_events_observed, 4); + assert_eq!(after.overflow_dropped, 2); + assert_eq!(after.lifetime_overflow_dropped, 8); + } + + #[test] + fn plain_event_observation_is_a_noop_for_counters() { + let observer = FieldObserver::new(100); + let plain = crate::event::PlainEvent::new("disk full".into()); + observer.observe(&plain); + let snap = observer.snapshot(); + // events_observed still ticks: the observer saw the event but it had + // no structured fields to record. + assert_eq!(snap.events_observed, 1); + assert_eq!(snap.unique_keys, 0); + assert_eq!(snap.overflow_dropped, 0); + } + + #[test] + fn coverage_partitions_observed_against_rule_set() { + // Two rules: one references CommandLine (also matches the event), + // the other references ProcessGuid (never appears in the event). + // Event carries CommandLine plus two unrelated fields. + let yaml = r#" +title: Whoami +status: test +logsource: + category: test +detection: + selection: + CommandLine|contains: whoami + condition: selection +--- +title: Process Tampering +status: test +logsource: + category: test +detection: + selection: + ProcessGuid: "{abc}" + condition: selection +"#; + let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse"); + let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true); + let observer = FieldObserver::new(100); + observer.observe(&JsonEvent::borrow( + &json!({"CommandLine":"whoami","User":"alice","src_ip":"10.0.0.1"}), + )); + + let snap = observer.snapshot(); + let coverage = snap.coverage(&rule_field_set); + + assert_eq!(coverage.intersection_count, 1, "CommandLine intersects"); + let unknown: Vec<&str> = coverage + .unknown + .iter() + .map(|e| -> &str { &e.field }) + .collect(); + assert!(unknown.contains(&"User")); + assert!(unknown.contains(&"src_ip")); + assert!(!unknown.contains(&"CommandLine")); + let missing: Vec<&str> = coverage.missing.iter().map(|(n, _)| *n).collect(); + assert!( + missing.contains(&"ProcessGuid"), + "ProcessGuid was rule-referenced but never observed" + ); + assert!(!missing.contains(&"CommandLine")); + } + + #[test] + fn coverage_empty_observer_yields_only_missing() { + let yaml = r#" +title: A +status: test +logsource: + category: test +detection: + selection: + FieldA: x + condition: selection +"#; + let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse"); + let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true); + let observer = FieldObserver::new(100); + + let snap = observer.snapshot(); + let coverage = snap.coverage(&rule_field_set); + assert_eq!(coverage.intersection_count, 0); + assert!(coverage.unknown.is_empty()); + assert_eq!(coverage.missing.len(), 1); + assert_eq!(coverage.missing[0].0, "FieldA"); + } + + #[test] + fn coverage_unknown_preserves_snapshot_ordering() { + let observer = FieldObserver::new(100); + for _ in 0..3 { + observer.observe(&JsonEvent::borrow(&json!({"hot": 1}))); + } + observer.observe(&JsonEvent::borrow(&json!({"warm": 1}))); + let empty_rule_set = crate::fields::RuleFieldSet::default(); + + let snap = observer.snapshot(); + let coverage = snap.coverage(&empty_rule_set); + let order: Vec<&str> = coverage + .unknown + .iter() + .map(|e| -> &str { &e.field }) + .collect(); + // Snapshot is already sorted by descending count then ascending + // name; coverage's filter-only pass must preserve that order. + assert_eq!(order, vec!["hot", "warm"]); + } +} diff --git a/crates/rsigma-eval/src/fields.rs b/crates/rsigma-eval/src/fields.rs new file mode 100644 index 00000000..fe8431d5 --- /dev/null +++ b/crates/rsigma-eval/src/fields.rs @@ -0,0 +1,345 @@ +//! Rule-field extraction shared between `rsigma rule fields` and the daemon's +//! field-observability endpoints. +//! +//! [`RuleFieldSet::collect`] walks a [`SigmaCollection`] (after optional +//! pipeline transformations are applied) and records every field name +//! referenced by detection items, correlation `group-by` / threshold / alias +//! fields, filter detections, and rule-level `fields:` metadata. The result +//! tracks per-field provenance (rule titles + source kinds) so callers can +//! decide whether to surface a finding as a gap signal, a broken-coverage +//! signal, or a coverage summary. +//! +//! The CLI command `rsigma rule fields` and the daemon's +//! `GET /api/v1/fields/*` endpoints share this implementation so the +//! field set the operator inspects offline matches exactly what the engine +//! references at runtime. + +use std::collections::{BTreeMap, BTreeSet}; + +use rsigma_parser::{ + CorrelationCondition, CorrelationRule, Detection, DetectionItem, Detections, FilterRule, + SigmaCollection, SigmaRule, +}; +use serde::Serialize; + +use crate::pipeline::{Pipeline, apply_pipelines}; + +/// Where in a rule a field reference came from. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] +#[serde(rename_all = "lowercase")] +pub enum FieldSource { + /// Field used in a detection condition (`selection`, etc.). + Detection, + /// Field used by a correlation rule (group-by, threshold field, alias mapping). + Correlation, + /// Field used in a filter rule's detection block. + Filter, + /// Field listed in rule-level `fields:` metadata. + Metadata, +} + +impl FieldSource { + /// Stable string identifier used in JSON serialization and human output. + pub fn as_str(self) -> &'static str { + match self { + FieldSource::Detection => "detection", + FieldSource::Correlation => "correlation", + FieldSource::Filter => "filter", + FieldSource::Metadata => "metadata", + } + } +} + +/// Provenance for a single field name across the loaded rule set. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct FieldOrigin { + /// Rule titles that reference this field. + pub rule_titles: BTreeSet, + /// Source kinds (detection, correlation, filter, metadata) where this + /// field was seen. + pub sources: BTreeSet, +} + +/// Set of field names referenced by a loaded `SigmaCollection`, optionally +/// after applying processing pipelines. +/// +/// Built via [`RuleFieldSet::collect`] and queried via [`contains`](Self::contains), +/// [`iter`](Self::iter), and [`len`](Self::len). Cheap to clone for sharing +/// across threads behind an `Arc`. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct RuleFieldSet { + fields: BTreeMap, +} + +impl RuleFieldSet { + /// Walk a rule collection (and any pipelines) and return the resulting + /// field set. When `pipelines` is non-empty, each rule is cloned and + /// transformed before its fields are collected so the recorded names + /// match what the engine evaluates against. Rules whose pipeline + /// application fails fall back to the untransformed names so the set + /// stays observable even when a pipeline misfires on one rule. + /// + /// `include_filters` controls whether filter-rule detection blocks + /// contribute to the set; mirrors the existing `--no-filters` flag on + /// `rsigma rule fields`. + pub fn collect( + collection: &SigmaCollection, + pipelines: &[Pipeline], + include_filters: bool, + ) -> Self { + let mut collector = Collector::default(); + + if pipelines.is_empty() { + for rule in &collection.rules { + collector.collect_rule(rule); + } + } else { + for rule in &collection.rules { + let mut transformed = rule.clone(); + if apply_pipelines(pipelines, &mut transformed).is_err() { + collector.collect_rule(rule); + continue; + } + collector.collect_rule(&transformed); + } + } + + for corr in &collection.correlations { + collector.collect_correlation(corr); + } + + if include_filters { + for filter in &collection.filters { + collector.collect_filter(filter); + } + } + + Self { + fields: collector.fields, + } + } + + /// True if any rule references this field name. + pub fn contains(&self, field: &str) -> bool { + self.fields.contains_key(field) + } + + /// Look up provenance for a single field name. + pub fn origin(&self, field: &str) -> Option<&FieldOrigin> { + self.fields.get(field) + } + + /// Iterate field names and their provenance in sorted order. + pub fn iter(&self) -> impl Iterator { + self.fields.iter().map(|(k, v)| (k.as_str(), v)) + } + + /// Iterate just the field names in sorted order. + pub fn names(&self) -> impl Iterator { + self.fields.keys().map(String::as_str) + } + + /// Number of distinct fields in the set. + pub fn len(&self) -> usize { + self.fields.len() + } + + /// True when no fields were collected. + pub fn is_empty(&self) -> bool { + self.fields.is_empty() + } +} + +#[derive(Default)] +struct Collector { + fields: BTreeMap, +} + +impl Collector { + fn add(&mut self, field: &str, rule_title: &str, source: FieldSource) { + let entry = self.fields.entry(field.to_string()).or_default(); + entry.rule_titles.insert(rule_title.to_string()); + entry.sources.insert(source); + } + + fn collect_detection_items( + &mut self, + detection: &Detection, + rule_title: &str, + source: FieldSource, + ) { + match detection { + Detection::AllOf(items) => { + for item in items { + self.collect_item(item, rule_title, source); + } + } + Detection::AnyOf(subs) => { + for sub in subs { + self.collect_detection_items(sub, rule_title, source); + } + } + Detection::Keywords(_) => {} + } + } + + fn collect_item(&mut self, item: &DetectionItem, rule_title: &str, source: FieldSource) { + if let Some(ref name) = item.field.name { + self.add(name, rule_title, source); + } + } + + fn collect_detections( + &mut self, + detections: &Detections, + rule_title: &str, + source: FieldSource, + ) { + for det in detections.named.values() { + self.collect_detection_items(det, rule_title, source); + } + } + + fn collect_rule(&mut self, rule: &SigmaRule) { + self.collect_detections(&rule.detection, &rule.title, FieldSource::Detection); + for f in &rule.fields { + self.add(f, &rule.title, FieldSource::Metadata); + } + } + + fn collect_correlation(&mut self, corr: &CorrelationRule) { + for f in &corr.group_by { + self.add(f, &corr.title, FieldSource::Correlation); + } + if let CorrelationCondition::Threshold { + field: Some(ref fields), + .. + } = corr.condition + { + for f in fields { + self.add(f, &corr.title, FieldSource::Correlation); + } + } + for alias in &corr.aliases { + for mapped_field in alias.mapping.values() { + self.add(mapped_field, &corr.title, FieldSource::Correlation); + } + } + for f in &corr.fields { + self.add(f, &corr.title, FieldSource::Metadata); + } + } + + fn collect_filter(&mut self, filter: &FilterRule) { + self.collect_detections(&filter.detection, &filter.title, FieldSource::Filter); + for f in &filter.fields { + self.add(f, &filter.title, FieldSource::Metadata); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rsigma_parser::parse_sigma_yaml; + + fn build(yaml: &str) -> SigmaCollection { + parse_sigma_yaml(yaml).expect("parse") + } + + #[test] + fn collects_detection_fields() { + let collection = build( + r#" +title: Test +status: test +logsource: + category: test +detection: + selection: + CommandLine|contains: whoami + EventID: 1 + condition: selection +"#, + ); + let set = RuleFieldSet::collect(&collection, &[], true); + assert!(set.contains("CommandLine")); + assert!(set.contains("EventID")); + assert!( + set.origin("CommandLine") + .unwrap() + .sources + .contains(&FieldSource::Detection) + ); + } + + #[test] + fn collects_correlation_group_by() { + let collection = build( + r#" +title: Login +id: login-rule +logsource: + category: auth +detection: + selection: + EventType: login + condition: selection +--- +title: Many Logins +correlation: + type: event_count + rules: + - login-rule + group-by: + - User + timespan: 60s + condition: + gte: 3 +"#, + ); + let set = RuleFieldSet::collect(&collection, &[], true); + assert!(set.contains("EventType")); + assert!(set.contains("User")); + let user_origin = set.origin("User").unwrap(); + assert!(user_origin.sources.contains(&FieldSource::Correlation)); + } + + #[test] + fn include_filters_toggle() { + let collection = build( + r#" +title: Detection +status: test +logsource: + category: test +detection: + selection: + DetField: x + condition: selection +--- +title: Filter +filter: + rules: + - non-existent + selection: + FilterField: y + condition: selection +"#, + ); + let with_filters = RuleFieldSet::collect(&collection, &[], true); + let without_filters = RuleFieldSet::collect(&collection, &[], false); + assert!(with_filters.contains("FilterField")); + assert!(!without_filters.contains("FilterField")); + assert!(with_filters.contains("DetField")); + assert!(without_filters.contains("DetField")); + } + + #[test] + fn empty_collection_is_empty_set() { + let collection = SigmaCollection::default(); + let set = RuleFieldSet::collect(&collection, &[], true); + assert!(set.is_empty()); + assert_eq!(set.len(), 0); + } +} diff --git a/crates/rsigma-eval/src/lib.rs b/crates/rsigma-eval/src/lib.rs index a912a3e7..164ce083 100644 --- a/crates/rsigma-eval/src/lib.rs +++ b/crates/rsigma-eval/src/lib.rs @@ -96,6 +96,8 @@ pub mod correlation_engine; pub mod engine; pub mod error; pub mod event; +pub mod field_observer; +pub mod fields; pub mod matcher; pub mod pipeline; pub mod result; @@ -116,6 +118,8 @@ pub use correlation_engine::{ pub use engine::Engine; pub use error::{EvalError, Result}; pub use event::{Event, EventValue, JsonEvent, KvEvent, MapEvent, PlainEvent}; +pub use field_observer::{FieldCoverage, FieldObservation, FieldObservationEntry, FieldObserver}; +pub use fields::{FieldOrigin, FieldSource, RuleFieldSet}; pub use matcher::CompiledMatcher; pub use pipeline::{ Pipeline, TransformationItem, apply_pipelines, apply_pipelines_with_state, diff --git a/crates/rsigma-runtime/README.md b/crates/rsigma-runtime/README.md index d2af39a7..a9dbc53e 100644 --- a/crates/rsigma-runtime/README.md +++ b/crates/rsigma-runtime/README.md @@ -10,6 +10,7 @@ Streaming runtime for [rsigma](https://github.com/timescale/rsigma) — input fo - **Dynamic source resolution**: `SourceResolver` trait with `DefaultSourceResolver` implementation fetching data from files, commands, HTTP APIs, and NATS subjects. Includes template expansion, extraction (jq/JSONPath/CEL), caching with TTL, and scheduled refresh. - **`DaemonSourceRegistry`**: unified registry that merges sources from external files (`--source` flag) and pipeline-embedded `sources:` blocks with collision-error semantics. Used by the daemon to manage all dynamic sources from a single point. - **I/O**: `EventSource` trait (stdin, HTTP, NATS) and `Sink` enum (stdout, file, NATS) with fan-out support. +- **Field observability**: re-exports `FieldObserver` / `FieldObservation` / `FieldObservationEntry` / `FieldCoverage` from `rsigma-eval` (the canonical home, since they only need the `Event` trait) so downstream consumers can keep importing from `rsigma_runtime`. Attach via `LogProcessor::set_field_observer(Some(observer))`; inspect via `snapshot()` / `reset()`; join against a `RuleFieldSet` via `FieldObservation::coverage()`. - **OTLP**: `LogRecord`-to-JSON conversion for OpenTelemetry log ingestion (feature-gated under `otlp`). Resource and log attributes are flattened for direct Sigma rule matching. ## Usage diff --git a/crates/rsigma-runtime/src/engine.rs b/crates/rsigma-runtime/src/engine.rs index 6a1954bc..c51145c5 100644 --- a/crates/rsigma-runtime/src/engine.rs +++ b/crates/rsigma-runtime/src/engine.rs @@ -1,10 +1,11 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; +use arc_swap::ArcSwap; use rsigma_eval::event::Event; use rsigma_eval::{ CorrelationConfig, CorrelationEngine, CorrelationSnapshot, Engine, Pipeline, ProcessResult, - parse_pipeline_file, + RuleFieldSet, parse_pipeline_file, }; use rsigma_parser::SigmaCollection; @@ -32,6 +33,11 @@ pub struct RuntimeEngine { /// `daachorse-index` Cargo feature. #[cfg(feature = "daachorse-index")] cross_rule_ac: bool, + /// Post-pipeline rule field set, refreshed at the end of every + /// `load_rules()`. Wrapped in `ArcSwap` so readers (e.g. the daemon's + /// `/api/v1/fields/*` endpoints) can snapshot a stable view without + /// blocking the hot path during a reload. + rule_field_set: Arc>, } enum EngineVariant { @@ -66,9 +72,21 @@ impl RuntimeEngine { bloom_max_bytes: None, #[cfg(feature = "daachorse-index")] cross_rule_ac: false, + rule_field_set: Arc::new(ArcSwap::from_pointee(RuleFieldSet::default())), } } + /// Return an immutable snapshot of the post-pipeline rule field set. + /// + /// Cheap to call: returns a refcounted handle that stays valid even if + /// `load_rules()` runs concurrently. The daemon's field-observability + /// endpoints use this to compute the intersection between observed + /// event keys and rule-referenced fields without coordinating with the + /// engine lock. + pub fn rule_field_set(&self) -> Arc { + self.rule_field_set.load_full() + } + /// Enable or disable bloom-filter pre-filtering on the inner detection /// engine. Off by default. Applies on the next `load_rules()`; pre-load /// callers should set this before calling `load_rules()`. @@ -243,6 +261,7 @@ impl RuntimeEngine { state_entries: engine.state_count(), }; self.engine = EngineVariant::WithCorrelations(Box::new(engine)); + self.refresh_rule_field_set(&collection); tracing::debug!( detection_rules = stats.detection_rules, correlation_rules = stats.correlation_rules, @@ -272,6 +291,7 @@ impl RuntimeEngine { state_entries: 0, }; self.engine = EngineVariant::DetectionOnly(Box::new(engine)); + self.refresh_rule_field_set(&collection); tracing::debug!( detection_rules = stats.detection_rules, correlation_rules = stats.correlation_rules, @@ -282,6 +302,13 @@ impl RuntimeEngine { } } + /// Recompute the post-pipeline rule field set and publish it. Called at + /// the end of every successful `load_rules()` branch. + fn refresh_rule_field_set(&self, collection: &SigmaCollection) { + let field_set = RuleFieldSet::collect(collection, &self.pipelines, true); + self.rule_field_set.store(Arc::new(field_set)); + } + /// Process a batch of events using parallel detection + sequential correlation. /// /// Delegates to `Engine::evaluate_batch` or `CorrelationEngine::process_batch` diff --git a/crates/rsigma-runtime/src/input/mod.rs b/crates/rsigma-runtime/src/input/mod.rs index 793d2396..fada3a47 100644 --- a/crates/rsigma-runtime/src/input/mod.rs +++ b/crates/rsigma-runtime/src/input/mod.rs @@ -101,6 +101,14 @@ impl Event for EventInputDecoded { EventInputDecoded::Plain(e) => e.to_json(), } } + + fn field_keys(&self) -> Vec> { + match self { + EventInputDecoded::Json(e) => e.field_keys(), + EventInputDecoded::Kv(e) => e.field_keys(), + EventInputDecoded::Plain(e) => e.field_keys(), + } + } } /// Parse a raw log line using the specified format. diff --git a/crates/rsigma-runtime/src/lib.rs b/crates/rsigma-runtime/src/lib.rs index 50dd6c9e..3d52b1df 100644 --- a/crates/rsigma-runtime/src/lib.rs +++ b/crates/rsigma-runtime/src/lib.rs @@ -68,7 +68,10 @@ pub use io::{ pub use metrics::{MetricsHook, NoopMetrics}; pub use processor::{EventFilter, LogProcessor}; -pub use rsigma_eval::{ProcessResult, ProcessResultExt}; +pub use rsigma_eval::{ + FieldCoverage, FieldObservation, FieldObservationEntry, FieldObserver, ProcessResult, + ProcessResultExt, +}; pub use sources::refresh::{RefreshResult, RefreshScheduler, RefreshTrigger}; pub use sources::{ DefaultSourceResolver, ResolvedValue, SourceCache, SourceError, SourceErrorKind, diff --git a/crates/rsigma-runtime/src/processor.rs b/crates/rsigma-runtime/src/processor.rs index 1a11bc1b..52fdf4d6 100644 --- a/crates/rsigma-runtime/src/processor.rs +++ b/crates/rsigma-runtime/src/processor.rs @@ -4,7 +4,7 @@ use parking_lot::Mutex; use std::time::Instant; use arc_swap::ArcSwap; -use rsigma_eval::{Event, JsonEvent, ProcessResult, ProcessResultExt}; +use rsigma_eval::{Event, FieldObserver, JsonEvent, ProcessResult, ProcessResultExt, RuleFieldSet}; use crate::engine::RuntimeEngine; use crate::input::{EventInputDecoded, InputFormat, parse_line}; @@ -26,6 +26,11 @@ pub type EventFilter = dyn Fn(&serde_json::Value) -> Vec; pub struct LogProcessor { engine: Arc>>, metrics: Arc, + /// Optional opt-in field observer. When `Some`, every parsed event + /// flowing through `process_batch_with_format` has its field keys + /// recorded. When `None`, the batch path skips iteration entirely + /// so the hot path stays untouched. + field_observer: ArcSwap>>, } impl LogProcessor { @@ -34,9 +39,26 @@ impl LogProcessor { LogProcessor { engine: Arc::new(ArcSwap::from_pointee(Mutex::new(engine))), metrics, + field_observer: ArcSwap::new(Arc::new(None)), } } + /// Attach (or detach) the opt-in field observer. + /// + /// When set, [`process_batch_with_format`](Self::process_batch_with_format) + /// records each parsed event's field keys before evaluation. Pass + /// `None` to disable observation; the hot path then performs zero + /// extra work. Safe to call at runtime: the swap is wait-free, and + /// in-flight batches finish against whichever observer they read. + pub fn set_field_observer(&self, observer: Option>) { + self.field_observer.store(Arc::new(observer)); + } + + /// Return the currently-attached field observer, if any. + pub fn field_observer(&self) -> Option> { + self.field_observer.load_full().as_ref().clone() + } + /// Atomically replace the engine with a new one. /// /// In-flight batches continue against the old engine (they hold an `Arc` @@ -200,6 +222,20 @@ impl LogProcessor { return empty_results(batch.len()); } + // Optional opt-in field observation. Cheap when disabled: one + // hazard-pointer `Guard` (no Arc clone) plus an `Option` check. + // When enabled, walks each decoded event's field keys before + // evaluation. The Guard's lifetime extends through the loop so + // the observer cannot be dropped mid-batch even if the daemon + // detaches it concurrently. + let observer_guard = self.field_observer.load(); + if let Some(observer) = observer_guard.as_ref() { + for (_, decoded) in &decoded_events { + observer.observe(decoded); + } + } + drop(observer_guard); + // Phase 2: Batch evaluation — parallel detection + sequential correlation let event_refs: Vec<&EventInputDecoded> = decoded_events.iter().map(|(_, e)| e).collect(); @@ -332,6 +368,15 @@ impl LogProcessor { let engine = snapshot.lock(); engine.stats() } + + /// Return an immutable snapshot of the current rule field set + /// (post-pipeline). The lock is held only long enough to clone the + /// `Arc`; the returned value remains valid across reloads. + pub fn rule_field_set(&self) -> Arc { + let snapshot = self.engine.load(); + let engine = snapshot.lock(); + engine.rule_field_set() + } } /// Produce a vec of empty `ProcessResult`, one per input line. diff --git a/docs/cli/engine/daemon.md b/docs/cli/engine/daemon.md index 9396450b..6b9ecade 100644 --- a/docs/cli/engine/daemon.md +++ b/docs/cli/engine/daemon.md @@ -140,6 +140,15 @@ The auth methods are mutually exclusive. See [NATS Streaming](../../guide/nats-s | `--bloom-max-bytes ` | `1048576` | Memory budget for the bloom index (1 MiB default). No effect without `--bloom-prefilter`. | | `--cross-rule-ac` | off | Enable cross-rule Aho-Corasick. Available with the `daachorse-index` build feature. See [Performance Tuning](../../guide/performance-tuning.md#cross-rule-aho-corasick-pre-filter). | +### Field observability (advanced) + +| Flag | Default | Description | +|------|---------|-------------| +| `--observe-fields` | off | Record the field keys of every event evaluated by the engine task so the `/api/v1/fields/*` endpoints can report which event fields no rule references (gap signal) and which rule fields have never appeared in an event (broken-coverage signal). Off by default; when off the engine task does not iterate event fields at all. | +| `--observe-fields-max-keys ` | `10000` | Hard ceiling on distinct field names tracked. Existing keys keep counting after the cap is hit; new keys are dropped and surfaced via `rsigma_fields_observer_overflow_dropped_total`. No effect without `--observe-fields`. | + +See [Observability: detection coverage](../../guide/observability.md#detection-coverage-with-observe-fields) for the operator workflow, and [HTTP API](../../reference/http-api.md#field-observability) for the endpoint payloads. + ## Examples ### Minimal daemon: stdin → stdout diff --git a/docs/cli/engine/eval.md b/docs/cli/engine/eval.md index b32dd14b..c6fc4a0c 100644 --- a/docs/cli/engine/eval.md +++ b/docs/cli/engine/eval.md @@ -66,6 +66,29 @@ For a narrative tutorial see [Evaluating Rules](../../guide/evaluating-rules.md) | `--bloom-max-bytes ` | `1048576` | Memory budget for the bloom index (1 MiB default). No effect without `--bloom-prefilter`. | | `--cross-rule-ac` | off | Enable the cross-rule Aho-Corasick pre-filter. Available when compiled with the `daachorse-index` Cargo feature. See [Performance Tuning](../../guide/performance-tuning.md#cross-rule-aho-corasick-pre-filter). | +### Field observability (offline coverage report) + +The same gap / broken-coverage signals exposed by the daemon's `/api/v1/fields*` endpoints are available offline as a one-shot report: + +| Flag | Default | Description | +|------|---------|-------------| +| `--observe-fields` | off | Record the field keys of every evaluated event and emit a coverage report at end-of-run. The report has the same JSON shape as `GET /api/v1/fields`, so the same `jq` queries work against either runtime (suited for CI gap analysis). | +| `--observe-fields-max-keys ` | `10000` | Hard ceiling on distinct field names tracked. New keys are dropped after the cap (and counted via `overflow_dropped` in the report). | +| `--observe-fields-report ` | unset | Write the report to a file. When omitted (and `--observe-fields` is set), the report goes to stderr so detections on stdout stay machine-consumable. | + +```bash +# CI: keep stdout for detection NDJSON, stderr for logs, report in its own file +rsigma engine eval -r rules/ -e @events.ndjson \ + --observe-fields \ + --observe-fields-report coverage.json + +# Quick interactive run: the report shows up on stderr alongside the +# "Processed N events, M matches." line +rsigma engine eval -r rules/ -e @events.ndjson --observe-fields +``` + +See [Observability: detection coverage](../../guide/observability.md#detection-coverage-with-observe-fields) for the operator workflow shared with the daemon path. + ### CI gating | Flag | Description | diff --git a/docs/guide/observability.md b/docs/guide/observability.md index c5913461..3cb067a0 100644 --- a/docs/guide/observability.md +++ b/docs/guide/observability.md @@ -159,6 +159,52 @@ groups: summary: rsigma rule reload is failing ``` +## Detection coverage with `--observe-fields` + +The daemon can answer two coverage questions live from inside the process: + +- **Gap signal:** "which event fields am I receiving that no loaded rule references?" An answer of "src_ip is the most-frequent unknown field" is a strong hint that an enricher should drop the field before ingestion, or that a new rule should consume it. +- **Broken-coverage signal:** "which rule fields have never appeared in an event since the daemon started?" An answer of "ProcessGuid is referenced by 3 rules and was never seen" usually means a pipeline mapping is wrong or the upstream agent dropped the field. + +Field observation is **off by default** because it adds a per-event field iteration that operators should opt into. Enable it with `--observe-fields` and (optionally) cap memory with `--observe-fields-max-keys ` (default `10000`). + +```bash +rsigma engine daemon -r /etc/rsigma/rules/ \ + --pipeline ecs_windows \ + --observe-fields \ + --observe-fields-max-keys 10000 +``` + +Once enabled, four endpoints are live on `--api-addr`: + +```bash +# Compact one-shot view bundled with summary, unknown, missing. +curl -sS http://127.0.0.1:9090/api/v1/fields | jq + +# Just the gap signal, sorted by hottest unknown first. +curl -sS http://127.0.0.1:9090/api/v1/fields/unknown | jq '.items[:5]' + +# Just the broken-coverage signal, with sample rule titles. +curl -sS http://127.0.0.1:9090/api/v1/fields/missing | jq '.items[:5]' + +# Start a fresh observation window after rolling out a new rule pack. +curl -sS -X DELETE http://127.0.0.1:9090/api/v1/fields/observer +``` + +Three Prometheus surfaces refresh on every `/metrics` scrape (`rsigma_fields_observed_total`, `rsigma_fields_observer_unique_keys`, `rsigma_fields_observer_overflow_dropped_total`). A persistent positive rate on `rsigma_fields_observer_overflow_dropped_total` means `--observe-fields-max-keys` is too low for the deployment; bump it or accept that long-tail keys will be invisible. + +The same surface works offline via `rsigma engine eval --observe-fields` for CI gap analysis. The end-of-run report has the same JSON shape as `GET /api/v1/fields`, so a single `jq` query works against either runtime: + +```bash +rsigma engine eval -r rules/ -e @events.ndjson \ + --observe-fields \ + --observe-fields-report coverage.json + +jq '.summary | {events_observed, unknown_count, missing_count}' coverage.json +``` + +See [HTTP API: Field observability](../reference/http-api.md#field-observability) for the daemon endpoint payloads and pagination, [`engine daemon`](../cli/engine/daemon.md#field-observability-advanced) for the daemon flags, and [`engine eval`](../cli/engine/eval.md#field-observability-offline-coverage-report) for the offline equivalent. + ## Health probes For Kubernetes-style orchestrators: @@ -193,6 +239,6 @@ The first line of `/metrics` should be a `# HELP rsigma_back_pressure_events_tot - [Streaming Detection](streaming-detection.md) for the daemon's HTTP API surface that complements the metrics endpoint. - [Performance Tuning](performance-tuning.md) for which metric to watch when sizing `--buffer-size`, `--batch-size`, or correlation `max_state_entries`. - [NATS Streaming](nats-streaming.md) for the NATS-specific log targets (`async_nats::connector`). -- [Prometheus metrics reference](../reference/metrics.md) for the full 27-metric catalog. +- [Prometheus metrics reference](../reference/metrics.md) for the full 30-metric catalog. - [HTTP API reference](../reference/http-api.md) for every endpoint exposed alongside `/metrics`. - [`tracing` filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives) for the exact `RUST_LOG` directive grammar. diff --git a/docs/reference/http-api.md b/docs/reference/http-api.md index 88842782..5885f56e 100644 --- a/docs/reference/http-api.md +++ b/docs/reference/http-api.md @@ -18,6 +18,10 @@ All bodies are JSON unless otherwise noted. All responses include a `Content-Typ | `/api/v1/sources` | GET | none | Dynamic pipeline sources currently registered. | | `/api/v1/sources/resolve` | POST | none | Force re-resolution of all dynamic sources (with no body) or one specific source (with `{"source_id":"..."}`). | | `/api/v1/sources/cache/{source_id}` | DELETE | none | Invalidate one source's cache so the next read fetches fresh. | +| `/api/v1/fields` | GET | none | Combined gap + broken-coverage report. Requires `--observe-fields`. | +| `/api/v1/fields/unknown` | GET | none | Fields seen in events that no rule references. Requires `--observe-fields`. | +| `/api/v1/fields/missing` | GET | none | Fields referenced by rules that have never appeared in an event. Requires `--observe-fields`. | +| `/api/v1/fields/observer` | DELETE | none | Reset the field observer's counters. Requires `--observe-fields`. | | `/v1/logs` | POST | none | OTLP/HTTP log ingestion (`application/x-protobuf` or `application/json`, optionally gzip-encoded). Requires `daemon-otlp`. | | OTLP/gRPC `LogsService/Export` | gRPC | none | OTLP over gRPC on the same `--api-addr`. Requires `daemon-otlp`. | @@ -212,6 +216,122 @@ curl -sS -X DELETE http://127.0.0.1:9090/api/v1/sources/cache/ip_blocklist The endpoint returns `200 OK` for any source ID regardless of whether that ID is currently configured; nonexistent IDs are a no-op. If you need a strict check, list `/api/v1/sources` first and confirm the source is registered before invalidating. +## Field observability + +The daemon can record the field keys of every event it evaluates and join that against the field names referenced by loaded rules. This surfaces two halves of detection coverage from inside the process: + +- **Gap signal:** fields in events that no rule references. Likely candidates for new detections, or a sign that an enricher should drop the field before ingestion. +- **Broken-coverage signal:** fields referenced by rules that have never appeared in an event. Either the rule is dead-lettered (wrong pipeline mapping, wrong logsource) or the event source has stopped emitting that field. + +Field observation is **off by default**. Start the daemon with `--observe-fields` (and optionally `--observe-fields-max-keys `, default `10000`) to enable the surface. When disabled, all four endpoints below return `503 Service Unavailable` with `{"error":"field observation disabled","hint":"..."}`. + +Three Prometheus surfaces refresh on every `/metrics` scrape (and after every successful `/api/v1/fields/*` call): `rsigma_fields_observed_total`, `rsigma_fields_observer_unique_keys`, and `rsigma_fields_observer_overflow_dropped_total`. See [Prometheus metrics](metrics.md) for the catalog entries. + +### `GET /api/v1/fields` + +One-shot snapshot bundling `summary`, `unknown`, and `missing` sections. Useful for dashboards that want all three views in a single round-trip. Each list section is paginated via `?limit=N&offset=M`. + +```bash +curl -sS 'http://127.0.0.1:9090/api/v1/fields?limit=10' +``` + +```json +{ + "summary": { + "events_observed": 1248, + "unique_keys_observed": 18, + "rule_fields_loaded": 22, + "overflow_dropped": 0, + "max_keys": 10000, + "uptime_seconds": 312.4, + "intersection_count": 12, + "unknown_count": 6, + "missing_count": 10 + }, + "unknown": { + "items": [{"field": "src_ip", "count": 1187}], + "total": 6, + "offset": 0, + "limit": 10, + "next_offset": null + }, + "missing": { + "items": [{ + "field": "ProcessGuid", + "rule_count": 3, + "sources": ["detection"], + "rule_titles": ["Sysmon Process Tampering", "..."], + "truncated": false + }], + "total": 10, + "offset": 0, + "limit": 10, + "next_offset": null + } +} +``` + +### `GET /api/v1/fields/unknown` + +Event field paths that the observer has seen but no loaded rule references. Sorted by descending count, then ascending name. Paginated with `?limit=N&offset=M`. + +```bash +curl -sS 'http://127.0.0.1:9090/api/v1/fields/unknown?limit=5' +``` + +```json +{ + "items": [ + {"field": "src_ip", "count": 1187}, + {"field": "User", "count": 1183} + ], + "total": 6, + "offset": 0, + "limit": 5, + "next_offset": null +} +``` + +### `GET /api/v1/fields/missing` + +Field names referenced by loaded rules that have never appeared in an event since the observer was started (or last reset). Each entry includes `rule_count` (total rules touching the field), `sources` (the kinds the field originated in: `detection`, `correlation`, `filter`, `metadata`), and `rule_titles` (up to 10 sample titles, with `truncated: true` when more exist). + +```bash +curl -sS 'http://127.0.0.1:9090/api/v1/fields/missing?limit=5' +``` + +```json +{ + "items": [ + { + "field": "ProcessGuid", + "rule_count": 3, + "sources": ["detection"], + "rule_titles": ["Sysmon Process Tampering"], + "truncated": false + } + ], + "total": 10, + "offset": 0, + "limit": 5, + "next_offset": null +} +``` + +### `DELETE /api/v1/fields/observer` + +Clear the observer's counters and overflow tally, and reset the per-observer uptime clock. Returns what was cleared so dashboards can subtract baselines. + +```bash +curl -sS -X DELETE http://127.0.0.1:9090/api/v1/fields/observer +``` + +```json +{"status":"reset","previous_keys":18,"previous_events":1248} +``` + +A `DELETE` does not affect rule loading or any other daemon state. Use it after a rule reload to start a clean coverage window against the updated rule set. + ## OTLP ingest ### `POST /v1/logs` (HTTP) diff --git a/docs/reference/metrics.md b/docs/reference/metrics.md index 5d832ed7..bc2a0760 100644 --- a/docs/reference/metrics.md +++ b/docs/reference/metrics.md @@ -1,6 +1,6 @@ # Prometheus Metrics -The `engine daemon` exposes Prometheus metrics on `GET /metrics` on the same `--api-addr` as the REST API. The full definition catalogue is 27 metric names across three concerns; the runtime exposes the ones that have ever fired in a given process. A startup scrape shows 21 names by default (one of the per-rule counters surfaces immediately because the registry pre-creates it for documentation); the remaining six lazy metrics register on first use of dynamic pipelines or OTLP. +The `engine daemon` exposes Prometheus metrics on `GET /metrics` on the same `--api-addr` as the REST API. The full definition catalogue is 30 metric names across four concerns; the runtime exposes the ones that have ever fired in a given process. The three field-observer surfaces always render their `# HELP`/`# TYPE` lines (and stay at zero unless `--observe-fields` was passed); the others follow the lazy-registration pattern documented per section below. The exact source of truth is the [`daemon/metrics`](https://github.com/timescale/rsigma/blob/main/crates/rsigma-cli/src/daemon/metrics.rs) module. @@ -87,6 +87,16 @@ Exposed when the daemon is built with `daemon-tls`. Both metrics render with the | `rsigma_tls_certificate_expiry_seconds` | gauge | — | Seconds until the active TLS server certificate's `not_after`. Signed: negative once expired. Updated at startup and after every successful SIGHUP-triggered reload. | | `rsigma_tls_active_connections` | gauge | — | Currently active TLS-terminated connections on the API listener. Decrements on connection close (including handshake failure). | +## Field observability (3 metrics) + +Exposed unconditionally; values stay at zero unless the daemon was started with `--observe-fields`. All three refresh on every `/metrics` scrape and after every successful `/api/v1/fields/*` call. See [HTTP API: Field observability](http-api.md#field-observability) for the matching endpoints. + +| Metric | Type | Labels | Description | +|--------|------|--------|-------------| +| `rsigma_fields_observed_total` | counter | — | Total events scanned by the opt-in field observer. Advances regardless of whether the event had structured fields. | +| `rsigma_fields_observer_unique_keys` | gauge | — | Distinct field names currently tracked. Saturates at `--observe-fields-max-keys` (default `10000`). | +| `rsigma_fields_observer_overflow_dropped_total` | counter | — | New-key insert attempts dropped because the observer was at capacity. A persistent positive rate signals that `--observe-fields-max-keys` is too low for the deployment. | + ## Scrape configuration Minimum Prometheus scrape config: