From 35e51a882efcbb97771f1de7a5fbc4228a3e4c92 Mon Sep 17 00:00:00 2001 From: Hyeseong Kim Date: Tue, 4 Feb 2025 16:20:52 +0900 Subject: [PATCH] Add drop_on_abort option to transform config Closes #5664 --- .../quickwit-config/src/source_config/mod.rs | 23 ++++++++++++++++ .../src/actors/doc_processor.rs | 14 +++++++++- .../src/actors/vrl_processing.rs | 27 ++++++++++++------- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index a3a783ab077..a4868d8518b 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -172,6 +172,7 @@ impl crate::TestableForRegression for SourceConfig { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, } @@ -620,6 +621,10 @@ fn default_consumer_name() -> String { "quickwit".to_string() } +fn default_drop_on_abort() -> bool { + true +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct TransformConfig { @@ -633,6 +638,10 @@ pub struct TransformConfig { /// manipulations. Defaults to `UTC` if not timezone is specified. #[serde(default = "default_timezone")] timezone: String, + + /// Drops any event that is manually aborted during processing. + #[serde(default = "default_drop_on_abort")] + pub drop_on_abort: bool, } fn default_timezone() -> String { @@ -646,6 +655,7 @@ impl TransformConfig { Self { vrl_script, timezone: timezone_opt.unwrap_or_else(default_timezone), + drop_on_abort: default_drop_on_abort(), } } @@ -709,6 +719,7 @@ impl TransformConfig { Self { vrl_script: vrl_script.to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), } } } @@ -754,6 +765,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; @@ -849,6 +861,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; @@ -1353,6 +1366,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; @@ -1366,6 +1380,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }; let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap(); assert_eq!( @@ -1377,6 +1392,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }; let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap(); assert_eq!( @@ -1398,6 +1414,7 @@ mod tests { let expected_transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }; assert_eq!(transform_config, expected_transform_config); } @@ -1412,6 +1429,7 @@ mod tests { let expected_transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "Turkey".to_string(), + drop_on_abort: default_drop_on_abort(), }; assert_eq!(transform_config, expected_transform_config); } @@ -1424,6 +1442,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "Turkey".to_string(), + drop_on_abort: default_drop_on_abort(), }; transform_config.compile_vrl_script().unwrap(); } @@ -1437,6 +1456,7 @@ mod tests { "# .to_string(), timezone: default_timezone(), + drop_on_abort: default_drop_on_abort(), }; transform_config.compile_vrl_script().unwrap(); } @@ -1444,6 +1464,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "foo".to_string(), + drop_on_abort: default_drop_on_abort(), }; let error = transform_config.compile_vrl_script().unwrap_err(); assert!(error.to_string().starts_with("failed to parse timezone")); @@ -1452,6 +1473,7 @@ mod tests { let transform_config = TransformConfig { vrl_script: "foo".to_string(), timezone: "Turkey".to_string(), + drop_on_abort: default_drop_on_abort(), }; let error = transform_config.compile_vrl_script().unwrap_err(); assert!(error.to_string().starts_with("failed to compile")); @@ -1504,6 +1526,7 @@ mod tests { transform_config: Some(TransformConfig { vrl_script: ".message = downcase(string!(.message))".to_string(), timezone: "local".to_string(), + drop_on_abort: default_drop_on_abort(), }), input_format: SourceInputFormat::Json, }; diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index cc60507d7aa..28b3a10b5f0 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -201,7 +201,7 @@ fn parse_raw_doc( }; let json_doc_result = try_into_vrl_doc(input_format, raw_doc, num_bytes) .and_then(|vrl_doc| vrl_program.transform_doc(vrl_doc)) - .and_then(JsonDoc::try_from_vrl_doc); + .and_then(|vrl_doc_opt| vrl_doc_opt.map(JsonDoc::try_from_vrl_doc).transpose()); JsonDocIterator::from(json_doc_result) } @@ -249,6 +249,18 @@ where E: Into } } +impl From, E>> for JsonDocIterator +where E: Into, +{ + fn from(result: Result, E>) -> Self { + match result { + Ok(None) => Self::One(None), + Ok(Some(json_doc)) => Self::One(Some(Ok(json_doc))), + Err(error) => Self::One(Some(Err(error.into()))), + } + } +} + impl From> for JsonDocIterator { fn from(result: Result) -> Self { match result { diff --git a/quickwit/quickwit-indexing/src/actors/vrl_processing.rs b/quickwit/quickwit-indexing/src/actors/vrl_processing.rs index c441be3544a..913eb4ff406 100644 --- a/quickwit/quickwit-indexing/src/actors/vrl_processing.rs +++ b/quickwit/quickwit-indexing/src/actors/vrl_processing.rs @@ -40,14 +40,17 @@ impl VrlDoc { pub(super) struct VrlProgram { program: Program, - timezone: TimeZone, runtime: Runtime, + timezone: TimeZone, + drop_on_abort: bool, metadata: VrlValue, secrets: VrlSecrets, } impl VrlProgram { - pub fn transform_doc(&mut self, vrl_doc: VrlDoc) -> Result { + pub fn transform_doc(&mut self, vrl_doc: VrlDoc) -> Result, DocProcessorError> { + let drop_on_abort = self.drop_on_abort; + let VrlDoc { mut vrl_value, num_bytes, @@ -58,20 +61,25 @@ impl VrlProgram { metadata: &mut self.metadata, secrets: &mut self.secrets, }; - let runtime_res = self + + let runtime_result = self .runtime - .resolve(&mut target, &self.program, &self.timezone) - .map_err(|transform_error| { - warn!(transform_error=?transform_error); - DocProcessorError::Transform(transform_error) - }); + .resolve(&mut target, &self.program, &self.timezone); if let VrlValue::Object(metadata) = target.metadata { metadata.clear(); } self.runtime.clear(); - runtime_res.map(|vrl_value| VrlDoc::new(vrl_value, num_bytes)) + match runtime_result { + Err(VrlTerminate::Abort(_)) if drop_on_abort => Ok(None), + runtime_result => runtime_result + .map(|vrl_value| Some(VrlDoc::new(vrl_value, num_bytes))) + .map_err(|transform_error| { + warn!(transform_error=?transform_error); + DocProcessorError::Transform(transform_error) + }), + } } pub fn try_from_transform_config(transform_config: TransformConfig) -> anyhow::Result { @@ -83,6 +91,7 @@ impl VrlProgram { program, runtime, timezone, + drop_on_abort: transform_config.drop_on_abort, metadata: VrlValue::Object(BTreeMap::new()), secrets: VrlSecrets::default(), })