Skip to content

Commit

Permalink
(feat): Cron Trigger Service Capability
Browse files Browse the repository at this point in the history
  • Loading branch information
justinkaseman committed Aug 20, 2024
1 parent ad84133 commit 47b94dc
Show file tree
Hide file tree
Showing 6 changed files with 612 additions and 0 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ require (
sigs.k8s.io/yaml v1.4.0
)

require (
github.com/go-co-op/gocron/v2 v2.11.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
)

require (
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w=
github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg=
github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE=
github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/go-co-op/gocron/v2 v2.11.0 h1:IOowNA6SzwdRFnD4/Ol3Kj6G2xKfsoiiGq2Jhhm9bvE=
github.com/go-co-op/gocron/v2 v2.11.0/go.mod h1:xY7bJxGazKam1cz04EebrlP4S9q4iWdiAylMGP3jY9w=
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f35nQbASLnvxEde4XOBL+Sn7rFuV+FOJqkljg=
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down Expand Up @@ -186,6 +188,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/riferrei/srclient v0.5.4 h1:dfwyR5u23QF7beuVl2WemUY2KXh5+Sc4DHKyPXBNYuc=
github.com/riferrei/srclient v0.5.4/go.mod h1:vbkLmWcgYa7JgfPvuy/+K8fTS0p1bApqadxrxi/S1MI=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0=
Expand Down
287 changes: 287 additions & 0 deletions pkg/capabilities/triggers/cron_trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
package triggers

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"sync"
"time"

// actually uses cronv3 internally for its
// parser to turn cron strings into cron schedules
"github.com/go-co-op/gocron/v2"
"github.com/robfig/cron/v3"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

const cronTriggerID = "[email protected]"

var cronTriggerInfo = capabilities.MustNewCapabilityInfo(
cronTriggerID,
capabilities.CapabilityTypeTrigger,
"A trigger to schedule workflow execution to run periodically at fixed times, dates, or intervals.",
)

type cronTriggerConfig struct {
// An identifier to register this trigger under, must be unique across all triggers within a workflow.
TriggerID string `json:"triggerId"`
// The crontab to evaluate, with second granularity.
// The seconds field is required. The time zone will always be automatically set to UTC.
Schedule string `json:"schedule"`
}

type cronTriggerInput struct{}

type CronTriggerPayload struct {
ScheduledExecutionTime string
}

type cronJob struct {
ch chan<- capabilities.CapabilityResponse
job gocron.Job
}

type CronTriggerService struct {
capabilities.CapabilityInfo
capabilities.Validator[cronTriggerConfig, cronTriggerInput, capabilities.CapabilityResponse]
jobs map[string]cronJob
lggr logger.Logger
mu sync.Mutex
scheduler gocron.Scheduler
stopCh services.StopChan
wg sync.WaitGroup
}

var _ capabilities.TriggerCapability = (*CronTriggerService)(nil)
var _ services.Service = &CronTriggerService{}

func NewCronTriggerService(lggr logger.Logger) *CronTriggerService {
s, err := gocron.NewScheduler()
if err != nil {
return nil
}
return &CronTriggerService{
CapabilityInfo: cronTriggerInfo,
Validator: capabilities.NewValidator[cronTriggerConfig, cronTriggerInput, capabilities.CapabilityResponse](capabilities.ValidatorArgs{Info: cronTriggerInfo}),
jobs: map[string]cronJob{},
lggr: logger.Named(lggr, "CronTrigger"),
scheduler: s,
stopCh: make(services.StopChan),
}
}

func (cts *CronTriggerService) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
cts.mu.Lock()
defer cts.mu.Unlock()

config, err := cts.ValidateConfig(req.Config)
if err != nil {
return nil, err
}

triggerID := getTriggerID(config.TriggerID, req.Metadata.WorkflowID)
if _, ok := cts.jobs[triggerID]; ok {
return nil, fmt.Errorf("triggerId %s already registered", triggerID)
}

job := gocron.CronJob(config.Schedule, true)

cronParser := cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow,
)
schedule, err := cronParser.Parse(config.Schedule)
if err != nil {
return nil, err
}

t := gocron.NewTask(
cts.makeProcess(triggerID, schedule),
)

j, err := cts.scheduler.NewJob(
job,
t,
)

if err != nil {
return nil, err
}

ch := make(chan capabilities.CapabilityResponse, defaultSendChannelBufferSize)
cts.jobs[triggerID] = cronJob{
ch: ch,
job: j,
}
cts.lggr.Debugw("RegisterTrigger", "triggerId", triggerID, "jobId", j.ID())
return ch, nil
}

func (cts *CronTriggerService) makeProcess(triggerId string, schedule cron.Schedule) func() {
return func() { cts.process(triggerId, schedule) }
}

