Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

web api trigger connector handler #14459

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
134 changes: 133 additions & 1 deletion core/capabilities/webapi/trigger.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,150 @@
package webapi

import (
"context"
"encoding/json"
"fmt"
"sync"

ethCommon "github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/workflow"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

const defaultSendChannelBufferSize = 1000

type Response struct {
Success bool `json:"success"`
ErrorMessage string `json:"error_message,omitempty"`
}

type workflowConnectorHandler struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short description of what a connector handler is would be nice to have here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call it ConnectorHandler or TriggerConnectorHandler. Not sure if workflow adds useful meaning here. Similar changes were recommended in target PR: #14448

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's not use "workflow" in web capability names. I used it in my example of a handler for spec uploads, which is not relevant here.

services.StateMachine

capabilities.CapabilityInfo
connector connector.GatewayConnector
lggr logger.Logger
mu sync.Mutex
triggers map[string]chan capabilities.TriggerResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe call this "registeredWorkflows"?

}

var _ capabilities.TriggerCapability = (*workflowConnectorHandler)(nil)
var _ services.Service = &workflowConnectorHandler{}

func NewTrigger(config string, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, lggr logger.Logger) (job.ServiceCtx, error) {
// TODO (CAPPL-22, CAPPL-24):
// - decode config
// - create an implementation of the capability API and add it to the Registry
// - create a handler and register it with Gateway Connector
// - manage trigger subscriptions
// - process incoming trigger events and related metadata
return nil, nil

handler := &workflowConnectorHandler{
connector: connector,
lggr: lggr.Named("WorkflowConnectorHandler"),
}

// is this the right way to register with gateway connector? Cron trigger doesn't do this.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cron trigger doesn't have connectors/handlers.

What I'd consider here is should this be in .Start instead? Trace through where constructor gets called versus where .Start gets called - is the connector ready for handlers at that point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, better to add in Start().

err := connector.AddHandler([]string{"add_workflow"}, handler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe "web_api_trigger" is the suggested method name in the design doc. "add_workflow` was just an example for an unrelated feature in the skeleton


return handler, err
}

func (h *workflowConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
fromAddr := ethCommon.HexToAddress(body.Sender)
// TODO: apply allowlist and rate-limiting
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "address", fromAddr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "address", fromAddr)
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "sender", fromAddr)


switch body.Method {
case workflow.MethodAddWorkflow:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

// TODO: add a new workflow spec and return success/failure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are remnants of my example PR - please remove.

// we need access to Job ORM or whatever CLO uses to fully launch a new spec
h.lggr.Debugw("added workflow spec", "payload", string(body.Payload))
response := Response{Success: true}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design doc specifies 3 response messages: ACCEPTED, PENDING, COMPLETED, not just a bool flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely, that is a TODO. It's in the ticket too. Perhaps I should have called that out. My thinking is that if the PR as it stands needs work, then that would affect the returns.

h.sendResponse(ctx, gatewayID, body, response)

Check failure on line 72 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `h.sendResponse` is not checked (errcheck)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to make this non-blocking? @bolekk had similar comment here: #14448 (comment)

default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
}
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (h *workflowConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
// There's no config to use and validate
h.mu.Lock()
defer h.mu.Unlock()
_, ok := h.triggers[req.TriggerID]
if ok {
return nil, fmt.Errorf("triggerId %s already registered", req.TriggerID)
}

callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)
h.triggers[req.TriggerID] = callbackCh

return callbackCh, nil
}

func (h *workflowConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
trigger := h.triggers[req.TriggerID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check if it exists here.


// Close callback channel
close(trigger)
// Remove from triggers context
delete(h.triggers, req.TriggerID)
return nil
}

func (h *workflowConnectorHandler) Start(ctx context.Context) error {
// how does the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray comment

return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
return nil
})
}
func (h *workflowConnectorHandler) Close() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are using services.StateMachine StartOnce, you should use StopOnce here

return nil
}

func (h *workflowConnectorHandler) Ready() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let services.StateMachine handle this

