Skip to content

Commit

Permalink
Feature/keystone beholder integration (#14510)
Browse files Browse the repository at this point in the history
* added TODO placeholders

* sketch of adding beholder custom messages

* other options

* adding more idiomatic option

* unit testing labeling

* adding a sample beholder metric emission

* refactoring to use labeled context utilities

* bumping common to use custom proto for Keystone Custom Message

* trying out sendLogAsCustomMessageWithLabels

* simplifying

* bumping chainlink-common

* adding labels field to Engine + expanding log as custom message API in workflow monitoring

* gomodtidy

* Adding customMessageAgent

* adding initial engine metrics

* minor cleanup

* %s --> %w. interface{} --> any

* Feature/keystone beholder alerts syncer 2 (#14786)

* manually adding edits from closed PR #14744

* cleanup

* lint

* lint

* adding newline back to .tool-versions
  • Loading branch information
patrickhuie19 authored Oct 17, 2024
1 parent 2b67c54 commit dd59dc9
Show file tree
Hide file tree
Showing 13 changed files with 497 additions and 60 deletions.
108 changes: 108 additions & 0 deletions core/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package monitoring

import (
"context"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
beholderpb "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb"
)

type CustomMessageLabeler struct {
labels map[string]string
}

func NewCustomMessageLabeler() CustomMessageLabeler {
return CustomMessageLabeler{labels: make(map[string]string)}
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c CustomMessageLabeler) With(keyValues ...string) CustomMessageLabeler {
newCustomMessageLabeler := NewCustomMessageLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
}

// Copy existing labels from the current agent
for k, v := range c.labels {
newCustomMessageLabeler.labels[k] = v
}

// Add new key-value pairs
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value := keyValues[i+1]
newCustomMessageLabeler.labels[key] = value
}

return newCustomMessageLabeler
}

// SendLogAsCustomMessage emits a BaseMessage With msg and labels as data.
// any key in labels that is not part of orderedLabelKeys will not be transmitted
func (c CustomMessageLabeler) SendLogAsCustomMessage(msg string) error {
return sendLogAsCustomMessageW(msg, c.labels)
}

type MetricsLabeler struct {
Labels map[string]string
}

func NewMetricsLabeler() MetricsLabeler {
return MetricsLabeler{Labels: make(map[string]string)}
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c MetricsLabeler) With(keyValues ...string) MetricsLabeler {
newCustomMetricsLabeler := NewMetricsLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
}

// Copy existing labels from the current agent
for k, v := range c.Labels {
newCustomMetricsLabeler.Labels[k] = v
}

// Add new key-value pairs
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value := keyValues[i+1]
newCustomMetricsLabeler.Labels[key] = value
}

return newCustomMetricsLabeler
}

func sendLogAsCustomMessageW(msg string, labels map[string]string) error {
protoLabels := make(map[string]*valuespb.Value)
for _, l := range labels {
protoLabels[l] = &valuespb.Value{Value: &valuespb.Value_StringValue{StringValue: labels[l]}}
}
// Define a custom protobuf payload to emit
payload := &beholderpb.BaseMessage{
Msg: msg,
Labels: protoLabels,
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return fmt.Errorf("sending custom message failed to marshal protobuf: %w", err)
}

err = beholder.GetEmitter().Emit(context.Background(), payloadBytes,
"beholder_data_schema", "/beholder-base-message/versions/1", // required
"beholder_data_type", "custom_message",
)
if err != nil {
return fmt.Errorf("sending custom message failed on emit: %w", err)
}

return nil
}
16 changes: 16 additions & 0 deletions core/monitoring/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package monitoring

import (
"testing"

"github.com/stretchr/testify/assert"
)

// tests CustomMessageAgent does not share state across new instances created by `With`
func Test_CustomMessageAgent(t *testing.T) {
cma := NewCustomMessageLabeler()
cma1 := cma.With("key1", "value1")
cma2 := cma1.With("key2", "value2")

assert.NotEqual(t, cma1.labels, cma2.labels)
}
11 changes: 11 additions & 0 deletions core/monitoring/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package monitoring

import "go.opentelemetry.io/otel/attribute"

func KvMapToOtelAttributes(kvmap map[string]string) []attribute.KeyValue {
otelKVs := make([]attribute.KeyValue, 0, len(kvmap))
for k, v := range kvmap {
otelKVs = append(otelKVs, attribute.String(k, v))
}
return otelKVs
}
49 changes: 49 additions & 0 deletions core/monitoring/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package monitoring

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
)

