Skip to content

Commit 7c34642

Browse files
authored
fix localdistservice missing result local route (#227)
1 parent 1883873 commit 7c34642

2 files changed

Lines changed: 129 additions & 12 deletions

File tree

bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
105105
Set<MatchInfo> ok = new HashSet<>();
106106
Set<MatchInfo> skip = new HashSet<>();
107107
Set<MatchInfo> noSub = new HashSet<>();
108+
Set<MatchInfo> noReceiver = new HashSet<>();
108109
long totalFanOutBytes = 0L;
109110
for (DeliveryPack writePack : packageEntry.getValue().getPackList()) {
110111
TopicMessagePack topicMsgPack = writePack.getMessagePack();
@@ -124,14 +125,13 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
124125
matchInfo);
125126
}
126127
} else {
127-
// no session found for shared subscription
128-
noSub.add(matchInfo);
128+
noReceiver.add(matchInfo);
129129
}
130130
} else {
131131
Optional<CompletableFuture<? extends ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =
132132
localTopicRouter.getTopicRoutes(tenantId, matchInfo);
133133
if (routesFutureOpt.isEmpty()) {
134-
noSub.add(matchInfo);
134+
noReceiver.add(matchInfo);
135135
continue;
136136
}
137137
CompletableFuture<? extends ILocalTopicRouter.ILocalRoutes> routesFuture =
@@ -143,15 +143,21 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
143143
}
144144
ILocalTopicRouter.ILocalRoutes localRoutes = routesFuture.join();
145145
if (!localRoutes.localReceiverId().equals(matchInfo.getReceiverId())) {
146-
noSub.add(matchInfo);
146+
noReceiver.add(matchInfo);
147147
continue;
148148
}
149+
if (localRoutes.routesInfo().isEmpty()) {
150+
noReceiver.add(matchInfo);
151+
continue;
152+
}
153+
boolean hasUsableSession = false;
149154
for (Map.Entry<String, Long> route : localRoutes.routesInfo().entrySet()) {
150155
String sessionId = route.getKey();
151156
long incarnation = route.getValue();
152157
// at least one session should publish the message
153158
IMQTTSession session = sessionRegistry.get(sessionId);
154159
if (session instanceof IMQTTTransientSession) {
160+
hasUsableSession = true;
155161
if (isFanOutThrottled && !matchedSessions.isEmpty()) {
156162
skip.add(matchInfo);
157163
} else {
@@ -161,6 +167,9 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
161167
}
162168
}
163169
}
170+
if (!hasUsableSession) {
171+
noReceiver.add(matchInfo);
172+
}
164173
}
165174
}
166175
long msgPackSize = SizeUtil.estSizeOf(topicMsgPack);
@@ -185,10 +194,13 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
185194
tenantMeter.recordSummary(MqttTransientFanOutBytes, totalFanOutBytes);
186195
// don't include duplicated matchInfo in the result
187196
// treat skip as ok
188-
Sets.difference(Sets.union(ok, skip), noSub).forEach(matchInfo -> resultsBuilder.addResult(
189-
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
197+
Sets.difference(Sets.union(ok, skip), Sets.union(noSub, noReceiver)).forEach(matchInfo ->
198+
resultsBuilder.addResult(
199+
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
190200
noSub.forEach(matchInfo -> resultsBuilder.addResult(
191201
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_SUB).build()));
202+
noReceiver.forEach(matchInfo -> resultsBuilder.addResult(
203+
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_RECEIVER).build()));
192204
replyBuilder.putResult(tenantId, resultsBuilder.build());
193205
}
194206
return CompletableFuture.completedFuture(replyBuilder.build());
@@ -205,8 +217,7 @@ public CheckReply.Code checkMatchInfo(String tenantId, MatchInfo matchInfo) {
205217
return transientSession.hasSubscribed(matchInfo.getMatcher().getMqttTopicFilter())
206218
? CheckReply.Code.OK : CheckReply.Code.NO_SUB;
207219
} else {
208-
// should not be here
209-
return CheckReply.Code.ERROR;
220+
return CheckReply.Code.NO_RECEIVER;
210221
}
211222
} else {
212223
Optional<CompletableFuture<? extends ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =

bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java

Lines changed: 110 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.testng.Assert.assertTrue;
3939

4040
import org.apache.bifromq.mqtt.MockableTest;
41+
import org.apache.bifromq.mqtt.session.IMQTTSession;
4142
import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
4243
import org.apache.bifromq.plugin.subbroker.CheckReply;
4344
import org.apache.bifromq.plugin.subbroker.DeliveryPack;
@@ -158,6 +159,7 @@ public void checkMatchInfoForNonSharedSub() {
158159
assertEquals(code, CheckReply.Code.NO_RECEIVER);
159160

160161
when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
162+
when(localSessionRegistry.get(channelId)).thenReturn(session);
161163
code = localDistService.checkMatchInfo(tenantId, MatchInfo.newBuilder()
162164
.setMatcher(TopicUtil.from(topicFilter))
163165
.setReceiverId(ILocalDistService.localize(channelId))
@@ -364,7 +366,7 @@ public void deliverToMismatchedReceiver() {
364366

365367
DeliveryResults results = reply.getResultMap().get(tenantId);
366368
DeliveryResult result = results.getResult(0);
367-
assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
369+
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
368370
}
369371

370372
@Test
@@ -398,7 +400,111 @@ public void deliverToNoLocalRoute() {
398400

399401
DeliveryResults results = reply.getResultMap().get(tenantId);
400402
DeliveryResult result = results.getResult(0);
401-
assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
403+
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
404+
}
405+
406+
@Test
407+
public void deliverToEmptyLocalRoutes() {
408+
String tenantId = "tenant1";
409+
String topic = "testTopic";
410+
String topicFilter = "testTopic/#";
411+
String channelId = "channel0";
412+
MatchInfo matchInfo = MatchInfo.newBuilder()
413+
.setMatcher(TopicUtil.from(topicFilter))
414+
.setReceiverId("receiverId")
415+
.build();
416+
TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build();
417+
DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
418+
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
419+
.build();
420+
DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
421+
422+
ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class);
423+
when(localRoutes.localReceiverId()).thenReturn("receiverId");
424+
when(localRoutes.routesInfo()).thenReturn(Map.of());
425+
when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
426+
Optional.of(CompletableFuture.completedFuture(localRoutes)));
427+
428+
LocalDistService localDistService =
429+
new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler);
430+
431+
CompletableFuture<DeliveryReply> future = localDistService.dist(request);
432+
DeliveryReply reply = future.join();
433+
434+
DeliveryResults results = reply.getResultMap().get(tenantId);
435+
DeliveryResult result = results.getResult(0);
436+
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
437+
}
438+
439+
@Test
440+
public void deliverToNoLocalSession() {
441+
String tenantId = "tenant1";
442+
String topic = "testTopic";
443+
String topicFilter = "testTopic/#";
444+
String channelId = "channel0";
445+
MatchInfo matchInfo = MatchInfo.newBuilder()
446+
.setMatcher(TopicUtil.from(topicFilter))
447+
.setReceiverId("receiverId")
448+
.build();
449+
TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build();
450+
DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
451+
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
452+
.build();
453+
DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
454+
455+
ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class);
456+
when(localRoutes.localReceiverId()).thenReturn("receiverId");
457+
when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
458+
when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
459+
Optional.of(CompletableFuture.completedFuture(localRoutes)));
460+
461+
when(localSessionRegistry.get(channelId)).thenReturn(null);
462+
463+
LocalDistService localDistService =
464+
new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler);
465+
466+
CompletableFuture<DeliveryReply> future = localDistService.dist(request);
467+
DeliveryReply reply = future.join();
468+
469+
DeliveryResults results = reply.getResultMap().get(tenantId);
470+
DeliveryResult result = results.getResult(0);
471+
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
472+
}
473+
474+
@Test
475+
public void deliverToNonTransientSession() {
476+
String tenantId = "tenant1";
477+
String topic = "testTopic";
478+
String topicFilter = "testTopic/#";
479+
String channelId = "channel0";
480+
MatchInfo matchInfo = MatchInfo.newBuilder()
481+
.setMatcher(TopicUtil.from(topicFilter))
482+
.setReceiverId("receiverId")
483+
.build();
484+
TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build();
485+
DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
486+
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
487+
.build();
488+
DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();
489+
490+
ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class);
491+
when(localRoutes.localReceiverId()).thenReturn("receiverId");
492+
when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
493+
when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
494+
Optional.of(CompletableFuture.completedFuture(localRoutes)));
495+
496+
IMQTTSession nonTransientSession = mock(IMQTTSession.class);
497+
when(localSessionRegistry.get(channelId)).thenReturn(nonTransientSession);
498+
499+
LocalDistService localDistService =
500+
new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler);
501+
502+
CompletableFuture<DeliveryReply> future = localDistService.dist(request);
503+
DeliveryReply reply = future.join();
504+
505+
DeliveryResults results = reply.getResultMap().get(tenantId);
506+
DeliveryResult result = results.getResult(0);
507+
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
402508
}
403509

404510
@Test
@@ -432,7 +538,7 @@ public void deliverToNoResolvedRoute() {
432538

433539
DeliveryResults results = reply.getResultMap().get(tenantId);
434540
DeliveryResult result = results.getResult(0);
435-
assertEquals(DeliveryResult.Code.OK, result.getCode());
541+
assertEquals(result.getCode(), DeliveryResult.Code.OK);
436542
}
437543

438544
@Test
@@ -467,7 +573,7 @@ public void deliverWhileRouteResolveException() {
467573

468574
DeliveryResults results = reply.getResultMap().get(tenantId);
469575
DeliveryResult result = results.getResult(0);
470-
assertEquals(DeliveryResult.Code.OK, result.getCode());
576+
assertEquals(result.getCode(), DeliveryResult.Code.OK);
471577
}
472578

473579
@Test

0 commit comments

Comments
 (0)