Skip to content
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func DefaultConfig() Config {
AnnouncementConf: discovery.DefaultProofMatureDelta,
MsgRateBytes: discovery.DefaultMsgBytesPerSecond,
MsgBurstBytes: discovery.DefaultMsgBytesBurst,
FilterConcurrency: discovery.DefaultFilterConcurrency,
},
Invoices: &lncfg.Invoices{
HoldExpiryDelta: lncfg.DefaultHoldInvoiceExpiryDelta,
Expand Down
20 changes: 14 additions & 6 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ type Config struct {
// MsgBurstBytes is the allotted burst amount in bytes. This is the
// number of starting tokens in our token bucket algorithm.
MsgBurstBytes uint64

// FilterConcurrency is the maximum number of concurrent gossip filter
// applications that can be processed.
FilterConcurrency int
}

// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
Expand Down Expand Up @@ -600,6 +604,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
IsStillZombieChannel: cfg.IsStillZombieChannel,
AllotedMsgBytesPerSecond: cfg.MsgRateBytes,
AllotedMsgBytesBurst: cfg.MsgBurstBytes,
FilterConcurrency: cfg.FilterConcurrency,
})

gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
Expand Down Expand Up @@ -907,13 +912,16 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(ctx context.Context,
return errChan
}

// If we've found the message target, then we'll dispatch the
// message directly to it.
if err := syncer.ApplyGossipFilter(ctx, m); err != nil {
log.Warnf("Unable to apply gossip filter for peer=%x: "+
"%v", peer.PubKey(), err)
// Queue the message for asynchronous processing to prevent
// blocking the gossiper when rate limiting is active.
if !syncer.QueueTimestampRange(m) {
log.Warnf("Unable to queue gossip filter for peer=%x: "+
"queue full", peer.PubKey())

errChan <- err
// Return nil to indicate we've handled the message,
// even though it was dropped. This prevents the peer
// from being disconnected.
errChan <- nil
return errChan
}

Expand Down
18 changes: 14 additions & 4 deletions discovery/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ const (
// network as possible.
DefaultHistoricalSyncInterval = time.Hour

// filterSemaSize is the capacity of gossipFilterSema.
filterSemaSize = 5
// DefaultFilterConcurrency is the default maximum number of concurrent
// gossip filter applications that can be processed.
DefaultFilterConcurrency = 5

// DefaultMsgBytesBurst is the allotted burst in bytes we'll permit.
// This is the most that can be sent in a given go. Requests beyond
Expand Down Expand Up @@ -136,6 +137,10 @@ type SyncManagerCfg struct {
// AllotedMsgBytesBurst is the amount of burst bytes we'll permit, if
// we've exceeded the hard upper limit.
AllotedMsgBytesBurst uint64

// FilterConcurrency is the maximum number of concurrent gossip filter
// applications that can be processed. If not set, defaults to 5.
FilterConcurrency int
}

// SyncManager is a subsystem of the gossiper that manages the gossip syncers
Expand Down Expand Up @@ -207,8 +212,13 @@ type SyncManager struct {
// newSyncManager constructs a new SyncManager backed by the given config.
func newSyncManager(cfg *SyncManagerCfg) *SyncManager {

filterSema := make(chan struct{}, filterSemaSize)
for i := 0; i < filterSemaSize; i++ {
filterConcurrency := cfg.FilterConcurrency
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline above

if filterConcurrency == 0 {
filterConcurrency = DefaultFilterConcurrency
}

filterSema := make(chan struct{}, filterConcurrency)
for i := 0; i < filterConcurrency; i++ {
filterSema <- struct{}{}
}

Expand Down
117 changes: 115 additions & 2 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ const (
PinnedSync
)

const (
// defaultTimestampQueueSize is the size of the timestamp range queue
// used.
defaultTimestampQueueSize = 1
)

// String returns a human readable string describing the target SyncerType.
func (t SyncerType) String() string {
switch t {
Expand Down Expand Up @@ -285,6 +291,10 @@ type gossipSyncerCfg struct {
// updates for a channel and returns true if the channel should be
// considered a zombie based on these timestamps.
isStillZombieChannel func(time.Time, time.Time) bool

// timestampQueueSize is the size of the timestamp range queue. If not
// set, defaults to the global timestampQueueSize constant.
timestampQueueSize int
}

// GossipSyncer is a struct that handles synchronizing the channel graph state
Expand Down Expand Up @@ -381,6 +391,16 @@ type GossipSyncer struct {
// respond to gossip timestamp range messages.
syncerSema chan struct{}

// timestampRangeQueue is a buffered channel for queuing timestamp range
// messages that need to be processed asynchronously. This prevents the
// gossiper from blocking when ApplyGossipFilter is called.
timestampRangeQueue chan *lnwire.GossipTimestampRange

// isSendingBacklog is an atomic flag that indicates whether a goroutine
// is currently sending the backlog of messages. This ensures only one
// goroutine is active at a time.
isSendingBacklog atomic.Bool

sync.Mutex

// cg is a helper that encapsulates a wait group and quit channel and
Expand All @@ -392,14 +412,23 @@ type GossipSyncer struct {
// newGossipSyncer returns a new instance of the GossipSyncer populated using
// the passed config.
func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
// Use the configured queue size if set, otherwise use the default.
queueSize := cfg.timestampQueueSize
if queueSize == 0 {
queueSize = defaultTimestampQueueSize
}

return &GossipSyncer{
cfg: cfg,
syncTransitionReqs: make(chan *syncTransitionReq),
historicalSyncReqs: make(chan *historicalSyncReq),
gossipMsgs: make(chan lnwire.Message, syncerBufferSize),
queryMsgs: make(chan lnwire.Message, syncerBufferSize),
syncerSema: sema,
cg: fn.NewContextGuard(),
timestampRangeQueue: make(
chan *lnwire.GossipTimestampRange, queueSize,
),
syncerSema: sema,
cg: fn.NewContextGuard(),
}
}

Expand All @@ -422,6 +451,13 @@ func (g *GossipSyncer) Start() {
g.cg.WgAdd(1)
go g.replyHandler(ctx)
}

// Start the timestamp range queue processor to handle gossip
// filter applications asynchronously.
if !g.cfg.noTimestampQueryOption {
g.cg.WgAdd(1)
go g.processTimestampRangeQueue(ctx)
}
})
}

Expand Down Expand Up @@ -672,6 +708,63 @@ func (g *GossipSyncer) replyHandler(ctx context.Context) {
}
}

// processTimestampRangeQueue handles timestamp range messages from the queue
// asynchronously. This prevents blocking the gossiper when rate limiting is
// active and multiple peers are trying to apply gossip filters.
func (g *GossipSyncer) processTimestampRangeQueue(ctx context.Context) {
defer g.cg.WgDone()

for {
select {
case msg := <-g.timestampRangeQueue:
// Process the timestamp range message. If we hit an
// error, log it but continue processing to avoid
// blocking the queue.
err := g.ApplyGossipFilter(ctx, msg)
switch {
case errors.Is(err, ErrGossipSyncerExiting):
return

case errors.Is(err, lnpeer.ErrPeerExiting):
return

case err != nil:
log.Errorf("Unable to apply gossip filter: %v",
err)
}

case <-g.cg.Done():
return

case <-ctx.Done():
return
}
}
}

// QueueTimestampRange attempts to queue a timestamp range message for
// asynchronous processing. If the queue is full, it returns false to indicate
// the message was dropped.
func (g *GossipSyncer) QueueTimestampRange(
msg *lnwire.GossipTimestampRange) bool {

// If timestamp queries are disabled, don't queue the message.
if g.cfg.noTimestampQueryOption {
return false
}

select {
case g.timestampRangeQueue <- msg:
return true

// Queue is full, drop the message to prevent blocking.
default:
log.Warnf("Timestamp range queue full for peer %x, "+
"dropping message", g.cfg.peerPub[:])
return false
}
}

// sendGossipTimestampRange constructs and sets a GossipTimestampRange for the
// syncer and sends it to the remote peer.
func (g *GossipSyncer) sendGossipTimestampRange(ctx context.Context,
Expand Down Expand Up @@ -1308,6 +1401,14 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
return nil
}

// Check if a goroutine is already sending the backlog. If so, return
// early without attempting to acquire the semaphore.
if g.isSendingBacklog.Load() {
log.Debugf("GossipSyncer(%x): skipping ApplyGossipFilter, "+
"backlog send already in progress", g.cfg.peerPub[:])
return nil
}

select {
case <-g.syncerSema:
case <-g.cg.Done():
Expand Down Expand Up @@ -1342,11 +1443,23 @@ func (g *GossipSyncer) ApplyGossipFilter(ctx context.Context,
return nil
}

// Set the atomic flag to indicate we're starting to send the backlog.
// If the swap fails, it means another goroutine is already active, so
// we return early.
if !g.isSendingBacklog.CompareAndSwap(false, true) {
returnSema()
log.Debugf("GossipSyncer(%x): another goroutine already "+
"sending backlog, skipping", g.cfg.peerPub[:])

return nil
}

// We'll conclude by launching a goroutine to send out any updates.
g.cg.WgAdd(1)
go func() {
defer g.cg.WgDone()
defer returnSema()
defer g.isSendingBacklog.Store(false)

for _, msg := range newUpdatestoSend {
err := g.cfg.sendToPeerSync(ctx, msg)
Expand Down
Loading
Loading