feat: dynamic pipeline source resolution engine (Phase 2-3)#87
Merged
Conversation
Implement the source resolution infrastructure for dynamic Sigma
pipelines (Phase 2a+2b+2c scaffolding):
Source resolvers (crates/rsigma-runtime/src/sources/):
- SourceResolver trait with DefaultSourceResolver
- File source: read + parse (JSON/YAML/lines/CSV) + jq extract
- Command source: tokio::process::Command + stdout parsing
- HTTP source: reqwest with env-var header expansion, configurable
method/timeout, format parsing + extract
- jq extraction via jaq-interpret for post-fetch data shaping
Template expansion:
- ${source.X} and ${source.X.path.to.field} replacement in pipeline vars
- Handles scalar, list, and inline template expansion
Caching:
- In-memory + SQLite-backed persistence (last-known-good values)
- Serves stale data on failure when on_error: use_cached
Error policies:
- use_cached: serve from cache on failure
- use_default: use declared default value
- fail: propagate error (blocks startup if required: true)
Refresh scheduler:
- RefreshScheduler with per-source interval timers
- On-demand trigger channel for API/SIGHUP integration
- Watch channel for notifying consumers of updated data
Daemon integration:
- RuntimeEngine.set_source_resolver() + resolve_dynamic_pipelines()
- load_rules() resolves dynamic sources when resolver is set
- LogProcessor carries resolver across reload cycles
- API endpoints: GET /api/v1/sources, POST /api/v1/sources/resolve,
POST /api/v1/sources/resolve/{source_id}
Dependencies added to rsigma-runtime:
- reqwest 0.12 (HTTP client)
- rusqlite 0.39 (cache persistence)
- jaq-interpret 1.5.0 + jaq-parse 1.0.3 (jq extraction)
- csv 1 (CSV format parsing)
- async-trait, regex
21 integration tests covering file/command sources, template expansion,
error policies, and SQLite cache persistence.
Complete Phase 2c of dynamic pipelines: - NATS push source: subscribe to subject, parse incoming messages, forward as RefreshTrigger::NatsPush to the scheduler - File watch: per-source notify watcher with debouncing for RefreshPolicy::Watch sources - RefreshScheduler: spawns interval timers, NATS subscriptions, and file watchers; coordinates re-resolution on triggers - futures dependency added to nats feature for StreamExt Dependencies: notify 8.2 (file watching), futures 0.3 (optional, nats)
Complete Phase 2d of dynamic pipelines: Include expansion (sources/include.rs): - Expands Transformation::Include directives by fetching source data and parsing it as transformation arrays - Security: blocks remote includes (HTTP/NATS) unless explicitly allowed via allow_remote_include setting - Uses rsigma-eval's parse_transformation_items (newly public) Startup sequencing: - resolve_all() now differentiates required vs optional sources - Required sources (required=true) with Fail policy propagate errors and block daemon startup - Optional sources that fail log a warning and use Null fallback Daemon wiring (server.rs): - Creates DefaultSourceResolver and sets it on RuntimeEngine - Calls resolve_dynamic_pipelines() at startup (async) - Spins up RefreshScheduler with trigger sender wired to AppState - allow_remote_include carried across reload cycles in processor Exports from rsigma-eval: - TransformationItem and parse_transformation_items now public
Phase 3 of dynamic pipelines:
CLI `resolve` command:
- `rsigma resolve -p pipeline.yml` resolves all dynamic sources and
prints their data as JSON
- `--source <id>` filters to a specific source
- `--pretty` for formatted output
- Exits non-zero if any resolution fails
Prometheus metrics (registered, ready for instrumentation):
- rsigma_source_resolves_total{source_id, source_type}
- rsigma_source_resolve_errors_total{source_id, error_kind}
- rsigma_source_resolve_seconds (histogram, 10ms-10s buckets)
- rsigma_source_cache_hits_total
The /api/v1/sources status endpoint was already added in Phase 2c.
Wire the declared source metrics into actual resolution: - InstrumentedResolver wraps DefaultSourceResolver, recording per-call metrics: resolves_total, resolve_errors, resolve_latency, cache_hits - ResolvedValue gains a `from_cache` flag so the wrapper can detect when the inner resolver served stale data on failure - Daemon uses InstrumentedResolver instead of DefaultSourceResolver - Error labels use the SourceErrorKind variant name (Fetch/Parse/ Extract/Timeout) for grouping - async-trait added to rsigma-cli daemon feature deps
Rust 1.88.0 cannot infer the shared element type when mixing &String and &str in a slice passed to `with_label_values`. Use `.as_str()` on the String references to produce a uniform `&[&str]`.
- Replace `printf` with file-based cat/type for lines test - Replace `echo` with file-based cat/type for JSON tests (Windows echo mangles JSON through cmd.exe) - Replace `false` with `cmd /C exit 1` on Windows for failure test - All command tests now use #[cfg(unix)]/#[cfg(windows)] branches
ed5d714 to
3b37563
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements the full dynamic source resolution infrastructure for Sigma pipelines, enabling pipeline variables to be populated from external sources (files, commands, HTTP endpoints, NATS subjects) at startup and refreshed continuously at runtime.
Phase 2a+2b: Core resolution engine
SourceResolvertrait withDefaultSourceResolverdispatching to file, command, and HTTP resolversjaq-interpretfor post-fetch data shapingTemplateExpanderreplacing${source.X}and${source.X.path}in pipeline varsuse_cached,use_default,failPhase 2c: Refresh scheduling
RefreshSchedulerwith per-source interval timersGET /api/v1/sources,POST /api/v1/sources/resolve,POST /api/v1/sources/resolve/{source_id}Phase 2d: Include expansion + daemon wiring
Transformation::Includeexpansion from resolved source dataRuntimeEnginecarries resolver across hot-reloads viaLogProcessorPhase 3: CLI tooling + observability
rsigma resolve -p pipeline.ymlfor offline source testingrsigma_source_resolves_total,rsigma_source_resolve_errors_total,rsigma_source_resolve_seconds,rsigma_source_cache_hits_totalInstrumentedResolverwrapper recording all metrics transparentlyDependencies added
reqwest 0.12(HTTP client),rusqlite 0.39(cache persistence)jaq-interpret 1.5.0+jaq-parse 1.0.3(jq extraction)csv 1(CSV format parsing),notify 8.2(file watching)async-trait(CLI daemon feature)Test plan
crates/rsigma-runtime/tests/sources_integration.rscovering file/command sources, template expansion, error policies, SQLite cache persistencecargo clippy --workspace --all-features -- -D warningscleancargo fmt --all -- --checkclean