Skip to content

Commit

Permalink
feat(job-distributor): add exp. backoff retry to `feeds.SyncNodeInfo(…
Browse files Browse the repository at this point in the history
…)` (#15752)

* feat(job-distributor): add exp. backoff retry to feeds.SyncNodeInfo()

There’s a behavior that we’ve observed for some time on the NOP side
where they will add/update a chain configuration of the Job
Distributor panel but the change is not reflected on the service itself.
This leads to inefficiencies as NOPs are unaware of this and thus need
to be notified so that they may "reapply" the configuration.

After some investigation, we suspect that this is due to connectivity
issues between the nodes and the job distributor instance, which causes
the message with the update to be lost.

This PR attempts to solve this by adding a "retry" wrapper on top of the
existing `SyncNodeInfo` method. We rely on `avast/retry-go` to implement
the bulk of the retry logic. It's configured with a minimal delay of
10 seconds, maximum delay of 30 minutes and retry a total of 56 times --
which adds up to a bit more than 24 hours.

Ticket Number: DPA-1371

* review: protect cancel func access with a mutex to avoid race conditions

* review: trigger retry on partial failures and support multiple job distributors

* review: clear contexts before closing the connection manager
  • Loading branch information
gustavogama-cll authored Feb 26, 2025
1 parent 3efa849 commit 39d0909
Show file tree
Hide file tree
Showing 17 changed files with 366 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/neat-penguins-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added add exponential backoff retry to feeds.SyncNodeInfo()
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ require (
github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250207205350-420ccacab78a // indirect
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20250211162441-3d6cea220efb // indirect
github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 // indirect
github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 // indirect
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 // indirect
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20250213203720-e15b1333a14a // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1107,8 +1107,8 @@ github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d
github.com/smartcontractkit/chainlink-integrations/evm v0.0.0-20250213145514-41d874782c02/go.mod h1:7DbPnG0E39eZaX1CXKxRiJ1NOWHwTZYDWR9ys3kZZuU=
github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0 h1:hfMRj2ny6oNHd8w1rhJHdoX3YkoWJtCkBK6wTlCE4+c=
github.com/smartcontractkit/chainlink-protos/job-distributor v0.9.0/go.mod h1:/dVVLXrsp+V0AbcYGJo3XMzKg3CkELsweA/TTopCsKE=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0 h1:ZBat8EBvE2LpSQR9U1gEbRV6PfAkiFdINmQ8nVnXIAQ=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.4.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0 h1:xRgu/kMkxcY4LeDKMBhaXU4khgya7v2wyb4Sa5Nzb+Y=
github.com/smartcontractkit/chainlink-protos/orchestrator v0.5.0/go.mod h1:m/A3lqD7ms/RsQ9BT5P2uceYY0QX5mIt4KQxT2G6qEo=
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 h1:L6KJ4kGv/yNNoCk8affk7Y1vAY0qglPMXC/hevV/IsA=
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6/go.mod h1:FRwzI3hGj4CJclNS733gfcffmqQ62ONCkbGi49s658w=
github.com/smartcontractkit/chainlink-protos/svr v0.0.0-20250123084029-58cce9b32112 h1:c77Gi/APraqwbBO8fbd/5JY2wW+MSIpYg8Uma9MEZFE=
Expand Down
166 changes: 144 additions & 22 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"strings"
"sync"
"time"

"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/lib/pq"
Expand Down Expand Up @@ -77,6 +81,10 @@ var (
// Job Proposal status
"status",
})

defaultSyncMinDelay = 10 * time.Second
defaultSyncMaxDelay = 30 * time.Minute
defaultSyncMaxAttempts = uint(48 + 8) // 30m * 48 =~ 24h; plus the initial 8 shorter retries
)

// Service represents a behavior of the feeds service
Expand Down Expand Up @@ -142,6 +150,10 @@ type service struct {
lggr logger.Logger
version string
loopRegistrarConfig plugins.RegistrarConfig
syncNodeInfoCancel atomicCancelFns
syncMinDelay time.Duration
syncMaxDelay time.Duration
syncMaxAttempts uint
}

// NewService constructs a new feeds service
Expand All @@ -161,6 +173,7 @@ func NewService(
lggr logger.Logger,
version string,
rc plugins.RegistrarConfig,
opts ...ServiceOption,
) *service {
lggr = lggr.Named("Feeds")
svc := &service{
Expand All @@ -184,6 +197,14 @@ func NewService(
lggr: lggr,
version: version,
loopRegistrarConfig: rc,
syncNodeInfoCancel: atomicCancelFns{fns: map[int64]context.CancelFunc{}},
syncMinDelay: defaultSyncMinDelay,
syncMaxDelay: defaultSyncMaxDelay,
syncMaxAttempts: defaultSyncMaxAttempts,
}

for _, opt := range opts {
opt(svc)
}

return svc
Expand Down Expand Up @@ -255,8 +276,43 @@ func (s *service) RegisterManager(ctx context.Context, params RegisterManagerPar
return id, nil
}

// SyncNodeInfo syncs the node's information with FMS
// syncNodeInfoWithRetry syncs the node's information with FMS. In case of failures,
// it retries with an exponential backoff for up to 24h.
func (s *service) syncNodeInfoWithRetry(id int64) {
ctx, cancel := context.WithCancel(context.Background())

// cancel the previous context -- and, by extension, the existing goroutine --
// so that we can start anew
s.syncNodeInfoCancel.callAndSwap(id, cancel)

retryOpts := []retry.Option{
retry.Context(ctx),
retry.DelayType(retry.BackOffDelay),
retry.Delay(s.syncMinDelay),
retry.MaxDelay(s.syncMaxDelay),
retry.Attempts(s.syncMaxAttempts),
retry.LastErrorOnly(true),
retry.OnRetry(func(attempt uint, err error) {
s.lggr.Infow("failed to sync node info", "attempt", attempt, "err", err.Error())
}),
}

go func() {
err := retry.Do(func() error { return s.SyncNodeInfo(ctx, id) }, retryOpts...)
if err != nil {
s.lggr.Errorw("failed to sync node info; aborting", "err", err)
} else {
s.lggr.Info("successfully synced node info")
}

s.syncNodeInfoCancel.callAndSwap(id, nil)
}()
}

func (s *service) SyncNodeInfo(ctx context.Context, id int64) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

// Get the FMS RPC client
fmsClient, err := s.connMgr.GetClient(id)
if err != nil {
Expand All @@ -281,12 +337,22 @@ func (s *service) SyncNodeInfo(ctx context.Context, id int64) error {
}

workflowKey := s.getWorkflowPublicKey()
if _, err = fmsClient.UpdateNode(ctx, &pb.UpdateNodeRequest{

resp, err := fmsClient.UpdateNode(ctx, &pb.UpdateNodeRequest{
Version: s.version,
ChainConfigs: cfgMsgs,
WorkflowKey: &workflowKey,
}); err != nil {
return err
})
if err != nil {
return errors.Wrap(err, "SyncNodeInfo.UpdateNode call failed")
}
if len(resp.ChainConfigErrors) > 0 {
errMsgs := make([]string, 0, len(resp.ChainConfigErrors))
for _, ccErr := range resp.ChainConfigErrors {
errMsgs = append(errMsgs, ccErr.Message)
}

return errors.Errorf("SyncNodeInfo.UpdateNode call partially failed: %s", strings.Join(errMsgs, "; "))
}

return nil
Expand Down Expand Up @@ -402,9 +468,7 @@ func (s *service) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "CreateChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand All @@ -426,9 +490,7 @@ func (s *service) DeleteChainConfig(ctx context.Context, id int64) (int64, error
return 0, errors.Wrap(err, "DeleteChainConfig: failed to fetch manager")
}

if err := s.SyncNodeInfo(ctx, mgr.ID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)

return id, nil
}
Expand Down Expand Up @@ -467,9 +529,7 @@ func (s *service) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64
return 0, errors.Wrap(err, "UpdateChainConfig failed: could not get chain config")
}

if err := s.SyncNodeInfo(ctx, ccfg.FeedsManagerID); err != nil {
s.lggr.Infof("FMS: Unable to sync node info: %v", err)
}
s.syncNodeInfoWithRetry(ccfg.FeedsManagerID)

return id, nil
}
Expand Down Expand Up @@ -1031,9 +1091,7 @@ func (s *service) CancelSpec(ctx context.Context, id int64) error {
)

