Skip to content

Commit

Permalink
some PR comments, updated register unregister trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidOrchard committed Sep 18, 2024
1 parent de300e0 commit fdc4510
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 63 deletions.
1 change: 1 addition & 0 deletions core/capabilities/gateway_connector/service_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (e *ServiceWrapper) GetGatewayConnector() connector.GatewayConnector {
return e.connector
}

// TODO: Remove once Jin's PR gets in.
func (e *ServiceWrapper) GetSignerKey() *ecdsa.PrivateKey {
return e.signerKey
}
160 changes: 107 additions & 53 deletions core/capabilities/webapi/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
Expand All @@ -21,24 +22,29 @@ import (

const defaultSendChannelBufferSize = 1000

const triggerType = "[email protected]"

type Response struct {
// TODO: what is the format for ACCEPTED, PENDING, COMPLETED status?
// Status string `json:"status"`?
Success bool `json:"success"`
ErrorMessage string `json:"error_message,omitempty"`
}

type workflowConnectorHandler struct {
// Handles connections to the webapi trigger
type triggerConnectorHandler struct {
services.StateMachine

capabilities.CapabilityInfo
connector connector.GatewayConnector
lggr logger.Logger
mu sync.Mutex
triggers map[string]chan capabilities.TriggerResponse
signerKey *ecdsa.PrivateKey
connector connector.GatewayConnector
lggr logger.Logger
mu sync.Mutex
registeredWorkflows map[string]chan capabilities.TriggerResponse
signerKey *ecdsa.PrivateKey
}

var _ capabilities.TriggerCapability = (*workflowConnectorHandler)(nil)
var _ services.Service = &workflowConnectorHandler{}
var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil)
var _ services.Service = &triggerConnectorHandler{}

func NewTrigger(config string, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, signerKey *ecdsa.PrivateKey, lggr logger.Logger) (job.ServiceCtx, error) {
// TODO (CAPPL-22, CAPPL-24):
Expand All @@ -48,18 +54,48 @@ func NewTrigger(config string, registry core.CapabilitiesRegistry, connector con
// - manage trigger subscriptions
// - process incoming trigger events and related metadata

handler := &workflowConnectorHandler{
handler := &triggerConnectorHandler{
connector: connector,
signerKey: signerKey,
lggr: lggr.Named("WorkflowConnectorHandler"),
}

// is this the right way to register with gateway connector? Cron trigger doesn't do this.
err := connector.AddHandler([]string{"add_workflow"}, handler)

return handler, err
return handler, nil
}

// https://gateway-us-1.chain.link/web-trigger
// {
// jsonrpc: "2.0",
// id: "...",
// method: "web-trigger",
// params: {
// signature: "...",
// body: {
// don_id: "workflow_123",
// payload: {
// trigger_id: "[email protected]",
// trigger_event_id: "action_1234567890",
// timestamp: 1234567890,
// sub-events: [
// {
// topics: ["daily_price_update"],
// params: {
// bid: "101",
// ask: "102"
// }
// },
// {
// topics: ["daily_message", "summary"],
// params: {
// message: "all good!",
// }
// },
// ]
// }
// }
// }
// }

// from Web API Trigger Doc
// trigger_id - ID of the trigger corresponding to the capability ID
// trigger_event_id - uniquely identifies generated event (scoped to trigger_id and sender)xx
Expand All @@ -69,95 +105,112 @@ func NewTrigger(config string, registry core.CapabilitiesRegistry, connector con
// workflow_owners - [OPTIONAL] list of workflow owners allowed to receive this event (affects all workflows if empty)
// params - key-value pairs that will be used as trigger output in the workflow Engine (translated to values.Map)

// The sample script does
// workflowSpec := flag.String("workflow_spec", "[my spec abcd]", "Workflow Spec")
// payloadJSON := []byte("{\"spec\": \"" + *workflowSpec + "\"}")
//
// how do these reconcile?
// How do we get the TriggerID to look up in the map of TriggerIDs to connection?
type TriggerRequestPayload struct {
TriggerId string `json:"trigger_id"`
TriggerEventId string `json:"trigger_event_id"`
// how are timestamps defined? ISO-8601 or UTC seconds or UTC ms?
Timestamp uint `json:"timestamp"`
SubEvents []SubEvents `json:"sub_events"`
}

func (h *workflowConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
type SubEvents struct {
Topics []string `json:"topics"`
Params values.Map `json:"params"`
}

func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
fromAddr := ethCommon.HexToAddress(body.Sender)
// TODO: apply allowlist and rate-limiting
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "address", fromAddr)
// ERR: payload is error, how to unmarshall?
payload := body.Payload.UnmarshalJSON(body.Payload)
triggerID := payload.trigger_id
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "sender", fromAddr)
var payload TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
return
}

// TODO: how to convert payload to *values.Map. Parse directly to that instead of the structs?

// TODO: How/where to check timestamp for freshness

switch body.Method {
case workflow.MethodAddWorkflow:
// TODO: add a new workflow spec and return success/failure
// we need access to Job ORM or whatever CLO uses to fully launch a new spec
h.lggr.Debugw("added workflow spec", "payload", string(body.Payload))

// Question: should this call the registerTrigger and then handleNodeResponse call unregister?
tr := capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
TriggerType: "__builtin_web-api-trigger",
ID: triggerID,
Outputs: payload,
},
case workflow.MethodWebAPITrigger:
h.lggr.Debugw("added MethodWebAPITrigger message", "payload", string(body.Payload))

for triggerID, trigger := range h.registeredWorkflows {

// TODO: CAPPL-24 extract the topic and then match the subscriber to this messages triggers.
// TODO: CAPPL-24 check the topic to see if the method is a duplicate and the trigger has been sent, ie PENDING
// TODO: Question asked in Web API trigger about checking for completed Triggers to return COMPLETED
tr := capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
TriggerType: triggerType,
ID: triggerID,
Outputs: payload, // must be *values.Map

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / goreleaser-build-publish-chainlink

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / Analyze go

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_ccip_deployment_tests)

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal

Check failure on line 149 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

cannot use payload (variable of type TriggerRequestPayload) as *values.Map value in struct literal
},
}
trigger <- tr
}
channel := h.triggers[triggerID]
channel <- tr

