Skip to content

Commit d88ca48

Browse files
committed
Introduce batching to MISP feed output bot
1 parent 42a9bfa commit d88ca48

File tree

1 file changed

+41
-8
lines changed

1 file changed

+41
-8
lines changed

intelmq/bots/outputs/misp/output_feed.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pathlib import Path
99
from uuid import uuid4
1010
import re
11+
import redis
1112

1213
from intelmq.lib.bot import OutputBot
1314
from intelmq.lib.exceptions import MissingDependencyError
@@ -35,6 +36,12 @@ class MISPFeedOutputBot(OutputBot):
3536
misp_org_name = None
3637
misp_org_uuid = None
3738
output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path
39+
queue_db: int = 2
40+
queue_host: str = "localhost"
41+
queue_name: str = None
42+
queue_password: str = None
43+
queue_port: int = 6379
44+
batch_size: int = 100
3845
_is_multithreadable: bool = False
3946

4047
@staticmethod
@@ -45,6 +52,13 @@ def check_output_dir(dirname):
4552
return True
4653

4754
def init(self):
55+
# Set up redis connection for length checks
56+
if not self.queue_name:
57+
self.queue_name = self.source_queue
58+
self.redis = self.connect_redis()
59+
60+
self.event_batch = []
61+
4862
if MISPEvent is None and import_fail_reason == 'syntax':
4963
raise MissingDependencyError("pymisp",
5064
version='>=2.4.117.3',
@@ -105,21 +119,40 @@ def process(self):
105119

106120
event = self.receive_message().to_dict(jsondict_as_string=True)
107121

108-
obj = self.current_event.add_object(name='intelmq_event')
109-
for object_relation, value in event.items():
110-
try:
111-
obj.add_attribute(object_relation, value=value)
112-
except NewAttributeError:
113-
# This entry isn't listed in the harmonization file, ignoring.
114-
pass
122+
current_queue_len = self.redis.llen(self.queue_name)
123+
if current_queue_len % self.batch_size == 0:
124+
self.flush_batch()
125+
else:
126+
self.event_batch.append(event)
127+
128+
self.acknowledge_message()
129+
130+
def connect_redis(self):
131+
return redis.Redis(
132+
host=self.queue_host,
133+
port=self.queue_port,
134+
db=self.queue_db,
135+
password=self.queue_password,
136+
)
137+
138+
def flush_batch(self):
139+
for event in self.event_batch:
140+
obj = self.current_event.add_object(name='intelmq_event')
141+
for object_relation, value in event.items():
142+
try:
143+
obj.add_attribute(object_relation, value=value)
144+
except NewAttributeError:
145+
# This entry isn't listed in the harmonization file, ignoring.
146+
pass
115147

116148
feed_output = self.current_event.to_feed(with_meta=False)
117149

118150
with self.current_file.open('w') as f:
119151
json.dump(feed_output, f)
120152

121153
feed_meta_generator(self.output_dir)
122-
self.acknowledge_message()
154+
155+
self.event_batch.clear()
123156

124157
@staticmethod
125158
def check(parameters):

0 commit comments

Comments
 (0)