Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/keystone beholder integration #14510

Merged
merged 24 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ea4990a
added TODO placeholders
patrickhuie19 Sep 20, 2024
562e97c
sketch of adding beholder custom messages
patrickhuie19 Sep 20, 2024
9e8dc39
other options
patrickhuie19 Sep 26, 2024
700a38a
adding more idiomatic option
patrickhuie19 Sep 29, 2024
25800ef
unit testing labeling
patrickhuie19 Sep 30, 2024
3075350
adding a sample beholder metric emission
patrickhuie19 Sep 30, 2024
93c6a17
refactoring to use labeled context utilities
patrickhuie19 Sep 30, 2024
dfab4a2
bumping common to use custom proto for Keystone Custom Message
patrickhuie19 Oct 3, 2024
1fe109d
trying out sendLogAsCustomMessageWithLabels
patrickhuie19 Oct 3, 2024
cf4d2cb
simplifying
patrickhuie19 Oct 3, 2024
35db200
bumping chainlink-common
patrickhuie19 Oct 3, 2024
0495062
adding labels field to Engine + expanding log as custom message API i…
patrickhuie19 Oct 7, 2024
327663f
gomodtidy
patrickhuie19 Oct 7, 2024
7c74a9e
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 8, 2024
5c061ce
Adding customMessageAgent
patrickhuie19 Oct 8, 2024
81d77b5
adding initial engine metrics
patrickhuie19 Oct 10, 2024
b28590c
minor cleanup
patrickhuie19 Oct 10, 2024
be6a82e
%s --> %w. interface{} --> any
patrickhuie19 Oct 12, 2024
e18b99e
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 16, 2024
a6f2182
Feature/keystone beholder alerts syncer 2 (#14786)
patrickhuie19 Oct 16, 2024
cd96528
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 16, 2024
89ff03c
lint
patrickhuie19 Oct 16, 2024
c9ff3c1
adding newline back to .tool-versions
patrickhuie19 Oct 16, 2024
c26973f
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions core/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package monitoring

import (
"context"
"fmt"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
beholderpb "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
valuespb "github.com/smartcontractkit/chainlink-common/pkg/values/pb"
)

type CustomMessageLabeler struct {
labels map[string]string
}

func NewCustomMessageLabeler() CustomMessageLabeler {
return CustomMessageLabeler{labels: make(map[string]string)}
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c CustomMessageLabeler) With(keyValues ...string) CustomMessageLabeler {
newCustomMessageLabeler := NewCustomMessageLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
}

// Copy existing labels from the current agent
for k, v := range c.labels {
newCustomMessageLabeler.labels[k] = v
}

// Add new key-value pairs
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value := keyValues[i+1]
newCustomMessageLabeler.labels[key] = value
}

return newCustomMessageLabeler
}

// SendLogAsCustomMessage emits a BaseMessage With msg and labels as data.
// any key in labels that is not part of orderedLabelKeys will not be transmitted
func (c CustomMessageLabeler) SendLogAsCustomMessage(msg string) error {
return sendLogAsCustomMessageW(msg, c.labels)
}

type MetricsLabeler struct {
Labels map[string]string
}

func NewMetricsLabeler() MetricsLabeler {
return MetricsLabeler{Labels: make(map[string]string)}
}

// With adds multiple key-value pairs to the CustomMessageLabeler for transmission With SendLogAsCustomMessage
func (c MetricsLabeler) With(keyValues ...string) MetricsLabeler {
newCustomMetricsLabeler := NewMetricsLabeler()

if len(keyValues)%2 != 0 {
// If an odd number of key-value arguments is passed, return the original CustomMessageLabeler unchanged
return c
}

// Copy existing labels from the current agent
for k, v := range c.Labels {
newCustomMetricsLabeler.Labels[k] = v
}

// Add new key-value pairs
for i := 0; i < len(keyValues); i += 2 {
key := keyValues[i]
value := keyValues[i+1]
newCustomMetricsLabeler.Labels[key] = value
}

return newCustomMetricsLabeler
}

func sendLogAsCustomMessageW(msg string, labels map[string]string) error {
protoLabels := make(map[string]*valuespb.Value)
for _, l := range labels {
protoLabels[l] = &valuespb.Value{Value: &valuespb.Value_StringValue{StringValue: labels[l]}}
}
// Define a custom protobuf payload to emit
payload := &beholderpb.BaseMessage{
Msg: msg,
Labels: protoLabels,
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
return fmt.Errorf("sending custom message failed to marshal protobuf: %w", err)
}

err = beholder.GetEmitter().Emit(context.Background(), payloadBytes,
"beholder_data_schema", "/beholder-base-message/versions/1", // required
"beholder_data_type", "custom_message",
)
if err != nil {
return fmt.Errorf("sending custom message failed on emit: %w", err)
}

return nil
}
16 changes: 16 additions & 0 deletions core/monitoring/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package monitoring

import (
"testing"

"github.com/stretchr/testify/assert"
)

// tests CustomMessageAgent does not share state across new instances created by `With`
func Test_CustomMessageAgent(t *testing.T) {
cma := NewCustomMessageLabeler()
cma1 := cma.With("key1", "value1")
cma2 := cma1.With("key2", "value2")

assert.NotEqual(t, cma1.labels, cma2.labels)
}
11 changes: 11 additions & 0 deletions core/monitoring/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package monitoring

