Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement HTTP target capability and connector handler #14491

Merged
merged 12 commits into from
Sep 26, 2024
5 changes: 5 additions & 0 deletions .changeset/six-frogs-juggle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added HTTP target capability and gateway connector handler
168 changes: 168 additions & 0 deletions core/capabilities/webapi/target/capability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package target
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/validation"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities"
)

const ID = "[email protected]"

var _ capabilities.TargetCapability = &Capability{}

var capabilityInfo = capabilities.MustNewCapabilityInfo(
ID,
capabilities.CapabilityTypeTarget,
"A target that sends HTTP requests to external clients via the Chainlink Gateway.",
)

// Capability is a target capability that sends HTTP requests to external clients via the Chainlink Gateway.
type Capability struct {
capabilityInfo capabilities.CapabilityInfo
connectorHandler *ConnectorHandler
lggr logger.Logger
registry core.CapabilitiesRegistry
config Config
registeredWorkflows map[string]struct{}
registeredWorkflowsMu sync.Mutex
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
}

func NewCapability(config Config, registry core.CapabilitiesRegistry, connectorHandler *ConnectorHandler, lggr logger.Logger) (*Capability, error) {
return &Capability{
capabilityInfo: capabilityInfo,
config: config,
registry: registry,
connectorHandler: connectorHandler,
registeredWorkflows: make(map[string]struct{}),
registeredWorkflowsMu: sync.Mutex{},
lggr: lggr,
}, nil
}

func (c *Capability) Start(ctx context.Context) error {
return c.registry.Add(ctx, c)
}

func (c *Capability) Close() error {
return nil
}

func (c *Capability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilityInfo, nil
}

func getMessageID(req capabilities.CapabilityRequest) (string, error) {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil {
return "", fmt.Errorf("workflow ID is invalid: %w", err)
}
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowExecutionID); err != nil {
return "", fmt.Errorf("workflow execution ID is invalid: %w", err)
}
messageID := []string{
req.Metadata.WorkflowID,
req.Metadata.WorkflowExecutionID,
webcapabilities.MethodWebAPITarget,
}
return strings.Join(messageID, "/"), nil
}

func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
c.lggr.Debugw("executing http target", "capabilityRequest", req)

var input Input
err := req.Inputs.UnwrapTo(&input)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

var workflowCfg WorkflowConfig
err = req.Config.UnwrapTo(&workflowCfg)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

messageID, err := getMessageID(req)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

c.registeredWorkflowsMu.Lock()
defer c.registeredWorkflowsMu.Unlock()
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := c.registeredWorkflows[req.Metadata.WorkflowID]; !ok {
return capabilities.CapabilityResponse{}, fmt.Errorf("workflow is not registered: %v", req.Metadata.WorkflowID)
}

