diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 24bd26757ac..d887df355a4 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -51,6 +51,7 @@ type registrationKey struct { type pubRegState struct { callback <-chan commoncap.TriggerResponse request commoncap.TriggerRegistrationRequest + cancel context.CancelFunc } type batchedResponse struct { @@ -162,16 +163,17 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { } ctx, cancel := p.stopCh.NewCtx() callbackCh, err := p.underlying.RegisterTrigger(ctx, unmarshaled) - cancel() if err == nil { p.registrations[key] = &pubRegState{ callback: callbackCh, request: unmarshaled, + cancel: cancel, } p.wg.Add(1) go p.triggerEventLoop(callbackCh, key) p.lggr.Debugw("updated trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID) } else { + cancel() p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err) } } else { @@ -198,6 +200,7 @@ func (p *triggerPublisher) registrationCleanupLoop() { ctx, cancel := p.stopCh.NewCtx() err := p.underlying.UnregisterTrigger(ctx, req.request) cancel() + p.registrations[key].cancel() // Cancel context on register trigger p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonID, "workflowId", key.workflowID, "err", err) // after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel delete(p.registrations, key)