Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

web api trigger connector handler #14459

Closed
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/capabilities/gateway_connector/service_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,8 @@ func (e *ServiceWrapper) Name() string {
func (e *ServiceWrapper) GetGatewayConnector() connector.GatewayConnector {
return e.connector
}

// TODO: Remove once Jin's PR gets in.
func (e *ServiceWrapper) GetSignerKey() *ecdsa.PrivateKey {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove it (see other comment). Also in the future please be wary of passing private keys around. It's easy to accidentally log it and it will also cause more problems when Keystore is hardened. Gateway wraps it in a Signer interface so let's use that if it's ever needed.

return e.signerKey
}
255 changes: 252 additions & 3 deletions core/capabilities/webapi/trigger.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,267 @@
package webapi

import (
"context"
"crypto/ecdsa"
"encoding/json"
"fmt"
"sync"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/workflow"
)

func NewTrigger(config string, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, lggr logger.Logger) (job.ServiceCtx, error) {
const defaultSendChannelBufferSize = 1000

const triggerType = "[email protected]"

type TriggerConfig struct {
AllowedSenders []ethCommon.Address `toml:"allowedSenders"`
Allowedtopics []string `toml:"allowedTopics"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this become a map also? map from triggerID to topics

RateLimiter common.RateLimiterConfig `toml:"rateLimiter"`
RequiredParams []string `toml:"requiredParams"`
}

type Response struct {
Success bool `json:"success"`
ErrorMessage string `json:"error_message,omitempty"`
Status string `json:"ACCEPTED"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need both Success and Status? do they both convey request status to gateway?

}

// Handles connections to the webapi trigger
type triggerConnectorHandler struct {
services.StateMachine

capabilities.CapabilityInfo
config TriggerConfig
connector connector.GatewayConnector
lggr logger.Logger
mu sync.Mutex
// Will this have to get pulled into a store to have the topic and workflow ID?
registeredWorkflows map[string]chan capabilities.TriggerResponse
allowedSendersMap map[string]bool
signerKey *ecdsa.PrivateKey
rateLimiter *common.RateLimiter
}

var _ capabilities.TriggerCapability = (*triggerConnectorHandler)(nil)
var _ services.Service = &triggerConnectorHandler{}

// TODO: From Design doc,
// Once connected to a Gateway, each connector handler periodically sends metadata messages containing aggregated
// config for all registered workflow specs using web-trigger.

func NewTrigger(config TriggerConfig, registry core.CapabilitiesRegistry, connector connector.GatewayConnector, signerKey *ecdsa.PrivateKey, lggr logger.Logger) (*triggerConnectorHandler, error) {
// TODO (CAPPL-22, CAPPL-24):
// - decode config
// - create an implementation of the capability API and add it to the Registry
// - create a handler and register it with Gateway Connector
// - manage trigger subscriptions
// - process incoming trigger events and related metadata
return nil, nil

rateLimiter, err := common.NewRateLimiter(config.RateLimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return nil, err
}
allowedSendersMap := map[string]bool{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe senders should be defined at workflow level. Should we create a map from triggerID to list of senders and populate that map using TriggerRegistrationRequest.Config in RegisterTrigger()?

Copy link
Contributor Author

@DavidOrchard DavidOrchard Sep 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the workflow configs get passed in at the "newTrigger" call. I see now that there's a config in the uploaded workflow spec that is the config in here. thanks. I'll move it there.

for _, k := range config.AllowedSenders {
allowedSendersMap[k.String()] = true
}

handler := &triggerConnectorHandler{
allowedSendersMap: allowedSendersMap,
config: config,
connector: connector,
signerKey: signerKey,
rateLimiter: rateLimiter,
lggr: lggr.Named("WorkflowConnectorHandler"),
}

return handler, nil
}

// https://gateway-us-1.chain.link/web-trigger
// {
// jsonrpc: "2.0",
// id: "...",
// method: "web-trigger",
// params: {
// signature: "...",
// body: {
// don_id: "workflow_123",
// payload: {
// trigger_id: "[email protected]",
// trigger_event_id: "action_1234567890",
// timestamp: 1234567890,
// topics: ["daily_price_update"],
// params: {
// bid: "101",
// ask: "102"
// }
// }
// }
// }
// }

// from Web API Trigger Doc
// trigger_id - ID of the trigger corresponding to the capability ID
// trigger_event_id - uniquely identifies generated event (scoped to trigger_id and sender)
// timestamp - timestamp of the event (unix time), needs to be within certain freshness to be processed
// topics - [OPTIONAL] list of topics (strings) to be started by this event (affects all topics if empty)
// workflow_owners - [OPTIONAL] list of workflow owners allowed to receive this event (affects all workflows if empty)
// params - key-value pairs that will be used as trigger output in the workflow Engine (translated to values.Map)

type TriggerRequestPayload struct {
TriggerId string `json:"trigger_id"`

Check failure on line 125 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: struct field TriggerId should be TriggerID (revive)
TriggerEventId string `json:"trigger_event_id"`

Check failure on line 126 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: struct field TriggerEventId should be TriggerEventID (revive)
Timestamp int64 `json:"timestamp"`
Topics []string `json:"topics"`
Params values.Map `json:"params"`
}

func (h *triggerConnectorHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, msg *api.Message) {
body := &msg.Body
sender := ethCommon.HexToAddress(body.Sender)
if !h.rateLimiter.Allow(body.Sender) {
h.lggr.Errorw("request rate-limited")
return
}
if !h.allowedSendersMap[sender.String()] {
h.lggr.Errorw("Unauthorized Sender")
return
}
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "sender", sender)
var payload TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
return
}

switch body.Method {
case workflow.MethodWebAPITrigger:
h.lggr.Debugw("added MethodWebAPITrigger message", "payload", string(body.Payload))
// TODO: Is the staleness check supposed to be in the gateway?
currentTime := time.Now()
// TODO: check against h.config.MaxAllowedMessageAgeSec
if currentTime.Unix()-3000 > payload.Timestamp {
// TODO: fix up with error handling update in design doc
response := Response{Success: false}
h.sendResponse(ctx, gatewayID, body, response)

Check failure on line 159 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `h.sendResponse` is not checked (errcheck)
}

// is this right?
// Sri did wrappedPayload, err := values.WrapMap(log.Data), does that work in this case?
wrappedPayload, _ := values.WrapMap(payload)

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The payload that is received here is the payload with

 payload: {
          trigger_id: "[email protected]",
          trigger_event_id: "action_1234567890",
          timestamp: 1234567890,
          topics: ["daily_price_update"],
          params: {
            bid: "101",
            ask: "102"
          }
        }

so isn't this correct?

I've added a comment asking about the conversion to a different format for the executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked with JT, he agrees this is likely sufficient, at least for now.


for _, trigger := range h.registeredWorkflows {

Check failure on line 166 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

// TODO: CAPPL-24 extract the topic and then match the subscriber to this messages triggers.
// TODO: CAPPL-24 check the topic to see if the method is a duplicate and the trigger has been sent, ie PENDING
// TODO: Question asked in Web API trigger about checking for completed Triggers to return COMPLETED
// "TriggerEventID used internally by the Engine is a pair (sender, trigger_event_id).
// This is to protect against a case where two different authorized senders use the same event ID in their messages.

// TODO: how do we know PENDING state, that is node received the event but processing hasn't finished.
TriggerEventID := body.Sender + payload.TriggerEventId
tr := capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
TriggerType: triggerType,
ID: TriggerEventID,
Outputs: wrappedPayload, // must be *values.Map
},
}
trigger <- tr
}

// TODO: ACCEPTED, PENDING, COMPLETED
response := Response{Success: true, Status: "ACCEPTED"}
h.sendResponse(ctx, gatewayID, body, response)

Check failure on line 188 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `h.sendResponse` is not checked (errcheck)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to make this non-blocking? @bolekk had similar comment here: #14448 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where your PR does the non-blocking call any more, I can't find the HTTP write. I see a send to a channel but no more http client. Can you link to it so I can see how you made it non blocking pls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked on your revised PR where the code is to making the http request non-blocking as I don't see it any more.

default:
h.lggr.Errorw("unsupported method", "id", gatewayID, "method", body.Method)
}
}

