Skip to content

Commit 0c46412

Browse files
committed
discovery: integrate async queue in ProcessRemoteAnnouncement
In this commit, we complete the integration of the asynchronous timestamp range queue by modifying ProcessRemoteAnnouncement to use the new queuing mechanism instead of calling ApplyGossipFilter synchronously. This change ensures that when a peer sends a GossipTimestampRange message, it is queued for asynchronous processing rather than blocking the gossiper's main message processing loop. The modification prevents the peer's readHandler from blocking on potentially slow gossip filter operations, maintaining connection stability during periods of high synchronization activity. If the queue is full when attempting to enqueue a message, we log a warning but return success to prevent peer disconnection. This design choice prioritizes connection stability over guaranteed delivery of every gossip filter request, which is acceptable since peers can always resend timestamp range messages if needed.
1 parent 793db99 commit 0c46412

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

discovery/gossiper.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ type Config struct {
399399
// MsgBurstBytes is the allotted burst amount in bytes. This is the
400400
// number of starting tokens in our token bucket algorithm.
401401
MsgBurstBytes uint64
402+
403+
// FilterConcurrency is the maximum number of concurrent gossip filter
404+
// applications that can be processed.
405+
FilterConcurrency int
402406
}
403407

404408
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
@@ -600,6 +604,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
600604
IsStillZombieChannel: cfg.IsStillZombieChannel,
601605
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
602606
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
607+
FilterConcurrency: cfg.FilterConcurrency,
603608
})
604609

605610
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
@@ -907,13 +912,16 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
907912
return errChan
908913
}
909914

910-
// If we've found the message target, then we'll dispatch the
911-
// message directly to it.
912-
if err := syncer.ApplyGossipFilter(ctx, m); err != nil {
913-
log.Warnf("Unable to apply gossip filter for peer=%x: "+
914-
"%v", peer.PubKey(), err)
915+
// Queue the message for asynchronous processing to prevent
916+
// blocking the gossiper when rate limiting is active.
917+
if !syncer.QueueTimestampRange(m) {
918+
log.Warnf("Unable to queue gossip filter for peer=%x: "+
919+
"queue full", peer.PubKey())
915920

916-
errChan <- err
921+
// Return nil to indicate we've handled the message,
922+
// even though it was dropped. This prevents the peer
923+
// from being disconnected.
924+
errChan <- nil
917925
return errChan
918926
}
919927

0 commit comments

Comments
 (0)