Skip to content

Add drop_on_abort option to transform config #5665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl crate::TestableForRegression for SourceConfig {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
}
Expand Down Expand Up @@ -620,6 +621,10 @@ fn default_consumer_name() -> String {
"quickwit".to_string()
}

fn default_drop_on_abort() -> bool {
true
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct TransformConfig {
Expand All @@ -633,6 +638,10 @@ pub struct TransformConfig {
/// manipulations. Defaults to `UTC` if not timezone is specified.
#[serde(default = "default_timezone")]
timezone: String,

/// Drops any event that is manually aborted during processing.
#[serde(default = "default_drop_on_abort")]
pub drop_on_abort: bool,
}

fn default_timezone() -> String {
Expand All @@ -646,6 +655,7 @@ impl TransformConfig {
Self {
vrl_script,
timezone: timezone_opt.unwrap_or_else(default_timezone),
drop_on_abort: default_drop_on_abort(),
}
}

Expand Down Expand Up @@ -709,6 +719,7 @@ impl TransformConfig {
Self {
vrl_script: vrl_script.to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
}
}
}
Expand Down Expand Up @@ -754,6 +765,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -849,6 +861,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -1353,6 +1366,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand All @@ -1366,6 +1380,7 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
};
let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap();
assert_eq!(
Expand All @@ -1377,6 +1392,7 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
};
let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap();
assert_eq!(
Expand All @@ -1398,6 +1414,7 @@ mod tests {
let expected_transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
};
assert_eq!(transform_config, expected_transform_config);
}
Expand All @@ -1412,6 +1429,7 @@ mod tests {
let expected_transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "Turkey".to_string(),
drop_on_abort: default_drop_on_abort(),
};
assert_eq!(transform_config, expected_transform_config);
}
Expand All @@ -1424,6 +1442,7 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "Turkey".to_string(),
drop_on_abort: default_drop_on_abort(),
};
transform_config.compile_vrl_script().unwrap();
}
Expand All @@ -1437,13 +1456,15 @@ mod tests {
"#
.to_string(),
timezone: default_timezone(),
drop_on_abort: default_drop_on_abort(),
};
transform_config.compile_vrl_script().unwrap();
}
{
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "foo".to_string(),
drop_on_abort: default_drop_on_abort(),
};
let error = transform_config.compile_vrl_script().unwrap_err();
assert!(error.to_string().starts_with("failed to parse timezone"));
Expand All @@ -1452,6 +1473,7 @@ mod tests {
let transform_config = TransformConfig {
vrl_script: "foo".to_string(),
timezone: "Turkey".to_string(),
drop_on_abort: default_drop_on_abort(),
};
let error = transform_config.compile_vrl_script().unwrap_err();
assert!(error.to_string().starts_with("failed to compile"));
Expand Down Expand Up @@ -1504,6 +1526,7 @@ mod tests {
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone: "local".to_string(),
drop_on_abort: default_drop_on_abort(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down
14 changes: 13 additions & 1 deletion quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ fn parse_raw_doc(
};
let json_doc_result = try_into_vrl_doc(input_format, raw_doc, num_bytes)
.and_then(|vrl_doc| vrl_program.transform_doc(vrl_doc))
.and_then(JsonDoc::try_from_vrl_doc);
.and_then(|vrl_doc_opt| vrl_doc_opt.map(JsonDoc::try_from_vrl_doc).transpose());

JsonDocIterator::from(json_doc_result)
}
Expand Down Expand Up @@ -249,6 +249,18 @@ where E: Into<DocProcessorError>
}
}

impl<E> From<Result<Option<JsonDoc>, E>> for JsonDocIterator
where E: Into<DocProcessorError>,
{
fn from(result: Result<Option<JsonDoc>, E>) -> Self {
match result {
Ok(None) => Self::One(None),
Ok(Some(json_doc)) => Self::One(Some(Ok(json_doc))),
Err(error) => Self::One(Some(Err(error.into()))),
}
}
}

impl From<Result<JsonLogIterator, OtlpLogsError>> for JsonDocIterator {
fn from(result: Result<JsonLogIterator, OtlpLogsError>) -> Self {
match result {
Expand Down
27 changes: 18 additions & 9 deletions quickwit/quickwit-indexing/src/actors/vrl_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ impl VrlDoc {

pub(super) struct VrlProgram {
program: Program,
timezone: TimeZone,
runtime: Runtime,
timezone: TimeZone,
drop_on_abort: bool,
metadata: VrlValue,
secrets: VrlSecrets,
}

impl VrlProgram {
pub fn transform_doc(&mut self, vrl_doc: VrlDoc) -> Result<VrlDoc, DocProcessorError> {
pub fn transform_doc(&mut self, vrl_doc: VrlDoc) -> Result<Option<VrlDoc>, DocProcessorError> {
let drop_on_abort = self.drop_on_abort;

let VrlDoc {
mut vrl_value,
num_bytes,
Expand All @@ -58,20 +61,25 @@ impl VrlProgram {
metadata: &mut self.metadata,
secrets: &mut self.secrets,
};
let runtime_res = self

let runtime_result = self
.runtime
.resolve(&mut target, &self.program, &self.timezone)
.map_err(|transform_error| {
warn!(transform_error=?transform_error);
DocProcessorError::Transform(transform_error)
});
.resolve(&mut target, &self.program, &self.timezone);

if let VrlValue::Object(metadata) = target.metadata {
metadata.clear();
}
self.runtime.clear();

runtime_res.map(|vrl_value| VrlDoc::new(vrl_value, num_bytes))
match runtime_result {
Err(VrlTerminate::Abort(_)) if drop_on_abort => Ok(None),
runtime_result => runtime_result
.map(|vrl_value| Some(VrlDoc::new(vrl_value, num_bytes)))
.map_err(|transform_error| {
warn!(transform_error=?transform_error);
DocProcessorError::Transform(transform_error)
}),
}
}

pub fn try_from_transform_config(transform_config: TransformConfig) -> anyhow::Result<Self> {
Expand All @@ -83,6 +91,7 @@ impl VrlProgram {
program,
runtime,
timezone,
drop_on_abort: transform_config.drop_on_abort,
metadata: VrlValue::Object(BTreeMap::new()),
secrets: VrlSecrets::default(),
})
Expand Down