Skip to content

Commit

Permalink
Action events, pending and failed scans, scans metadata (#1305)
Browse files Browse the repository at this point in the history
* Add action event system, make krr and popeye use it

* Bugfixes

* Fix jsonb passing in requests

* Set krr version to v1.7.0, remove logs, pass errors to metadata

* Set default ScanReportBlock.metadata

* Fix grade, should be a number

* generic events emitter

* class member instead of static

* Update insert_scan_meta with _v2 postfix

* Rework pubsub to be class-based

* Minor fixes & improvements

---------

Co-authored-by: Arik Alon <[email protected]>
  • Loading branch information
LeaveMyYard and arikalon1 authored Mar 4, 2024
1 parent d85bb62 commit 977829f
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 90 deletions.
56 changes: 40 additions & 16 deletions playbooks/robusta_playbooks/krr.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from hikaru.model.rel_1_26 import Container, EnvVar, EnvVarSource, PodSpec, ResourceRequirements, SecretKeySelector
from prometrix import AWSPrometheusConfig, CoralogixPrometheusConfig, PrometheusAuthorization, PrometheusConfig
from pydantic import BaseModel, ValidationError, validator

from robusta.api import (
IMAGE_REGISTRY,
RELEASE_NAME,
Expand All @@ -29,10 +28,12 @@
action,
format_unit,
)
from robusta.core.model.env_vars import INSTALLATION_NAMESPACE
from robusta.core.reporting.consts import ScanState
from robusta.integrations.openshift import IS_OPENSHIFT
from robusta.integrations.prometheus.utils import generate_prometheus_config

IMAGE: str = os.getenv("KRR_IMAGE_OVERRIDE", f"{IMAGE_REGISTRY}/krr:v1.7.0")
IMAGE: str = os.getenv("KRR_IMAGE_OVERRIDE", f"{IMAGE_REGISTRY}/krr:v1.7.1")
KRR_MEMORY_LIMIT: str = os.getenv("KRR_MEMORY_LIMIT", "1Gi")
KRR_MEMORY_REQUEST: str = os.getenv("KRR_MEMORY_REQUEST", "1Gi")

Expand Down Expand Up @@ -95,6 +96,7 @@ class KRRResponse(BaseModel):
resources: List[ResourceType] = ["cpu", "memory"]
description: Optional[str] = None # This field is not returned by KRR < v1.2.0
strategy: Optional[KRRStrategyData] = None # This field is not returned by KRR < v1.3.0
errors: List[Dict[str, Any]] = [] # This field is not returned by KRR < v1.7.1


class KRRParams(PrometheusParams, PodRunningParams):
Expand Down Expand Up @@ -352,19 +354,38 @@ def krr_scan(event: ExecutionBaseEvent, params: KRRParams):
**params.krr_job_spec,
)

start_time = end_time = datetime.now()
krr_scan = krr_response = {}
start_time = datetime.now()
logs = None
job_name = f"krr-job-{scan_id}"
metadata: Dict[str, Any] = {
"job": {
"name": job_name,
"namespace": INSTALLATION_NAMESPACE,
},
}

def update_state(state: ScanState) -> None:
event.emit_event(
"scan_updated",
scan_id=scan_id,
metadata=metadata,
state=state,
type=ScanType.KRR,
start_time=start_time,
)

update_state(ScanState.PENDING)

try:
logs = RobustaJob.run_simple_job_spec(
spec,
"krr_job" + scan_id,
job_name,
params.timeout,
secret,
custom_annotations=params.custom_annotations,
ttl_seconds_after_finished=43200, # 12 hours
delete_job_post_execution=False,
process_name=False,
)

# NOTE: We need to remove the logs before the json result
Expand All @@ -379,31 +400,34 @@ def krr_scan(event: ExecutionBaseEvent, params: KRRParams):
logs = logs[logs.find("{") :]

krr_response = json.loads(logs)
end_time = datetime.now()
krr_scan = KRRResponse(**krr_response)
except json.JSONDecodeError:
logging.exception("*KRR scan job failed. Expecting json result.*")
logging.error(f"Logs: {logs}")
return
except ValidationError:
logging.exception("*KRR scan job failed. Result format issue.*\n\n")
logging.error(f"Logs: {logs}")
return

except Exception as e:
if str(e) == "Failed to reach wait condition":
if isinstance(e, json.JSONDecodeError):
logging.exception("*KRR scan job failed. Expecting json result.*")
elif isinstance(e, ValidationError):
logging.exception("*KRR scan job failed. Result format issue.*")
elif str(e) == "Failed to reach wait condition":
logging.exception(f"*KRR scan job failed. The job wait condition timed out ({params.timeout}s)*")
else:
logging.exception(f"*KRR scan job unexpected error.*\n {e}")

