Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat): Cron Trigger Service Capability #715

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/dominikbraun/graph v0.23.0
github.com/fxamacker/cbor/v2 v2.5.0
github.com/go-co-op/gocron/v2 v2.11.0
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
Expand Down Expand Up @@ -76,6 +77,7 @@ require (
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sanity-io/litter v1.5.5 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.2.0
github.com/stretchr/objx v0.5.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE=
github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/go-co-op/gocron/v2 v2.11.0 h1:IOowNA6SzwdRFnD4/Ol3Kj6G2xKfsoiiGq2Jhhm9bvE=
github.com/go-co-op/gocron/v2 v2.11.0/go.mod h1:xY7bJxGazKam1cz04EebrlP4S9q4iWdiAylMGP3jY9w=
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f35nQbASLnvxEde4XOBL+Sn7rFuV+FOJqkljg=
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down Expand Up @@ -204,6 +206,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/riferrei/srclient v0.5.4 h1:dfwyR5u23QF7beuVl2WemUY2KXh5+Sc4DHKyPXBNYuc=
github.com/riferrei/srclient v0.5.4/go.mod h1:vbkLmWcgYa7JgfPvuy/+K8fTS0p1bApqadxrxi/S1MI=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/sanity-io/litter v1.5.5 h1:iE+sBxPBzoK6uaEP5Lt3fHNgpKcHXc/A2HGETy0uJQo=
Expand Down
28 changes: 21 additions & 7 deletions pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ type CapabilityRequest struct {
}

type TriggerEvent struct {
// The ID of the trigger capability
TriggerType string
ID string
Timestamp string
// Trigger-specific payload+metadata
Metadata values.Value
Payload values.Value
// The ID of the trigger event
ID string
// Trigger-specific payload
Outputs *values.Map
}

type RegisterToWorkflowRequest struct {
Expand Down Expand Up @@ -128,9 +128,23 @@ type BaseCapability interface {
Info(ctx context.Context) (CapabilityInfo, error)
}

type TriggerRegistrationRequest struct {
// TriggerID uniquely identifies the trigger by concatenating
// the workflow ID and the trigger's index in the spec.
TriggerID string

Metadata RequestMetadata
Config *values.Map
}

type TriggerResponse struct {
Event TriggerEvent
Err error
}

type TriggerExecutable interface {
RegisterTrigger(ctx context.Context, request CapabilityRequest) (<-chan CapabilityResponse, error)
UnregisterTrigger(ctx context.Context, request CapabilityRequest) error
RegisterTrigger(ctx context.Context, request TriggerRegistrationRequest) (<-chan TriggerResponse, error)
UnregisterTrigger(ctx context.Context, request TriggerRegistrationRequest) error
}

// TriggerCapability interface needs to be implemented by all trigger capabilities.
Expand Down
11 changes: 3 additions & 8 deletions pkg/capabilities/consensus/ocr3/datafeeds/feeds_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

ocrcommon "github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -233,16 +232,12 @@ func (a *dataFeedsAggregator) extractSignersAndPayloads(observations map[ocrcomm
a.lggr.Warnf("node %d contributed with more than one observation", nodeID)
continue
}
triggerEvent := &capabilities.TriggerEvent{}
triggerEvent := &datastreams.StreamsTriggerPayload{}
if err := nodeObservations[0].UnwrapTo(triggerEvent); err != nil {
a.lggr.Warnf("could not parse observations from node %d: %v", nodeID, err)
continue
}
meta := &datastreams.SignersMetadata{}
if err := triggerEvent.Metadata.UnwrapTo(meta); err != nil {
a.lggr.Warnf("could not parse trigger metadata from node %d: %v", nodeID, err)
continue
}
meta := triggerEvent.Metadata
currentNodeSigners, err := extractUniqueSigners(meta.Signers)
if err != nil {
a.lggr.Warnf("could not extract signers from node %d: %v", nodeID, err)
Expand All @@ -252,7 +247,7 @@ func (a *dataFeedsAggregator) extractSignersAndPayloads(observations map[ocrcomm
signers[signer]++
}
mins[meta.MinRequiredSignatures]++
payloads[nodeID] = triggerEvent.Payload
payloads[nodeID] = nodeObservations[0]
}
// Agree on signers list and min-required. It's technically possible to have F+1 valid values from one trigger DON and F+1 from another trigger DON.
// In that case both values are legitimate and signers list will contain nodes from both DONs. However, min-required value will be the higher one (if different).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/datafeeds"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams/mocks"
Expand All @@ -34,14 +33,11 @@ var (
)

func TestDataFeedsAggregator_Aggregate_TwoRounds(t *testing.T) {
metaVal, err := values.Wrap(datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
})
require.NoError(t, err)
mockTriggerEvent, err := values.Wrap(capabilities.TriggerEvent{
Metadata: metaVal,
Payload: &values.Map{},
mockTriggerEvent, err := values.Wrap(datastreams.StreamsTriggerPayload{
Metadata: datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
},
})
require.NoError(t, err)
config := getConfig(t, feedIDA.String(), "0.1", heartbeatA)
Expand Down Expand Up @@ -112,14 +108,11 @@ func TestDataFeedsAggregator_Aggregate_TwoRounds(t *testing.T) {
}

func TestDataFeedsAggregator_Aggregate_AllowedPartialStaleness(t *testing.T) {
metaVal, err := values.Wrap(datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
})
require.NoError(t, err)
mockTriggerEvent, err := values.Wrap(capabilities.TriggerEvent{
Metadata: metaVal,
Payload: &values.Map{},
mockTriggerEvent, err := values.Wrap(datastreams.StreamsTriggerPayload{
Metadata: datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
},
})
require.NoError(t, err)
config := getConfig(t, feedIDA.String(), "0.1", heartbeatA)
Expand Down Expand Up @@ -189,15 +182,11 @@ func TestDataFeedsAggregator_Aggregate_AllowedPartialStaleness(t *testing.T) {
}

func TestDataFeedsAggregator_Aggregate_Failures(t *testing.T) {
meta := datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
}
metaVal, err := values.Wrap(meta)
require.NoError(t, err)
mockTriggerEvent, err := values.Wrap(capabilities.TriggerEvent{
Metadata: metaVal,
Payload: &values.Map{},
mockTriggerEvent, err := values.Wrap(datastreams.StreamsTriggerPayload{
Metadata: datastreams.SignersMetadata{
Signers: [][]byte{newSigner(t), newSigner(t)},
MinRequiredSignatures: 1,
},
})
require.NoError(t, err)

Expand Down
29 changes: 23 additions & 6 deletions pkg/capabilities/datastreams/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type SignersMetadata struct {
MinRequiredSignatures int
}

type StreamsTriggerPayload struct {
Payload []FeedReport
Metadata SignersMetadata
Timestamp int64
}

type ReportCodec interface {
// unwrap reports and convert to a list of FeedReport
Unwrap(wrapped values.Value) ([]FeedReport, error)
Expand All @@ -84,18 +90,28 @@ type ReportCodec interface {
Validate(feedReport FeedReport, allowedSigners [][]byte, minRequiredSignatures int) error
}

// Helpers for unwrapping a list of FeedReports - more efficient than using mapstructure/reflection
func UnwrapFeedReportList(wrapped values.Value) ([]FeedReport, error) {
// Helpers for unwrapping a list of StreamsTriggerPayload - more efficient than using mapstructure/reflection
func UnwrapStreamsTriggerPayloadToFeedReportList(wrapped values.Value) ([]FeedReport, error) {
result := []FeedReport{}
lst, ok := wrapped.(*values.List)
triggerEvent, ok := wrapped.(*values.Map)
if !ok {
return nil, fmt.Errorf("unexpected value %+v for trigger payload: expected map, got %T", wrapped, wrapped)
}

p, ok := triggerEvent.Underlying["Payload"]
if !ok {
return nil, errors.New("expected list")
return nil, errors.New("expected map to have Payload field")
}
for _, v := range lst.Underlying {

plst, ok := p.(*values.List)
if !ok {
return nil, errors.New("expected Payload to be a list")
}
for _, v := range plst.Underlying {
report := FeedReport{}
mp, ok := v.(*values.Map)
if !ok {
return nil, errors.New("expected map")
return nil, fmt.Errorf("unexpected value %+v for feed report: expected map, got %T", v, v)
}
var err error
report.FeedID, err = getStringField(mp, "FeedID")
Expand Down Expand Up @@ -127,6 +143,7 @@ func UnwrapFeedReportList(wrapped values.Value) ([]FeedReport, error) {
}
result = append(result, report)
}

return result, nil
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/capabilities/datastreams/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestFeedID_Validate(t *testing.T) {
require.NoError(t, err)
}

func Test_UnwrapFeedReportList(t *testing.T) {
func Test_UnwrapStreamsTriggerPayloadToFeedReportList(t *testing.T) {
feedReports := []datastreams.FeedReport{
{
FeedID: feedIDAStr,
Expand All @@ -50,10 +50,13 @@ func Test_UnwrapFeedReportList(t *testing.T) {
},
}

wrapped, err := values.Wrap(feedReports)
payload := datastreams.StreamsTriggerPayload{
Payload: feedReports,
}
wrapped, err := values.Wrap(payload)
require.NoError(t, err)

unwrapped, err := datastreams.UnwrapFeedReportList(wrapped)
unwrapped, err := datastreams.UnwrapStreamsTriggerPayloadToFeedReportList(wrapped)
require.NoError(t, err)
require.Equal(t, feedReports, unwrapped)
}
Expand Down
Loading
Loading