Skip to content

Commit 36a822f

Browse files
committed
Add 'DeqeuedAt' for better monitoring at hooked backends
1 parent 9f989f3 commit 36a822f

File tree

2 files changed

+59
-29
lines changed

2 files changed

+59
-29
lines changed

protobufs/livekit_webhook.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ message WebhookEvent {
4949
// timestamp in seconds
5050
int64 created_at = 7;
5151

52+
int64 dequeued_at = 13;
53+
5254
int32 num_dropped = 11;
5355

54-
// NEXT_ID: 12
56+
// NEXT_ID: 13
5557
}

webhook/webhook_test.go

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -79,28 +79,55 @@ func TestURLNotifierDropped(t *testing.T) {
7979
require.NoError(t, s.Start())
8080
defer s.Stop()
8181

82-
urlNotifier := newTestNotifier()
83-
defer urlNotifier.Stop(true)
84-
totalDropped := atomic.Int32{}
85-
totalReceived := atomic.Int32{}
86-
s.handler = func(r *http.Request) {
87-
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
88-
require.NoError(t, err)
89-
totalReceived.Inc()
90-
totalDropped.Add(decodedEvent.NumDropped)
91-
}
92-
// send multiple notifications
93-
for i := 0; i < 10; i++ {
94-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
95-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
96-
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
97-
}
82+
t.Run("DropWhenFull = true", func(t *testing.T) {
83+
urlNotifier := newTestNotifier(true)
84+
defer urlNotifier.Stop(true)
85+
totalDropped := atomic.Int32{}
86+
totalReceived := atomic.Int32{}
87+
s.handler = func(r *http.Request) {
88+
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
89+
require.NoError(t, err)
90+
totalReceived.Inc()
91+
totalDropped.Add(decodedEvent.NumDropped)
92+
}
93+
// send multiple notifications
94+
for i := 0; i < 10; i++ {
95+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
96+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
97+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
98+
}
99+
100+
time.Sleep(webhookCheckInterval)
98101

99-
time.Sleep(webhookCheckInterval)
102+
require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
103+
// at least one request dropped
104+
require.Less(t, int32(0), totalDropped.Load())
105+
})
100106

101-
require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
102-
// at least one request dropped
103-
require.Less(t, int32(0), totalDropped.Load())
107+
t.Run("DropWhenFull = false", func(t *testing.T) {
108+
urlNotifier := newTestNotifier(false)
109+
defer urlNotifier.Stop(true)
110+
totalDropped := atomic.Int32{}
111+
totalReceived := atomic.Int32{}
112+
s.handler = func(r *http.Request) {
113+
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
114+
require.NoError(t, err)
115+
totalReceived.Inc()
116+
totalDropped.Add(decodedEvent.NumDropped)
117+
}
118+
// send multiple notifications
119+
for i := 0; i < 10; i++ {
120+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
121+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
122+
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
123+
}
124+
125+
time.Sleep(webhookCheckInterval)
126+
127+
require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
128+
// at least one request dropped
129+
require.Equal(t, int32(0), totalDropped.Load())
130+
})
104131
}
105132

106133
func TestURLNotifierLifecycle(t *testing.T) {
@@ -109,12 +136,12 @@ func TestURLNotifierLifecycle(t *testing.T) {
109136
defer s.Stop()
110137

111138
t.Run("start/stop without use", func(t *testing.T) {
112-
urlNotifier := newTestNotifier()
139+
urlNotifier := newTestNotifier(true)
113140
urlNotifier.Stop(false)
114141
})
115142

116143
t.Run("stop allowing to drain", func(t *testing.T) {
117-
urlNotifier := newTestNotifier()
144+
urlNotifier := newTestNotifier(true)
118145
numCalled := atomic.Int32{}
119146
s.handler = func(r *http.Request) {
120147
numCalled.Inc()
@@ -128,7 +155,7 @@ func TestURLNotifierLifecycle(t *testing.T) {
128155
})
129156

130157
t.Run("force stop", func(t *testing.T) {
131-
urlNotifier := newTestNotifier()
158+
urlNotifier := newTestNotifier(true)
132159
numCalled := atomic.Int32{}
133160
s.handler = func(r *http.Request) {
134161
numCalled.Inc()
@@ -143,12 +170,13 @@ func TestURLNotifierLifecycle(t *testing.T) {
143170
})
144171
}
145172

146-
func newTestNotifier() *URLNotifier {
173+
func newTestNotifier(dropWhenFull bool) *URLNotifier {
147174
return NewURLNotifier(URLNotifierParams{
148-
QueueSize: 20,
149-
URL: testUrl,
150-
APIKey: apiKey,
151-
APISecret: apiSecret,
175+
QueueSize: 20,
176+
URL: testUrl,
177+
APIKey: apiKey,
178+
APISecret: apiSecret,
179+
DropWhenFull: dropWhenFull,
152180
})
153181
}
154182

0 commit comments

Comments
 (0)