func (cts *CronTriggerService) process(triggerId string, schedule cron.Schedule) {
cts.mu.Lock()
defer cts.mu.Unlock()

now := time.Now()
scheduledExecutionTime := schedule.Next(now)
// if scheduledExecutionTime is before now, get the next one
// this can be due to time sync / sleep
for {
if scheduledExecutionTime.Before(now) {
scheduledExecutionTime = schedule.Next(now.Add(time.Second))
} else {
break
}
}

// timestamp in ns precision, to show the discrepancy between the scheduled time and the actual time
// we need scheduled time to generate a deterministic triggerEventId
timestampNs := time.Now().UTC().UnixNano()
// format to ISO 8601
timestampMsIso8601 := formatUnixNanoToISO8601(timestampNs)

scheduledExecutionTimeFormatted := scheduledExecutionTime.Format(time.RFC3339)
hash := sha256.Sum256([]byte(scheduledExecutionTimeFormatted))
triggerEventId := hex.EncodeToString(hash[:])

cts.lggr.Debugw("process", "scheduledExecTime", scheduledExecutionTime, "actualExecTime", timestampMsIso8601, "triggerEventId", triggerEventId)

capabilityResponse, err := wrapCronTriggerEvent(triggerEventId, timestampMsIso8601, scheduledExecutionTimeFormatted)
if err != nil {
cts.lggr.Errorw("error wrapping trigger event", "err", err)
}

select {
case cts.jobs[triggerId].ch <- capabilityResponse:
default:
cts.lggr.Errorw("channel full, dropping event", "eventID", triggerEventId, "triggerID", triggerID, "capabilityResponse", capabilityResponse)
}
}

func (cts *CronTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error {
cts.mu.Lock()
defer cts.mu.Unlock()

config, err := cts.ValidateConfig(req.Config)
if err != nil {
return err
}

triggerID := getTriggerID(config.TriggerID, req.Metadata.WorkflowID)

if _, ok := cts.jobs[triggerID]; !ok {
return fmt.Errorf("triggerId %s not found", triggerID)
}

trigger, ok := cts.jobs[triggerID]
jobId := trigger.job.ID()

if ok {
cts.scheduler.RemoveJob(jobId)
close(trigger.ch)
}
delete(cts.jobs, triggerID)
cts.lggr.Debugw("UnregisterTrigger", "triggerId", triggerID, "jobId", jobId)
return nil
}

// Start the service.
func (cts *CronTriggerService) Start(ctx context.Context) error {
cts.wg.Add(1)

if cts.scheduler == nil {
return errors.New("no scheduler initialized")
}
cts.scheduler.Start()

cts.lggr.Info(cts.Name() + " started")

// block until ready to shut down
go func() {
defer cts.wg.Done()
<-cts.stopCh
}()

return nil
}

// Close stops the Service.
// Invariants: After this call the Service cannot be started
// again, new Service will need to be built to do so.
func (cts *CronTriggerService) Close() error {
err := cts.scheduler.Shutdown()
if err != nil {
return fmt.Errorf("scheduler shutdown encountered a problem: %s", err)
}

close(cts.stopCh)

cts.wg.Wait()

cts.lggr.Info(cts.Name() + " closed")

return nil
}

func (cts *CronTriggerService) Ready() error {
if cts.scheduler == nil {
return errors.New("no scheduler initialized")
}
return nil
}

func (cts *CronTriggerService) HealthReport() map[string]error {
return map[string]error{cts.Name(): nil}
}

func (cts *CronTriggerService) Name() string {
return "CronTriggerService"
}

func getTriggerID(triggerID string, wid string) string {
return wid + "|" + triggerID
}

func wrapCronTriggerEvent(eventID string, timestamp string, scheduledExecutionTime string) (capabilities.CapabilityResponse, error) {
payload, err := values.Wrap(CronTriggerPayload{ScheduledExecutionTime: scheduledExecutionTime})
if err != nil {
return capabilities.CapabilityResponse{}, err
}

metadata, err := values.Wrap(nil)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

triggerEvent := capabilities.TriggerEvent{
TriggerType: cronTriggerID,
ID: eventID,
Timestamp: timestamp,
Metadata: metadata,
Payload: payload,
}

eventVal, err := values.Wrap(triggerEvent)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

return capabilities.CapabilityResponse{
Value: eventVal.(*values.Map),
}, nil
}

func formatUnixNanoToISO8601(unixNano int64) string {
seconds := unixNano / int64(time.Second)
nanoseconds := unixNano % int64(time.Second)
t := time.Unix(seconds, nanoseconds)
return t.Format(time.RFC3339Nano)
}
Loading

0 comments on commit 47b94dc

Please sign in to comment.