Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/analyzer/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,31 @@ fn find_header(headers: &[httparse::Header<'_>], name: &str) -> Option<String> {
.map(|h| String::from_utf8_lossy(h.value).trim().to_string())
}

/// Number of consecutive parse errors before poisoning a direction.
/// Set > 1 to tolerate mid-stream joins where the first segment(s)
/// are body data from a transfer that started before the capture.
const POISON_THRESHOLD: u8 = 3;

struct HttpFlowState {
request_buf: Vec<u8>,
response_buf: Vec<u8>,
request_poisoned: bool,
response_poisoned: bool,
request_error_count: u8,
response_error_count: u8,
counted_as_non_http: bool,
}

impl HttpFlowState {
fn new() -> Self {
HttpFlowState {
request_buf: Vec::new(),
response_buf: Vec::new(),
request_poisoned: false,
response_poisoned: false,
request_error_count: 0,
response_error_count: 0,
counted_as_non_http: false,
}
}
}
Expand All @@ -93,6 +108,8 @@ pub struct HttpAnalyzer {
transactions: u64,
all_findings: Vec<Finding>,
parse_errors: u64,
non_http_flows: u64,
poisoned_bytes_skipped: u64,
}

impl Default for HttpAnalyzer {
Expand All @@ -113,6 +130,8 @@ impl HttpAnalyzer {
transactions: 0,
all_findings: Vec::new(),
parse_errors: 0,
non_http_flows: 0,
poisoned_bytes_skipped: 0,
}
}

Expand Down Expand Up @@ -144,6 +163,10 @@ impl HttpAnalyzer {
self.parse_errors
}

pub fn poisoned_bytes_skipped(&self) -> u64 {
self.poisoned_bytes_skipped
}

fn check_request_detections(&mut self, parsed: &ParsedRequest, _flow_key: &FlowKey) {
let uri_lower = parsed.uri.to_lowercase();

Expand Down Expand Up @@ -307,12 +330,23 @@ impl HttpAnalyzer {

if let Some(state) = self.flows.get_mut(flow_key) {
state.request_buf.drain(..parsed.bytes_consumed);
state.request_error_count = 0;
}
}
Some(Ok(None)) => return, // Partial — wait for more data
Some(Err(e)) => {
if !had_success {
self.parse_errors += 1;
if let Some(state) = self.flows.get_mut(flow_key) {
state.request_error_count = state.request_error_count.saturating_add(1);
if state.request_error_count >= POISON_THRESHOLD {
state.request_poisoned = true;
if !state.counted_as_non_http {
state.counted_as_non_http = true;
self.non_http_flows += 1;
}
}
}
if e == httparse::Error::TooManyHeaders {
self.all_findings.push(Finding {
category: ThreatCategory::Anomaly,
Expand Down Expand Up @@ -353,12 +387,24 @@ impl HttpAnalyzer {

if let Some(state) = self.flows.get_mut(flow_key) {
state.response_buf.drain(..parsed.bytes_consumed);
state.response_error_count = 0;
}
}
Some(Ok(None)) => return,
Some(Err(e)) => {
if !had_success {
self.parse_errors += 1;
if let Some(state) = self.flows.get_mut(flow_key) {
state.response_error_count =
state.response_error_count.saturating_add(1);
if state.response_error_count >= POISON_THRESHOLD {
state.response_poisoned = true;
if !state.counted_as_non_http {
state.counted_as_non_http = true;
self.non_http_flows += 1;
}
}
}
if e == httparse::Error::TooManyHeaders {
self.all_findings.push(Finding {
category: ThreatCategory::Anomaly,
Expand Down Expand Up @@ -392,6 +438,10 @@ impl StreamHandler for HttpAnalyzer {
.or_insert_with(HttpFlowState::new);
match direction {
Direction::ClientToServer => {
if state.request_poisoned {
self.poisoned_bytes_skipped += data.len() as u64;
return;
}
let remaining = MAX_HEADER_BUF.saturating_sub(state.request_buf.len());
if remaining > 0 {
state
Expand All @@ -400,6 +450,10 @@ impl StreamHandler for HttpAnalyzer {
}
}
Direction::ServerToClient => {
if state.response_poisoned {
self.poisoned_bytes_skipped += data.len() as u64;
return;
}
let remaining = MAX_HEADER_BUF.saturating_sub(state.response_buf.len());
if remaining > 0 {
state
Expand Down Expand Up @@ -459,6 +513,14 @@ impl StreamAnalyzer for HttpAnalyzer {
"parse_errors".to_string(),
serde_json::json!(self.parse_errors),
);
detail.insert(
"non_http_flows".to_string(),
serde_json::json!(self.non_http_flows),
);
detail.insert(
"poisoned_bytes_skipped".to_string(),
serde_json::json!(self.poisoned_bytes_skipped),
);

AnalysisSummary {
analyzer_name: self.name().to_string(),
Expand Down
90 changes: 86 additions & 4 deletions tests/http_analyzer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,101 @@ fn test_parse_error_in_response() {
}

#[test]
fn test_parse_error_clears_buffer_and_continues() {
fn test_parse_error_poisons_direction_after_threshold() {
let mut analyzer = HttpAnalyzer::new();
let fk = test_flow_key();

// First: malformed request (triggers error, clears buffer)
// Send 3 consecutive errors to reach POISON_THRESHOLD
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE1\r\n\r\n", 0);
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE2\r\n\r\n", 0);
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE3\r\n\r\n", 0);
assert_eq!(analyzer.parse_error_count(), 3);

// Fourth: valid request — skipped because direction is now poisoned
let valid = b"GET /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n";
let skipped_before = analyzer.poisoned_bytes_skipped();
analyzer.on_data(&fk, Direction::ClientToServer, valid, 0);

assert_eq!(analyzer.parse_error_count(), 3); // no new errors (poisoned, not retried)
assert!(analyzer.method_counts().get("GET").is_none()); // never parsed
assert_eq!(
analyzer.poisoned_bytes_skipped(),
skipped_before + valid.len() as u64
);
}

#[test]
fn test_single_error_does_not_poison() {
let mut analyzer = HttpAnalyzer::new();
let fk = test_flow_key();

// One error is below threshold — should NOT poison
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE\r\n\r\n", 0);
assert_eq!(analyzer.parse_error_count(), 1);

// Second: valid request (should parse successfully on fresh buffer)
// Valid request should still parse (direction not poisoned yet)
let valid = b"GET /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n";
analyzer.on_data(&fk, Direction::ClientToServer, valid, 0);

assert_eq!(analyzer.parse_error_count(), 1); // no new errors
assert_eq!(analyzer.parse_error_count(), 1);
assert_eq!(*analyzer.method_counts().get("GET").unwrap(), 1);
}

#[test]
fn test_poison_request_does_not_affect_response() {
let mut analyzer = HttpAnalyzer::new();
let fk = test_flow_key();

// Poison request direction (3 errors)
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE1\r\n\r\n", 0);
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE2\r\n\r\n", 0);
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE3\r\n\r\n", 0);
assert_eq!(analyzer.parse_error_count(), 3);

// Response direction should still work
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n";
analyzer.on_data(&fk, Direction::ServerToClient, response, 0);
assert_eq!(analyzer.transaction_count(), 1);
assert_eq!(*analyzer.status_code_counts().get(&200).unwrap(), 1);
}

#[test]
fn test_non_http_flows_counts_per_flow_not_direction() {
let mut analyzer = HttpAnalyzer::new();
let fk = test_flow_key();

// Poison request direction (3 errors)
for _ in 0..3 {
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE\r\n\r\n", 0);
}
// Poison response direction (3 errors)
for _ in 0..3 {
analyzer.on_data(&fk, Direction::ServerToClient, b"GARBAGE\r\n\r\n", 0);
}

// Both directions poisoned, but non_http_flows should count 1 flow, not 2
let summary = analyzer.summarize();
assert_eq!(summary.detail["non_http_flows"], serde_json::json!(1));
}

#[test]
fn test_poison_cleared_after_flow_close() {
use wirerust::reassembly::handler::CloseReason;

let mut analyzer = HttpAnalyzer::new();
let fk = test_flow_key();

// Poison request direction (3 errors)
for _ in 0..3 {
analyzer.on_data(&fk, Direction::ClientToServer, b"GARBAGE\r\n\r\n", 0);
}

// Close the flow
analyzer.on_flow_close(&fk, CloseReason::Fin);

// Reopen same 4-tuple — should NOT be poisoned
let valid = b"GET /index.html HTTP/1.1\r\nHost: example.com\r\n\r\n";
analyzer.on_data(&fk, Direction::ClientToServer, valid, 0);
assert_eq!(*analyzer.method_counts().get("GET").unwrap(), 1);
}

Expand Down