-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
web api trigger connector handler #14459
base: develop
Are you sure you want to change the base?
web api trigger connector handler #14459
Conversation
7c7a231
to
f4c79c3
Compare
f4c79c3
to
9a0ab6a
Compare
// NOTE: more methods will go here: CRUD for workflow specs; HTTP trigger/action/target; etc. | ||
) | ||
|
||
type workflowHandler struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to have a little description here of what a workflow handler is
var _ handlers.Handler = (*workflowHandler)(nil) | ||
|
||
func NewWorkflowHandler(donConfig *config.DONConfig, don handlers.DON, lggr logger.Logger) (*workflowHandler, error) { | ||
lggr.Debugw("-------HNewWorkflowHandler") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray debug log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't have debug logs until final review?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can, I'm just calling it out in case it had gotten forgotten. Seemed like you removed some others.
} | ||
|
||
func (d *workflowHandler) HandleUserMessage(ctx context.Context, msg *api.Message, callbackCh chan<- handlers.UserCallbackPayload) error { | ||
d.lggr.Debugw("-------HandleUserMessage", "method", msg.Body.Method) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray debug log
core/capabilities/webapi/trigger.go
Outdated
return nil | ||
}) | ||
} | ||
func (h *workflowConnectorHandler) Close() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are using services.StateMachine StartOnce, you should use StopOnce here
core/capabilities/webapi/trigger.go
Outdated
return nil | ||
} | ||
|
||
func (h *workflowConnectorHandler) Ready() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let services.StateMachine handle this
core/capabilities/webapi/trigger.go
Outdated
} | ||
|
||
func (h *workflowConnectorHandler) HealthReport() map[string]error { | ||
return map[string]error{h.Name(): nil} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return map[string]error{h.Name(): nil} | |
return map[string]error{h.Name(): h.Healthy()} |
} | ||
|
||
func (h *workflowConnectorHandler) sendResponse(ctx context.Context, gatewayID string, requestBody *api.MessageBody, payload any) error { | ||
payloadJson, err := json.Marshal(payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lint error
core/capabilities/webapi/trigger.go
Outdated
lggr: lggr.Named("WorkflowConnectorHandler"), | ||
} | ||
|
||
// is this the right way to register with gateway connector? Cron trigger doesn't do this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cron trigger doesn't have connectors/handlers.
What I'd consider here is should this be in .Start
instead? Trace through where constructor gets called versus where .Start
gets called - is the connector ready for handlers at that point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, better to add in Start().
core/capabilities/webapi/trigger.go
Outdated
} | ||
|
||
func (h *workflowConnectorHandler) Start(ctx context.Context) error { | ||
// how does the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray comment
fmt.Println("error parsing private key", err) | ||
return | ||
} | ||
//address := crypto.PubkeyToAddress(key.PublicKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray commented out code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a copy from the template so I left as is.
|
||
const ( | ||
MethodAddWorkflow = "add_workflow" | ||
// NOTE: more methods will go here: CRUD for workflow specs; HTTP trigger/action/target; etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More of a TODO than a NOTE probably
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO/NOTE is not correct anyway (I left that comment on Jin's PR too). We will not reuse this handler for workflow spec CRUDs.
core/capabilities/webapi/trigger.go
Outdated
ErrorMessage string `json:"error_message,omitempty"` | ||
} | ||
|
||
type workflowConnectorHandler struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A short description of what a connector handler is would be nice to have here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call it ConnectorHandler
or TriggerConnectorHandler
. Not sure if workflow adds useful meaning here. Similar changes were recommended in target PR: #14448
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's not use "workflow" in web capability names. I used it in my example of a handler for spec uploads, which is not relevant here.
core/capabilities/webapi/trigger.go
Outdated
}, | ||
} | ||
|
||
// How do we get the signerKey from the connector? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not from the connector, it comes from the node's keystore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still going through it. Left some comments for the trigger.go
core/capabilities/webapi/trigger.go
Outdated
ErrorMessage string `json:"error_message,omitempty"` | ||
} | ||
|
||
type workflowConnectorHandler struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call it ConnectorHandler
or TriggerConnectorHandler
. Not sure if workflow adds useful meaning here. Similar changes were recommended in target PR: #14448
core/capabilities/webapi/trigger.go
Outdated
} | ||
|
||
// is this the right way to register with gateway connector? Cron trigger doesn't do this. | ||
err := connector.AddHandler([]string{"add_workflow"}, handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe "web_api_trigger" is the suggested method name in the design doc. "add_workflow` was just an example for an unrelated feature in the skeleton
core/capabilities/webapi/trigger.go
Outdated
body := &msg.Body | ||
fromAddr := ethCommon.HexToAddress(body.Sender) | ||
// TODO: apply allowlist and rate-limiting | ||
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "address", fromAddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "address", fromAddr) | |
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "sender", fromAddr) |
core/capabilities/webapi/trigger.go
Outdated
fromAddr := ethCommon.HexToAddress(body.Sender) | ||
// TODO: apply allowlist and rate-limiting | ||
h.lggr.Debugw("handling gateway request", "id", gatewayID, "method", body.Method, "address", fromAddr) | ||
// ERR: payload is error, how to unmarshall? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can define a struct
type TriggerRequestPayload struct {
// add fields here
}
then, pass a pointer to that struct as argument
var payload TriggerRequestPayload
err := json.Unmarshal(body.Payload, &payload)
if err != nil {
return err
}
core/capabilities/webapi/trigger.go
Outdated
triggerID := payload.trigger_id | ||
|
||
switch body.Method { | ||
case workflow.MethodAddWorkflow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
core/capabilities/webapi/trigger.go
Outdated
// we need access to Job ORM or whatever CLO uses to fully launch a new spec | ||
h.lggr.Debugw("added workflow spec", "payload", string(body.Payload)) | ||
|
||
// Question: should this call the registerTrigger and then handleNodeResponse call unregister? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC registerTrigger and unregisterTrigger should be called during Start() or Close(). I think HandleGatewayMessage()
needs to emit an event to workflow engine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes exactly - here you only to send a TriggerEvent on appropriate channel(s) (potentially many), like you do below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From meeting with Bolek & JK, registerTrigger and unregisterTrigger is called by engine, not by any code that I am writing.
core/capabilities/webapi/trigger.go
Outdated
ErrorMessage string `json:"error_message,omitempty"` | ||
} | ||
|
||
type workflowConnectorHandler struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let's not use "workflow" in web capability names. I used it in my example of a handler for spec uploads, which is not relevant here.
core/capabilities/webapi/trigger.go
Outdated
lggr: lggr.Named("WorkflowConnectorHandler"), | ||
} | ||
|
||
// is this the right way to register with gateway connector? Cron trigger doesn't do this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, better to add in Start().
core/capabilities/webapi/trigger.go
Outdated
|
||
switch body.Method { | ||
case workflow.MethodAddWorkflow: | ||
// TODO: add a new workflow spec and return success/failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are remnants of my example PR - please remove.
core/capabilities/webapi/trigger.go
Outdated
func (h *workflowConnectorHandler) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { | ||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
trigger := h.triggers[req.TriggerID] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also check if it exists here.
}, | ||
} | ||
|
||
if err = msg.Sign(h.signerKey); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jin is going to modify Connector to sign automatically. You can remove this (and also don't pass the private key at all).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my changes, as of now, only introduces a new method that sends to first available gateway. In this case, the caller already knows the sender. I will modify my changes to account for this use case.
@@ -33,6 +35,8 @@ func (hf *handlerFactory) NewHandler(handlerType HandlerType, handlerConfig json | |||
switch handlerType { | |||
case FunctionsHandlerType: | |||
return functions.NewFunctionsHandlerFromConfig(handlerConfig, donConfig, don, hf.legacyChains, hf.ds, hf.lggr) | |||
case workflowHandlerType: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinhoonbang is already adding it in his PR, maybe wait for that to be merged and then rebase.
@@ -0,0 +1,106 @@ | |||
package main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a script to upload workflow specs as this is not something we're implementing right now. If you want to have a test script then modify it to send valid Web Trigger messages. Otherwise you can remove it from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, again my thinking was that if the trigger needed updating then that would affect the script.
|
||
const ( | ||
MethodAddWorkflow = "add_workflow" | ||
// NOTE: more methods will go here: CRUD for workflow specs; HTTP trigger/action/target; etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO/NOTE is not correct anyway (I left that comment on Jin's PR too). We will not reuse this handler for workflow spec CRUDs.
@@ -0,0 +1,91 @@ | |||
package workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jinhoonbang is already adding this handler in his PR. Best to wait on that and then only extend with the Trigger method name (can be in a separate PR since it's technically part of CAPPL-21).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll proceed with this in mine so that I'm not blocked if that's ok.
@@ -80,7 +80,8 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser | |||
return nil, errors.New("gateway connector is required for web API Trigger capability") | |||
} | |||
connector := d.gatewayConnectorWrapper.GetGatewayConnector() | |||
triggerSrvc, err := webapi.NewTrigger(spec.StandardCapabilitiesSpec.Config, d.registry, connector, log) | |||
signer := d.gatewayConnectorWrapper.GetSignerKey() | |||
triggerSrvc, err := webapi.NewTrigger(spec.StandardCapabilitiesSpec.Config, d.registry, connector, signer, log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
signer key not needed
62d4dbc
to
fdc4510
Compare
fdc4510
to
f6d4543
Compare
7464429
to
2231012
Compare
Quality Gate failedFailed conditions See analysis details on SonarQube Catch issues before they fail your Quality Gate with our IDE extension SonarLint |
type Response struct { | ||
Success bool `json:"success"` | ||
ErrorMessage string `json:"error_message,omitempty"` | ||
Status string `json:"ACCEPTED"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need both Success
and Status
? do they both convey request status to gateway?
|
||
// TODO: ACCEPTED, PENDING, COMPLETED | ||
response := Response{Success: true, Status: "ACCEPTED"} | ||
h.sendResponse(ctx, gatewayID, body, response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to make this non-blocking? @bolekk had similar comment here: #14448 (comment)
if err != nil { | ||
return nil, err | ||
} | ||
allowedSendersMap := map[string]bool{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe senders should be defined at workflow level. Should we create a map from triggerID to list of senders and populate that map using TriggerRegistrationRequest.Config
in RegisterTrigger()
?
// TODO (CAPPL-22, CAPPL-24): | ||
// - decode config | ||
// - create an implementation of the capability API and add it to the Registry | ||
// - create a handler and register it with Gateway Connector | ||
// - manage trigger subscriptions | ||
// - process incoming trigger events and related metadata | ||
return nil, nil | ||
|
||
rateLimiter, err := common.NewRateLimiter(config.RateLimiter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to below. rateLimit
is also a workflow config https://docs.google.com/document/d/1mCTAo-ix-P923eUlh4SloZfBN9PCvgf90oHWbmykjsc/edit#heading=h.qqkbzbl5lbmv
|
||
type TriggerConfig struct { | ||
AllowedSenders []ethCommon.Address `toml:"allowedSenders"` | ||
Allowedtopics []string `toml:"allowedTopics"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this become a map also? map from triggerID to topics
|
||
// is this right? | ||
// Sri did wrappedPayload, err := values.WrapMap(log.Data), does that work in this case? | ||
wrappedPayload, _ := values.WrapMap(payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we would need to populate payloads with topics and params
web api trigger connector handler cappl-22
For high level review not final code yet.
To review
TODO