diff --git a/examples/python/scheduled/programatic-async.py b/examples/python/scheduled/programatic-async.py index abe9d39319..0364f0a5c4 100644 --- a/examples/python/scheduled/programatic-async.py +++ b/examples/python/scheduled/programatic-async.py @@ -16,6 +16,7 @@ async def create_scheduled() -> None: additional_metadata={ "customer_id": "customer-a", }, + priority=3, ) scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/examples/python/scheduled/programatic-sync.py b/examples/python/scheduled/programatic-sync.py index 8eb6163b8b..c3e5da3e1d 100644 --- a/examples/python/scheduled/programatic-sync.py +++ b/examples/python/scheduled/programatic-sync.py @@ -15,6 +15,7 @@ additional_metadata={ "customer_id": "customer-a", }, + priority=2, ) id = scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/go/features/schedules.go b/sdks/go/features/schedules.go index f5752fe007..97982f8a8b 100644 --- a/sdks/go/features/schedules.go +++ b/sdks/go/features/schedules.go @@ -37,6 +37,11 @@ type SchedulesClient struct { namespace *string } +type BulkUpdateScheduledRunItem struct { + ScheduledRunId string `json:"scheduledRunId"` + TriggerAt time.Time `json:"triggerAt"` +} + // NewSchedulesClient creates a new SchedulesClient func NewSchedulesClient( api *rest.ClientWithResponses, @@ -164,3 +169,100 @@ func (s *SchedulesClient) Get(ctx context.Context, scheduledRunId string) (*rest return resp.JSON200, nil } + +// Update reschedules a specific scheduled workflow run by its ID. +func (s *SchedulesClient) Update(ctx context.Context, scheduledRunId string, trigger CreateScheduledRunTrigger) (*rest.ScheduledWorkflows, error) { + scheduledRunIdUUID, err := uuid.Parse(scheduledRunId) + if err != nil { + return nil, errors.Wrap(err, "failed to parse scheduled run id") + } + + request := rest.UpdateScheduledWorkflowRunRequest{ + TriggerAt: trigger.TriggerAt, + } + + resp, err := s.api.WorkflowScheduledUpdateWithResponse( + ctx, + s.tenantId, + scheduledRunIdUUID, + request, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to update scheduled workflow run") + } + + if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil { + return nil, err + } + + return resp.JSON200, nil +} + +// BulkDelete deletes scheduled workflows by a list of their IDs. +func (s *SchedulesClient) BulkDelete(ctx context.Context, scheduledWorkflowRunIds []string, filter *rest.ScheduledWorkflowsBulkDeleteFilter) (*rest.ScheduledWorkflowsBulkDeleteResponse, error) { + if len(scheduledWorkflowRunIds) == 0 && filter == nil { + return nil, errors.New("BulkDelete requires either scheduledRunIds or a filter") + } + + var ids *[]uuid.UUID + if len(scheduledWorkflowRunIds) > 0 { + parsed := make([]uuid.UUID, 0, len(scheduledWorkflowRunIds)) + for _, id := range scheduledWorkflowRunIds { + u, err := uuid.Parse(id) + if err != nil { + return nil, errors.Wrapf(err, "Failed to parse scheduled run id %q", id) + } + parsed = append(parsed, u) + } + ids = &parsed + } + + resp, err := s.api.WorkflowScheduledBulkDeleteWithResponse(ctx, s.tenantId, + rest.ScheduledWorkflowsBulkDeleteRequest{ + ScheduledWorkflowRunIds: ids, + Filter: filter, + }, + ) + + if err != nil { + return nil, errors.Wrap(err, "failed to bulk delete scheduled workflow runs") + } + + if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil { + return nil, err + } + + return resp.JSON200, nil +} + +// BulkUpdate reschedules scheduled workflows by a list of their IDs. +func (s *SchedulesClient) BulkUpdate(ctx context.Context, updates []BulkUpdateScheduledRunItem) (*rest.ScheduledWorkflowsBulkUpdateResponse, error) { + items := make([]rest.ScheduledWorkflowsBulkUpdateItem, 0, len(updates)) + for _, u := range updates { + id, err := uuid.Parse(u.ScheduledRunId) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse scheduled run id %q", u.ScheduledRunId) + } + + items = append(items, rest.ScheduledWorkflowsBulkUpdateItem{ + Id: id, + TriggerAt: u.TriggerAt, + }) + } + + resp, err := s.api.WorkflowScheduledBulkUpdateWithResponse(ctx, s.tenantId, + rest.ScheduledWorkflowsBulkUpdateRequest{ + Updates: items, + }, + ) + + if err != nil { + return nil, errors.Wrap(err, "failed to bulk update scheduled workflow runs") + } + + if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil { + return nil, err + } + + return resp.JSON200, nil +} diff --git a/sdks/python/examples/scheduled/programatic-async.py b/sdks/python/examples/scheduled/programatic-async.py index d69d73cf2d..e848c4b221 100644 --- a/sdks/python/examples/scheduled/programatic-async.py +++ b/sdks/python/examples/scheduled/programatic-async.py @@ -16,6 +16,7 @@ async def create_scheduled() -> None: additional_metadata={ "customer_id": "customer-a", }, + priority=3, ) scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/python/examples/scheduled/programatic-sync.py b/sdks/python/examples/scheduled/programatic-sync.py index f971587540..59bab30c7f 100644 --- a/sdks/python/examples/scheduled/programatic-sync.py +++ b/sdks/python/examples/scheduled/programatic-sync.py @@ -15,6 +15,7 @@ additional_metadata={ "customer_id": "customer-a", }, + priority=2, ) id = scheduled_run.metadata.id # the id of the scheduled run trigger diff --git a/sdks/python/hatchet_sdk/features/scheduled.py b/sdks/python/hatchet_sdk/features/scheduled.py index 34dac09c03..437250f861 100644 --- a/sdks/python/hatchet_sdk/features/scheduled.py +++ b/sdks/python/hatchet_sdk/features/scheduled.py @@ -64,6 +64,7 @@ def create( trigger_at: datetime.datetime, input: JSONSerializableMapping, additional_metadata: JSONSerializableMapping, + priority: int | None = None, ) -> ScheduledWorkflows: """ Creates a new scheduled workflow run. @@ -74,6 +75,7 @@ def create( :param trigger_at: The datetime when the run should be triggered. :param input: The input data for the scheduled workflow. :param additional_metadata: Additional metadata associated with the future run as a key-value pair. + :param priority: The priority of the scheduled workflow run. Must be between 1 and 3, inclusive. :return: The created scheduled workflow instance. """ @@ -85,6 +87,7 @@ def create( triggerAt=trigger_at, input=input, additionalMetadata=additional_metadata, + priority=priority, ), ) @@ -94,6 +97,7 @@ async def aio_create( trigger_at: datetime.datetime, input: JSONSerializableMapping, additional_metadata: JSONSerializableMapping, + priority: int | None = None, ) -> ScheduledWorkflows: """ Creates a new scheduled workflow run. @@ -104,16 +108,13 @@ async def aio_create( :param trigger_at: The datetime when the run should be triggered. :param input: The input data for the scheduled workflow. :param additional_metadata: Additional metadata associated with the future run as a key-value pair. + :param priority: The priority of the scheduled workflow run. Must be between 1 and 3, inclusive. :return: The created scheduled workflow instance. """ return await asyncio.to_thread( - self.create, - workflow_name, - trigger_at, - input, - additional_metadata, + self.create, workflow_name, trigger_at, input, additional_metadata, priority ) def delete(self, scheduled_id: str) -> None: