From 5faa58da2aed0dceae60b7d4b3c65e775909515c Mon Sep 17 00:00:00 2001 From: root Date: Mon, 4 May 2026 19:17:32 +0000 Subject: [PATCH 1/3] feat: add bulk_update, bulk_delete, update methods for go sdk scheduled workflow --- sdks/go/features/schedules.go | 99 +++++++++++++++++++ .../examples/scheduled/programatic-async.py | 1 + .../examples/scheduled/programatic-sync.py | 1 + sdks/python/hatchet_sdk/features/scheduled.py | 2 + 4 files changed, 103 insertions(+) diff --git a/sdks/go/features/schedules.go b/sdks/go/features/schedules.go index f5752fe007..fe249c870f 100644 --- a/sdks/go/features/schedules.go +++ b/sdks/go/features/schedules.go @@ -37,6 +37,11 @@ type SchedulesClient struct { namespace *string } +type BulkUpdateScheduledRunItem struct { + TriggerAt time.Time `json:"triggerAt"` + ScheduledRunId string `json:"scheduledRunId"` +} + // NewSchedulesClient creates a new SchedulesClient func NewSchedulesClient( api *rest.ClientWithResponses, @@ -164,3 +169,97 @@ func (s *SchedulesClient) Get(ctx context.Context, scheduledRunId string) (*rest return resp.JSON200, nil } + +func (s *SchedulesClient) Update(ctx context.Context, scheduledRunId string, trigger CreateScheduledRunTrigger) (*rest.ScheduledWorkflows, error) { + scheduledRunIdUUID, err := uuid.Parse(scheduledRunId) + if err != nil { + return nil, errors.Wrap(err, "failed to parse scheduled run id") + } + + request := rest.UpdateScheduledWorkflowRunRequest{ + TriggerAt: trigger.TriggerAt, + } + + resp, err := s.api.WorkflowScheduledUpdateWithResponse( + ctx, + s.tenantId, + scheduledRunIdUUID, + request, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to update scheduled workflow run") + } + + if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil { + return nil, err + } + + return resp.JSON200, nil +} + +func (s *SchedulesClient) BulkDelete(ctx context.Context, scheduledWorkflowRunIds []string, filter *rest.ScheduledWorkflowsBulkDeleteFilter) (*rest.ScheduledWorkflowsBulkDeleteResponse, error) { + if len(scheduledWorkflowRunIds) == 0 && filter == nil { + return nil, errors.New("BulkDelete requires either scheduledRunIds or a filter") + } + + var ids *[]uuid.UUID + if len(scheduledWorkflowRunIds) > 0 { + parsed := make([]uuid.UUID, 0, len(scheduledWorkflowRunIds)) + for _, id := range scheduledWorkflowRunIds { + u, err := uuid.Parse(id) + if err != nil { + return nil, errors.Wrapf(err, "Failed to parse scheduled run id %q", id) + } + parsed = append(parsed, u) + } + ids = &parsed + } + + resp, err := s.api.WorkflowScheduledBulkDeleteWithResponse(ctx, s.tenantId, + rest.ScheduledWorkflowsBulkDeleteRequest{ + ScheduledWorkflowRunIds: ids, + Filter: filter, + }, + ) + + if err != nil { + return nil, errors.Wrap(err, "failed to bulk delete scheduled workflow runs") + } + + if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil { + return nil, err + } + + return resp.JSON200, nil +} + +func (s *SchedulesClient) BulkUpdate(ctx context.Context, updates []BulkUpdateScheduledRunItem) (*rest.ScheduledWorkflowsBulkUpdateResponse, error) { + items := make([]rest.ScheduledWorkflowsBulkUpdateItem, 0, len(updates)) + for _, u := range updates { + id, err := uuid.Parse(u.ScheduledRunId) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse scheduled run id %q", u.ScheduledRunId) + } + + items = append(items, rest.ScheduledWorkflowsBulkUpdateItem{ + Id: id, + TriggerAt: u.TriggerAt, + }) + } + + resp, err := s.api.WorkflowScheduledBulkUpdateWithResponse(ctx, s.tenantId, + rest.ScheduledWorkflowsBulkUpdateRequest{ + Updates: items, + }, + ) + + if err != nil { + return nil, errors.Wrap(err, "failed to bulk update scheduled workflow runs") + } + + if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil { + return nil, err + } + + return resp.JSON200, nil +} diff --git a/sdks/python/examples/scheduled/programatic-async.py b/sdks/python/examples/scheduled/programatic-async.py index d69d73cf2d..bb86c48802 100644 --- a/sdks/python/examples/scheduled/programatic-async.py +++ b/sdks/python/examples/scheduled/programatic-async.py @@ -16,6 +16,7 @@ async def create_scheduled() -> None: additional_metadata={ "customer_id": "customer-a", }, + priority=3 ) scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/python/examples/scheduled/programatic-sync.py b/sdks/python/examples/scheduled/programatic-sync.py index f971587540..79ec5648df 100644 --- a/sdks/python/examples/scheduled/programatic-sync.py +++ b/sdks/python/examples/scheduled/programatic-sync.py @@ -15,6 +15,7 @@ additional_metadata={ "customer_id": "customer-a", }, + priority=2 ) id = scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/python/hatchet_sdk/features/scheduled.py b/sdks/python/hatchet_sdk/features/scheduled.py index 34dac09c03..3075a9dc74 100644 --- a/sdks/python/hatchet_sdk/features/scheduled.py +++ b/sdks/python/hatchet_sdk/features/scheduled.py @@ -64,6 +64,7 @@ def create( trigger_at: datetime.datetime, input: JSONSerializableMapping, additional_metadata: JSONSerializableMapping, + priority: int | None = None, ) -> ScheduledWorkflows: """ Creates a new scheduled workflow run. @@ -85,6 +86,7 @@ def create( triggerAt=trigger_at, input=input, additionalMetadata=additional_metadata, + priority=priority ), ) From 29b44276f0ef98556596623166fe68da0d0fe5a8 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 4 May 2026 20:08:33 +0000 Subject: [PATCH 2/3] fix: lint --- examples/python/scheduled/programatic-async.py | 1 + examples/python/scheduled/programatic-sync.py | 1 + sdks/go/features/schedules.go | 2 +- sdks/python/examples/scheduled/programatic-async.py | 2 +- sdks/python/examples/scheduled/programatic-sync.py | 2 +- sdks/python/hatchet_sdk/features/scheduled.py | 11 +++++------ 6 files changed, 10 insertions(+), 9 deletions(-) diff --git a/examples/python/scheduled/programatic-async.py b/examples/python/scheduled/programatic-async.py index abe9d39319..0364f0a5c4 100644 --- a/examples/python/scheduled/programatic-async.py +++ b/examples/python/scheduled/programatic-async.py @@ -16,6 +16,7 @@ async def create_scheduled() -> None: additional_metadata={ "customer_id": "customer-a", }, + priority=3, ) scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/examples/python/scheduled/programatic-sync.py b/examples/python/scheduled/programatic-sync.py index 8eb6163b8b..c3e5da3e1d 100644 --- a/examples/python/scheduled/programatic-sync.py +++ b/examples/python/scheduled/programatic-sync.py @@ -15,6 +15,7 @@ additional_metadata={ "customer_id": "customer-a", }, + priority=2, ) id = scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/go/features/schedules.go b/sdks/go/features/schedules.go index fe249c870f..5182515173 100644 --- a/sdks/go/features/schedules.go +++ b/sdks/go/features/schedules.go @@ -38,8 +38,8 @@ type SchedulesClient struct { } type BulkUpdateScheduledRunItem struct { - TriggerAt time.Time `json:"triggerAt"` ScheduledRunId string `json:"scheduledRunId"` + TriggerAt time.Time `json:"triggerAt"` } // NewSchedulesClient creates a new SchedulesClient diff --git a/sdks/python/examples/scheduled/programatic-async.py b/sdks/python/examples/scheduled/programatic-async.py index bb86c48802..e848c4b221 100644 --- a/sdks/python/examples/scheduled/programatic-async.py +++ b/sdks/python/examples/scheduled/programatic-async.py @@ -16,7 +16,7 @@ async def create_scheduled() -> None: additional_metadata={ "customer_id": "customer-a", }, - priority=3 + priority=3, ) scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/python/examples/scheduled/programatic-sync.py b/sdks/python/examples/scheduled/programatic-sync.py index 79ec5648df..59bab30c7f 100644 --- a/sdks/python/examples/scheduled/programatic-sync.py +++ b/sdks/python/examples/scheduled/programatic-sync.py @@ -15,7 +15,7 @@ additional_metadata={ "customer_id": "customer-a", }, - priority=2 + priority=2, ) id = scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/python/hatchet_sdk/features/scheduled.py b/sdks/python/hatchet_sdk/features/scheduled.py index 3075a9dc74..437250f861 100644 --- a/sdks/python/hatchet_sdk/features/scheduled.py +++ b/sdks/python/hatchet_sdk/features/scheduled.py @@ -75,6 +75,7 @@ def create( :param trigger_at: The datetime when the run should be triggered. :param input: The input data for the scheduled workflow. :param additional_metadata: Additional metadata associated with the future run as a key-value pair. + :param priority: The priority of the scheduled workflow run. Must be between 1 and 3, inclusive. :return: The created scheduled workflow instance. """ @@ -86,7 +87,7 @@ def create( triggerAt=trigger_at, input=input, additionalMetadata=additional_metadata, - priority=priority + priority=priority, ), ) @@ -96,6 +97,7 @@ async def aio_create( trigger_at: datetime.datetime, input: JSONSerializableMapping, additional_metadata: JSONSerializableMapping, + priority: int | None = None, ) -> ScheduledWorkflows: """ Creates a new scheduled workflow run. @@ -106,16 +108,13 @@ async def aio_create( :param trigger_at: The datetime when the run should be triggered. :param input: The input data for the scheduled workflow. :param additional_metadata: Additional metadata associated with the future run as a key-value pair. + :param priority: The priority of the scheduled workflow run. Must be between 1 and 3, inclusive. :return: The created scheduled workflow instance. """ return await asyncio.to_thread( - self.create, - workflow_name, - trigger_at, - input, - additional_metadata, + self.create, workflow_name, trigger_at, input, additional_metadata, priority ) def delete(self, scheduled_id: str) -> None: From b55976e6c2c153855e0e690ce8bd2c307faebe4a Mon Sep 17 00:00:00 2001 From: jishnundth Date: Tue, 5 May 2026 17:17:24 +0000 Subject: [PATCH 3/3] add: scheduled worfklow doc comments for go sdk --- sdks/go/features/schedules.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/go/features/schedules.go b/sdks/go/features/schedules.go index 5182515173..97982f8a8b 100644 --- a/sdks/go/features/schedules.go +++ b/sdks/go/features/schedules.go @@ -170,6 +170,7 @@ func (s *SchedulesClient) Get(ctx context.Context, scheduledRunId string) (*rest return resp.JSON200, nil } +// Update reschedules a specific scheduled workflow run by its ID. func (s *SchedulesClient) Update(ctx context.Context, scheduledRunId string, trigger CreateScheduledRunTrigger) (*rest.ScheduledWorkflows, error) { scheduledRunIdUUID, err := uuid.Parse(scheduledRunId) if err != nil { @@ -197,6 +198,7 @@ func (s *SchedulesClient) Update(ctx context.Context, scheduledRunId string, tri return resp.JSON200, nil } +// BulkDelete deletes scheduled workflows by a list of their IDs. func (s *SchedulesClient) BulkDelete(ctx context.Context, scheduledWorkflowRunIds []string, filter *rest.ScheduledWorkflowsBulkDeleteFilter) (*rest.ScheduledWorkflowsBulkDeleteResponse, error) { if len(scheduledWorkflowRunIds) == 0 && filter == nil { return nil, errors.New("BulkDelete requires either scheduledRunIds or a filter") @@ -233,6 +235,7 @@ func (s *SchedulesClient) BulkDelete(ctx context.Context, scheduledWorkflowRunId return resp.JSON200, nil } +// BulkUpdate reschedules scheduled workflows by a list of their IDs. func (s *SchedulesClient) BulkUpdate(ctx context.Context, updates []BulkUpdateScheduledRunItem) (*rest.ScheduledWorkflowsBulkUpdateResponse, error) { items := make([]rest.ScheduledWorkflowsBulkUpdateItem, 0, len(updates)) for _, u := range updates {