return nil
}

func (h *workflowConnectorHandler) HealthReport() map[string]error {
return map[string]error{h.Name(): nil}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return map[string]error{h.Name(): nil}
return map[string]error{h.Name(): h.Healthy()}

}

func (h *workflowConnectorHandler) Name() string {
return "WebAPITrigger"
}

func (h *workflowConnectorHandler) sendResponse(ctx context.Context, gatewayID string, requestBody *api.MessageBody, payload any) error {
payloadJson, err := json.Marshal(payload)

Check failure on line 130 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var payloadJson should be payloadJSON (revive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint error

if err != nil {
return err
}

msg := &api.Message{
Body: api.MessageBody{
MessageId: requestBody.MessageId,
DonId: requestBody.DonId,
Method: requestBody.Method,
Receiver: requestBody.Sender,
Payload: payloadJson,
},
}

// How do we get the signerKey from the connector?
Copy link
Contributor

@justinkaseman justinkaseman Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// if err = msg.Sign(h.signerKey); err != nil {
// return err
// }
return h.connector.SendToGateway(ctx, gatewayID, msg)
}
106 changes: 106 additions & 0 deletions core/scripts/gateway/workflow/upload_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a script to upload workflow specs as this is not something we're implementing right now. If you want to have a test script then modify it to send valid Web Trigger messages. Otherwise you can remove it from this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, again my thinking was that if the trigger needed updating then that would affect the script.


import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"

"github.com/ethereum/go-ethereum/crypto"
"github.com/joho/godotenv"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
)

func main() {
gatewayURL := flag.String("gateway_url", "http://localhost:5002", "Gateway URL")
privateKey := flag.String("private_key", "65456ffb8af4a2b93959256a8e04f6f2fe0943579fb3c9c3350593aabb89023f", "Private key to sign the message with")
messageID := flag.String("message_id", "12345", "Request ID")
methodName := flag.String("method", "add_workflow", "Method name")
donID := flag.String("don_id", "workflow_don_1", "DON ID")
workflowSpec := flag.String("workflow_spec", "[my spec abcd]", "Workflow Spec")
flag.Parse()

if privateKey == nil || *privateKey == "" {
if err := godotenv.Load(); err != nil {
panic(err)
}

privateKeyEnvVar := os.Getenv("PRIVATE_KEY")
privateKey = &privateKeyEnvVar
fmt.Println("Loaded private key from .env")
}

// validate key and extract address
key, err := crypto.HexToECDSA(*privateKey)
if err != nil {
fmt.Println("error parsing private key", err)
return
}
//address := crypto.PubkeyToAddress(key.PublicKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray commented out code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a copy from the template so I left as is.


payloadJSON := []byte("{\"spec\": \"" + *workflowSpec + "\"}")

msg := &api.Message{
Body: api.MessageBody{
MessageId: *messageID,
Method: *methodName,
DonId: *donID,
Payload: json.RawMessage(payloadJSON),
},
}

if err = msg.Sign(key); err != nil {
fmt.Println("error signing message", err)
return
}
codec := api.JsonRPCCodec{}
rawMsg, err := codec.EncodeRequest(msg)
if err != nil {
fmt.Println("error JSON-RPC encoding", err)
return
}

createRequest := func() (req *http.Request, err error) {
req, err = http.NewRequestWithContext(context.Background(), "POST", *gatewayURL, bytes.NewBuffer(rawMsg))
if err == nil {
req.Header.Set("Content-Type", "application/json")
}
return
}

client := &http.Client{}

sendRequest := func() {
req, err2 := createRequest()
if err2 != nil {
fmt.Println("error creating a request", err2)
return
}

resp, err2 := client.Do(req)
if err2 != nil {
fmt.Println("error sending a request", err2)
return
}
defer resp.Body.Close()

body, err2 := io.ReadAll(resp.Body)
if err2 != nil {
fmt.Println("error sending a request", err2)
return
}

var prettyJSON bytes.Buffer
if err2 = json.Indent(&prettyJSON, body, "", " "); err2 != nil {
fmt.Println(string(body))
} else {
fmt.Println(prettyJSON.String())
}
}
sendRequest()
}
4 changes: 4 additions & 0 deletions core/services/gateway/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/workflow"
)

