Skip to content

Commit

Permalink
fixing oring regex architecture and adding fields cleaning in it to s…
Browse files Browse the repository at this point in the history
…upport nested fields
  • Loading branch information
arblade committed Dec 10, 2024
1 parent 5b47835 commit d36eb0e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
54 changes: 31 additions & 23 deletions sigma/backends/splunk/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,45 @@ class SplunkDeferredORRegularExpression(DeferredTextQueryExpression):
}

def __init__(self, state, field, arg) -> None:
SplunkDeferredORRegularExpression.add_field(field)
index_suffix = SplunkDeferredORRegularExpression.get_index_suffix(field)
self.template = (
'rex field={field} "(?<{field}Match'
+ index_suffix
+ '>{value})"\n| eval {field}Condition'
+ index_suffix
+ "=if(isnotnull({field}Match"
+ index_suffix
+ '), "true", "false")'
self.add_field(field)
field_condition = self.get_field_condition(field)
field_match = self.get_field_match(field)
self.template = 'rex field={{field}} "(?<{field_match}>{{value}})"\n| eval {field_condition}=if(isnotnull({field_match}), "true", "false")'.format(
field_match=field_match, field_condition=field_condition
)
return super().__init__(state, field, arg)

@staticmethod
def clean_field(field):
# splunk does not allow dots in regex group, so we need to clean variables
return re.sub(".*\\.", "", field)

@classmethod
def add_field(cls, field):
cls.field_counts[field] = (
cls.field_counts.get(field, 0) + 1
) # increment the field count

@classmethod
def get_index_suffix(cls, field):

index_suffix = cls.field_counts.get(field, 0)
def get_field_suffix(cls, field):
index_suffix = cls.field_counts.get(field, "")
if index_suffix == 1:
# return nothing for the first field use
return ""
return str(index_suffix)
index_suffix = ""
return index_suffix

@classmethod
def construct_field_variable(cls, field, variable):
cleaned_field = cls.clean_field(field)
index_suffix = cls.get_field_suffix(field)
return f"{cleaned_field}{variable}{index_suffix}"

@classmethod
def get_field_match(cls, field):
return cls.construct_field_variable(field, "Match")

@classmethod
def get_field_condition(cls, field):
return cls.construct_field_variable(field, "Condition")

@classmethod
def reset(cls):
Expand Down Expand Up @@ -248,9 +260,7 @@ def convert_condition_field_eq_val_re(
).postprocess(None, cond)

cond_true = ConditionFieldEqualsValueExpression(
cond.field
+ "Condition"
+ str(SplunkDeferredORRegularExpression.get_index_suffix(cond.field)),
SplunkDeferredORRegularExpression.get_field_condition(cond.field),
SigmaString("true"),
)
# returning fieldX=true
Expand Down Expand Up @@ -381,13 +391,11 @@ def finalize_query_data_model(
cim_fields = " ".join(
splunk_sysmon_process_creation_cim_mapping.values()
)

elif rule.logsource.category == "proxy":
data_model = "Web"
data_set = "Proxy"
cim_fields = " ".join(
splunk_web_proxy_cim_mapping.values()
)
cim_fields = " ".join(splunk_web_proxy_cim_mapping.values())

try:
data_model_set = state.processing_state["data_model_set"]
Expand Down
38 changes: 38 additions & 0 deletions tests/test_backend_splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
from sigma.backends.splunk import SplunkBackend
from sigma.collection import SigmaCollection
from sigma.processing.pipeline import ProcessingPipeline
from sigma.pipelines.splunk import splunk_cim_data_model


Expand Down Expand Up @@ -224,6 +225,43 @@ def test_splunk_regex_query_explicit_or(splunk_backend: SplunkBackend):
)


def test_splunk_regex_query_explicit_or_with_nested_fields():

pipeline = ProcessingPipeline.from_yaml(
"""
name: Test
priority: 100
transformations:
- id: field_mapping
type: field_name_mapping
mapping:
fieldA: Event.EventData.fieldA
fieldB: Event.EventData.fieldB
"""
)
splunk_backend = SplunkBackend(pipeline)

collection = SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA|re: foo.*bar
sel2:
fieldB|re: boo.*foo
condition: sel1 or sel2
"""
)

assert splunk_backend.convert(collection) == [
'\n| rex field=Event.EventData.fieldA "(?<fieldAMatch>foo.*bar)"\n| eval fieldACondition=if(isnotnull(fieldAMatch), "true", "false")\n| rex field=Event.EventData.fieldB "(?<fieldBMatch>boo.*foo)"\n| eval fieldBCondition=if(isnotnull(fieldBMatch), "true", "false")\n| search fieldACondition="true" OR fieldBCondition="true"'
]


def test_splunk_single_regex_query(splunk_backend: SplunkBackend):
assert (
splunk_backend.convert(
Expand Down

0 comments on commit d36eb0e

Please sign in to comment.