Skip to content

Commit 9b6e5d7

Browse files
Webhook analytics event. (#979)
* Webhook analytics event. NOTE: Need a frostbyte73/core PR to merge and tag a release PR pending: frostbyte73/core#5 * sync submit * generated protobuf --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 82dadcf commit 9b6e5d7

File tree

8 files changed

+620
-181
lines changed

8 files changed

+620
-181
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
buf.build/go/protoyaml v0.3.1
77
github.com/benbjohnson/clock v1.3.5
88
github.com/dennwc/iters v1.0.1
9-
github.com/frostbyte73/core v0.1.0
9+
github.com/frostbyte73/core v0.1.1
1010
github.com/fsnotify/fsnotify v1.8.0
1111
github.com/gammazero/deque v1.0.0
1212
github.com/go-jose/go-jose/v3 v3.0.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6
4949
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
5050
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
5151
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
52-
github.com/frostbyte73/core v0.1.0 h1:KA4klxRjLbEHLv+judmlRtweyjcj1NWOJ+BQHQgNxfw=
53-
github.com/frostbyte73/core v0.1.0/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo=
52+
github.com/frostbyte73/core v0.1.1 h1:ChhJOR7bAKOCPbA+lqDLE2cGKlCG5JXsDvvQr4YaJIA=
53+
github.com/frostbyte73/core v0.1.1/go.mod h1:mhfOtR+xWAvwXiwor7jnqPMnu4fxbv1F2MwZ0BEpzZo=
5454
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
5555
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
5656
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=

livekit/livekit_analytics.pb.go

Lines changed: 427 additions & 144 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobufs/livekit_analytics.proto

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,9 @@ enum AnalyticsEventType {
124124
SIP_CALL_ENDED = 39;
125125
REPORT = 40;
126126
API_CALL = 41;
127+
WEBHOOK = 42;
127128

128-
// NEXT_ID: 42
129+
// NEXT_ID: 43
129130
}
130131

131132
message AnalyticsClientMeta {
@@ -175,8 +176,9 @@ message AnalyticsEvent {
175176
SIPDispatchRuleInfo sip_dispatch_rule = 32;
176177
ReportInfo report = 33;
177178
APICallInfo api_call = 34;
179+
WebhookInfo webhook = 35;
178180

179-
// NEXT_ID: 35
181+
// NEXT_ID: 36
180182
}
181183

182184
message AnalyticsEvents {
@@ -269,3 +271,28 @@ message APICallInfo {
269271
google.protobuf.Timestamp started_at = 14;
270272
int64 duration_ns = 15;
271273
}
274+
275+
message WebhookInfo {
276+
string event_id = 1;
277+
string event = 2;
278+
string project_id = 3;
279+
string room_name = 4;
280+
string room_id = 5;
281+
string participant_identity = 6;
282+
string participant_id = 7;
283+
string track_id = 8;
284+
string egress_id = 9;
285+
string ingress_id = 10;
286+
google.protobuf.Timestamp created_at = 11;
287+
google.protobuf.Timestamp queued_at = 12;
288+
int64 queue_duration_ns = 13;
289+
google.protobuf.Timestamp sent_at = 14;
290+
int64 send_duration_ns = 15;
291+
string url = 16;
292+
int32 num_dropped = 17;
293+
bool is_dropped = 18;
294+
string service_status = 19;
295+
int32 service_error_code = 20;
296+
string service_error = 21;
297+
string send_error = 22;
298+
}

replay/cloud_replay.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

webhook/notifier.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
)
2424

2525
type QueuedNotifier interface {
26+
RegisterProcessedHook(f func(ctx context.Context, whi *livekit.WebhookInfo))
2627
QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error
2728
}
2829

@@ -56,11 +57,17 @@ func (n *DefaultNotifier) Stop(force bool) {
5657
wg.Wait()
5758
}
5859

59-
func (n *DefaultNotifier) QueueNotify(_ context.Context, event *livekit.WebhookEvent) error {
60+
func (n *DefaultNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
6061
for _, u := range n.urlNotifiers {
61-
if err := u.QueueNotify(event); err != nil {
62+
if err := u.QueueNotify(ctx, event); err != nil {
6263
return err
6364
}
6465
}
6566
return nil
6667
}
68+
69+
func (n *DefaultNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) {
70+
for _, u := range n.urlNotifiers {
71+
u.RegisterProcessedHook(hook)
72+
}
73+
}

webhook/url_notifier.go

Lines changed: 144 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package webhook
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"crypto/sha256"
2021
"encoding/base64"
2122
"sync"
@@ -25,6 +26,7 @@ import (
2526
"github.com/hashicorp/go-retryablehttp"
2627
"go.uber.org/atomic"
2728
"google.golang.org/protobuf/encoding/protojson"
29+
"google.golang.org/protobuf/types/known/timestamppb"
2830

2931
"github.com/livekit/protocol/auth"
3032
"github.com/livekit/protocol/livekit"
@@ -37,11 +39,12 @@ const (
3739

3840
type URLNotifierParams struct {
3941
HTTPClientParams
40-
Logger logger.Logger
41-
QueueSize int
42-
URL string
43-
APIKey string
44-
APISecret string
42+
Logger logger.Logger
43+
QueueSize int
44+
URL string
45+
APIKey string
46+
APISecret string
47+
FieldsHook func(whi *livekit.WebhookInfo)
4548
}
4649

4750
type HTTPClientParams struct {
@@ -56,11 +59,12 @@ const defaultQueueSize = 100
5659
// URLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL.
5760
// It will retry on failure, and will drop events if notification fall too far behind
5861
type URLNotifier struct {
59-
mu sync.RWMutex
60-
params URLNotifierParams
61-
client *retryablehttp.Client
62-
dropped atomic.Int32
63-
pool core.QueuePool
62+
mu sync.RWMutex
63+
params URLNotifierParams
64+
client *retryablehttp.Client
65+
dropped atomic.Int32
66+
pool core.QueuePool
67+
processedHook func(ctx context.Context, whi *livekit.WebhookInfo)
6468
}
6569

6670
func NewURLNotifier(params URLNotifierParams) *URLNotifier {
@@ -93,7 +97,6 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
9397
n.pool = core.NewQueuePool(numWorkers, core.QueueWorkerParams{
9498
QueueSize: params.QueueSize,
9599
DropWhenFull: true,
96-
OnDropped: func() { n.dropped.Inc() },
97100
})
98101
return n
99102
}
@@ -105,25 +108,76 @@ func (n *URLNotifier) SetKeys(apiKey, apiSecret string) {
105108
n.params.APISecret = apiSecret
106109
}
107110

108-
func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error {
111+
func (n *URLNotifier) RegisterProcessedHook(hook func(ctx context.Context, whi *livekit.WebhookInfo)) {
112+
n.mu.Lock()
113+
defer n.mu.Unlock()
114+
n.processedHook = hook
115+
}
116+
117+
func (n *URLNotifier) getProcessedHook() func(ctx context.Context, whi *livekit.WebhookInfo) {
118+
n.mu.RLock()
119+
defer n.mu.RUnlock()
120+
return n.processedHook
121+
}
122+
123+
func (n *URLNotifier) QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error {
109124
enqueuedAt := time.Now()
110125

111-
n.pool.Submit(n.eventKey(event), func() {
112-
fields := logFields(event)
113-
fields = append(fields,
114-
"url", n.params.URL,
115-
"queueDuration", time.Since(enqueuedAt),
116-
)
117-
sentStart := time.Now()
126+
if !n.pool.Submit(n.eventKey(event), func() {
127+
fields := logFields(event, n.params.URL)
128+
129+
queueDuration := time.Since(enqueuedAt)
130+
fields = append(fields, "queueDuration", queueDuration)
131+
132+
sendStart := time.Now()
118133
err := n.send(event)
119-
fields = append(fields, "sendDuration", time.Since(sentStart))
134+
sendDuration := time.Since(sendStart)
135+
fields = append(fields, "sendDuration", sendDuration)
120136
if err != nil {
121137
n.params.Logger.Warnw("failed to send webhook", err, fields...)
122138
n.dropped.Add(event.NumDropped + 1)
123139
} else {
124140
n.params.Logger.Infow("sent webhook", fields...)
125141
}
126-
})
142+
if ph := n.getProcessedHook(); ph != nil {
143+
whi := webhookInfo(
144+
event,
145+
enqueuedAt,
146+
queueDuration,
147+
sendStart,
148+
sendDuration,
149+
n.params.URL,
150+
false,
151+
err,
152+
)
153+
if n.params.FieldsHook != nil {
154+
n.params.FieldsHook(whi)
155+
}
156+
ph(ctx, whi)
157+
}
158+
}) {
159+
n.dropped.Inc()
160+
161+
fields := logFields(event, n.params.URL)
162+
n.params.Logger.Infow("dropped webhook", fields...)
163+
164+
if ph := n.getProcessedHook(); ph != nil {
165+
whi := webhookInfo(
166+
event,
167+
time.Time{},
168+
0,
169+
time.Time{},
170+
0,
171+
n.params.URL,
172+
true,
173+
nil,
174+
)
175+
if n.params.FieldsHook != nil {
176+
n.params.FieldsHook(whi)
177+
}
178+
ph(ctx, whi)
179+
}
180+
}
127181
return nil
128182
}
129183

@@ -197,12 +251,13 @@ type logAdapter struct{}
197251

198252
func (l *logAdapter) Printf(string, ...interface{}) {}
199253

200-
func logFields(event *livekit.WebhookEvent) []interface{} {
254+
func logFields(event *livekit.WebhookEvent, url string) []interface{} {
201255
fields := make([]interface{}, 0, 20)
202256
fields = append(fields,
203257
"event", event.Event,
204258
"id", event.Id,
205259
"webhookTime", event.CreatedAt,
260+
"url", url,
206261
)
207262

208263
if event.Room != nil {
@@ -217,6 +272,11 @@ func logFields(event *livekit.WebhookEvent) []interface{} {
217272
"pID", event.Participant.Sid,
218273
)
219274
}
275+
if event.Track != nil {
276+
fields = append(fields,
277+
"trackID", event.Track.Sid,
278+
)
279+
}
220280
if event.EgressInfo != nil {
221281
fields = append(fields,
222282
"egressID", event.EgressInfo.EgressId,
@@ -239,3 +299,65 @@ func logFields(event *livekit.WebhookEvent) []interface{} {
239299
}
240300
return fields
241301
}
302+
303+
func webhookInfo(
304+
event *livekit.WebhookEvent,
305+
queuedAt time.Time,
306+
queueDuration time.Duration,
307+
sentAt time.Time,
308+
sendDuration time.Duration,
309+
url string,
310+
isDropped bool,
311+
sendError error,
312+
) *livekit.WebhookInfo {
313+
whi := &livekit.WebhookInfo{
314+
EventId: event.Id,
315+
Event: event.Event,
316+
CreatedAt: timestamppb.New(time.Unix(event.CreatedAt, 0)),
317+
QueuedAt: timestamppb.New(queuedAt),
318+
QueueDurationNs: queueDuration.Nanoseconds(),
319+
SentAt: timestamppb.New(sentAt),
320+
SendDurationNs: sendDuration.Nanoseconds(),
321+
Url: url,
322+
NumDropped: event.NumDropped,
323+
IsDropped: isDropped,
324+
}
325+
if !queuedAt.IsZero() {
326+
whi.QueuedAt = timestamppb.New(queuedAt)
327+
}
328+
if !sentAt.IsZero() {
329+
whi.SentAt = timestamppb.New(sentAt)
330+
}
331+
if event.Room != nil {
332+
whi.RoomName = event.Room.Name
333+
whi.RoomId = event.Room.Sid
334+
}
335+
if event.Participant != nil {
336+
whi.ParticipantIdentity = event.Participant.Identity
337+
whi.ParticipantId = event.Participant.Sid
338+
}
339+
if event.Track != nil {
340+
whi.TrackId = event.Track.Sid
341+
}
342+
if event.EgressInfo != nil {
343+
whi.EgressId = event.EgressInfo.EgressId
344+
whi.ServiceStatus = event.EgressInfo.Status.String()
345+
if event.EgressInfo.Error != "" {
346+
whi.ServiceErrorCode = event.EgressInfo.ErrorCode
347+
whi.ServiceError = event.EgressInfo.Error
348+
}
349+
}
350+
if event.IngressInfo != nil {
351+
whi.IngressId = event.IngressInfo.IngressId
352+
if event.IngressInfo.State != nil {
353+
whi.ServiceStatus = event.IngressInfo.State.Status.String()
354+
if event.IngressInfo.State.Error != "" {
355+
whi.ServiceError = event.IngressInfo.State.Error
356+
}
357+
}
358+
}
359+
if sendError != nil {
360+
whi.SendError = sendError.Error()
361+
}
362+
return whi
363+
}

webhook/webhook_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ func TestURLNotifierDropped(t *testing.T) {
9191
}
9292
// send multiple notifications
9393
for i := 0; i < 10; i++ {
94-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
95-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
96-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
94+
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted})
95+
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventParticipantJoined})
96+
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished})
9797
}
9898

9999
time.Sleep(webhookCheckInterval)
@@ -120,8 +120,8 @@ func TestURLNotifierLifecycle(t *testing.T) {
120120
numCalled.Inc()
121121
}
122122
for i := 0; i < 10; i++ {
123-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
124-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
123+
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted})
124+
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished})
125125
}
126126
urlNotifier.Stop(false)
127127
require.Eventually(t, func() bool { return numCalled.Load() == 20 }, 5*time.Second, webhookCheckInterval)
@@ -134,8 +134,8 @@ func TestURLNotifierLifecycle(t *testing.T) {
134134
numCalled.Inc()
135135
}
136136
for i := 0; i < 10; i++ {
137-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
138-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
137+
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomStarted})
138+
_ = urlNotifier.QueueNotify(context.Background(), &livekit.WebhookEvent{Event: EventRoomFinished})
139139
}
140140
urlNotifier.Stop(true)
141141
time.Sleep(time.Second)

0 commit comments

Comments
 (0)