Skip to content

Commit

Permalink
web api trigger connector handler
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidOrchard committed Sep 17, 2024
1 parent 37c5a2f commit 7c7a231
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 0 deletions.
109 changes: 109 additions & 0 deletions core/capabilities/webapi/trigger.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,127 @@
package webapi

import (
"context"
"fmt"
"sync"

ethCommon "github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/workflow"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

const defaultSendChannelBufferSize = 1000

type Response struct {
Success bool `json:"success"`
ErrorMessage string `json:"error_message,omitempty"`
}

type workflowConnectorHandler struct {
services.StateMachine

capabilities.CapabilityInfo
connector connector.GatewayConnector
lggr logger.Logger
mu sync.Mutex
triggers map[string]chan capabilities.TriggerResponse
}

var _ capabilities.TriggerCapability = (*workflowConnectorHandler)(nil)
var _ services.Service = &workflowConnectorHandler{}

func NewTrigger(config string, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, lggr logger.Logger) (job.ServiceCtx, error) {
// TODO (CAPPL-22, CAPPL-24):
// - decode config
// - create an implementation of the capability API and add it to the Registry
// - create a handler and register it with Gateway Connector
// - manage trigger subscriptions
// - process incoming trigger events and related metadata

handler := &workflowConnectorHandler{
connector: connector,
lggr: lggr.Named("WorkflowConnectorHandler"),
}

// is this the right way to register with gateway connector? Cron trigger doesn't do this.
err := connector.AddHandler([]string{"add_workflow"}, handler)

return handler, err
}

func (h *workflowConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
fromAddr := ethCommon.HexToAddress(body.Sender)
// TODO: apply allowlist and rate-limiting
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "address", fromAddr)

switch body.Method {
case workflow.MethodAddWorkflow:
// TODO: add a new workflow spec and return success/failure
// we need access to Job ORM or whatever CLO uses to fully launch a new spec
h.lggr.Debugw("added workflow spec", "payload", string(body.Payload))
// TODO how does the capability respond to the gateway connector
// response := Response{Success: true}
// h.sendResponse(ctx, gatewayId, body, response)
default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
}
}

// Register a new trigger
// Can register triggers before the service is actively scheduling
func (h *workflowConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
// There's no config to use and validate
h.mu.Lock()
defer h.mu.Unlock()
_, ok := h.triggers[req.TriggerID]
if ok {
return nil, fmt.Errorf("triggerId %s already registered", req.TriggerID)
}

callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)
h.triggers[req.TriggerID] = callbackCh

return nil, nil
}

func (h *workflowConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
trigger := h.triggers[req.TriggerID]

// Close callback channel
close(trigger)
// Remove from triggers context
delete(h.triggers, req.TriggerID)
return nil
}

func (h *workflowConnectorHandler) Start(ctx context.Context) error {
// how does the
return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
return nil
})
}
func (h *workflowConnectorHandler) Close() error {
return nil
}

func (h *workflowConnectorHandler) Ready() error {
return nil
}

func (h *workflowConnectorHandler) HealthReport() map[string]error {
return map[string]error{h.Name(): nil}
}

func (h *workflowConnectorHandler) Name() string {
return "WebAPITrigger"
}
106 changes: 106 additions & 0 deletions core/scripts/gateway/workflow/upload_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"

"github.com/ethereum/go-ethereum/crypto"
"github.com/joho/godotenv"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
)