err = s.transact(ctx, func(tx datasources) error {
var (
txerr error
)
var txerr error

if txerr = tx.orm.CancelSpec(ctx, id); txerr != nil {
return txerr
Expand Down Expand Up @@ -1153,6 +1211,8 @@ func (s *service) Start(ctx context.Context) error {
// Close shuts down the service
func (s *service) Close() error {
return s.StopOnce("FeedsService", func() error {
s.syncNodeInfoCancel.callAllAndClear()

// This blocks until it finishes
s.connMgr.Close()

Expand All @@ -1173,10 +1233,7 @@ func (s *service) connectFeedManager(ctx context.Context, mgr FeedsManager, priv
},
OnConnect: func(pb.FeedsManagerClient) {
// Sync the node's information with FMS once connected
err := s.SyncNodeInfo(ctx, mgr.ID)
if err != nil {
s.lggr.Infof("Error syncing node info: %v", err)
}
s.syncNodeInfoWithRetry(mgr.ID)
},
})
}
Expand Down Expand Up @@ -1220,8 +1277,10 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
metrics := counts.toMetrics()

// Set the prometheus gauge metrics.
for _, status := range []JobProposalStatus{JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked} {
for _, status := range []JobProposalStatus{
JobProposalStatusPending, JobProposalStatusApproved,
JobProposalStatusCancelled, JobProposalStatusRejected, JobProposalStatusDeleted, JobProposalStatusRevoked,
} {
status := status

promJobProposalCounts.With(prometheus.Labels{"status": string(status)}).Set(metrics[status])
Expand Down Expand Up @@ -1565,6 +1624,49 @@ func (s *service) isRevokable(propStatus JobProposalStatus, specStatus SpecStatu
return propStatus != JobProposalStatusDeleted && (specStatus == SpecStatusPending || specStatus == SpecStatusCancelled)
}

type atomicCancelFns struct {
fns map[int64]context.CancelFunc
mutex sync.Mutex
}

func (f *atomicCancelFns) callAndSwap(id int64, other func()) {
f.mutex.Lock()
defer f.mutex.Unlock()

fn, found := f.fns[id]
if found && fn != nil {
fn()
}

f.fns[id] = other
}

func (f *atomicCancelFns) callAllAndClear() {
f.mutex.Lock()
defer f.mutex.Unlock()

for _, fn := range f.fns {
if fn != nil {
fn()
}
}
clear(f.fns)
}

type ServiceOption func(*service)

func WithSyncMinDelay(delay time.Duration) ServiceOption {
return func(s *service) { s.syncMinDelay = delay }
}

func WithSyncMaxDelay(delay time.Duration) ServiceOption {
return func(s *service) { s.syncMaxDelay = delay }
}

func WithSyncMaxAttempts(attempts uint) ServiceOption {
return func(s *service) { s.syncMaxAttempts = attempts }
}

var _ Service = &NullService{}

// NullService defines an implementation of the Feeds Service that is used
Expand All @@ -1577,75 +1679,95 @@ func (ns NullService) Close() error { return nil }
func (ns NullService) ApproveSpec(ctx context.Context, id int64, force bool) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) CountJobProposalsByStatus(ctx context.Context) (*JobProposalCounts, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) CancelSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListSpecsByJobProposalIDs(ctx context.Context, ids []int64) ([]JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ListManagersByIDs(ctx context.Context, ids []int64) ([]FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) GetSpec(ctx context.Context, id int64) (*JobProposalSpec, error) {
return nil, ErrFeedsManagerDisabled
}
func (ns NullService) ListManagers(ctx context.Context) ([]FeedsManager, error) { return nil, nil }
func (ns NullService) CreateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) GetChainConfig(ctx context.Context, id int64) (*ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteChainConfig(ctx context.Context, id int64) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) ListChainConfigsByManagerIDs(ctx context.Context, mgrIDs []int64) ([]ChainConfig, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) UpdateChainConfig(ctx context.Context, cfg ChainConfig) (int64, error) {
return 0, ErrFeedsManagerDisabled
}
func (ns NullService) ListJobProposals(ctx context.Context) ([]JobProposal, error) { return nil, nil }
func (ns NullService) ListJobProposalsByManagersIDs(ctx context.Context, ids []int64) ([]JobProposal, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RevokeJob(ctx context.Context, args *RevokeJobArgs) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RegisterManager(ctx context.Context, params RegisterManagerParams) (int64, error) {
return 0, ErrFeedsManagerDisabled
}

func (ns NullService) RejectSpec(ctx context.Context, id int64) error {
return ErrFeedsManagerDisabled
}
func (ns NullService) SyncNodeInfo(ctx context.Context, id int64) error { return nil }
func (ns NullService) UpdateManager(ctx context.Context, mgr FeedsManager) error {
return ErrFeedsManagerDisabled
}

func (ns NullService) EnableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) DisableManager(ctx context.Context, id int64) (*FeedsManager, error) {
return nil, ErrFeedsManagerDisabled
}

func (ns NullService) IsJobManaged(ctx context.Context, jobID int64) (bool, error) {
return false, nil
}

func (ns NullService) UpdateSpecDefinition(ctx context.Context, id int64, spec string) error {
return ErrFeedsManagerDisabled
}
Expand Down
Loading

0 comments on commit 39d0909

Please sign in to comment.