|
14 | 14 | from intelmq.lib.exceptions import MissingDependencyError
|
15 | 15 | from hashlib import sha256
|
16 | 16 |
|
| 17 | + |
| 18 | +############## |
| 19 | +# BASIC TEST # |
| 20 | +############## |
| 21 | + |
17 | 22 | BOT_ID = "test-bot"
|
| 23 | + |
18 | 24 |
|
19 | 25 | IDENTITY1_HASH = sha256(sha256(IDENTITY1.encode()).digest()).hexdigest()
|
20 | 26 | KEY1 = f"{BOT_ID}:{IDENTITY1_HASH}".encode()
|
|
23 | 29 | 'source.url': 'http://example.com/',
|
24 | 30 | 'source.abuse_contact': IDENTITY1
|
25 | 31 | }
|
| 32 | + |
26 | 33 |
|
27 | 34 | IDENTITY2_HASH = sha256(sha256(IDENTITY2.encode()).digest()).hexdigest()
|
28 | 35 | KEY2 = f"{BOT_ID}:{IDENTITY2_HASH}".encode()
|
|
36 | 43 | FROM_IDENTITY = "[email protected]"
|
37 | 44 |
|
38 | 45 |
|
| 46 | +################# |
| 47 | +# ADVANCED TEST # |
| 48 | +################# |
| 49 | + |
| 50 | +def calc_key(identity, tmpl_data): |
| 51 | + h = sha256() |
| 52 | + h.update(sha256(identity.encode()).digest()) |
| 53 | + h.update(sha256(tmpl_data.encode()).digest()) |
| 54 | + return h.hexdigest() |
| 55 | + |
| 56 | +TESTB_BOT_ID = "test-bot" |
| 57 | + |
| 58 | +TESTB_IDENTITY1 = '[email protected]' |
| 59 | +TESTB_KEY1 = f"{TESTB_BOT_ID}:{calc_key(TESTB_IDENTITY1, 'valueA')}".encode() |
| 60 | +TESTB_EVENT1 = {'__type': 'Event', |
| 61 | + 'source.ip': '127.0.0.1', |
| 62 | + 'source.url': 'http://example.com/', |
| 63 | + 'extra.template_value': 'valueA', |
| 64 | + 'source.abuse_contact': TESTB_IDENTITY1 |
| 65 | + } |
| 66 | + |
| 67 | +TESTB_IDENTITY1B = '[email protected]' |
| 68 | +TESTB_KEY1B = f"{TESTB_BOT_ID}:{calc_key(TESTB_IDENTITY1B, 'valueB')}".encode() |
| 69 | +TESTB_EVENT1B = {'__type': 'Event', |
| 70 | + 'source.ip': '127.0.0.1', |
| 71 | + 'source.url': 'http://example.com/', |
| 72 | + 'extra.template_value': 'valueB', |
| 73 | + 'source.abuse_contact': TESTB_IDENTITY1B |
| 74 | + } |
| 75 | + |
| 76 | +TESTB_IDENTITY2 = '[email protected]' |
| 77 | +TESTB_KEY2 = f"{TESTB_BOT_ID}:{calc_key(TESTB_IDENTITY2, 'valueC')}".encode() |
| 78 | +TESTB_EVENT2 = {'__type': 'Event', |
| 79 | + 'source.ip': '127.0.0.2', |
| 80 | + 'source.url': 'http://example2.com/', |
| 81 | + 'extra.template_value': 'valueC', |
| 82 | + 'source.abuse_contact': TESTB_IDENTITY2 |
| 83 | + } |
| 84 | + |
| 85 | +TESTB_MAIL_TEMPLATE = Path(__file__).parent / "mail_template.txt" |
| 86 | +TESTB_FROM_IDENTITY = "[email protected]" |
| 87 | + |
| 88 | + |
39 | 89 | @test.skip_exotic()
|
40 | 90 | @test.skip_redis()
|
41 | 91 | class TestSMTPBatchOutputBot(test.BotTestCase, unittest.TestCase):
|
@@ -69,7 +119,6 @@ def compare_envelope(self, envelope: Envelope, subject, message, from_, to):
|
69 | 119 | self.assertEqual(message, envelope.message())
|
70 | 120 | self.assertEqual(from_, envelope.from_())
|
71 | 121 | self.assertEqual(to, envelope.to())
|
72 |
| - # TODO template_data |
73 | 122 |
|
74 | 123 | def send_message(self):
|
75 | 124 | def _(envelope):
|
@@ -117,5 +166,89 @@ def test_processing(self):
|
117 | 166 | self.assertCountEqual([], redis.keys(f"{self.bot.key}*"))
|
118 | 167 |
|
119 | 168 |
|
| 169 | +@test.skip_exotic() |
| 170 | +@test.skip_redis() |
| 171 | +class TestSMTPBatchTemplatedOutputBot(test.BotTestCase, unittest.TestCase): |
| 172 | + |
| 173 | + def setUp(self): |
| 174 | + self.sent_messages = [] # here we collect e-mail messages there were to be sent |
| 175 | + |
| 176 | + @classmethod |
| 177 | + def set_bot(cls): |
| 178 | + cls.bot_reference = SMTPBatchOutputBot |
| 179 | + cls.use_cache = True |
| 180 | + cls.sysconfig = {"alternative_mails": "", |
| 181 | + "attachment_name": "events_%Y-%m-%d", |
| 182 | + "bcc": [], |
| 183 | + "email_from": TESTB_FROM_IDENTITY, |
| 184 | + "gpg_key": "", |
| 185 | + "gpg_pass": "", |
| 186 | + "mail_template": str(TESTB_MAIL_TEMPLATE.resolve()), |
| 187 | + "ignore_older_than_days": 0, |
| 188 | + "limit_results": 10, |
| 189 | + "smtp_server": "localhost", |
| 190 | + "subject": "Testing subject {{ extra_template_value }} -- {{ current_time.strftime('%Y-%m-%d') }}", |
| 191 | + "additional_grouping_keys": ['extra.template_value'], |
| 192 | + "templating": {'subject': True}, |
| 193 | + "testing_to": "" |
| 194 | + } |
| 195 | + |
| 196 | + if not Envelope: |
| 197 | + raise MissingDependencyError('envelope', '>=2.0.0') |
| 198 | + |
| 199 | + def compare_envelope(self, envelope: Envelope, subject, message, from_, to): |
| 200 | + self.assertEqual(subject, envelope.subject()) |
| 201 | + self.assertEqual(message, envelope.message()) |
| 202 | + self.assertEqual(from_, envelope.from_()) |
| 203 | + self.assertEqual(to, envelope.to()) |
| 204 | + |
| 205 | + def send_message(self): |
| 206 | + def _(envelope): |
| 207 | + self.sent_messages.append(envelope) |
| 208 | + return True # let's pretend the message sending succeeded |
| 209 | + return _ |
| 210 | + |
| 211 | + def test_processing(self): |
| 212 | + redis = self.cache |
| 213 | + time_string = time.strftime("%Y-%m-%d") |
| 214 | + message = TESTB_MAIL_TEMPLATE.read_text() |
| 215 | + self.input_message = (TESTB_EVENT1, TESTB_EVENT1B, TESTB_EVENT1B, TESTB_EVENT1, TESTB_EVENT2, TESTB_EVENT1) |
| 216 | + |
| 217 | + # if tests failed before, there might be left records from the last time |
| 218 | + [redis.delete(k) for k in (TESTB_KEY1, TESTB_KEY1B, TESTB_KEY2)] |
| 219 | + |
| 220 | + # process messages |
| 221 | + self.run_bot(iterations=6) |
| 222 | + |
| 223 | + # event should be in the DB |
| 224 | + self.assertCountEqual([TESTB_KEY1, TESTB_KEY1B, TESTB_KEY2], redis.keys(f"{self.bot.key}*")) |
| 225 | + self.assertEqual(3, len(redis.lrange(TESTB_KEY1, 0, -1))) |
| 226 | + self.assertEqual(2, len(redis.lrange(TESTB_KEY1B, 0, -1))) |
| 227 | + self.assertEqual(1, len(redis.lrange(TESTB_KEY2, 0, -1))) |
| 228 | + |
| 229 | + # run the CLI interface with the --send parameter, it should send the messages and exit |
| 230 | + with unittest.mock.patch('envelope.Envelope.send', new=self.send_message()): |
| 231 | + self.bot.send = True |
| 232 | + self.assertRaises(SystemExit, self.bot.cli_run) |
| 233 | + |
| 234 | + # bring the messages in a consistent order to make the comparison/assertion easier |
| 235 | + msg1, msg1b, msg2 = sorted(self.sent_messages, key=lambda x: x.to()[0]+x.subject()) |
| 236 | + # msg1, msg1b, msg2, TESTB_IDENTITY1, TESTB_IDENTITY1b, TESTB_IDENTITY2 |
| 237 | + |
| 238 | + self.compare_envelope( |
| 239 | + msg1, f"Testing subject valueA -- {time_string} ({TESTB_IDENTITY1})", message, TESTB_FROM_IDENTITY, [TESTB_IDENTITY1]) |
| 240 | + self.compare_envelope( |
| 241 | + msg1b, f"Testing subject valueB -- {time_string} ({TESTB_IDENTITY1B})", message, TESTB_FROM_IDENTITY, [TESTB_IDENTITY1B]) |
| 242 | + self.compare_envelope( |
| 243 | + msg2, f"Testing subject valueC -- {time_string} ({TESTB_IDENTITY2})", message, TESTB_FROM_IDENTITY, [TESTB_IDENTITY2]) |
| 244 | + |
| 245 | + # we expect this ZIP attachment |
| 246 | + self.assertTrue(self.sent_messages[1] |
| 247 | + .attachments(f"events_{time_string}.zip")) |
| 248 | + |
| 249 | + # messages should have disappeared from the redis |
| 250 | + self.assertCountEqual([], redis.keys(f"{self.bot.key}*")) |
| 251 | + |
| 252 | + |
120 | 253 | if __name__ == '__main__': # pragma: no cover
|
121 | 254 | unittest.main()
|
0 commit comments