logging.error(f"Logs: {logs}")
update_state(ScanState.FAILED)
return
else:
metadata["strategy"] = krr_scan.strategy.dict() if krr_scan.strategy else None
metadata["description"] = krr_scan.description
metadata["errors"] = krr_scan.errors

scan_block = ScanReportBlock(
title="KRR scan",
scan_id=scan_id,
type=ScanType.KRR,
start_time=start_time,
end_time=end_time,
end_time=datetime.now(),
score=krr_scan.score,
metadata=metadata,
results=[
ScanReportRow(
scan_id=scan_id,
Expand Down
55 changes: 39 additions & 16 deletions playbooks/robusta_playbooks/popeye.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
from collections import defaultdict
from datetime import datetime
from json import JSONDecodeError
from typing import Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional

from hikaru.model.rel_1_26 import Container, PodSpec, ResourceRequirements
from pydantic import BaseModel, ValidationError

from robusta.api import (
RELEASE_NAME,
EnrichmentAnnotation,
Expand All @@ -26,6 +25,8 @@
action,
to_kubernetes_name,
)
from robusta.core.model.env_vars import INSTALLATION_NAMESPACE
from robusta.core.reporting.consts import ScanState

IMAGE: str = os.getenv("POPEYE_IMAGE_OVERRIDE", "derailed/popeye:v0.11.1")
POPEYE_MEMORY_LIMIT: str = os.getenv("POPEYE_MEMORY_LIMIT", "1Gi")
Expand Down Expand Up @@ -165,40 +166,62 @@ def popeye_scan(event: ExecutionBaseEvent, params: PopeyeParams):
)

start_time = datetime.now()
scan_id = str(uuid.uuid4())
logs = None
job_name = f"popeye-job-{scan_id}"
metadata: Dict[str, Any] = {
"job": {
"name": job_name,
"namespace": INSTALLATION_NAMESPACE,
},
}

def update_state(state: ScanState) -> None:
event.emit_event(
"scan_updated",
scan_id=scan_id,
metadata=metadata,
state=state,
type=ScanType.POPEYE,
start_time=start_time,
)

update_state(ScanState.PENDING)

try:
logs = RobustaJob.run_simple_job_spec(
spec,
"popeye_job",
job_name,
params.timeout,
custom_annotations=params.custom_annotations,
ttl_seconds_after_finished=43200, # 12 hours
delete_job_post_execution=False,
process_name=False,
)
scan = json.loads(logs)
end_time = datetime.now()
popeye_scan = PopeyeReport(**scan["popeye"])
except JSONDecodeError:
logging.error(f"*Popeye scan job failed. Expecting json result.*\n\n Result:\n{logs}")
return
except ValidationError as e:
logging.error(f"*Popeye scan job failed. Result format issue.*\n\n {e}")
logging.error(f"\n {logs}")
return
except Exception as e:
if str(e) == "Failed to reach wait condition":
logging.error(f"*Popeye scan job failed. The job wait condition timed out ({params.timeout}s)*")
if isinstance(e, JSONDecodeError):
logging.exception(f"*Popeye scan job failed. Expecting json result.*\n\n Result:\n{logs}")
elif isinstance(e, ValidationError):
logging.exception(f"*Popeye scan job failed. Result format issue.*\n\n {e}")
elif str(e) == "Failed to reach wait condition":
logging.exception(f"*Popeye scan job failed. The job wait condition timed out ({params.timeout}s)*")
else:
logging.error(f"*Popeye scan job unexpected error.*\n {e}")
logging.exception(f"*Popeye scan job unexpected error.*\n {e}")

logging.error(f"Logs: {logs}")
update_state(ScanState.FAILED)
return

scan_block = ScanReportBlock(
title="Popeye scan",
scan_id=str(uuid.uuid4()),
scan_id=scan_id,
type=ScanType.POPEYE,
start_time=start_time,
end_time=end_time,
end_time=datetime.now(),
score=popeye_scan.score,
metadata=metadata,
results=[],
config=f"{params.args} \n\n {params.spinach}",
pdf_scan_row_content_format=scan_row_content_to_string,
Expand Down
19 changes: 16 additions & 3 deletions src/robusta/core/model/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@

from pydantic import BaseModel

from robusta.core.pubsub.event_emitter import EventEmitter
from robusta.core.reporting.base import (
BaseBlock,
EnrichmentType,
Finding,
FindingSeverity,
FindingSource,
FindingSubject,
FindingSubjectType,
VideoLink, EnrichmentType,
VideoLink,
)
from robusta.core.sinks import SinkBase
from robusta.integrations.scheduled.playbook_scheduler import PlaybooksScheduler
Expand Down Expand Up @@ -55,6 +57,7 @@ class ExecutionBaseEvent:
stop_processing: bool = False
_scheduler: Optional[PlaybooksScheduler] = None
_context: Optional[ExecutionContext] = None
_event_emitter: Optional[EventEmitter] = None

def set_context(self, context: ExecutionContext):
self._context = context
Expand All @@ -68,6 +71,9 @@ def set_scheduler(self, scheduler: PlaybooksScheduler):
def get_scheduler(self) -> PlaybooksScheduler:
return self._scheduler

def set_event_emitter(self, emitter: EventEmitter):
self._event_emitter = emitter

def create_default_finding(self) -> Finding:
"""Create finding default fields according to the event type"""
return Finding(title="Robusta notification", aggregation_key="Generic finding key")
Expand All @@ -94,6 +100,12 @@ def add_video_link(self, video_link: VideoLink):
for sink in self.named_sinks:
self.sink_findings[sink][0].add_video_link(video_link, True)

def emit_event(self, event_name: str, **kwargs):
"""Publish an event to the pubsub. It will be processed by the sinks during the execution of the playbook."""

if self._event_emitter:
self._event_emitter.emit_event(event_name, **kwargs)

def add_enrichment(
self,
enrichment_blocks: List[BaseBlock],
Expand All @@ -103,8 +115,9 @@ def add_enrichment(
):
self.__prepare_sinks_findings()
for sink in self.named_sinks:
self.sink_findings[sink][0].add_enrichment(enrichment_blocks, annotations, True,
enrichment_type=enrichment_type, title=title)
self.sink_findings[sink][0].add_enrichment(
enrichment_blocks, annotations, True, enrichment_type=enrichment_type, title=title
)

def add_finding(self, finding: Finding, suppress_warning: bool = False):
finding.dirty = True # Warn if new enrichments are added to this finding directly
Expand Down
1 change: 1 addition & 0 deletions src/robusta/core/playbooks/playbooks_event_handler_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def __to_finding_json(self, finding: Finding) -> Dict:

def __prepare_execution_event(self, execution_event: ExecutionBaseEvent):
execution_event.set_scheduler(self.registry.get_scheduler())
execution_event.set_event_emitter(self.registry.get_event_emitter())
execution_event.set_all_sinks(self.registry.get_sinks().get_all())
execution_event.set_context(
ExecutionContext(
Expand Down
Empty file.
7 changes: 7 additions & 0 deletions src/robusta/core/pubsub/event_emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import abc


class EventEmitter:
@abc.abstractmethod
def emit_event(self, event_name: str, **kwargs):
pass
17 changes: 17 additions & 0 deletions src/robusta/core/pubsub/event_subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import abc


class EventHandler:
@abc.abstractmethod
def handle_event(self, event_name: str, **kwargs):
pass


class EventSubscriber:
@abc.abstractmethod
def subscribe(self, event_name: str, handler: EventHandler):
pass

@abc.abstractmethod
def unsubscribe(self, event_name: str, handler: EventHandler):
pass
20 changes: 20 additions & 0 deletions src/robusta/core/pubsub/events_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from collections import defaultdict
from weakref import WeakSet

from robusta.core.pubsub.event_emitter import EventEmitter
from robusta.core.pubsub.event_subscriber import EventHandler, EventSubscriber


class EventsPubSub(EventEmitter, EventSubscriber):
def __init__(self) -> None:
self.event_handlers: defaultdict[str, WeakSet[EventHandler]] = defaultdict(WeakSet)

def subscribe(self, event_name: str, handler: EventHandler) -> None:
self.event_handlers[event_name].add(handler)

def unsubscribe(self, event_name: str, handler: EventHandler) -> None:
self.event_handlers[event_name].remove(handler)

def emit_event(self, event_name: str, **kwargs) -> None:
for handler in self.event_handlers[event_name]:
handler.handle_event(event_name, **kwargs)
2 changes: 2 additions & 0 deletions src/robusta/core/reporting/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,11 @@ class ScanReportBlock(BaseBlock):
score: str
results: List[ScanReportRow]
config: str
metadata: Dict[str, Any] = {}
pdf_scan_row_content_format: Callable[[ScanReportRow], str] = lambda row: json.dumps(row.content)
pdf_scan_row_priority_format: Callable[[float], str] = lambda priority: str(priority)

@property
def grade(self):
score = int(self.score)
if score >= 90:
Expand Down
10 changes: 10 additions & 0 deletions src/robusta/core/reporting/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ class SlackAnnotations:
ATTACHMENT = "attachment"


class ActionEvent(str, Enum):
SCAN_UPDATED = "scan_updated"


class ScanState(str, Enum):
PENDING = "pending"
SUCCESS = "success"
FAILED = "failed"


class ScanType(str, Enum):
POPEYE = "popeye"
KRR = "krr"
Loading

0 comments on commit 977829f

Please sign in to comment.