const (
FunctionsHandlerType HandlerType = "functions"
DummyHandlerType HandlerType = "dummy"
workflowHandlerType HandlerType = "workflow"
)

type handlerFactory struct {
Expand All @@ -33,6 +35,8 @@ func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json
switch handlerType {
case FunctionsHandlerType:
return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.ds, hf.lggr)
case workflowHandlerType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinhoonbang is already adding it in his PR, maybe wait for that to be merged and then rebase.

return workflow.NewWorkflowHandler(donConfig, don, hf.lggr)
case DummyHandlerType:
return handlers.NewDummyHandler(donConfig, don, hf.lggr)
default:
Expand Down
91 changes: 91 additions & 0 deletions core/services/gateway/handlers/workflow/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package workflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinhoonbang is already adding this handler in his PR. Best to wait on that and then only extend with the Trigger method name (can be in a separate PR since it's technically part of CAPPL-21).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll proceed with this in mine so that I'm not blocked if that's ok.


import (
"context"
"fmt"
"sync"

"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
)

const (
MethodAddWorkflow = "add_workflow"
// NOTE: more methods will go here: CRUD for workflow specs; HTTP trigger/action/target; etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a TODO than a NOTE probably

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO/NOTE is not correct anyway (I left that comment on Jin's PR too). We will not reuse this handler for workflow spec CRUDs.

)

type workflowHandler struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have a little description here of what a workflow handler is

donConfig *config.DONConfig
don handlers.DON
savedCallbacks map[string]*savedCallback
mu sync.Mutex
lggr logger.Logger
}

type savedCallback struct {
id string
callbackCh chan<- handlers.UserCallbackPayload
}

var _ handlers.Handler = (*workflowHandler)(nil)

func NewWorkflowHandler(donConfig *config.DONConfig, don handlers.DON, lggr logger.Logger) (*workflowHandler, error) {
lggr.Debugw("-------HNewWorkflowHandler")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray debug log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have debug logs until final review?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can, I'm just calling it out in case it had gotten forgotten. Seemed like you removed some others.


return &workflowHandler{
donConfig: donConfig,
don: don,
savedCallbacks: make(map[string]*savedCallback),
lggr: lggr.Named("WorkflowHandler." + donConfig.DonId),
}, nil
}

func (d *workflowHandler) HandleUserMessage(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error {
d.lggr.Debugw("-------HandleUserMessage", "method", msg.Body.Method)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray debug log


d.mu.Lock()
d.savedCallbacks[msg.Body.MessageId] = &savedCallback{msg.Body.MessageId, callbackCh}
don := d.don
d.mu.Unlock()

// TODO: apply allowlist and rate-limiting here.
if msg.Body.Method != MethodAddWorkflow {
d.lggr.Errorw("unsupported method", "method", msg.Body.Method)
return fmt.Errorf("unsupported method")
}

var err error
// Send to all nodes.
for _, member := range d.donConfig.Members {
err = multierr.Combine(err, don.SendToNode(ctx, member.Address, msg))
}
return err
}

func (d *workflowHandler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error {
d.mu.Lock()
savedCb, found := d.savedCallbacks[msg.Body.MessageId]
delete(d.savedCallbacks, msg.Body.MessageId)
d.mu.Unlock()

if found {
// Send first response from a node back to the user, ignore any other ones.
// TODO: in practice, we should wait for at least 2F+1 nodes to respond and then return an aggregated response
// back to the user.
savedCb.callbackCh <- handlers.UserCallbackPayload{Msg: msg, ErrCode: api.NoError, ErrMsg: ""}
close(savedCb.callbackCh)
}
return nil
}

func (d *workflowHandler) Start(context.Context) error {
return nil
}

func (d *workflowHandler) Close() error {
return nil
}
Loading