Skip to content

Commit 386f87f

Browse files
authored
Chore/remove extractor (#320)
* removing extractor * removing extractor * removing extractor from file event search * search tests * removing extractor * removing extractor from alerts search * removing extractor * removing extractor * making security-data tests compatible with python 3.7 * making alert test compatible with python 3.7 * returning event responses as generator * reverting securitydata changes * moving checkpoint timestamp check * style check * documentation changes * converting to event generator * using generator to return file events * indent * adding dedupe logic to alert checkpointing * black * return none of checkpoint file empty * additional integration test for security-data cmds * decrease default alert batch to 25 * style * removing filter check * removing 'get_alert_details' method * PR Feedback * use get_all_alert_details py42 method * PR feedback * delete unused test constant * implement py42 bugfix * ALERT_PAGE_SIZE constant * adjust unit test * ? * cleaning up code
1 parent 93d2ef1 commit 386f87f

18 files changed

+969
-634
lines changed

setup.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@
3535
"click>=7.1.1, <8",
3636
"click_plugins>=1.1.1",
3737
"colorama>=0.4.3",
38-
"c42eventextractor==0.4.1",
3938
"keyring==18.0.1",
4039
"keyrings.alt==3.2.0",
4140
"ipython>=7.16.1",
4241
"pandas>=1.1.3",
43-
"py42>=1.19.0",
42+
"py42>=1.19.1",
4443
],
4544
extras_require={
4645
"dev": [

src/code42cli/cmds/alerts.py

+98-38
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import click
22
import py42.sdk.queries.alerts.filters as f
3-
from c42eventextractor.extractors import AlertExtractor
43
from py42.exceptions import Py42NotFoundError
4+
from py42.sdk.queries.alerts.alert_query import AlertQuery
55
from py42.sdk.queries.alerts.filters import AlertState
66
from py42.sdk.queries.alerts.filters import RuleType
77
from py42.sdk.queries.alerts.filters import Severity
88
from py42.util import format_dict
99

10-
import code42cli.cmds.search.extraction as ext
1110
import code42cli.cmds.search.options as searchopt
1211
import code42cli.errors as errors
1312
import code42cli.options as opt
@@ -16,18 +15,24 @@
1615
from code42cli.click_ext.groups import OrderedGroup
1716
from code42cli.cmds.search import SendToCommand
1817
from code42cli.cmds.search.cursor_store import AlertCursorStore
19-
from code42cli.cmds.search.extraction import handle_no_events
2018
from code42cli.cmds.search.options import server_options
19+
from code42cli.cmds.util import convert_to_or_query
20+
from code42cli.cmds.util import create_time_range_filter
21+
from code42cli.cmds.util import try_get_default_header
2122
from code42cli.date_helper import convert_datetime_to_timestamp
2223
from code42cli.date_helper import limit_date_range
2324
from code42cli.file_readers import read_csv_arg
2425
from code42cli.options import format_option
2526
from code42cli.output_formats import JsonOutputFormat
2627
from code42cli.output_formats import OutputFormat
2728
from code42cli.output_formats import OutputFormatter
28-
29+
from code42cli.util import hash_event
30+
from code42cli.util import parse_timestamp
31+
from code42cli.util import warn_interrupt
2932

3033
ALERTS_KEYWORD = "alerts"
34+
ALERT_PAGE_SIZE = 25
35+
3136
begin = opt.begin_option(
3237
ALERTS_KEYWORD,
3338
callback=lambda ctx, param, arg: convert_datetime_to_timestamp(
@@ -202,20 +207,6 @@ def clear_checkpoint(state, checkpoint_name):
202207
_get_alert_cursor_store(state.profile.name).delete(checkpoint_name)
203208

204209

205-
def _call_extractor(
206-
cli_state, handlers, begin, end, or_query, advanced_query, **kwargs
207-
):
208-
extractor = _get_alert_extractor(cli_state.sdk, handlers)
209-
extractor.use_or_query = or_query
210-
if advanced_query:
211-
cli_state.search_filters = advanced_query
212-
if begin or end:
213-
cli_state.search_filters.append(
214-
ext.create_time_range_filter(f.DateObserved, begin, end)
215-
)
216-
extractor.extract(*cli_state.search_filters)
217-
218-
219210
@alerts.command()
220211
@filter_options
221212
@search_options
@@ -242,21 +233,78 @@ def search(
242233
**kwargs,
243234
):
244235
"""Search for alerts."""
245-
output_header = ext.try_get_default_header(
236+
output_header = try_get_default_header(
246237
include_all, _get_default_output_header(), format
247238
)
248239
formatter = OutputFormatter(format, output_header)
249240
cursor = _get_alert_cursor_store(cli_state.profile.name) if use_checkpoint else None
250-
handlers = ext.create_handlers(
251-
cli_state.sdk,
252-
AlertExtractor,
253-
cursor,
254-
use_checkpoint,
255-
formatter=formatter,
256-
force_pager=include_all,
257-
)
258-
_call_extractor(cli_state, handlers, begin, end, or_query, advanced_query, **kwargs)
259-
handle_no_events(not handlers.TOTAL_EVENTS and not errors.ERRORED)
241+
if use_checkpoint:
242+
checkpoint_name = use_checkpoint
243+
checkpoint = cursor.get(checkpoint_name)
244+
if checkpoint is not None:
245+
begin = checkpoint
246+
247+
query = _construct_query(cli_state, begin, end, advanced_query, or_query)
248+
alerts_gen = cli_state.sdk.alerts.get_all_alert_details(query)
249+
250+
if use_checkpoint:
251+
checkpoint_name = use_checkpoint
252+
# update checkpoint to alertId of last event retrieved
253+
alerts_gen = _dedupe_checkpointed_events_and_store_updated_checkpoint(
254+
cursor, checkpoint_name, alerts_gen
255+
)
256+
alerts_list = []
257+
for alert in alerts_gen:
258+
alerts_list.append(alert)
259+
if not alerts_list:
260+
click.echo("No results found.")
261+
return
262+
formatter.echo_formatted_list(alerts_list)
263+
264+
265+
def _construct_query(state, begin, end, advanced_query, or_query):
266+
267+
if advanced_query:
268+
state.search_filters = advanced_query
269+
else:
270+
if begin or end:
271+
state.search_filters.append(
272+
create_time_range_filter(f.DateObserved, begin, end)
273+
)
274+
if or_query:
275+
state.search_filters = convert_to_or_query(state.search_filters)
276+
query = AlertQuery(*state.search_filters)
277+
query.page_size = ALERT_PAGE_SIZE
278+
query.sort_direction = "asc"
279+
query.sort_key = "CreatedAt"
280+
return query
281+
282+
283+
def _dedupe_checkpointed_events_and_store_updated_checkpoint(
284+
cursor, checkpoint_name, alerts_gen
285+
):
286+
"""De-duplicates events across checkpointed runs. Since using the timestamp of the last event
287+
processed as the `--begin` time of the next run causes the last event to show up again in the
288+
next results, we hash the last event(s) of each run and store those hashes in the cursor to
289+
filter out on the next run. It's also possible that two events have the exact same timestamp, so
290+
`checkpoint_events` needs to be a list of hashes so we can filter out everything that's actually
291+
been processed.
292+
"""
293+
294+
checkpoint_alerts = cursor.get_alerts(checkpoint_name)
295+
new_timestamp = None
296+
new_alerts = []
297+
for alert in alerts_gen:
298+
event_hash = hash_event(alert)
299+
if event_hash not in checkpoint_alerts:
300+
if alert[f.DateObserved._term] != new_timestamp:
301+
new_timestamp = alert[f.DateObserved._term]
302+
new_alerts.clear()
303+
new_alerts.append(event_hash)
304+
yield alert
305+
ts = parse_timestamp(new_timestamp)
306+
cursor.replace(checkpoint_name, ts)
307+
cursor.replace_alerts(checkpoint_name, new_alerts)
260308

261309

262310
@alerts.command(cls=SendToCommand)
@@ -280,19 +328,31 @@ def send_to(cli_state, begin, end, advanced_query, use_checkpoint, or_query, **k
280328
HOSTNAME format: address:port where port is optional and defaults to 514.
281329
"""
282330
cursor = _get_cursor(cli_state, use_checkpoint)
283-
handlers = ext.create_send_to_handlers(
284-
cli_state.sdk, AlertExtractor, cursor, use_checkpoint, cli_state.logger,
285-
)
286-
_call_extractor(cli_state, handlers, begin, end, or_query, advanced_query, **kwargs)
287-
handle_no_events(not handlers.TOTAL_EVENTS and not errors.ERRORED)
288331

332+
if use_checkpoint:
333+
checkpoint_name = use_checkpoint
334+
checkpoint = cursor.get(checkpoint_name)
335+
if checkpoint is not None:
336+
begin = checkpoint
289337

290-
def _get_cursor(state, use_checkpoint):
291-
return _get_alert_cursor_store(state.profile.name) if use_checkpoint else None
338+
query = _construct_query(cli_state, begin, end, advanced_query, or_query)
339+
alerts_gen = cli_state.sdk.alerts.get_all_alert_details(query)
340+
341+
if use_checkpoint:
342+
checkpoint_name = use_checkpoint
343+
alerts_gen = _dedupe_checkpointed_events_and_store_updated_checkpoint(
344+
cursor, checkpoint_name, alerts_gen
345+
)
346+
with warn_interrupt():
347+
alert = None
348+
for alert in alerts_gen:
349+
cli_state.logger.info(alert)
350+
if alert is None: # generator was empty
351+
click.echo("No results found.")
292352

293353

294-
def _get_alert_extractor(sdk, handlers):
295-
return AlertExtractor(sdk, handlers)
354+
def _get_cursor(state, use_checkpoint):
355+
return _get_alert_cursor_store(state.profile.name) if use_checkpoint else None
296356

297357

298358
def _get_alert_cursor_store(profile_name):

src/code42cli/cmds/search/cursor_store.py

+39-3
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,23 @@ def get(self, cursor_name):
3030
try:
3131
location = path.join(self._dir_path, cursor_name)
3232
with open(location) as checkpoint:
33-
return float(checkpoint.read())
33+
checkpoint_value = checkpoint.read()
34+
if not checkpoint_value:
35+
return None
36+
try:
37+
return float(checkpoint_value)
38+
except ValueError:
39+
raise Code42CLIError(
40+
f"Unable to parse checkpoint from {location}, expected a unix-epoch timestamp, got '{checkpoint_value}'."
41+
)
3442
except FileNotFoundError:
3543
return None
3644

37-
def replace(self, cursor_name, new_timestamp):
45+
def replace(self, cursor_name, new_checkpoint):
3846
"""Replaces the last stored date observed timestamp with the given one."""
3947
location = path.join(self._dir_path, cursor_name)
4048
with open(location, "w") as checkpoint:
41-
checkpoint.write(str(new_timestamp))
49+
checkpoint.write(str(new_checkpoint))
4250

4351
def delete(self, cursor_name):
4452
"""Removes a single cursor from the store."""
@@ -69,12 +77,40 @@ def __init__(self, profile_name):
6977
dir_path = get_user_project_path("file_event_checkpoints", profile_name)
7078
super().__init__(dir_path)
7179

80+
def get(self, cursor_name):
81+
"""Gets the last stored date observed timestamp."""
82+
try:
83+
location = path.join(self._dir_path, cursor_name)
84+
with open(location) as checkpoint:
85+
checkpoint_value = checkpoint.read()
86+
if not checkpoint_value:
87+
return None
88+
return str(checkpoint_value)
89+
except FileNotFoundError:
90+
return None
91+
7292

7393
class AlertCursorStore(BaseCursorStore):
7494
def __init__(self, profile_name):
7595
dir_path = get_user_project_path("alert_checkpoints", profile_name)
7696
super().__init__(dir_path)
7797

98+
def get_alerts(self, cursor_name):
99+
try:
100+
location = path.join(self._dir_path, cursor_name) + "_alerts"
101+
with open(location) as checkpoint:
102+
try:
103+
return json.loads(checkpoint.read())
104+
except json.JSONDecodeError:
105+
return []
106+
except FileNotFoundError:
107+
return []
108+
109+
def replace_alerts(self, cursor_name, new_alerts):
110+
location = path.join(self._dir_path, cursor_name) + "_alerts"
111+
with open(location, "w") as checkpoint:
112+
checkpoint.write(json.dumps(new_alerts))
113+
78114

79115
class AuditLogCursorStore(BaseCursorStore):
80116
def __init__(self, profile_name):

0 commit comments

Comments
 (0)