// TODO: ACCEPTED, PENDING, COMPLETED
response := Response{Success: true}
h.sendResponse(ctx, gatewayID, body, response)
default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
}
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (h *workflowConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
// There's no config to use and validate
func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
h.mu.Lock()
defer h.mu.Unlock()
_, ok := h.triggers[req.TriggerID]
_, ok := h.registeredWorkflows[req.TriggerID]
if ok {
return nil, fmt.Errorf("triggerId %s already registered", req.TriggerID)
}

callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)
h.triggers[req.TriggerID] = callbackCh
// TODO: CAPPL-24 how do we extract the topic and then define the trigger by that?
// It's not TriggerID because TriggerID is concat of workflow ID and the trigger's index in the spec (what does that mean?)
h.registeredWorkflows[req.TriggerID] = callbackCh

return callbackCh, nil
}

func (h *workflowConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
trigger := h.triggers[req.TriggerID]
trigger, ok := h.registeredWorkflows[req.TriggerID]
if ok {
return fmt.Errorf("triggerId %s not registered", req.TriggerID)
}

// Close callback channel
close(trigger)
// Remove from triggers context
delete(h.triggers, req.TriggerID)
delete(h.registeredWorkflows, req.TriggerID)
return nil
}

func (h *workflowConnectorHandler) Start(ctx context.Context) error {
func (h *triggerConnectorHandler) Start(ctx context.Context) error {
return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
return nil
return h.connector.AddHandler([]string{"web_trigger"}, h)
})
}
func (h *workflowConnectorHandler) Close() error {
func (h *triggerConnectorHandler) Close() error {
return h.StopOnce("GatewayConnectorServiceWrapper", func() error {
return nil
})
}

func (h *workflowConnectorHandler) HealthReport() map[string]error {
func (h *triggerConnectorHandler) HealthReport() map[string]error {
return map[string]error{h.Name(): h.Healthy()}
}

func (h *workflowConnectorHandler) Name() string {
func (h *triggerConnectorHandler) Name() string {
return "WebAPITrigger"
}

func (h *workflowConnectorHandler) sendResponse(ctx context.Context, gatewayID string, requestBody *api.MessageBody, payload any) error {
func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID string, requestBody *api.MessageBody, payload any) error {
payloadJson, err := json.Marshal(payload)
if err != nil {
return err
Expand All @@ -173,6 +226,7 @@ func (h *workflowConnectorHandler) sendResponse(ctx context.Context, gatewayID s
},
}

// TODO remove this and signerKey once Jin's PR is in.
if err = msg.Sign(h.signerKey); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,48 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
)

// https://gateway-us-1.chain.link/web-trigger
// {
// jsonrpc: "2.0",
// id: "...",
// method: "web-trigger",
// params: {
// signature: "...",
// body: {
// don_id: "workflow_123",
// payload: {
// trigger_id: "[email protected]",
// trigger_event_id: "action_1234567890",
// timestamp: 1234567890,
// sub-events: [
// {
// topics: ["daily_price_update"],
// params: {
// bid: "101",
// ask: "102"
// }
// },
// {
// topics: ["daily_message", "summary"],
// params: {
// message: "all good!",
// }
// },
// ]
// }
// }
// }
// }

func main() {
gatewayURL := flag.String("gateway_url", "http://localhost:5002", "Gateway URL")
privateKey := flag.String("private_key", "65456ffb8af4a2b93959256a8e04f6f2fe0943579fb3c9c3350593aabb89023f", "Private key to sign the message with")
messageID := flag.String("message_id", "12345", "Request ID")
methodName := flag.String("method", "add_workflow", "Method name")
messageID := flag.String("id", "12345", "Request ID")
methodName := flag.String("method", "web_trigger", "Method name")
donID := flag.String("don_id", "workflow_don_1", "DON ID")
workflowSpec := flag.String("workflow_spec", "[my spec abcd]", "Workflow Spec")
// workflowSpec := flag.String("workflow_spec", "[my spec abcd]", "Workflow Spec")
// payloadJSON := []byte("{\"spec\": \"" + *workflowSpec + "\"}")

flag.Parse()

if privateKey == nil || *privateKey == "" {
Expand All @@ -41,10 +76,29 @@ func main() {
fmt.Println("error parsing private key", err)
return
}
//address := crypto.PubkeyToAddress(key.PublicKey)

payloadJSON := []byte("{\"spec\": \"" + *workflowSpec + "\"}")

payload := `{
trigger_id: "[email protected]",
trigger_event_id: "action_1234567890",
timestamp: 1234567890,
sub-events: [
{
topics: ["daily_price_update"],
params: {
bid: "101",
ask: "102"
}
},
{
topics: ["daily_message", "summary"],
params: {
message: "all good!",
}
},
]
}
`
payloadJSON := []byte(payload)
msg := &api.Message{
Body: api.MessageBody{
MessageId: *messageID,
Expand All @@ -53,7 +107,6 @@ func main() {
Payload: json.RawMessage(payloadJSON),
},
}

if err = msg.Sign(key); err != nil {
fmt.Println("error signing message", err)
return
Expand Down
1 change: 1 addition & 0 deletions core/services/gateway/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json
switch handlerType {
case FunctionsHandlerType:
return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.ds, hf.lggr)
// TODO: remove if Jin's PR goes in first
case workflowHandlerType:
return workflow.NewWorkflowHandler(donConfig, don, hf.lggr)
case DummyHandlerType:
Expand Down
6 changes: 3 additions & 3 deletions core/services/gateway/handlers/workflow/handler.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package workflow

// TODO: reconcile with Jin's PR.
import (
"context"
"fmt"
Expand All @@ -14,8 +15,7 @@ import (
)

const (
MethodAddWorkflow = "add_workflow"
// NOTE: more methods will go here: CRUD for workflow specs; HTTP trigger/action/target; etc.
MethodWebAPITrigger = "web_trigger"
)

type workflowHandler struct {
Expand Down Expand Up @@ -53,7 +53,7 @@ func (d *workflowHandler) HandleUserMessage(ctx context.Context, msg *api.Messag
d.mu.Unlock()

// TODO: apply allowlist and rate-limiting here.
if msg.Body.Method != MethodAddWorkflow {
if msg.Body.Method != MethodWebAPITrigger {
d.lggr.Errorw("unsupported method", "method", msg.Body.Method)
return fmt.Errorf("unsupported method")
}
Expand Down
Loading

0 comments on commit fdc4510

Please sign in to comment.