import "go.opentelemetry.io/otel/attribute"

func KvMapToOtelAttributes(kvmap map[string]string) []attribute.KeyValue {
otelKVs := make([]attribute.KeyValue, 0, len(kvmap))
for k, v := range kvmap {
otelKVs = append(otelKVs, attribute.String(k, v))
}
return otelKVs
}
49 changes: 49 additions & 0 deletions core/monitoring/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package monitoring

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
)

func TestKvMapToOtelAttributes(t *testing.T) {
tests := []struct {
name string
input map[string]string
expected []attribute.KeyValue
}{
{
name: "empty map",
input: map[string]string{},
expected: []attribute.KeyValue{},
},
{
name: "single key-value pair",
input: map[string]string{
"key1": "value1",
},
expected: []attribute.KeyValue{
attribute.String("key1", "value1"),
},
},
{
name: "multiple key-value pairs",
input: map[string]string{
"key1": "value1",
"key2": "value2",
},
expected: []attribute.KeyValue{
attribute.String("key1", "value1"),
attribute.String("key2", "value2"),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := KvMapToOtelAttributes(tt.input)
assert.ElementsMatch(t, tt.expected, result, "unexpected KeyValue slice")
})
}
}
48 changes: 48 additions & 0 deletions core/services/registrysyncer/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package registrysyncer

import (
"context"
"fmt"

"go.opentelemetry.io/otel/metric"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink/v2/core/monitoring"
)

var remoteRegistrySyncFailureCounter metric.Int64Counter
var launcherFailureCounter metric.Int64Counter

func initMonitoringResources() (err error) {
remoteRegistrySyncFailureCounter, err = beholder.GetMeter().Int64Counter("RemoteRegistrySyncFailure")
if err != nil {
return fmt.Errorf("failed to register sync failure counter: %w", err)
}

launcherFailureCounter, err = beholder.GetMeter().Int64Counter("LauncherFailureCounter")
if err != nil {
return fmt.Errorf("failed to register launcher failure counter: %w", err)
}

return nil
}

// syncerMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities
// for monitoring resources
type syncerMetricLabeler struct {
monitoring.MetricsLabeler
}

func (c syncerMetricLabeler) with(keyValues ...string) syncerMetricLabeler {
return syncerMetricLabeler{c.With(keyValues...)}
}

func (c syncerMetricLabeler) incrementRemoteRegistryFailureCounter(ctx context.Context) {
otelLabels := monitoring.KvMapToOtelAttributes(c.Labels)
remoteRegistrySyncFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c syncerMetricLabeler) incrementLauncherFailureCounter(ctx context.Context) {
otelLabels := monitoring.KvMapToOtelAttributes(c.Labels)
launcherFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}
19 changes: 19 additions & 0 deletions core/services/registrysyncer/monitoring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package registrysyncer

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/monitoring"
)

func Test_InitMonitoringResources(t *testing.T) {
require.NoError(t, initMonitoringResources())
}

func Test_SyncerMetricsLabeler(t *testing.T) {
testSyncerMetricLabeler := syncerMetricLabeler{monitoring.NewMetricsLabeler()}
testSyncerMetricLabeler2 := testSyncerMetricLabeler.with("foo", "baz")
require.EqualValues(t, testSyncerMetricLabeler2.Labels["foo"], "baz")
}
11 changes: 10 additions & 1 deletion core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type RegistrySyncer interface {

type registrySyncer struct {
services.StateMachine
metrics syncerMetricLabeler
stopCh services.StopChan
launchers []Launcher
reader types.ContractReader
Expand Down Expand Up @@ -130,6 +131,11 @@ func newReader(ctx context.Context, lggr logger.Logger, relayer ContractReaderFa

func (s *registrySyncer) Start(ctx context.Context) error {
return s.StartOnce("RegistrySyncer", func() error {
err := initMonitoringResources()
if err != nil {
return err
}

s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand All @@ -153,7 +159,8 @@ func (s *registrySyncer) syncLoop() {

// Sync for a first time outside the loop; this means we'll start a remote
// sync immediately once spinning up syncLoop, as by default a ticker will
// fire for the first time at T+N, where N is the interval.
// fire for the first time at T+N, where N is the interval. We do not
// increment RemoteRegistryFailureCounter the first time
s.lggr.Debug("starting initial sync with remote registry")
err := s.Sync(ctx, true)
if err != nil {
Expand All @@ -169,6 +176,7 @@ func (s *registrySyncer) syncLoop() {
err := s.Sync(ctx, false)
if err != nil {
s.lggr.Errorw("failed to sync with remote registry", "error", err)
s.metrics.incrementRemoteRegistryFailureCounter(ctx)
}
}
}
Expand Down Expand Up @@ -319,6 +327,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
lrCopy := deepCopyLocalRegistry(lr)
if err := h.Launch(ctx, &lrCopy); err != nil {
s.lggr.Errorf("error calling launcher: %s", err)
s.metrics.incrementLauncherFailureCounter(ctx)
}
}

Expand Down
Loading
Loading