Skip to content

Commit 6f98580

Browse files
committed
Ensure sessions are processed with fifo even if concurrent
Signed-off-by: Joni Collinge <[email protected]>
1 parent 920ad8a commit 6f98580

File tree

3 files changed

+496
-2
lines changed

3 files changed

+496
-2
lines changed

common/component/azure/servicebus/subscription.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,13 @@ func (s *Subscription) ReceiveBlocking(parentCtx context.Context, handler Handle
273273
continue
274274
}
275275

276-
// Handle the messages in background
277-
go s.handleAsync(ctx, msgs, handler, receiver)
276+
// If we require sessions then we must process the message
277+
// synchronously to ensure the FIFO order is maintained.
278+
if s.requireSessions {
279+
s.handleAsync(ctx, msgs, handler, receiver)
280+
} else {
281+
go s.handleAsync(ctx, msgs, handler, receiver)
282+
}
278283
}
279284
}
280285

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package topics
15+
16+
import (
17+
"context"
18+
"errors"
19+
"fmt"
20+
"strings"
21+
"sync"
22+
"sync/atomic"
23+
"testing"
24+
"time"
25+
26+
azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
27+
"github.com/stretchr/testify/assert"
28+
"github.com/stretchr/testify/require"
29+
30+
impl "github.com/dapr/components-contrib/common/component/azure/servicebus"
31+
"github.com/dapr/kit/logger"
32+
"github.com/dapr/kit/ptr"
33+
)
34+
35+
type mockReceiver struct {
36+
messages []*azservicebus.ReceivedMessage
37+
messageIndex int
38+
sessionID string
39+
mu sync.Mutex
40+
closed bool
41+
}
42+
43+
func newMockReceiver(sessionID string, messages []*azservicebus.ReceivedMessage) *mockReceiver {
44+
return &mockReceiver{
45+
sessionID: sessionID,
46+
messages: messages,
47+
}
48+
}
49+
50+
func (m *mockReceiver) ReceiveMessages(ctx context.Context, count int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) {
51+
m.mu.Lock()
52+
defer m.mu.Unlock()
53+
54+
if m.closed {
55+
return nil, errors.New("receiver closed")
56+
}
57+
58+
if ctx.Err() != nil {
59+
return nil, ctx.Err()
60+
}
61+
62+
if m.messageIndex >= len(m.messages) {
63+
select {
64+
case <-ctx.Done():
65+
return nil, ctx.Err()
66+
case <-time.After(100 * time.Millisecond):
67+
return nil, errors.New("no more messages")
68+
}
69+
}
70+
71+
end := m.messageIndex + count
72+
if end > len(m.messages) {
73+
end = len(m.messages)
74+
}
75+
76+
result := m.messages[m.messageIndex:end]
77+
m.messageIndex = end
78+
return result, nil
79+
}
80+
81+
func (m *mockReceiver) CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error {
82+
return nil
83+
}
84+
85+
func (m *mockReceiver) AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error {
86+
return nil
87+
}
88+
89+
func (m *mockReceiver) Close(ctx context.Context) error {
90+
m.mu.Lock()
91+
defer m.mu.Unlock()
92+
m.closed = true
93+
return nil
94+
}
95+
96+
func TestSessionOrderingWithSingleHandler(t *testing.T) {
97+
const numMessages = 10
98+
sessionID := "test-session-1"
99+
100+
messages := make([]*azservicebus.ReceivedMessage, numMessages)
101+
for i := 0; i < numMessages; i++ {
102+
seqNum := int64(i + 1)
103+
messages[i] = &azservicebus.ReceivedMessage{
104+
MessageID: fmt.Sprintf("msg-%d", i),
105+
SessionID: &sessionID,
106+
SequenceNumber: &seqNum,
107+
Body: []byte(fmt.Sprintf("message-%d", i)),
108+
}
109+
}
110+
111+
sub := impl.NewSubscription(
112+
impl.SubscriptionOptions{
113+
MaxActiveMessages: 100,
114+
TimeoutInSec: 5,
115+
MaxBulkSubCount: ptr.Of(1),
116+
MaxConcurrentHandlers: 1,
117+
Entity: "test-topic",
118+
LockRenewalInSec: 30,
119+
RequireSessions: true,
120+
SessionIdleTimeout: time.Second * 5,
121+
},
122+
logger.NewLogger("test"),
123+
)
124+
125+
var (
126+
processedOrder []int
127+
orderMu sync.Mutex
128+
)
129+
130+
handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) {
131+
var msgIndex int
132+
_, err := fmt.Sscanf(string(msgs[0].Body), "message-%d", &msgIndex)
133+
require.NoError(t, err)
134+
135+
time.Sleep(10 * time.Millisecond)
136+
137+
orderMu.Lock()
138+
processedOrder = append(processedOrder, msgIndex)
139+
orderMu.Unlock()
140+
141+
return nil, nil
142+
}
143+
144+
receiver := newMockReceiver(sessionID, messages)
145+
146+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
147+
defer cancel()
148+
149+
done := make(chan struct{})
150+
go func() {
151+
defer close(done)
152+
_ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, "test-session")
153+
}()
154+
155+
<-done
156+
157+
expectedOrder := make([]int, numMessages)
158+
for i := range expectedOrder {
159+
expectedOrder[i] = i
160+
}
161+
162+
assert.Equal(t, expectedOrder, processedOrder, "messages must be processed in order")
163+
}
164+
165+
func TestMultipleSessionsConcurrentHandler(t *testing.T) {
166+
const (
167+
numSessions = 5
168+
messagesPerSession = 10
169+
maxConcurrentLimit = 3
170+
)
171+
172+
sessionIDs := make([]string, numSessions)
173+
for i := 0; i < numSessions; i++ {
174+
sessionIDs[i] = fmt.Sprintf("session-%d", i)
175+
}
176+
177+
allMessages := make(map[string][]*azservicebus.ReceivedMessage)
178+
for _, sessionID := range sessionIDs {
179+
messages := make([]*azservicebus.ReceivedMessage, messagesPerSession)
180+
for i := 0; i < messagesPerSession; i++ {
181+
seqNum := int64(i + 1)
182+
sessID := sessionID
183+
messages[i] = &azservicebus.ReceivedMessage{
184+
MessageID: fmt.Sprintf("%s-msg-%d", sessionID, i),
185+
SessionID: &sessID,
186+
SequenceNumber: &seqNum,
187+
Body: []byte(fmt.Sprintf("%s:%d", sessionID, i)),
188+
}
189+
}
190+
allMessages[sessionID] = messages
191+
}
192+
193+
sub := impl.NewSubscription(
194+
impl.SubscriptionOptions{
195+
MaxActiveMessages: 100,
196+
TimeoutInSec: 5,
197+
MaxBulkSubCount: ptr.Of(1),
198+
MaxConcurrentHandlers: maxConcurrentLimit,
199+
Entity: "test-topic",
200+
LockRenewalInSec: 30,
201+
RequireSessions: true,
202+
SessionIdleTimeout: time.Second * 5,
203+
},
204+
logger.NewLogger("test"),
205+
)
206+
207+
var (
208+
mu sync.Mutex
209+
globalOrder []string // tracks session IDs in the order messages were received
210+
sessionOrders = make(map[string][]int)
211+
concurrentHandlers atomic.Int32
212+
maxConcurrentHandlers atomic.Int32
213+
)
214+
215+
handlerFunc := func(ctx context.Context, msgs []*azservicebus.ReceivedMessage) ([]impl.HandlerResponseItem, error) {
216+
msg := msgs[0]
217+
sessionID := *msg.SessionID
218+
219+
current := concurrentHandlers.Add(1)
220+
defer concurrentHandlers.Add(-1)
221+
222+
for {
223+
max := maxConcurrentHandlers.Load()
224+
if current <= max || maxConcurrentHandlers.CompareAndSwap(max, current) {
225+
break
226+
}
227+
}
228+
229+
var msgIndex int
230+
parts := strings.Split(string(msg.Body), ":")
231+
require.Len(t, parts, 2)
232+
_, err := fmt.Sscanf(parts[1], "%d", &msgIndex)
233+
require.NoError(t, err)
234+
235+
time.Sleep(50 * time.Millisecond)
236+
237+
mu.Lock()
238+
globalOrder = append(globalOrder, sessionID)
239+
sessionOrders[sessionID] = append(sessionOrders[sessionID], msgIndex)
240+
mu.Unlock()
241+
242+
return nil, nil
243+
}
244+
245+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
246+
defer cancel()
247+
248+
var wg sync.WaitGroup
249+
for _, sessionID := range sessionIDs {
250+
wg.Add(1)
251+
go func() {
252+
defer wg.Done()
253+
receiver := newMockReceiver(sessionID, allMessages[sessionID])
254+
done := make(chan struct{})
255+
go func() {
256+
defer close(done)
257+
_ = sub.ReceiveBlocking(ctx, handlerFunc, receiver, func() {}, fmt.Sprintf("session-%s", sessionID))
258+
}()
259+
<-done
260+
}()
261+
}
262+
263+
wg.Wait()
264+
265+
// Verify FIFO ordering per session
266+
for _, sessionID := range sessionIDs {
267+
order := sessionOrders[sessionID]
268+
require.Len(t, order, messagesPerSession, "session %s should process all messages", sessionID)
269+
270+
for i := 0; i < messagesPerSession; i++ {
271+
assert.Equal(t, i, order[i], "session %s message %d out of order", sessionID, i)
272+
}
273+
}
274+
275+
// Verify concurrent handler limits
276+
assert.LessOrEqual(t, maxConcurrentHandlers.Load(), int32(maxConcurrentLimit),
277+
"concurrent handlers should not exceed configured maximum")
278+
279+
assert.Greater(t, maxConcurrentHandlers.Load(), int32(1),
280+
"multiple handlers should run concurrently across sessions")
281+
282+
// Check global order to prove concurrent processing
283+
// If processed sequentially, all messages from one session would come before the next
284+
// If processed concurrently, session IDs will be interleaved
285+
hasInterleaving := false
286+
seenSessions := make(map[string]bool)
287+
lastSession := ""
288+
289+
for _, sessionID := range globalOrder {
290+
if sessionID != lastSession && seenSessions[sessionID] {
291+
// We've seen this session before but with a different session in between
292+
hasInterleaving = true
293+
break
294+
}
295+
seenSessions[sessionID] = true
296+
lastSession = sessionID
297+
}
298+
299+
assert.True(t, hasInterleaving,
300+
"global order must show session interleaving, proving concurrent processing")
301+
}

0 commit comments

Comments
 (0)