Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/python/scheduled/programatic-async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async def create_scheduled() -> None:
additional_metadata={
"customer_id": "customer-a",
},
priority=3,
)

scheduled_run.metadata.id # the id of the scheduled run trigger
Expand Down
1 change: 1 addition & 0 deletions examples/python/scheduled/programatic-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
additional_metadata={
"customer_id": "customer-a",
},
priority=2,
)

id = scheduled_run.metadata.id # the id of the scheduled run trigger
Expand Down
102 changes: 102 additions & 0 deletions sdks/go/features/schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type SchedulesClient struct {
namespace *string
}

type BulkUpdateScheduledRunItem struct {
ScheduledRunId string `json:"scheduledRunId"`
TriggerAt time.Time `json:"triggerAt"`
}

// NewSchedulesClient creates a new SchedulesClient
func NewSchedulesClient(
api *rest.ClientWithResponses,
Expand Down Expand Up @@ -164,3 +169,100 @@ func (s *SchedulesClient) Get(ctx context.Context, scheduledRunId string) (*rest

return resp.JSON200, nil
}

// Update reschedules a specific scheduled workflow run by its ID.
func (s *SchedulesClient) Update(ctx context.Context, scheduledRunId string, trigger CreateScheduledRunTrigger) (*rest.ScheduledWorkflows, error) {
scheduledRunIdUUID, err := uuid.Parse(scheduledRunId)
if err != nil {
return nil, errors.Wrap(err, "failed to parse scheduled run id")
}

request := rest.UpdateScheduledWorkflowRunRequest{
TriggerAt: trigger.TriggerAt,
}

resp, err := s.api.WorkflowScheduledUpdateWithResponse(
ctx,
s.tenantId,
scheduledRunIdUUID,
request,
)
if err != nil {
return nil, errors.Wrap(err, "failed to update scheduled workflow run")
}

if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil {
return nil, err
}

return resp.JSON200, nil
}

// BulkDelete deletes scheduled workflows by a list of their IDs.
func (s *SchedulesClient) BulkDelete(ctx context.Context, scheduledWorkflowRunIds []string, filter *rest.ScheduledWorkflowsBulkDeleteFilter) (*rest.ScheduledWorkflowsBulkDeleteResponse, error) {
if len(scheduledWorkflowRunIds) == 0 && filter == nil {
return nil, errors.New("BulkDelete requires either scheduledRunIds or a filter")
}

var ids *[]uuid.UUID
if len(scheduledWorkflowRunIds) > 0 {
parsed := make([]uuid.UUID, 0, len(scheduledWorkflowRunIds))
for _, id := range scheduledWorkflowRunIds {
u, err := uuid.Parse(id)
if err != nil {
return nil, errors.Wrapf(err, "Failed to parse scheduled run id %q", id)
}
parsed = append(parsed, u)
}
ids = &parsed
}

resp, err := s.api.WorkflowScheduledBulkDeleteWithResponse(ctx, s.tenantId,
rest.ScheduledWorkflowsBulkDeleteRequest{
ScheduledWorkflowRunIds: ids,
Filter: filter,
},
)

if err != nil {
return nil, errors.Wrap(err, "failed to bulk delete scheduled workflow runs")
}

if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil {
return nil, err
}

return resp.JSON200, nil
}

// BulkUpdate reschedules scheduled workflows by a list of their IDs.
func (s *SchedulesClient) BulkUpdate(ctx context.Context, updates []BulkUpdateScheduledRunItem) (*rest.ScheduledWorkflowsBulkUpdateResponse, error) {
items := make([]rest.ScheduledWorkflowsBulkUpdateItem, 0, len(updates))
for _, u := range updates {
id, err := uuid.Parse(u.ScheduledRunId)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse scheduled run id %q", u.ScheduledRunId)
}

items = append(items, rest.ScheduledWorkflowsBulkUpdateItem{
Id: id,
TriggerAt: u.TriggerAt,
})
}

resp, err := s.api.WorkflowScheduledBulkUpdateWithResponse(ctx, s.tenantId,
rest.ScheduledWorkflowsBulkUpdateRequest{
Updates: items,
},
)

if err != nil {
return nil, errors.Wrap(err, "failed to bulk update scheduled workflow runs")
}

if err := validateJSON200Response(resp.StatusCode(), resp.Body, resp.JSON200); err != nil {
return nil, err
}

return resp.JSON200, nil
}
1 change: 1 addition & 0 deletions sdks/python/examples/scheduled/programatic-async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async def create_scheduled() -> None:
additional_metadata={
"customer_id": "customer-a",
},
priority=3,
)

scheduled_run.metadata.id # the id of the scheduled run trigger
Expand Down
1 change: 1 addition & 0 deletions sdks/python/examples/scheduled/programatic-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
additional_metadata={
"customer_id": "customer-a",
},
priority=2,
)

id = scheduled_run.metadata.id # the id of the scheduled run trigger
Expand Down
11 changes: 6 additions & 5 deletions sdks/python/hatchet_sdk/features/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def create(
trigger_at: datetime.datetime,
input: JSONSerializableMapping,
additional_metadata: JSONSerializableMapping,
priority: int | None = None,
) -> ScheduledWorkflows:
"""
Creates a new scheduled workflow run.
Expand All @@ -74,6 +75,7 @@ def create(
:param trigger_at: The datetime when the run should be triggered.
:param input: The input data for the scheduled workflow.
:param additional_metadata: Additional metadata associated with the future run as a key-value pair.
:param priority: The priority of the scheduled workflow run. Must be between 1 and 3, inclusive.

:return: The created scheduled workflow instance.
"""
Expand All @@ -85,6 +87,7 @@ def create(
triggerAt=trigger_at,
input=input,
additionalMetadata=additional_metadata,
priority=priority,
),
)

Expand All @@ -94,6 +97,7 @@ async def aio_create(
trigger_at: datetime.datetime,
input: JSONSerializableMapping,
additional_metadata: JSONSerializableMapping,
priority: int | None = None,
) -> ScheduledWorkflows:
"""
Creates a new scheduled workflow run.
Expand All @@ -104,16 +108,13 @@ async def aio_create(
:param trigger_at: The datetime when the run should be triggered.
:param input: The input data for the scheduled workflow.
:param additional_metadata: Additional metadata associated with the future run as a key-value pair.
:param priority: The priority of the scheduled workflow run. Must be between 1 and 3, inclusive.

:return: The created scheduled workflow instance.
"""

return await asyncio.to_thread(
self.create,
workflow_name,
trigger_at,
input,
additional_metadata,
self.create, workflow_name, trigger_at, input, additional_metadata, priority
)

def delete(self, scheduled_id: str) -> None:
Expand Down
Loading