func TestKvMapToOtelAttributes(t *testing.T) {
tests := []struct {
name string
input map[string]string
expected []attribute.KeyValue
}{
{
name: "empty map",
input: map[string]string{},
expected: []attribute.KeyValue{},
},
{
name: "single key-value pair",
input: map[string]string{
"key1": "value1",
},
expected: []attribute.KeyValue{
attribute.String("key1", "value1"),
},
},
{
name: "multiple key-value pairs",
input: map[string]string{
"key1": "value1",
"key2": "value2",
},
expected: []attribute.KeyValue{
attribute.String("key1", "value1"),
attribute.String("key2", "value2"),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := KvMapToOtelAttributes(tt.input)
assert.ElementsMatch(t, tt.expected, result, "unexpected KeyValue slice")
})
}
}
48 changes: 48 additions & 0 deletions core/services/registrysyncer/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package registrysyncer

import (
"context"
"fmt"

"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink/v2/core/monitoring"
)

var remoteRegistrySyncFailureCounter metric.Int64Counter
var launcherFailureCounter metric.Int64Counter

func initMonitoringResources() (err error) {
remoteRegistrySyncFailureCounter, err = beholder.GetMeter().Int64Counter("RemoteRegistrySyncFailure")
if err != nil {
return fmt.Errorf("failed to register sync failure counter: %w", err)
}

launcherFailureCounter, err = beholder.GetMeter().Int64Counter("LauncherFailureCounter")
if err != nil {
return fmt.Errorf("failed to register launcher failure counter: %w", err)
}

return nil
}

// syncerMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities
// for monitoring resources
type syncerMetricLabeler struct {
monitoring.MetricsLabeler
}

func (c syncerMetricLabeler) with(keyValues ...string) syncerMetricLabeler {
return syncerMetricLabeler{c.With(keyValues...)}
}

func (c syncerMetricLabeler) incrementRemoteRegistryFailureCounter(ctx context.Context) {
otelLabels := monitoring.KvMapToOtelAttributes(c.Labels)
remoteRegistrySyncFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c syncerMetricLabeler) incrementLauncherFailureCounter(ctx context.Context) {
otelLabels := monitoring.KvMapToOtelAttributes(c.Labels)
launcherFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}
19 changes: 19 additions & 0 deletions core/services/registrysyncer/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package registrysyncer

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/monitoring"
)

func Test_InitMonitoringResources(t *testing.T) {
require.NoError(t, initMonitoringResources())
}

func Test_SyncerMetricsLabeler(t *testing.T) {
testSyncerMetricLabeler := syncerMetricLabeler{monitoring.NewMetricsLabeler()}
testSyncerMetricLabeler2 := testSyncerMetricLabeler.with("foo", "baz")
require.EqualValues(t, testSyncerMetricLabeler2.Labels["foo"], "baz")
}
11 changes: 10 additions & 1 deletion core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type RegistrySyncer interface {

type registrySyncer struct {
services.StateMachine
metrics syncerMetricLabeler
stopCh services.StopChan
launchers []Launcher
reader types.ContractReader
Expand Down Expand Up @@ -130,6 +131,11 @@ func newReader(ctx context.Context, lggr logger.Logger, relayer ContractReaderFa

func (s *registrySyncer) Start(ctx context.Context) error {
return s.StartOnce("RegistrySyncer", func() error {
err := initMonitoringResources()
if err != nil {
return err
}

s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand All @@ -153,7 +159,8 @@ func (s *registrySyncer) syncLoop() {

// Sync for a first time outside the loop; this means we'll start a remote
// sync immediately once spinning up syncLoop, as by default a ticker will
// fire for the first time at T+N, where N is the interval.
// fire for the first time at T+N, where N is the interval. We do not
// increment RemoteRegistryFailureCounter the first time
s.lggr.Debug("starting initial sync with remote registry")
err := s.Sync(ctx, true)
if err != nil {
Expand All @@ -169,6 +176,7 @@ func (s *registrySyncer) syncLoop() {
err := s.Sync(ctx, false)
if err != nil {
s.lggr.Errorw("failed to sync with remote registry", "error", err)
s.metrics.incrementRemoteRegistryFailureCounter(ctx)
}
}
}
Expand Down Expand Up @@ -319,6 +327,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
lrCopy := deepCopyLocalRegistry(lr)
if err := h.Launch(ctx, &lrCopy); err != nil {
s.lggr.Errorf("error calling launcher: %s", err)
s.metrics.incrementLauncherFailureCounter(ctx)
}
}

Expand Down
Loading

0 comments on commit dd59dc9

Please sign in to comment.