payload := webcapabilities.TargetRequestPayload{
URL: input.URL,
Method: input.Method,
Headers: input.Headers,
Body: []byte(input.Body),
TimeoutMs: workflowCfg.TimeoutMs,
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

switch workflowCfg.Schedule {
case SingleNode:
// blocking call to handle single node request. waits for response from gateway
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
c.lggr.Debugw("received gateway response", "resp", resp)
var payload webcapabilities.TargetResponsePayload
err = json.Unmarshal(resp.Body.Payload, &payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

// TODO: check target response format and fields
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
values, err := values.NewMap(map[string]any{
"statusCode": payload.StatusCode,
"headers": payload.Headers,
"body": string(payload.Body),
})
if err != nil {
return capabilities.CapabilityResponse{}, err
}
return capabilities.CapabilityResponse{
Value: values,
}, nil
default:
return capabilities.CapabilityResponse{}, fmt.Errorf("unsupported schedule: %v", workflowCfg.Schedule)
}
}

func (c *Capability) RegisterToWorkflow(ctx context.Context, req capabilities.RegisterToWorkflowRequest) error {
if err := validation.ValidateWorkflowOrExecutionID(req.Metadata.WorkflowID); err != nil {
return fmt.Errorf("workflow ID is invalid: %w", err)
}
c.registeredWorkflowsMu.Lock()
defer c.registeredWorkflowsMu.Unlock()
c.registeredWorkflows[req.Metadata.WorkflowID] = struct{}{}
return nil
}

func (c *Capability) UnregisterFromWorkflow(ctx context.Context, req capabilities.UnregisterFromWorkflowRequest) error {
// if workflow is not found for some reason, just log a warning
c.registeredWorkflowsMu.Lock()
defer c.registeredWorkflowsMu.Unlock()
if _, ok := c.registeredWorkflows[req.Metadata.WorkflowID]; !ok {
c.lggr.Warnw("workflow not found", "workflowID", req.Metadata.WorkflowID)
} else {
delete(c.registeredWorkflows, req.Metadata.WorkflowID)
}
return nil
}
115 changes: 115 additions & 0 deletions core/capabilities/webapi/target/connector_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package target

import (
"context"
"encoding/json"
"sort"
"sync"

"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/webcapabilities"
)

var _ connector.GatewayConnectorHandler = &ConnectorHandler{}

type ConnectorHandler struct {
gc connector.GatewayConnector
lggr logger.Logger
responseChs map[string]chan *api.Message
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
responseChsMu sync.Mutex
rateLimiter *common.RateLimiter
}

func NewConnectorHandler(gc connector.GatewayConnector, config Config, lgger logger.Logger) (*ConnectorHandler, error) {
rateLimiter, err := common.NewRateLimiter(config.RateLimiter)
if err != nil {
return nil, err
}
responseChs := make(map[string]chan *api.Message)
return &ConnectorHandler{
gc: gc,
responseChs: responseChs,
responseChsMu: sync.Mutex{},
rateLimiter: rateLimiter,
lggr: lgger,
}, nil
}

// HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received
// TODO: handle retries and timeouts
func (c *ConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, payload []byte) (*api.Message, error) {
ch := make(chan *api.Message, 1)
c.responseChsMu.Lock()
c.responseChs[messageID] = ch
c.responseChsMu.Unlock()
l := logger.With(c.lggr, "messageID", messageID)
l.Debugw("sending request to gateway")

body := &api.MessageBody{
MessageId: messageID,
DonId: c.gc.DonID(),
Method: webcapabilities.MethodWebAPITarget,
Payload: payload,
}

// simply, send request to first available gateway node from sorted list
// this allows for deterministic selection of gateay node receiver for easier debugging
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
gatewayIDs := c.gc.GatewayIDs()
sort.Strings(gatewayIDs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinhoonbang 🤔 Doesn't this also mean we'll always go to the same gateway assuming the same list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I had a discussion with @bolekk on this and decided to keep it simple here. Random gateway nodes will be harder to debug so reusing the same gateway node is preferred


err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}

select {
case resp := <-ch:
MStreet3 marked this conversation as resolved.
Show resolved Hide resolved
return resp, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (c *ConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
l := logger.With(c.lggr, "gatewayID", gatewayID, "method", body.Method, "messageID", msg.Body.MessageId)
if !c.rateLimiter.Allow(body.Sender) {
// error is logged here instead of warning because if a message from gateway is rate-limited,
// the workflow will eventually fail with timeout as there are no retries in place yet
c.lggr.Errorw("request rate-limited")
return
}
l.Debugw("handling gateway request")
switch body.Method {
case webcapabilities.MethodWebAPITarget:
var payload webcapabilities.TargetResponsePayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
l.Errorw("failed to unmarshal payload", "err", err)
return
}
c.responseChsMu.Lock()
defer c.responseChsMu.Unlock()
ch, ok := c.responseChs[body.MessageId]
if !ok {
l.Errorw("no response channel found")
return
}
ch <- msg
jinhoonbang marked this conversation as resolved.
Show resolved Hide resolved
default:
l.Errorw("unsupported method")
}
}

func (c *ConnectorHandler) Start(ctx context.Context) error {
return c.gc.AddHandler([]string{webcapabilities.MethodWebAPITarget}, c)
}

func (c *ConnectorHandler) Close() error {
return nil
}
Loading
Loading