Skip to content

Commit 6c39b9a

Browse files
authored
Merge pull request #10097 from Roasbeef/gossip-block-fix
multi: make gossip filter sends non-blocking, only allow a single backlog catch up goroutine per peer
2 parents 149a819 + 8dcb7a8 commit 6c39b9a

File tree

12 files changed

+1097
-12
lines changed

12 files changed

+1097
-12
lines changed

config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,7 @@ func DefaultConfig() Config {
719719
AnnouncementConf: discovery.DefaultProofMatureDelta,
720720
MsgRateBytes: discovery.DefaultMsgBytesPerSecond,
721721
MsgBurstBytes: discovery.DefaultMsgBytesBurst,
722+
FilterConcurrency: discovery.DefaultFilterConcurrency,
722723
},
723724
Invoices: &lncfg.Invoices{
724725
HoldExpiryDelta: lncfg.DefaultHoldInvoiceExpiryDelta,

discovery/gossiper.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ type Config struct {
399399
// MsgBurstBytes is the allotted burst amount in bytes. This is the
400400
// number of starting tokens in our token bucket algorithm.
401401
MsgBurstBytes uint64
402+
403+
// FilterConcurrency is the maximum number of concurrent gossip filter
404+
// applications that can be processed.
405+
FilterConcurrency int
402406
}
403407

404408
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
@@ -600,6 +604,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
600604
IsStillZombieChannel: cfg.IsStillZombieChannel,
601605
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
602606
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
607+
FilterConcurrency: cfg.FilterConcurrency,
603608
})
604609

605610
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
@@ -907,13 +912,16 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
907912
return errChan
908913
}
909914

