Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/keystone beholder integration #14510

Merged
merged 24 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ea4990a
added TODO placeholders
patrickhuie19 Sep 20, 2024
562e97c
sketch of adding beholder custom messages
patrickhuie19 Sep 20, 2024
9e8dc39
other options
patrickhuie19 Sep 26, 2024
700a38a
adding more idiomatic option
patrickhuie19 Sep 29, 2024
25800ef
unit testing labeling
patrickhuie19 Sep 30, 2024
3075350
adding a sample beholder metric emission
patrickhuie19 Sep 30, 2024
93c6a17
refactoring to use labeled context utilities
patrickhuie19 Sep 30, 2024
dfab4a2
bumping common to use custom proto for Keystone Custom Message
patrickhuie19 Oct 3, 2024
1fe109d
trying out sendLogAsCustomMessageWithLabels
patrickhuie19 Oct 3, 2024
cf4d2cb
simplifying
patrickhuie19 Oct 3, 2024
35db200
bumping chainlink-common
patrickhuie19 Oct 3, 2024
0495062
adding labels field to Engine + expanding log as custom message API i…
patrickhuie19 Oct 7, 2024
327663f
gomodtidy
patrickhuie19 Oct 7, 2024
7c74a9e
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 8, 2024
5c061ce
Adding customMessageAgent
patrickhuie19 Oct 8, 2024
81d77b5
adding initial engine metrics
patrickhuie19 Oct 10, 2024
b28590c
minor cleanup
patrickhuie19 Oct 10, 2024
be6a82e
%s --> %w. interface{} --> any
patrickhuie19 Oct 12, 2024
e18b99e
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 16, 2024
a6f2182
Feature/keystone beholder alerts syncer 2 (#14786)
patrickhuie19 Oct 16, 2024
cd96528
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 16, 2024
89ff03c
lint
patrickhuie19 Oct 16, 2024
c9ff3c1
adding newline back to .tool-versions
patrickhuie19 Oct 16, 2024
c26973f
Merge branch 'develop' into feature/keystone-beholder-alerts
patrickhuie19 Oct 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/services/registrysyncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (s *registrySyncer) syncLoop() {
s.lggr.Debug("starting initial sync with remote registry")
err := s.Sync(ctx, true)
if err != nil {
// TODO ks-461
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
s.lggr.Errorw("failed to sync with remote registry", "error", err)
}

Expand All @@ -168,6 +169,7 @@ func (s *registrySyncer) syncLoop() {
s.lggr.Debug("starting regular sync with the remote registry")
err := s.Sync(ctx, false)
if err != nil {
// TODO ks-461
s.lggr.Errorw("failed to sync with remote registry", "error", err)
}
}
Expand Down Expand Up @@ -318,6 +320,7 @@ func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error {
for _, h := range s.launchers {
lrCopy := deepCopyLocalRegistry(lr)
if err := h.Launch(ctx, &lrCopy); err != nil {
// TODO ks-461
s.lggr.Errorf("error calling launcher: %s", err)
}
}
Expand Down
48 changes: 33 additions & 15 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"go.opentelemetry.io/otel/attribute"
"sync"
"time"

Expand Down Expand Up @@ -62,11 +63,14 @@ type Engine struct {
clock clockwork.Clock
}

func (e *Engine) Start(ctx context.Context) error {
func (e *Engine) Start(_ context.Context) error {
return e.StartOnce("Engine", func() error {
// create a new context, since the one passed in via Start is short-lived.
ctx, _ := e.stopCh.NewCtx()

// spin up monitoring resources
initMonitoringResources(ctx)

e.wg.Add(e.maxWorkerLimit)
for i := 0; i < e.maxWorkerLimit; i++ {
go e.worker(ctx)
Expand All @@ -89,15 +93,17 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error {
// Step 1. Resolve the underlying capability for each trigger
//
triggersInitialized := true
for _, t := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, t.ID)
for _, tc := range e.workflow.triggers {
tg, err := e.registry.GetTrigger(ctx, tc.ID)
if err != nil {
e.logger.With(cIDKey, t.ID).Errorf("failed to get trigger capability: %s", err)
// TODO ks-463
e.logger.Errorf("failed to get trigger capability %s: %s", tc.ID, err)
sendLogAsCustomMessage(ctx, "failed to get trigger capability %s: %s", tc.ID, err)
// we don't immediately return here, since we want to retry all triggers
// to notify the user of all errors at once.
triggersInitialized = false
} else {
t.trigger = tg
tc.trigger = tg
}
}
if !triggersInitialized {
Expand Down Expand Up @@ -243,6 +249,7 @@ func (e *Engine) init(ctx context.Context) {
})

if retryErr != nil {
// TODO ks-461
e.logger.Errorf("initialization failed: %s", retryErr)
e.afterInit(false)
return
Expand All @@ -256,9 +263,11 @@ func (e *Engine) init(ctx context.Context) {

e.logger.Debug("registering triggers")
for idx, t := range e.workflow.triggers {
err := e.registerTrigger(ctx, t, idx)
if err != nil {
e.logger.With(cIDKey, t.ID).Errorf("failed to register trigger: %s", err)
terr := e.registerTrigger(ctx, t, idx)
if terr != nil {
e.logger.Errorf("failed to register trigger %s: %s", t.ID, terr)
sendLogAsCustomMessage(ctx, "failed to register trigger: %s", t.ID, terr)
incrementRegisterTriggerFailureCounter(ctx, e.logger, attribute.String(cIDKey, t.ID))
}
}

Expand Down Expand Up @@ -434,6 +443,10 @@ func (e *Engine) loop(ctx context.Context) {
err = e.startExecution(ctx, executionID, resp.Event.Outputs)
if err != nil {
e.logger.With(eIDKey, executionID).Errorf("failed to start execution: %v", err)
// TODO: having the consumer need to shepherd the labels like this is clunky
labelsMap := make(map[string]string)
labelsMap[eIDKey] = executionID
sendLogAsCustomMessageWithLabels(ctx, labelsMap, "failed to start execution: %v", err)
}
case stepUpdate := <-e.stepUpdateCh:
// Executed synchronously to ensure we correctly schedule subsequent tasks.
Expand Down Expand Up @@ -548,6 +561,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow
// We haven't completed the workflow, but should we continue?
// If we've been executing for too long, let's time the workflow out and stop here.
if state.CreatedAt != nil && e.clock.Since(*state.CreatedAt) > e.maxExecutionDuration {
// TODO ks-461
l.Info("execution timed out")
return e.finishExecution(ctx, state.ExecutionID, store.StatusTimeout)
}
Expand All @@ -558,6 +572,7 @@ func (e *Engine) handleStepUpdate(ctx context.Context, stepUpdate store.Workflow
e.queueIfReady(state, sd)
}
case store.StatusCompletedEarlyExit:
// TODO ks-461
l.Info("execution terminated early")
// NOTE: even though this marks the workflow as completed, any branches of the DAG
// that don't depend on the step that signaled for an early exit will still complete.
Expand Down Expand Up @@ -652,6 +667,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
l.Info("step executed successfully with a termination")
stepStatus = store.StatusCompletedEarlyExit
case err != nil:
// TODO ks-461
l.Errorf("error executing step request: %s", err)
stepStatus = store.StatusErrored
default:
Expand Down Expand Up @@ -962,14 +978,16 @@ func NewEngine(cfg Config) (engine *Engine, err error) {
return engine, nil
}

// Logging keys
// Observability keys
const (
cIDKey = "capabilityID"
tIDKey = "triggerID"
wIDKey = "workflowID"
eIDKey = "executionID"
sIDKey = "stepID"
sRKey = "stepRef"
cIDKey = "capabilityID"
ceIDKey = "capabilityExecutionID"
tIDKey = "triggerID"
wIDKey = "workflowID"
eIDKey = "workflowExecutionID"
woIDKey = "workflowOwner"
sIDKey = "stepID"
sRKey = "stepRef"
)

type workflowError struct {
Expand Down
85 changes: 85 additions & 0 deletions core/services/workflows/monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package workflows

import (
"context"
"fmt"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/protobuf/proto"
"log"
)

var registerTriggerFailureCounter metric.Int64Counter

func initMonitoringResources(_ context.Context) (err error) {
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("RegisterTriggerFailure")
if err != nil {
return fmt.Errorf("failed to register trigger failure: %s", err)
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// TODO: tradeoff between having labels be variadic vs values be variadic?
func sendLogAsCustomMessageWithLabels(ctx context.Context, labels map[string]string, format string, values ...interface{}) (err error) {
for labelKey, labelValue := range labels {
ctx, err = KeystoneContextWithLabel(ctx, labelKey, labelValue)
if err != nil {
return err
}
}

return sendLogAsCustomMessage(ctx, format, values...)
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved

}

func sendLogAsCustomMessage(ctx context.Context, format string, values ...interface{}) error {
msg, err := composeLabeledMsg(ctx, format, values...)
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("sendLogAsCustomMessage failed: %w", err)
}

labelsStruct, oerr := GetKeystoneLabelsFromContext(ctx)
if oerr != nil {
return oerr
}

labels := labelsStruct.ToMap()

// Define a custom protobuf payload to emit
payload := &pb.KeystoneCustomMessage{
Msg: msg,
WorkflowID: labels[WorkflowID],
WorkflowExecutionID: labels[WorkflowExecutionID],
}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
log.Fatalf("Failed to marshal protobuf")
}

err = beholder.GetEmitter().Emit(context.Background(), payloadBytes,
"beholder_data_schema", "/keystone-custom-message/versions/1", // required
"beholder_data_type", "custom_message",
)
if err != nil {
log.Printf("Error emitting message: %v", err)
}

return nil
}

func incrementRegisterTriggerFailureCounter(ctx context.Context, lggr logger.Logger, labels ...attribute.KeyValue) {
ctxLabels, oerr := getOtelAttributesFromCtx(ctx)
if oerr != nil {
// custom messages require this extracting of values from the context
// but if i set them in the proto, then could use
// lggr.With for logs, metric.WithAttributes, and set the labels directly in the proto for custom messages
lggr.Errorf("failed to get otel attributes from context: %s", oerr)
}

labels = append(labels, ctxLabels...)
// TODO add duplicate labels check?
registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(labels...))
}
114 changes: 114 additions & 0 deletions core/services/workflows/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package workflows

import (
"context"
"fmt"
"go.opentelemetry.io/otel/attribute"
"reflect"
)

const WorkflowID = "WorkflowID"
const WorkflowExecutionID = "WorkflowExecutionID"

type keystoneWorkflowContextKey struct{}
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved

var keystoneContextKey = keystoneWorkflowContextKey{}

type KeystoneWorkflowLabels struct {
WorkflowExecutionID string
WorkflowID string
}

var OrderedKeystoneLabels = []string{WorkflowID, WorkflowExecutionID}

var OrderedKeystoneLabelsMap = make(map[string]interface{})

func init() {
for _, label := range OrderedKeystoneLabels {
OrderedKeystoneLabelsMap[label] = interface{}(0)
}
}

func (k *KeystoneWorkflowLabels) ToMap() map[string]string {
labels := make(map[string]string)

labels[WorkflowID] = k.WorkflowID
labels[WorkflowExecutionID] = k.WorkflowExecutionID

return labels
}

func (k *KeystoneWorkflowLabels) ToOtelAttributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(WorkflowID, k.WorkflowID),
attribute.String(WorkflowExecutionID, k.WorkflowExecutionID),
}
}

// GetKeystoneLabelsFromContext extracts the KeystoneWorkflowLabels struct set on the
// unexported keystoneContextKey. Call NewKeystoneContext first before usage -
// if the key is unset or the value is not of the expected type GetKeystoneLabelsFromContext will error.
func GetKeystoneLabelsFromContext(ctx context.Context) (KeystoneWorkflowLabels, error) {
curLabelsAny := ctx.Value(keystoneContextKey)
curLabels, ok := curLabelsAny.(KeystoneWorkflowLabels)
if !ok {
return KeystoneWorkflowLabels{}, fmt.Errorf("context value with keystoneContextKey is not of type KeystoneWorkflowLabels")
}

return curLabels, nil
}

// NewKeystoneContext returns a context with the keystoneContextKey loaded. This enables
// the context to be consumed by GetKeystoneLabelsFromContext and KeystoneContextWithLabel.
// labels should not be nil.
func NewKeystoneContext(ctx context.Context, labels KeystoneWorkflowLabels) context.Context {
return context.WithValue(ctx, keystoneContextKey, labels)
}

// KeystoneContextWithLabel extracts the Keystone Labels set on the passed in immutable context,
// sets the new desired label if valid, and then returns a new context with the updated labels
func KeystoneContextWithLabel(ctx context.Context, key string, value string) (context.Context, error) {
curLabels, err := GetKeystoneLabelsFromContext(ctx)
if err != nil {
return nil, err
}

if OrderedKeystoneLabelsMap[key] == nil {
return nil, fmt.Errorf("key %v is not a valid keystone label", key)
}

reflectedLabels := reflect.ValueOf(&curLabels).Elem()
reflectedLabels.FieldByName(key).SetString(value)

newLabels := reflectedLabels.Interface().(KeystoneWorkflowLabels)
return context.WithValue(ctx, keystoneContextKey, newLabels), nil
}

func composeLabeledMsg(ctx context.Context, format string, values ...interface{}) (string, error) {
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
msg := fmt.Sprintf(format, values...)

structLabels, err := GetKeystoneLabelsFromContext(ctx)
if err != nil {
return "", fmt.Errorf("composing labeled message failed: %w", err)
}

labels := structLabels.ToMap()

// Populate labeled message in reverse
numLabels := len(OrderedKeystoneLabels)
for i := range numLabels {
msg = fmt.Sprintf("%v.%v", labels[OrderedKeystoneLabels[numLabels-1-i]], msg)
}

return msg, nil
}

func getOtelAttributesFromCtx(ctx context.Context) ([]attribute.KeyValue, error) {
labelsStruct, err := GetKeystoneLabelsFromContext(ctx)
if err != nil {
return nil, err
}

otelLabels := labelsStruct.ToOtelAttributes()
return otelLabels, nil
}
Loading