diff --git a/pkg/app/flags.go b/pkg/app/flags.go index c1825b5..a38162d 100644 --- a/pkg/app/flags.go +++ b/pkg/app/flags.go @@ -77,6 +77,10 @@ var Flags = []cli.Flag{ Name: "webhook-url", Usage: "endpoint where to send upgrade webhooks (experimental)", }, + &cli.StringSliceFlag{ + Name: "webhook-custom-block", + Usage: "trigger a custom webhook at a given block number (experimental)", + }, &cli.StringFlag{ Name: "x-gov", Usage: "version of the gov module to use (v1|v1beta1)", diff --git a/pkg/app/run.go b/pkg/app/run.go index 52fe8de..d0fd715 100644 --- a/pkg/app/run.go +++ b/pkg/app/run.go @@ -6,6 +6,7 @@ import ( "net/url" "os" "os/signal" + "strconv" "syscall" "github.com/cometbft/cometbft/rpc/client/http" @@ -31,23 +32,24 @@ func RunFunc(cCtx *cli.Context) error { ctx = cCtx.Context // Config flags - chainID = cCtx.String("chain-id") - httpAddr = cCtx.String("http-addr") - logLevel = cCtx.String("log-level") - namespace = cCtx.String("namespace") - noColor = cCtx.Bool("no-color") - nodes = cCtx.StringSlice("node") - noGov = cCtx.Bool("no-gov") - noStaking = cCtx.Bool("no-staking") - noUpgrade = cCtx.Bool("no-upgrade") - noCommission = cCtx.Bool("no-commission") - denom = cCtx.String("denom") - denomExpon = cCtx.Uint("denom-exponent") - startTimeout = cCtx.Duration("start-timeout") - stopTimeout = cCtx.Duration("stop-timeout") - validators = cCtx.StringSlice("validator") - webhookURL = cCtx.String("webhook-url") - xGov = cCtx.String("x-gov") + chainID = cCtx.String("chain-id") + httpAddr = cCtx.String("http-addr") + logLevel = cCtx.String("log-level") + namespace = cCtx.String("namespace") + noColor = cCtx.Bool("no-color") + nodes = cCtx.StringSlice("node") + noGov = cCtx.Bool("no-gov") + noStaking = cCtx.Bool("no-staking") + noUpgrade = cCtx.Bool("no-upgrade") + noCommission = cCtx.Bool("no-commission") + denom = cCtx.String("denom") + denomExpon = cCtx.Uint("denom-exponent") + startTimeout = cCtx.Duration("start-timeout") + stopTimeout = cCtx.Duration("stop-timeout") + validators = cCtx.StringSlice("validator") + webhookURL = cCtx.String("webhook-url") + webhookCustomBlocks = cCtx.StringSlice("webhook-custom-block") + xGov = cCtx.String("x-gov") ) // @@ -84,12 +86,34 @@ func RunFunc(cCtx *cli.Context) error { return err } + var wh *webhook.Webhook + if webhookURL != "" { + whURL, err := url.Parse(webhookURL) + if err != nil { + return fmt.Errorf("failed to parse webhook endpoint: %w", err) + } + wh = webhook.New(*whURL) + } + + // Custom block webhooks + blockWebhooks := []watcher.BlockWebhook{} + for _, block := range webhookCustomBlocks { + blockHeight, err := strconv.ParseInt(block, 10, 64) + if err != nil { + return fmt.Errorf("failed to parse block height for custom webhook (%s): %w", block, err) + } + blockWebhooks = append(blockWebhooks, watcher.BlockWebhook{ + Height: blockHeight, + Metadata: map[string]string{}, + }) + } + // // Node Watchers // metrics := metrics.New(namespace) metrics.Register() - blockWatcher := watcher.NewBlockWatcher(trackedValidators, metrics, os.Stdout) + blockWatcher := watcher.NewBlockWatcher(trackedValidators, metrics, os.Stdout, wh, blockWebhooks) errg.Go(func() error { return blockWatcher.Start(ctx) }) @@ -128,14 +152,6 @@ func RunFunc(cCtx *cli.Context) error { return votesWatcher.Start(ctx) }) } - var wh *webhook.Webhook - if webhookURL != "" { - whURL, err := url.Parse(webhookURL) - if err != nil { - return fmt.Errorf("failed to parse webhook endpoint: %w", err) - } - wh = webhook.New(*whURL) - } var upgradeWatcher *watcher.UpgradeWatcher if !noUpgrade { diff --git a/pkg/watcher/block.go b/pkg/watcher/block.go index 69a52c9..3d05b75 100644 --- a/pkg/watcher/block.go +++ b/pkg/watcher/block.go @@ -13,10 +13,16 @@ import ( "github.com/fatih/color" "github.com/kilnfi/cosmos-validator-watcher/pkg/metrics" "github.com/kilnfi/cosmos-validator-watcher/pkg/rpc" + "github.com/kilnfi/cosmos-validator-watcher/pkg/webhook" "github.com/rs/zerolog/log" "github.com/shopspring/decimal" ) +type BlockWebhook struct { + Height int64 `json:"height"` + Metadata map[string]string `json:"metadata"` +} + type BlockWatcher struct { trackedValidators []TrackedValidator metrics *metrics.Metrics @@ -25,14 +31,18 @@ type BlockWatcher struct { validatorSet atomic.Value // []*types.Validator latestBlockHeight int64 latestBlockProposer string + webhook *webhook.Webhook + customWebhooks []BlockWebhook } -func NewBlockWatcher(validators []TrackedValidator, metrics *metrics.Metrics, writer io.Writer) *BlockWatcher { +func NewBlockWatcher(validators []TrackedValidator, metrics *metrics.Metrics, writer io.Writer, webhook *webhook.Webhook, customWebhooks []BlockWebhook) *BlockWatcher { return &BlockWatcher{ trackedValidators: validators, metrics: metrics, writer: writer, blockChan: make(chan *BlockInfo), + webhook: webhook, + customWebhooks: customWebhooks, } } @@ -42,7 +52,7 @@ func (w *BlockWatcher) Start(ctx context.Context) error { case <-ctx.Done(): return nil case block := <-w.blockChan: - w.handleBlockInfo(block) + w.handleBlockInfo(ctx, block) } } } @@ -159,7 +169,7 @@ func (w *BlockWatcher) syncValidatorSet(ctx context.Context, n *rpc.Node) error return nil } -func (w *BlockWatcher) handleBlockInfo(block *BlockInfo) { +func (w *BlockWatcher) handleBlockInfo(ctx context.Context, block *BlockInfo) { chainId := block.ChainID if w.latestBlockHeight >= block.Height { @@ -220,6 +230,9 @@ func (w *BlockWatcher) handleBlockInfo(block *BlockInfo) { strings.Join(validatorStatus, " "), ) + // Handle webhooks + w.handleWebhooks(ctx, block) + w.latestBlockHeight = block.Height w.latestBlockProposer = block.ProposerAddress } @@ -261,3 +274,38 @@ func (w *BlockWatcher) isValidatorActive(address string) bool { } return false } + +func (w *BlockWatcher) handleWebhooks(ctx context.Context, block *BlockInfo) { + if len(w.customWebhooks) == 0 { + return + } + + newWebhooks := []BlockWebhook{} + + for _, webhook := range w.customWebhooks { + // If webhook block height is passed + if webhook.Height <= block.Height { + w.triggerWebhook(ctx, block.ChainID, webhook) + } else { + newWebhooks = append(newWebhooks, webhook) + } + } + + w.customWebhooks = newWebhooks +} + +func (w *BlockWatcher) triggerWebhook(ctx context.Context, chainID string, wh BlockWebhook) { + msg := make(map[string]string) + msg["type"] = "custom" + msg["block"] = fmt.Sprintf("%d", wh.Height) + msg["chain_id"] = chainID + for k, v := range wh.Metadata { + msg[k] = v + } + + go func() { + if err := w.webhook.Send(context.Background(), msg); err != nil { + log.Error().Err(err).Msg("failed to send upgrade webhook") + } + }() +} diff --git a/pkg/watcher/block_test.go b/pkg/watcher/block_test.go index 521cb29..214f2a3 100644 --- a/pkg/watcher/block_test.go +++ b/pkg/watcher/block_test.go @@ -2,10 +2,13 @@ package watcher import ( "bytes" + "context" + "net/url" "strings" "testing" "github.com/kilnfi/cosmos-validator-watcher/pkg/metrics" + "github.com/kilnfi/cosmos-validator-watcher/pkg/webhook" "github.com/prometheus/client_golang/prometheus/testutil" "gotest.tools/assert" ) @@ -26,6 +29,8 @@ func TestBlockWatcher(t *testing.T) { }, metrics.New("cosmos_validator_watcher"), &bytes.Buffer{}, + webhook.New(url.URL{}), + []BlockWebhook{}, ) t.Run("Handle BlockInfo", func(t *testing.T) { @@ -114,7 +119,7 @@ func TestBlockWatcher(t *testing.T) { } for _, block := range blocks { - blockWatcher.handleBlockInfo(&block) + blockWatcher.handleBlockInfo(context.Background(), &block) } assert.Equal(t, diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index bfc77e0..8ab5769 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -54,7 +54,7 @@ func (w *Webhook) Send(ctx context.Context, message interface{}) error { }, retryOpts...) } -func (w *Webhook) postRequest(ctx context.Context, req *http.Request) error { +func (w *Webhook) postRequest(_ context.Context, req *http.Request) error { resp, err := w.client.Do(req) if err != nil { return fmt.Errorf("failed to send request: %w", err)