Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 42 additions & 3 deletions crates/rsigma-cli/src/commands/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use rsigma_eval::parse_pipeline_file;
use rsigma_runtime::DefaultSourceResolver;
use rsigma_runtime::sources::SourceResolver;

pub fn cmd_resolve(pipeline_paths: Vec<PathBuf>, source_filter: Option<String>, pretty: bool) {
pub fn cmd_resolve(
pipeline_paths: Vec<PathBuf>,
source_filter: Option<String>,
pretty: bool,
dry_run: bool,
) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
Expand All @@ -16,10 +21,15 @@ pub fn cmd_resolve(pipeline_paths: Vec<PathBuf>, source_filter: Option<String>,
std::process::exit(crate::exit_code::CONFIG_ERROR);
});

rt.block_on(async { resolve_async(pipeline_paths, source_filter, pretty).await });
rt.block_on(async { resolve_async(pipeline_paths, source_filter, pretty, dry_run).await });
}

async fn resolve_async(pipeline_paths: Vec<PathBuf>, source_filter: Option<String>, pretty: bool) {
async fn resolve_async(
pipeline_paths: Vec<PathBuf>,
source_filter: Option<String>,
pretty: bool,
dry_run: bool,
) {
let mut all_sources = Vec::new();

for path in &pipeline_paths {
Expand Down Expand Up @@ -58,6 +68,35 @@ async fn resolve_async(pipeline_paths: Vec<PathBuf>, source_filter: Option<Strin
std::process::exit(crate::exit_code::RULE_ERROR);
}

if dry_run {
let items: Vec<_> = all_sources
.iter()
.map(|(pipeline_name, source)| {
serde_json::json!({
"pipeline": pipeline_name,
"source_id": &source.id,
"source_type": format!("{:?}", source.source_type).split('{').next().unwrap_or("unknown").trim(),
"required": source.required,
"refresh": format!("{:?}", source.refresh),
})
})
.collect();

let output = if items.len() == 1 {
items.into_iter().next().unwrap()
} else {
serde_json::Value::Array(items)
};

let json_str = if pretty {
serde_json::to_string_pretty(&output).unwrap()
} else {
serde_json::to_string(&output).unwrap()
};
println!("{json_str}");
return;
}

let resolver = Arc::new(DefaultSourceResolver::new());
let mut results = Vec::new();
let mut had_error = false;
Expand Down
61 changes: 59 additions & 2 deletions crates/rsigma-cli/src/commands/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,65 @@ use std::process;
use rsigma_eval::Engine;
use rsigma_parser::parse_sigma_directory;

pub(crate) fn cmd_validate(path: PathBuf, verbose: bool, pipeline_paths: Vec<PathBuf>) {
let pipelines = crate::load_pipelines(&pipeline_paths);
pub(crate) fn cmd_validate(
path: PathBuf,
verbose: bool,
pipeline_paths: Vec<PathBuf>,
resolve_sources: bool,
) {
let mut pipelines = crate::load_pipelines(&pipeline_paths);

if resolve_sources {
let has_dynamic = pipelines.iter().any(|p| p.is_dynamic());
if has_dynamic {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|e| {
eprintln!("Failed to create async runtime for source resolution: {e}");
process::exit(crate::exit_code::CONFIG_ERROR);
});

let resolver = rsigma_runtime::DefaultSourceResolver::new();
let mut resolved_pipelines = Vec::with_capacity(pipelines.len());
let mut source_errors: Vec<String> = Vec::new();

for pipeline in &pipelines {
if pipeline.is_dynamic() {
match rt.block_on(rsigma_runtime::sources::resolve_all(
&resolver,
&pipeline.sources,
)) {
Ok(resolved_data) => {
let expanded =
rsigma_runtime::sources::template::TemplateExpander::expand(
pipeline,
&resolved_data,
);
resolved_pipelines.push(expanded);
}
Err(e) => {
source_errors.push(format!("pipeline '{}': {e}", pipeline.name));
resolved_pipelines.push(pipeline.clone());
}
}
} else {
resolved_pipelines.push(pipeline.clone());
}
}

if !source_errors.is_empty() {
eprintln!("Source resolution errors:");
for err in &source_errors {
eprintln!(" - {err}");
}
process::exit(crate::exit_code::CONFIG_ERROR);
}

pipelines = resolved_pipelines;
println!(" Sources resolved: OK");
}
}

match parse_sigma_directory(&path) {
Ok(collection) => {
Expand Down
14 changes: 14 additions & 0 deletions crates/rsigma-cli/src/daemon/instrumented_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ impl InstrumentedResolver {
metrics,
}
}

/// Access the underlying cache for invalidation operations.
pub fn cache(&self) -> &rsigma_runtime::sources::cache::SourceCache {
self.inner.cache()
}
}

#[async_trait::async_trait]
Expand All @@ -44,6 +49,15 @@ impl SourceResolver for InstrumentedResolver {
if value.from_cache {
self.metrics.source_cache_hits.inc();
}
self.metrics
.source_last_resolved
.with_label_values(&[source.id.as_str()])
.set(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64(),
);
}
Err(e) => {
let error_kind = match &e.kind {
Expand Down
15 changes: 14 additions & 1 deletion crates/rsigma-cli/src/daemon/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use prometheus::{
Gauge, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry,
Gauge, GaugeVec, Histogram, HistogramOpts, IntCounter, IntCounterVec, IntGauge, Opts, Registry,
TextEncoder,
};
use rsigma_runtime::MetricsHook;
Expand Down Expand Up @@ -30,6 +30,7 @@ pub struct Metrics {
pub source_resolve_errors: IntCounterVec,
pub source_resolve_latency: Histogram,
pub source_cache_hits: IntCounter,
pub source_last_resolved: GaugeVec,
#[cfg(feature = "daemon-otlp")]
pub otlp_requests: IntCounterVec,
#[cfg(feature = "daemon-otlp")]
Expand Down Expand Up @@ -235,6 +236,14 @@ impl Metrics {
"Times cached source data was served on resolution failure",
))
.unwrap();
let source_last_resolved = GaugeVec::new(
Opts::new(
"rsigma_source_last_resolved_timestamp",
"Unix timestamp of last successful resolution per source",
),
&["source_id"],
)
.unwrap();

registry
.register(Box::new(source_resolves_total.clone()))
Expand All @@ -248,6 +257,9 @@ impl Metrics {
registry
.register(Box::new(source_cache_hits.clone()))
.unwrap();
registry
.register(Box::new(source_last_resolved.clone()))
.unwrap();

#[cfg(feature = "daemon-otlp")]
let otlp_requests = IntCounterVec::new(
Expand Down Expand Up @@ -305,6 +317,7 @@ impl Metrics {
source_resolve_errors,
source_resolve_latency,
source_cache_hits,
source_last_resolved,
#[cfg(feature = "daemon-otlp")]
otlp_requests,
#[cfg(feature = "daemon-otlp")]
Expand Down
18 changes: 13 additions & 5 deletions crates/rsigma-cli/src/daemon/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ pub fn spawn_file_watcher(
Some(watcher)
}

/// Set up a SIGHUP handler that sends reload signals.
/// Set up a SIGHUP handler that sends reload signals and source re-resolution triggers.
#[cfg(unix)]
pub async fn sighup_listener(reload_tx: mpsc::Sender<()>) {
pub async fn sighup_listener(
reload_tx: mpsc::Sender<()>,
sources_trigger_tx: Option<mpsc::Sender<rsigma_runtime::sources::refresh::RefreshTrigger>>,
) {
use tokio::signal::unix::{SignalKind, signal};

let mut sig = match signal(SignalKind::hangup()) {
Expand All @@ -75,13 +78,18 @@ pub async fn sighup_listener(reload_tx: mpsc::Sender<()>) {

loop {
sig.recv().await;
tracing::info!("SIGHUP received, triggering reload");
tracing::info!("SIGHUP received, triggering reload and source re-resolution");
let _ = reload_tx.try_send(());
if let Some(tx) = &sources_trigger_tx {
let _ = tx.try_send(rsigma_runtime::sources::refresh::RefreshTrigger::All);
}
}
}

#[cfg(not(unix))]
pub async fn sighup_listener(_reload_tx: mpsc::Sender<()>) {
// No-op on non-Unix platforms
pub async fn sighup_listener(
_reload_tx: mpsc::Sender<()>,
_sources_trigger_tx: Option<mpsc::Sender<rsigma_runtime::sources::refresh::RefreshTrigger>>,
) {
std::future::pending::<()>().await;
}
Loading
Loading