66from typing import Any , NamedTuple , TypeAlias
77
88from sentry import features , tsdb
9- from sentry .digests .types import Notification , Record , RecordWithRuleObjects
9+ from sentry .digests .types import IdentifierKey , Notification , Record , RecordWithRuleObjects
1010from sentry .eventstore .models import Event
1111from sentry .models .group import Group , GroupStatus
1212from sentry .models .project import Project
1313from sentry .models .rule import Rule
1414from sentry .notifications .types import ActionTargetType , FallthroughChoiceType
1515from sentry .notifications .utils .rules import get_key_from_rule_data
1616from sentry .tsdb .base import TSDBModel
17+ from sentry .workflow_engine .models import Workflow
18+ from sentry .workflow_engine .models .alertrule_workflow import AlertRuleWorkflow
1719
1820logger = logging .getLogger ("sentry.digests" )
1921
@@ -70,16 +72,20 @@ def event_to_record(
7072 logger .warning ("Creating record for %s that does not contain any rules!" , event )
7173
7274 rule_ids = []
73- # TODO(iamrajjoshi): This will only work during the dual write period of the rollout!
74- if features .has ("organizations:workflow-engine-trigger-actions" , event .organization ):
75+ identifier_key = IdentifierKey .RULE
76+ if features .has ("organizations:workflow-engine-ui-links" , event .organization ):
77+ identifier_key = IdentifierKey .WORKFLOW
78+ for rule in rules :
79+ rule_ids .append (int (get_key_from_rule_data (rule , "workflow_id" )))
80+ elif features .has ("organizations:workflow-engine-trigger-actions" , event .organization ):
7581 for rule in rules :
7682 rule_ids .append (int (get_key_from_rule_data (rule , "legacy_rule_id" )))
7783 else :
7884 for rule in rules :
7985 rule_ids .append (rule .id )
8086 return Record (
8187 event .event_id ,
82- Notification (event , rule_ids , notification_uuid ),
88+ Notification (event , rule_ids , notification_uuid , identifier_key ),
8389 event .datetime .timestamp (),
8490 )
8591
@@ -159,6 +165,67 @@ def _build_digest_impl(
159165 return _sort_digest (grouped , event_counts = event_counts , user_counts = user_counts )
160166
161167
168+ def get_rules_from_workflows (project : Project , workflow_ids : set [int ]) -> dict [int , Rule ]:
169+ rules : dict [int , Rule ] = {}
170+ if not workflow_ids :
171+ return rules
172+
173+ # Fetch all workflows in bulk
174+ workflows = Workflow .objects .filter (organization_id = project .organization_id ).in_bulk (
175+ workflow_ids
176+ )
177+
178+ # We are only processing the workflows in the digest if under the new flag
179+ # This should be ok since we should only add workflow_ids to redis when under this flag
180+ if features .has ("organizations:workflow-engine-ui-links" , project .organization ):
181+ for workflow_id , workflow in workflows .items ():
182+ assert (
183+ workflow .organization_id == project .organization_id
184+ ), "Workflow must belong to Organization"
185+ rules [workflow_id ] = Rule (
186+ label = workflow .name ,
187+ id = workflow_id ,
188+ project_id = project .id ,
189+ # We need to do this so that the links are built correctly downstream
190+ data = {"actions" : [{"workflow_id" : workflow_id }]},
191+ )
192+ # This is if we had workflows in the digest but the flag is not enabled
193+ # This can happen if we rollback the flag, but the records in the digest aren't flushed
194+ else :
195+ alert_rule_workflows = AlertRuleWorkflow .objects .filter (workflow_id__in = workflow_ids )
196+ alert_rule_workflows_map = {awf .workflow_id : awf for awf in alert_rule_workflows }
197+
198+ rule_ids_to_fetch = {awf .rule_id for awf in alert_rule_workflows }
199+
200+ bulk_rules = Rule .objects .filter (project_id = project .id ).in_bulk (rule_ids_to_fetch )
201+
202+ for workflow_id in workflow_ids :
203+ alert_workflow = alert_rule_workflows_map .get (workflow_id )
204+ if not alert_workflow :
205+ logger .warning (
206+ "Workflow %s does not have a corresponding AlertRuleWorkflow entry" , workflow_id
207+ )
208+ raise
209+
210+ rule = bulk_rules .get (alert_workflow .rule_id )
211+ if not rule :
212+ logger .warning (
213+ "Rule %s linked to Workflow %s not found or does not belong to project %s" ,
214+ alert_workflow .rule_id ,
215+ workflow_id ,
216+ project .id ,
217+ )
218+ continue
219+
220+ assert rule .project_id == project .id , "Rule must belong to Project"
221+
222+ if features .has ("organizations:workflow-engine-trigger-actions" , project .organization ):
223+ rule .data ["actions" ][0 ]["legacy_rule_id" ] = rule .id
224+
225+ rules [workflow_id ] = rule
226+ return rules
227+
228+
162229def build_digest (project : Project , records : Sequence [Record ]) -> DigestInfo :
163230 if not records :
164231 return DigestInfo ({}, {}, {})
@@ -170,15 +237,28 @@ def build_digest(project: Project, records: Sequence[Record]) -> DigestInfo:
170237 start = records [- 1 ].datetime
171238 end = records [0 ].datetime
172239
240+ rule_ids : set [int ] = set ()
241+ workflow_ids : set [int ] = set ()
242+
243+ for record in records :
244+ identifier_key = getattr (record .value , "identifier_key" , IdentifierKey .RULE )
245+ # record.value is Notification, record.value.rules is Sequence[int]
246+ ids_to_add = record .value .rules
247+ if identifier_key == IdentifierKey .RULE :
248+ rule_ids .update (ids_to_add )
249+ elif identifier_key == IdentifierKey .WORKFLOW :
250+ workflow_ids .update (ids_to_add )
251+
173252 groups = Group .objects .in_bulk (record .value .event .group_id for record in records )
174253 group_ids = list (groups )
175- rules = Rule .objects .in_bulk (rule_id for record in records for rule_id in record . value . rules )
254+ rules = Rule .objects .in_bulk (rule_ids )
176255
177- # TODO(iamrajjoshi): This will only work during the dual write period of the rollout!
178256 if features .has ("organizations:workflow-engine-trigger-actions" , project .organization ):
179257 for rule in rules .values ():
180258 rule .data ["actions" ][0 ]["legacy_rule_id" ] = rule .id
181259
260+ rules .update (get_rules_from_workflows (project , workflow_ids ))
261+
182262 for group_id , g in groups .items ():
183263 assert g .project_id == project .id , "Group must belong to Project"
184264 for rule_id , rule in rules .items ():
@@ -199,7 +279,6 @@ def build_digest(project: Project, records: Sequence[Record]) -> DigestInfo:
199279 end ,
200280 tenant_ids = tenant_ids ,
201281 )
202-
203282 digest = _build_digest_impl (records , groups , rules , event_counts , user_counts )
204283
205284 return DigestInfo (digest , event_counts , user_counts )
0 commit comments