Skip to content

Commit 2ea9686

Browse files
committed
adds the alerts events consumer to the data service
It uses the new asyncevents from go-common, as alerts processing requires different retry semantics than the existing solution. The Pusher interface is moved out of data/service into data/events to avoid a circular dependency. BACK-2554
1 parent ce43010 commit 2ea9686

File tree

3 files changed

+1074
-9
lines changed

3 files changed

+1074
-9
lines changed

data/events/alerts.go

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
package events
2+
3+
import (
4+
"cmp"
5+
"context"
6+
"os"
7+
"slices"
8+
"strings"
9+
"time"
10+
11+
"github.com/Shopify/sarama"
12+
"go.mongodb.org/mongo-driver/bson"
13+
14+
"github.com/tidepool-org/platform/alerts"
15+
"github.com/tidepool-org/platform/auth"
16+
"github.com/tidepool-org/platform/data/store"
17+
"github.com/tidepool-org/platform/data/types/blood/glucose"
18+
"github.com/tidepool-org/platform/data/types/dosingdecision"
19+
"github.com/tidepool-org/platform/devicetokens"
20+
"github.com/tidepool-org/platform/errors"
21+
"github.com/tidepool-org/platform/log"
22+
logjson "github.com/tidepool-org/platform/log/json"
23+
lognull "github.com/tidepool-org/platform/log/null"
24+
"github.com/tidepool-org/platform/permission"
25+
"github.com/tidepool-org/platform/push"
26+
)
27+
28+
type Consumer struct {
29+
Alerts AlertsClient
30+
Data store.DataRepository
31+
DeviceTokens auth.DeviceTokensClient
32+
Evaluator AlertsEvaluator
33+
Permissions permission.Client
34+
Pusher Pusher
35+
Tokens alerts.TokenProvider
36+
37+
Logger log.Logger
38+
}
39+
40+
// DosingDecision removes a stutter to improve readability.
41+
type DosingDecision = dosingdecision.DosingDecision
42+
43+
// Glucose removes a stutter to improve readability.
44+
type Glucose = glucose.Glucose
45+
46+
func (c *Consumer) Consume(ctx context.Context,
47+
session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) (err error) {
48+
49+
if msg == nil {
50+
c.logger(ctx).Info("UNEXPECTED: nil message; ignoring")
51+
return nil
52+
}
53+
54+
switch {
55+
case strings.HasSuffix(msg.Topic, ".data.alerts"):
56+
return c.consumeAlertsConfigs(ctx, session, msg)
57+
case strings.HasSuffix(msg.Topic, ".data.deviceData.alerts"):
58+
return c.consumeDeviceData(ctx, session, msg)
59+
default:
60+
c.logger(ctx).WithField("topic", msg.Topic).
61+
Infof("UNEXPECTED: topic; ignoring")
62+
}
63+
64+
return nil
65+
}
66+
67+
func (c *Consumer) consumeAlertsConfigs(ctx context.Context,
68+
session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error {
69+
70+
cfg := &alerts.Config{}
71+
if err := unmarshalMessageValue(msg.Value, cfg); err != nil {
72+
return err
73+
}
74+
lgr := c.logger(ctx)
75+
lgr.WithField("cfg", cfg).Info("consuming an alerts config message")
76+
77+
ctxLog := c.logger(ctx).WithField("followedUserID", cfg.FollowedUserID)
78+
ctx = log.NewContextWithLogger(ctx, ctxLog)
79+
80+
notes, err := c.Evaluator.Evaluate(ctx, cfg.FollowedUserID)
81+
if err != nil {
82+
format := "Unable to evalaute alerts configs triggered event for user %s"
83+
return errors.Wrapf(err, format, cfg.UserID)
84+
}
85+
ctxLog.WithField("notes", notes).Debug("notes generated from alerts config")
86+
87+
c.pushNotes(ctx, notes)
88+
89+
session.MarkMessage(msg, "")
90+
lgr.WithField("message", msg).Debug("marked")
91+
return nil
92+
}
93+
94+
func (c *Consumer) consumeDeviceData(ctx context.Context,
95+
session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) (err error) {
96+
97+
datum := &Glucose{}
98+
if err := unmarshalMessageValue(msg.Value, datum); err != nil {
99+
return err
100+
}
101+
lgr := c.logger(ctx)
102+
lgr.WithField("data", datum).Info("consuming a device data message")
103+
104+
if datum.UserID == nil {
105+
return errors.New("Unable to retrieve alerts configs: userID is nil")
106+
}
107+
ctx = log.NewContextWithLogger(ctx, lgr.WithField("followedUserID", *datum.UserID))
108+
notes, err := c.Evaluator.Evaluate(ctx, *datum.UserID)
109+
if err != nil {
110+
format := "Unable to evalaute device data triggered event for user %s"
111+
return errors.Wrapf(err, format, *datum.UserID)
112+
}
113+
for idx, note := range notes {
114+
lgr.WithField("idx", idx).WithField("note", note).Debug("notes")
115+
}
116+
117+
c.pushNotes(ctx, notes)
118+
119+
session.MarkMessage(msg, "")
120+
lgr.WithField("message", msg).Debug("marked")
121+
return nil
122+
}
123+
124+
func (c *Consumer) pushNotes(ctx context.Context, notes []*alerts.Note) {
125+
lgr := c.logger(ctx)
126+
127+
// Notes could be pushed into a Kafka topic to have a more durable retry,
128+
// but that can be added later.
129+
for _, note := range notes {
130+
lgr := lgr.WithField("recipientUserID", note.RecipientUserID)
131+
tokens, err := c.DeviceTokens.GetDeviceTokens(ctx, note.RecipientUserID)
132+
if err != nil {
133+
lgr.WithError(err).Info("Unable to retrieve device tokens")
134+
}
135+
if len(tokens) == 0 {
136+
lgr.Debug("no device tokens found, won't push any notifications")
137+
}
138+
pushNote := push.FromNote(note)
139+
for _, token := range tokens {
140+
if err := c.Pusher.Push(ctx, token, pushNote); err != nil {
141+
lgr.WithError(err).Info("Unable to push notification")
142+
}
143+
}
144+
}
145+
}
146+
147+
// logger produces a log.Logger.
148+
//
149+
// It tries a number of options before falling back to a null Logger.
150+
func (c *Consumer) logger(ctx context.Context) log.Logger {
151+
// A context's Logger is preferred, as it has the most... context.
152+
if ctxLgr := log.LoggerFromContext(ctx); ctxLgr != nil {
153+
return ctxLgr
154+
}
155+
if c.Logger != nil {
156+
return c.Logger
157+
}
158+
fallback, err := logjson.NewLogger(os.Stderr, log.DefaultLevelRanks(), log.DefaultLevel())
159+
if err != nil {
160+
fallback = lognull.NewLogger()
161+
}
162+
return fallback
163+
}
164+
165+
type AlertsEvaluator interface {
166+
Evaluate(ctx context.Context, followedUserID string) ([]*alerts.Note, error)
167+
}
168+
169+
func NewAlertsEvaluator(alerts AlertsClient, data store.DataRepository,
170+
perms permission.Client, tokens alerts.TokenProvider) *evaluator {
171+
172+
return &evaluator{
173+
Alerts: alerts,
174+
Data: data,
175+
Permissions: perms,
176+
Tokens: tokens,
177+
}
178+
}
179+
180+
// evaluator implements AlertsEvaluator.
181+
type evaluator struct {
182+
Alerts AlertsClient
183+
Data store.DataRepository
184+
Permissions permission.Client
185+
Tokens alerts.TokenProvider
186+
}
187+
188+
// logger produces a log.Logger.
189+
//
190+
// It tries a number of options before falling back to a null Logger.
191+
func (e *evaluator) logger(ctx context.Context) log.Logger {
192+
// A context's Logger is preferred, as it has the most... context.
193+
if ctxLgr := log.LoggerFromContext(ctx); ctxLgr != nil {
194+
return ctxLgr
195+
}
196+
fallback, err := logjson.NewLogger(os.Stderr, log.DefaultLevelRanks(), log.DefaultLevel())
197+
if err != nil {
198+
fallback = lognull.NewLogger()
199+
}
200+
return fallback
201+
}
202+
203+
// Evaluate followers' alerts.Configs to generate alert notifications.
204+
func (e *evaluator) Evaluate(ctx context.Context, followedUserID string) (
205+
[]*alerts.Note, error) {
206+
207+
alertsConfigs, err := e.gatherAlertsConfigs(ctx, followedUserID)
208+
if err != nil {
209+
return nil, err
210+
}
211+
e.logger(ctx).Debugf("%d alerts configs found", len(alertsConfigs))
212+
213+
alertsConfigsByUploadID := e.mapAlertsConfigsByUploadID(alertsConfigs)
214+
215+
notes := []*alerts.Note{}
216+
for uploadID, cfgs := range alertsConfigsByUploadID {
217+
resp, err := e.gatherData(ctx, followedUserID, uploadID, cfgs)
218+
if err != nil {
219+
return nil, err
220+
}
221+
notes = slices.Concat(notes, e.generateNotes(ctx, cfgs, resp))
222+
}
223+
224+
return notes, nil
225+
}
226+
227+
func (e *evaluator) mapAlertsConfigsByUploadID(cfgs []*alerts.Config) map[string][]*alerts.Config {
228+
mapped := map[string][]*alerts.Config{}
229+
for _, cfg := range cfgs {
230+
if _, found := mapped[cfg.UploadID]; !found {
231+
mapped[cfg.UploadID] = []*alerts.Config{}
232+
}
233+
mapped[cfg.UploadID] = append(mapped[cfg.UploadID], cfg)
234+
}
235+
return mapped
236+
}
237+
238+
func (e *evaluator) gatherAlertsConfigs(ctx context.Context,
239+
followedUserID string) ([]*alerts.Config, error) {
240+
241+
alertsConfigs, err := e.Alerts.List(ctx, followedUserID)
242+
if err != nil {
243+
return nil, err
244+
}
245+
e.logger(ctx).Debugf("after List, %d alerts configs", len(alertsConfigs))
246+
alertsConfigs = slices.DeleteFunc(alertsConfigs, e.authDenied(ctx))
247+
e.logger(ctx).Debugf("after perms check, %d alerts configs", len(alertsConfigs))
248+
return alertsConfigs, nil
249+
}
250+
251+
// authDenied builds functions that enable slices.DeleteFunc to remove
252+
// unauthorized users' alerts.Configs.
253+
//
254+
// Via a closure it's able to inject information from the Context and the
255+
// evaluator itself into the resulting function.
256+
func (e *evaluator) authDenied(ctx context.Context) func(ac *alerts.Config) bool {
257+
lgr := e.logger(ctx)
258+
return func(ac *alerts.Config) bool {
259+
if ac == nil {
260+
return true
261+
}
262+
lgr = lgr.WithFields(log.Fields{
263+
"userID": ac.UserID,
264+
"followedUserID": ac.FollowedUserID,
265+
})
266+
token, err := e.Tokens.ServerSessionToken()
267+
if err != nil {
268+
lgr.WithError(err).Warn("Unable to confirm permissions; skipping")
269+
return false
270+
}
271+
ctx = auth.NewContextWithServerSessionToken(ctx, token)
272+
perms, err := e.Permissions.GetUserPermissions(ctx, ac.UserID, ac.FollowedUserID)
273+
if err != nil {
274+
lgr.WithError(err).Warn("Unable to confirm permissions; skipping")
275+
return true
276+
}
277+
if _, found := perms[permission.Follow]; !found {
278+
lgr.Debug("permission denied: skipping")
279+
return true
280+
}
281+
return false
282+
}
283+
}
284+
285+
func (e *evaluator) gatherData(ctx context.Context, followedUserID, uploadID string,
286+
alertsConfigs []*alerts.Config) (*store.AlertableResponse, error) {
287+
288+
if len(alertsConfigs) == 0 {
289+
return nil, nil
290+
}
291+
292+
longestDelay := slices.MaxFunc(alertsConfigs, func(i, j *alerts.Config) int {
293+
return cmp.Compare(i.LongestDelay(), j.LongestDelay())
294+
}).LongestDelay()
295+
longestDelay = max(5*time.Minute, longestDelay)
296+
e.logger(ctx).WithField("longestDelay", longestDelay).Debug("here it is")
297+
params := store.AlertableParams{
298+
UserID: followedUserID,
299+
UploadID: uploadID,
300+
Start: time.Now().Add(-longestDelay),
301+
}
302+
resp, err := e.Data.GetAlertableData(ctx, params)
303+
if err != nil {
304+
return nil, err
305+
}
306+
307+
return resp, nil
308+
}
309+
310+
func (e *evaluator) generateNotes(ctx context.Context,
311+
alertsConfigs []*alerts.Config, resp *store.AlertableResponse) []*alerts.Note {
312+
313+
if len(alertsConfigs) == 0 {
314+
return nil
315+
}
316+
317+
lgr := e.logger(ctx)
318+
notes := []*alerts.Note{}
319+
for _, alertsConfig := range alertsConfigs {
320+
l := lgr.WithFields(log.Fields{
321+
"userID": alertsConfig.UserID,
322+
"followedUserID": alertsConfig.FollowedUserID,
323+
"uploadID": alertsConfig.UploadID,
324+
})
325+
c := log.NewContextWithLogger(ctx, l)
326+
note := alertsConfig.Evaluate(c, resp.Glucose, resp.DosingDecisions)
327+
if note != nil {
328+
notes = append(notes, note)
329+
continue
330+
}
331+
}
332+
333+
return notes
334+
}
335+
336+
func unmarshalMessageValue[A any](b []byte, payload *A) error {
337+
wrapper := &struct {
338+
FullDocument A `json:"fullDocument"`
339+
}{}
340+
if err := bson.UnmarshalExtJSON(b, false, wrapper); err != nil {
341+
return errors.Wrap(err, "Unable to unmarshal ExtJSON")
342+
}
343+
*payload = wrapper.FullDocument
344+
return nil
345+
}
346+
347+
type AlertsClient interface {
348+
Delete(context.Context, *alerts.Config) error
349+
Get(context.Context, *alerts.Config) (*alerts.Config, error)
350+
List(_ context.Context, userID string) ([]*alerts.Config, error)
351+
Upsert(context.Context, *alerts.Config) error
352+
}
353+
354+
// Pusher is a service-agnostic interface for sending push notifications.
355+
type Pusher interface {
356+
// Push a notification to a device.
357+
Push(context.Context, *devicetokens.DeviceToken, *push.Notification) error
358+
}

0 commit comments

Comments
 (0)