910-
// If we've found the message target, then we'll dispatch the
911-
// message directly to it.
912-
if err := syncer.ApplyGossipFilter(ctx, m); err != nil {
913-
log.Warnf("Unable to apply gossip filter for peer=%x: "+
914-
"%v", peer.PubKey(), err)
915+
// Queue the message for asynchronous processing to prevent
916+
// blocking the gossiper when rate limiting is active.
917+
if !syncer.QueueTimestampRange(m) {
918+
log.Warnf("Unable to queue gossip filter for peer=%x: "+
919+
"queue full", peer.PubKey())
915920

916-
errChan <- err
921+
// Return nil to indicate we've handled the message,
922+
// even though it was dropped. This prevents the peer
923+
// from being disconnected.
924+
errChan <- nil
917925
return errChan
918926
}
919927

discovery/sync_manager.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ const (
2525
// network as possible.
2626
DefaultHistoricalSyncInterval = time.Hour
2727

28-
// filterSemaSize is the capacity of gossipFilterSema.
29-
filterSemaSize = 5
28+
// DefaultFilterConcurrency is the default maximum number of concurrent
29+
// gossip filter applications that can be processed.
30+
DefaultFilterConcurrency = 5
3031

3132
// DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
3233
// This is the most that can be sent in a given go. Requests beyond
@@ -136,6 +137,10 @@ type SyncManagerCfg struct {
136137
// AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
137138
// we've exceeded the hard upper limit.
138139
AllotedMsgBytesBurst uint64
140+
141+
// FilterConcurrency is the maximum number of concurrent gossip filter
142+
// applications that can be processed. If not set, defaults to 5.
143+
FilterConcurrency int
139144
}
140145

141146
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
@@ -207,8 +212,13 @@ type SyncManager struct {
207212
// newSyncManager constructs a new SyncManager backed by the given config.
208213
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
209214

210-
filterSema := make(chan struct{}, filterSemaSize)
211-
for i := 0; i < filterSemaSize; i++ {
215+
filterConcurrency := cfg.FilterConcurrency
216+
if filterConcurrency == 0 {
217+
filterConcurrency = DefaultFilterConcurrency
218+
}
219+
220+
filterSema := make(chan struct{}, filterConcurrency)
221+
for i := 0; i < filterConcurrency; i++ {
212222
filterSema <- struct{}{}
213223
}
214224

discovery/syncer.go

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ const (
5454
PinnedSync
5555
)
5656

57+
const (
58+
// defaultTimestampQueueSize is the size of the timestamp range queue
59+
// used.
60+
defaultTimestampQueueSize = 1
61+
)
62+
5763
// String returns a human readable string describing the target SyncerType.
5864
func (t SyncerType) String() string {
5965
switch t {
@@ -285,6 +291,10 @@ type gossipSyncerCfg struct {
285291
// updates for a channel and returns true if the channel should be
286292
// considered a zombie based on these timestamps.
287293
isStillZombieChannel func(time.Time, time.Time) bool
294+
295+
// timestampQueueSize is the size of the timestamp range queue. If not
296+
// set, defaults to the global timestampQueueSize constant.
297+
timestampQueueSize int
288298
}
289299

290300
// GossipSyncer is a struct that handles synchronizing the channel graph state
@@ -381,6 +391,16 @@ type GossipSyncer struct {
381391
// respond to gossip timestamp range messages.
382392
syncerSema chan struct{}
383393

394+
// timestampRangeQueue is a buffered channel for queuing timestamp range
395+
// messages that need to be processed asynchronously. This prevents the
396+
// gossiper from blocking when ApplyGossipFilter is called.
397+
timestampRangeQueue chan *lnwire.GossipTimestampRange
398+
399+
// isSendingBacklog is an atomic flag that indicates whether a goroutine
400+
// is currently sending the backlog of messages. This ensures only one
401+
// goroutine is active at a time.
402+
isSendingBacklog atomic.Bool
403+
384404
sync.Mutex
385405

386406
// cg is a helper that encapsulates a wait group and quit channel and
@@ -392,14 +412,23 @@ type GossipSyncer struct {
392412
// newGossipSyncer returns a new instance of the GossipSyncer populated using
393413
// the passed config.
394414
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
415+
// Use the configured queue size if set, otherwise use the default.
416+
queueSize := cfg.timestampQueueSize
417+
if queueSize == 0 {
418+
queueSize = defaultTimestampQueueSize
419+
}
420+
395421
return &GossipSyncer{
396422
cfg: cfg,
397423
syncTransitionReqs: make(chan *syncTransitionReq),
398424
historicalSyncReqs: make(chan *historicalSyncReq),
399425
gossipMsgs: make(chan lnwire.Message, syncerBufferSize),
400426
queryMsgs: make(chan lnwire.Message, syncerBufferSize),
401-
syncerSema: sema,
402-
cg: fn.NewContextGuard(),
427+
timestampRangeQueue: make(
428+
chan *lnwire.GossipTimestampRange, queueSize,
429+
),
430+
syncerSema: sema,
431+
cg: fn.NewContextGuard(),
403432
}
404433
}
405434

@@ -422,6 +451,13 @@ func (g *GossipSyncer) Start() {
422451
g.cg.WgAdd(1)
423452
go g.replyHandler(ctx)
424453
}
454+
455+
// Start the timestamp range queue processor to handle gossip
456+
// filter applications asynchronously.
457+
if !g.cfg.noTimestampQueryOption {
458+
g.cg.WgAdd(1)
459+
go g.processTimestampRangeQueue(ctx)
460+
}
425461
})
426462
}
427463

@@ -672,6 +708,63 @@ func (g *GossipSyncer) replyHandler(ctx context.Context) {
672708
}
673709
}
674710

711+
// processTimestampRangeQueue handles timestamp range messages from the queue
712+
// asynchronously. This prevents blocking the gossiper when rate limiting is
713+
// active and multiple peers are trying to apply gossip filters.
714+
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
715+
defer g.cg.WgDone()
716+
717+
for {
718+
select {
719+
case msg := <-g.timestampRangeQueue:
720+
// Process the timestamp range message. If we hit an
721+
// error, log it but continue processing to avoid
722+
// blocking the queue.
723+
err := g.ApplyGossipFilter(ctx, msg)
724+
switch {
725+
case errors.Is(err, ErrGossipSyncerExiting):
726+
return
727+
728+
case errors.Is(err, lnpeer.ErrPeerExiting):
729+
return
730+
731+
case err != nil:
732+
log.Errorf("Unable to apply gossip filter: %v",
733+
err)
734+
}
735+
736+
case <-g.cg.Done():
737+
return
738+
739+
case <-ctx.Done():
740+
return
741+
}
742+
}
743+
}
744+
745+
// QueueTimestampRange attempts to queue a timestamp range message for
746+
// asynchronous processing. If the queue is full, it returns false to indicate
747+
// the message was dropped.
748+
func (g *GossipSyncer) QueueTimestampRange(
749+
msg *lnwire.GossipTimestampRange) bool {
750+
751+
// If timestamp queries are disabled, don't queue the message.
752+
if g.cfg.noTimestampQueryOption {
753+
return false
754+
}
755+
756+
select {
757+
case g.timestampRangeQueue <- msg:
758+
return true
759+
760+
// Queue is full, drop the message to prevent blocking.
761+
default:
762+
log.Warnf("Timestamp range queue full for peer %x, "+
763+
"dropping message", g.cfg.peerPub[:])
764+
return false
765+
}
766+
}
767+
675768
// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
676769
// syncer and sends it to the remote peer.
677770
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
@@ -1308,6 +1401,14 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
13081401
return nil
13091402
}
13101403

1404+
// Check if a goroutine is already sending the backlog. If so, return
1405+
// early without attempting to acquire the semaphore.
1406+
if g.isSendingBacklog.Load() {
1407+
log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
1408+
"backlog send already in progress", g.cfg.peerPub[:])
1409+
return nil
1410+
}
1411+
13111412
select {
13121413
case <-g.syncerSema:
13131414
case <-g.cg.Done():
@@ -1342,11 +1443,23 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
13421443
return nil
13431444
}
13441445

1446+
// Set the atomic flag to indicate we're starting to send the backlog.
1447+
// If the swap fails, it means another goroutine is already active, so
1448+
// we return early.
1449+
if !g.isSendingBacklog.CompareAndSwap(false, true) {
1450+
returnSema()
1451+
log.Debugf("GossipSyncer(%x): another goroutine already "+
1452+
"sending backlog, skipping", g.cfg.peerPub[:])
1453+
1454+
return nil
1455+
}
1456+
13451457
// We'll conclude by launching a goroutine to send out any updates.
13461458
g.cg.WgAdd(1)
13471459
go func() {
13481460
defer g.cg.WgDone()
13491461
defer returnSema()
1462+
defer g.isSendingBacklog.Store(false)
13501463

13511464
for _, msg := range newUpdatestoSend {
13521465
err := g.cfg.sendToPeerSync(ctx, msg)

0 commit comments

Comments
 (0)