func (h *triggerConnectorHandler) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
h.mu.Lock()
defer h.mu.Unlock()
_, ok := h.registeredWorkflows[req.TriggerID]
if ok {
return nil, fmt.Errorf("triggerId %s already registered", req.TriggerID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have idempotent registrations and return nil here? can an instance of workflow engine call register more than once during its lifecycle? cc @cedric-cordenier

}

callbackCh := make(chan capabilities.TriggerResponse, defaultSendChannelBufferSize)
// TODO: CAPPL-24 how do we extract the topic and then define the trigger by that?
// It's not TriggerID because TriggerID is concat of workflow ID and the trigger's index in the spec (what does that mean?)
// I'm not sure if the workflow config comes in via the req.Config

h.registeredWorkflows[req.TriggerID] = callbackCh

return callbackCh, nil
}

func (h *triggerConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error {
h.mu.Lock()
defer h.mu.Unlock()
trigger, ok := h.registeredWorkflows[req.TriggerID]
if ok {
return fmt.Errorf("triggerId %s not registered", req.TriggerID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar question as above. maybe it's safer to return nil here but I would need to understand wf engine lifecycle first.

}

// Close callback channel
close(trigger)
// Remove from triggers context
delete(h.registeredWorkflows, req.TriggerID)
return nil
}

func (h *triggerConnectorHandler) Start(ctx context.Context) error {
return h.StartOnce("GatewayConnectorServiceWrapper", func() error {
return h.connector.AddHandler([]string{"web_trigger"}, h)
})
}
func (h *triggerConnectorHandler) Close() error {
return h.StopOnce("GatewayConnectorServiceWrapper", func() error {
return nil
})
}

func (h *triggerConnectorHandler) HealthReport() map[string]error {
return map[string]error{h.Name(): h.Healthy()}
}

func (h *triggerConnectorHandler) Name() string {
return "WebAPITrigger"
}

func (h *triggerConnectorHandler) sendResponse(ctx context.Context, gatewayID string, requestBody *api.MessageBody, payload any) error {
payloadJson, err := json.Marshal(payload)

Check failure on line 247 in core/capabilities/webapi/trigger.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: var payloadJson should be payloadJSON (revive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint error

if err != nil {
return err
}

msg := &api.Message{
Body: api.MessageBody{
MessageId: requestBody.MessageId,
DonId: requestBody.DonId,
Method: requestBody.Method,
Receiver: requestBody.Sender,
Payload: payloadJson,
},
}

// TODO remove this and signerKey once Jin's PR is in.
if err = msg.Sign(h.signerKey); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jin is going to modify Connector to sign automatically. You can remove this (and also don't pass the private key at all).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my changes, as of now, only introduces a new method that sends to first available gateway. In this case, the caller already knows the sender. I will modify my changes to account for this use case.

return err
}
return h.connector.SendToGateway(ctx, gatewayID, msg)
}
Loading
Loading