func main() {
gatewayURL := flag.String("gateway_url", "http://localhost:5002", "Gateway URL")
privateKey := flag.String("private_key", "65456ffb8af4a2b93959256a8e04f6f2fe0943579fb3c9c3350593aabb89023f", "Private key to sign the message with")
messageID := flag.String("message_id", "12345", "Request ID")
methodName := flag.String("method", "add_workflow", "Method name")
donID := flag.String("don_id", "workflow_don_1", "DON ID")
workflowSpec := flag.String("workflow_spec", "[my spec abcd]", "Workflow Spec")
flag.Parse()

if privateKey == nil || *privateKey == "" {
if err := godotenv.Load(); err != nil {
panic(err)
}

privateKeyEnvVar := os.Getenv("PRIVATE_KEY")
privateKey = &privateKeyEnvVar
fmt.Println("Loaded private key from .env")
}

// validate key and extract address
key, err := crypto.HexToECDSA(*privateKey)
if err != nil {
fmt.Println("error parsing private key", err)
return
}
//address := crypto.PubkeyToAddress(key.PublicKey)

payloadJSON := []byte("{\"spec\": \"" + *workflowSpec + "\"}")

msg := &api.Message{
Body: api.MessageBody{
MessageId: *messageID,
Method: *methodName,
DonId: *donID,
Payload: json.RawMessage(payloadJSON),
},
}

if err = msg.Sign(key); err != nil {
fmt.Println("error signing message", err)
return
}
codec := api.JsonRPCCodec{}
rawMsg, err := codec.EncodeRequest(msg)
if err != nil {
fmt.Println("error JSON-RPC encoding", err)
return
}

createRequest := func() (req *http.Request, err error) {
req, err = http.NewRequestWithContext(context.Background(), "POST", *gatewayURL, bytes.NewBuffer(rawMsg))
if err == nil {
req.Header.Set("Content-Type", "application/json")
}
return
}

client := &http.Client{}

sendRequest := func() {
req, err2 := createRequest()
if err2 != nil {
fmt.Println("error creating a request", err2)
return
}

resp, err2 := client.Do(req)
if err2 != nil {
fmt.Println("error sending a request", err2)
return
}
defer resp.Body.Close()

body, err2 := io.ReadAll(resp.Body)
if err2 != nil {
fmt.Println("error sending a request", err2)
return
}

var prettyJSON bytes.Buffer
if err2 = json.Indent(&prettyJSON, body, "", " "); err2 != nil {
fmt.Println(string(body))
} else {
fmt.Println(prettyJSON.String())
}
}
sendRequest()
}
4 changes: 4 additions & 0 deletions core/services/gateway/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/workflow"
)

const (
FunctionsHandlerType HandlerType = "functions"
DummyHandlerType HandlerType = "dummy"
workflowHandlerType HandlerType = "workflow"
)

type handlerFactory struct {
Expand All @@ -33,6 +35,8 @@ func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json
switch handlerType {
case FunctionsHandlerType:
return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.ds, hf.lggr)
case workflowHandlerType:
return workflow.NewWorkflowHandler(donConfig, don, hf.lggr)
case DummyHandlerType:
return handlers.NewDummyHandler(donConfig, don, hf.lggr)
default:
Expand Down
91 changes: 91 additions & 0 deletions core/services/gateway/handlers/workflow/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package workflow

import (
"context"
"fmt"
"sync"

"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/config"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers"
)

const (
MethodAddWorkflow = "add_workflow"
// NOTE: more methods will go here: CRUD for workflow specs; HTTP trigger/action/target; etc.
)

type workflowHandler struct {
donConfig *config.DONConfig
don handlers.DON
savedCallbacks map[string]*savedCallback
mu sync.Mutex
lggr logger.Logger
}

type savedCallback struct {
id string
callbackCh chan<- handlers.UserCallbackPayload
}

var _ handlers.Handler = (*workflowHandler)(nil)

func NewWorkflowHandler(donConfig *config.DONConfig, don handlers.DON, lggr logger.Logger) (*workflowHandler, error) {
lggr.Debugw("-------HNewWorkflowHandler")

return &workflowHandler{
donConfig: donConfig,
don: don,
savedCallbacks: make(map[string]*savedCallback),
lggr: lggr.Named("WorkflowHandler." + donConfig.DonId),
}, nil
}

func (d *workflowHandler) HandleUserMessage(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error {
d.lggr.Debugw("-------HandleUserMessage", "method", msg.Body.Method)

d.mu.Lock()
d.savedCallbacks[msg.Body.MessageId] = &savedCallback{msg.Body.MessageId, callbackCh}
don := d.don
d.mu.Unlock()

// TODO: apply allowlist and rate-limiting here.
if msg.Body.Method != MethodAddWorkflow {
d.lggr.Errorw("unsupported method", "method", msg.Body.Method)
return fmt.Errorf("unsupported method")
}

var err error
// Send to all nodes.
for _, member := range d.donConfig.Members {
err = multierr.Combine(err, don.SendToNode(ctx, member.Address, msg))
}
return err
}

func (d *workflowHandler) HandleNodeMessage(ctx context.Context, msg *api.Message, nodeAddr string) error {
d.mu.Lock()
savedCb, found := d.savedCallbacks[msg.Body.MessageId]
delete(d.savedCallbacks, msg.Body.MessageId)
d.mu.Unlock()

if found {
// Send first response from a node back to the user, ignore any other ones.
// TODO: in practice, we should wait for at least 2F+1 nodes to respond and then return an aggregated response
// back to the user.
savedCb.callbackCh <- handlers.UserCallbackPayload{Msg: msg, ErrCode: api.NoError, ErrMsg: ""}
close(savedCb.callbackCh)
}
return nil
}

func (d *workflowHandler) Start(context.Context) error {
return nil
}

func (d *workflowHandler) Close() error {
return nil
}

0 comments on commit 7c7a231

Please sign in to comment.