Skip to content

fake: add new mode random_single_value #2601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Please refer to the [NEWS](NEWS.md) for a list of changes which have an affect o

#### Experts
- `intelmq.bots.experts.asn_lookup.expert`: Print URLs to stdout only in verbose mode (PR#2591 by Sebastian Wagner).
- `intelmq.bots.experts.fake.expert`: Add new mode `random_single_value` (PR#2601 by Sebastian Wagner).

#### Outputs

Expand Down
33 changes: 27 additions & 6 deletions docs/user/bots.md
Original file line number Diff line number Diff line change
Expand Up @@ -2684,35 +2684,56 @@ is `$portal_url + '/api/1.0/ripe/contact?cidr=%s'`.

### Fake <div id="intelmq.bots.experts.fake.expert" />

Adds fake data to events. Currently supports setting the IP address and network.
Adds fake data to events. It currently supports two operation methods:

For each incoming event, the bots chooses one random IP network range from the configured data file.
It set's the first IP address of the range as `source.ip` and the network itself as `source.network`.
To adapt the `source.asn` field accordingly, use the [ASN Lookup Expert](#asn-lookup).
* Setting the IP address and network
* For any Event event field, set the value to a random item of a used-defined list (mode `random_single_value`)

For a detailed description of the modes, see below.

**Module:** `intelmq.bots.experts.fake.expert`

**Parameters:**

**`database`**

(required, string) Path to a JSON file in the following format:
(required, string) Path to a JSON file in the following format (example):
```
{
"ip_network": [
"10.0.0.0/8",
"192.168.0.0/16",
...
]
],
"event_fields": {
"extra.severity": {
"mode": "random_single_value",
"values": ["critical", "high", "medium", "low", "info", "undefined"]
},
...
}
}
```
Any of the two modes can be ommitted

**`overwrite`**

(optional, boolean) Whether to overwrite existing fields. Defaults to false.

### Modes

#### IP Network
For each incoming event, the bots chooses one random IP network range (IPv4 or IPv6) from the configured data file.
It set's the first IP address of the range as `source.ip` and the network itself as `source.network`.
To adapt the `source.asn` field accordingly, use the [ASN Lookup Expert](#asn-lookup).

For data consistency `source.network` will only be set if `source.ip` was set or overridden.
If overwrite is false, `source.ip` was did not exist before but `source.network` existed before, `source.network` will still be overridden.

#### Event fields
##### Mode `random_single_value`
For any possible event field, the bot chooses a random value of the values in the `values` property.

---

### Field Reducer <div id="intelmq.bots.experts.field_reducer.expert" />
Expand Down
53 changes: 41 additions & 12 deletions intelmq/bots/experts/fake/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from json import load as json_load

from intelmq.lib.bot import ExpertBot
from intelmq.lib.message import Event


class FakeExpertBot(ExpertBot):
Expand All @@ -17,30 +18,58 @@ class FakeExpertBot(ExpertBot):

def init(self):
with open(self.database) as database:
self.networks = json_load(database)['ip_network']
database = json_load(database)
self.ip_networks = database.get('ip_network', [])
self.event_fields = database.get('event_fields', {})

def process(self):
event = self.receive_message()
network = choice(self.networks)
if self.ip_networks:
network = choice(self.ip_networks)

updated = False
try:
updated = event.add('source.ip', ip_network(network)[1], overwrite=self.overwrite)
except IndexError:
updated = event.add('source.ip', ip_network(network)[0], overwrite=self.overwrite)
# For consistency, only set the network if the source.ip was set or overwritten, but then always overwrite it
if updated:
event.add('source.network', network, overwrite=True)
updated = False
try:
updated = event.add('source.ip', ip_network(network)[1], overwrite=self.overwrite)
except IndexError:
updated = event.add('source.ip', ip_network(network)[0], overwrite=self.overwrite)
# For consistency, only set the network if the source.ip was set or overwritten, but then always overwrite it
if updated:
event.add('source.network', network, overwrite=True)

for fieldname, field in self.event_fields.items():
if field['mode'] == 'random_single_value':
event.add(fieldname, choice(field['values']), overwrite=self.overwrite)
else:
raise ValueError(f"Mode {field['mode']} not supported in field {fieldname}.")

self.send_message(event)
self.acknowledge_message()

def check(parameters: dict):
try:
with open(parameters['database']) as database:
json_load(database)['ip_network']
database = json_load(database)
except Exception as exc:
return [['error', exc]]
return [['error', f"Could not load database: {exc}"]]
errors = []
if not isinstance(database.get('ip_network', []), list):
errors.append(['error', 'ip_network is not of type list'])
if not isinstance(database.get('event_fields', {}), dict):
errors.append(['error', 'event_fields is not of type dict'])
else:
test_event = Event()
for fieldname, field in database.get('event_fields', {}).items():
fieldname_check = test_event._Message__is_valid_key(fieldname)
if not fieldname_check[0]:
errors.append(['error', f"Field name {fieldname} is not valid: {fieldname_check[1]}."])
mode = field.get('mode')
if mode not in ('random_single_value', ):
errors.append(['error', f"Mode {mode} not supported in field {fieldname}."])
if 'values' not in field:
errors.append(['error', f"No values defined in field {fieldname}."])
elif not isinstance(field['values'], list):
errors.append(['error', f"Values is not a list in field {fieldname}."])
return errors if errors else None


BOT = FakeExpertBot
8 changes: 8 additions & 0 deletions intelmq/tests/bots/experts/fake/severity.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"event_fields": {
"extra.severity": {
"mode": "random_single_value",
"values": ["critical", "high", "medium", "low", "info", "undefined"]
}
}
}
2 changes: 2 additions & 0 deletions intelmq/tests/bots/experts/fake/severity.json.license
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SPDX-FileCopyrightText: 2025 Institute for Common Good Technology, Sebastian Wagner
SPDX-License-Identifier: AGPL-3.0-or-later
7 changes: 7 additions & 0 deletions intelmq/tests/bots/experts/fake/test_expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from intelmq.bots.experts.fake.expert import FakeExpertBot

FAKE_DB = pkg_resources.resource_filename('intelmq', 'tests/bots/experts/fake/data.json')
SEVERITY_DB = pkg_resources.resource_filename('intelmq', 'tests/bots/experts/fake/severity.json')
EXAMPLE_INPUT = {"__type": "Event",
"source.ip": "93.184.216.34", # example.com
}
Expand Down Expand Up @@ -45,6 +46,12 @@ def test_network_exists(self):
self.assertIn(ip_address(msg['source.ip']), ip_network("10.0.0.0/8"))
self.assertEqual(msg['source.network'], "10.0.0.0/8")

def test_random_single_value(self):
self.input_message = {"__type": "Event"}
self.run_bot(parameters={'database': SEVERITY_DB})
msg = json_loads(self.get_output_queue()[0])
self.assertIn(msg['extra.severity'], ["critical", "high", "medium", "low", "info", "undefined"])


if __name__ == '__main__': # pragma: no cover
unittest.main()
Loading