diff --git a/cmd/hatchet-migrate/migrate/migrations/20260428164911_v1_0_102.sql b/cmd/hatchet-migrate/migrate/migrations/20260428164911_v1_0_102.sql new file mode 100644 index 0000000000..3977a0af6d --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20260428164911_v1_0_102.sql @@ -0,0 +1,11 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE v1_task ADD COLUMN step_name TEXT; +ALTER TABLE v1_tasks_olap ADD COLUMN step_name TEXT; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE v1_task DROP COLUMN step_name; +ALTER TABLE v1_tasks_olap DROP COLUMN step_name; +-- +goose StatementEnd diff --git a/pkg/repository/olap.go b/pkg/repository/olap.go index ce95727227..a3bb46144a 100644 --- a/pkg/repository/olap.go +++ b/pkg/repository/olap.go @@ -533,8 +533,10 @@ func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId uui } type TaskMetadata struct { - TaskID int64 `json:"task_id"` - TaskInsertedAt time.Time `json:"task_inserted_at"` + TaskID int64 `json:"task_id"` + TaskInsertedAt time.Time `json:"task_inserted_at"` + OutputEventExternalId *uuid.UUID `json:"output_event_external_id,omitempty"` + StepName *string `json:"step_name"` } func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error) { @@ -570,10 +572,47 @@ func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExt return nil, err } + outputEventExternalIdToStepName := make(map[uuid.UUID]string) + outputEventExternalIds := make([]uuid.UUID, 0) + + for _, taskMeta := range taskMetadata { + if taskMeta.OutputEventExternalId != nil && taskMeta.StepName != nil { + outputEventExternalIds = append(outputEventExternalIds, *taskMeta.OutputEventExternalId) + outputEventExternalIdToStepName[*taskMeta.OutputEventExternalId] = *taskMeta.StepName + } + } + + outputPayloads, err := r.ReadPayloads(ctx, row.TenantID, outputEventExternalIds...) + + if err != nil { + return nil, err + } + + output := make(map[string]interface{}) + + for externalId, payload := range outputPayloads { + stepName, ok := outputEventExternalIdToStepName[externalId] + + if !ok { + continue + } + + payloadMap := make(map[string]interface{}) + + err = json.Unmarshal(payload, &payloadMap) + + if err != nil { + r.l.Error().Err(err).Msgf("failed to unmarshal payload for step %s, externalId %s", stepName, externalId) + continue + } + + output[stepName] = payloadMap + } + var outputPayload []byte - if row.OutputEventExternalID != nil { - outputPayload, err = r.ReadPayload(ctx, row.TenantID, *row.OutputEventExternalID) + if len(output) > 0 { + outputPayload, err = json.Marshal(output) if err != nil { return nil, err @@ -1896,6 +1935,7 @@ func (r *OLAPRepositoryImpl) writeTaskBatch(ctx context.Context, tenantId uuid.U WorkflowRunID: task.WorkflowRunID, Input: payloadToWriteToTask, IsDurable: task.IsDurable.Bool, + StepName: task.StepName, }) putPayloadOpts = append(putPayloadOpts, StoreOLAPPayloadOpts{ diff --git a/pkg/repository/sqlcv1/batch.go b/pkg/repository/sqlcv1/batch.go index ddf90c1f06..d74262d8ae 100644 --- a/pkg/repository/sqlcv1/batch.go +++ b/pkg/repository/sqlcv1/batch.go @@ -17,7 +17,7 @@ var ( ) const registerBatch = `-- name: RegisterBatch :batchexec -SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key FROM v1_task WHERE id = $1 +SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, step_name FROM v1_task WHERE id = $1 ` type RegisterBatchBatchResults struct { diff --git a/pkg/repository/sqlcv1/copyfrom.go b/pkg/repository/sqlcv1/copyfrom.go index 1eb1eeeedb..6f73c8db48 100644 --- a/pkg/repository/sqlcv1/copyfrom.go +++ b/pkg/repository/sqlcv1/copyfrom.go @@ -325,6 +325,7 @@ func (r iteratorForCreateTasksOLAP) Values() ([]interface{}, error) { r.rows[0].DagInsertedAt, r.rows[0].ParentTaskExternalID, r.rows[0].IsDurable, + r.rows[0].StepName, }, nil } @@ -333,7 +334,7 @@ func (r iteratorForCreateTasksOLAP) Err() error { } func (q *Queries) CreateTasksOLAP(ctx context.Context, db DBTX, arg []CreateTasksOLAPParams) (int64, error) { - return db.CopyFrom(ctx, []string{"v1_tasks_olap"}, []string{"tenant_id", "id", "inserted_at", "queue", "action_id", "step_id", "workflow_id", "workflow_version_id", "workflow_run_id", "schedule_timeout", "step_timeout", "priority", "sticky", "desired_worker_id", "external_id", "display_name", "input", "additional_metadata", "dag_id", "dag_inserted_at", "parent_task_external_id", "is_durable"}, &iteratorForCreateTasksOLAP{rows: arg}) + return db.CopyFrom(ctx, []string{"v1_tasks_olap"}, []string{"tenant_id", "id", "inserted_at", "queue", "action_id", "step_id", "workflow_id", "workflow_version_id", "workflow_run_id", "schedule_timeout", "step_timeout", "priority", "sticky", "desired_worker_id", "external_id", "display_name", "input", "additional_metadata", "dag_id", "dag_inserted_at", "parent_task_external_id", "is_durable", "step_name"}, &iteratorForCreateTasksOLAP{rows: arg}) } // iteratorForInsertLogLine implements pgx.CopyFromSource. diff --git a/pkg/repository/sqlcv1/durable_event_log.sql.go b/pkg/repository/sqlcv1/durable_event_log.sql.go index 1a0f39dc86..e52b41baf3 100644 --- a/pkg/repository/sqlcv1/durable_event_log.sql.go +++ b/pkg/repository/sqlcv1/durable_event_log.sql.go @@ -590,7 +590,7 @@ WITH inputs AS ( UNNEST($2::BIGINT[]) AS node_id, UNNEST($3::BIGINT[]) AS branch_id ), tasks_with_nodes AS ( - SELECT t.id, t.inserted_at, t.tenant_id, t.queue, t.action_id, t.step_id, t.step_readable_id, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.external_id, t.display_name, t.input, t.retry_count, t.internal_retry_count, t.app_retry_count, t.step_index, t.additional_metadata, t.dag_id, t.dag_inserted_at, t.parent_task_external_id, t.parent_task_id, t.parent_task_inserted_at, t.child_index, t.child_key, t.initial_state, t.initial_state_reason, t.concurrency_parent_strategy_ids, t.concurrency_strategy_ids, t.concurrency_keys, t.retry_backoff_factor, t.retry_max_backoff, t.is_durable, t.desired_worker_label, t.triggering_event_external_id, t.triggering_event_key, i.node_id AS requested_node_id, i.branch_id AS requested_branch_id + SELECT t.id, t.inserted_at, t.tenant_id, t.queue, t.action_id, t.step_id, t.step_readable_id, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.external_id, t.display_name, t.input, t.retry_count, t.internal_retry_count, t.app_retry_count, t.step_index, t.additional_metadata, t.dag_id, t.dag_inserted_at, t.parent_task_external_id, t.parent_task_id, t.parent_task_inserted_at, t.child_index, t.child_key, t.initial_state, t.initial_state_reason, t.concurrency_parent_strategy_ids, t.concurrency_strategy_ids, t.concurrency_keys, t.retry_backoff_factor, t.retry_max_backoff, t.is_durable, t.desired_worker_label, t.triggering_event_external_id, t.triggering_event_key, t.step_name, i.node_id AS requested_node_id, i.branch_id AS requested_branch_id FROM inputs i JOIN v1_lookup_table lt ON lt.external_id = i.external_id JOIN v1_task t ON (t.id, t.inserted_at) = (lt.task_id, lt.inserted_at) diff --git a/pkg/repository/sqlcv1/models.go b/pkg/repository/sqlcv1/models.go index 3e881b85d6..9aeaf2bcdc 100644 --- a/pkg/repository/sqlcv1/models.go +++ b/pkg/repository/sqlcv1/models.go @@ -3614,6 +3614,7 @@ type V1Task struct { DesiredWorkerLabel []byte `json:"desired_worker_label"` TriggeringEventExternalID *uuid.UUID `json:"triggering_event_external_id"` TriggeringEventKey pgtype.Text `json:"triggering_event_key"` + StepName pgtype.Text `json:"step_name"` } type V1TaskEvent struct { @@ -3711,6 +3712,7 @@ type V1TasksOlap struct { Queue string `json:"queue"` ActionID string `json:"action_id"` StepID uuid.UUID `json:"step_id"` + StepName pgtype.Text `json:"step_name"` WorkflowID uuid.UUID `json:"workflow_id"` WorkflowVersionID uuid.UUID `json:"workflow_version_id"` WorkflowRunID uuid.UUID `json:"workflow_run_id"` diff --git a/pkg/repository/sqlcv1/olap.sql b/pkg/repository/sqlcv1/olap.sql index bd3cdab63a..c9c308c996 100644 --- a/pkg/repository/sqlcv1/olap.sql +++ b/pkg/repository/sqlcv1/olap.sql @@ -177,7 +177,8 @@ INSERT INTO v1_tasks_olap ( dag_id, dag_inserted_at, parent_task_external_id, - is_durable + is_durable, + step_name ) VALUES ( $1, $2, @@ -200,7 +201,8 @@ INSERT INTO v1_tasks_olap ( $19, $20, $21, - $22 + $22, + $23 ); -- name: CreateDAGsOLAP :copyfrom @@ -1519,9 +1521,16 @@ WITH runs AS ( MIN(e.inserted_at)::timestamptz AS created_at, MIN(e.inserted_at) FILTER (WHERE e.readable_status = 'RUNNING')::timestamptz AS started_at, MAX(e.inserted_at) FILTER (WHERE e.readable_status IN ('COMPLETED', 'CANCELLED', 'FAILED'))::timestamptz AS finished_at, - JSON_AGG(JSON_BUILD_OBJECT('task_id', e.task_id,'task_inserted_at', e.task_inserted_at)) AS task_metadata - FROM - relevant_events e + JSON_AGG( + JSON_BUILD_OBJECT( + 'task_id', e.task_id, + 'task_inserted_at', e.task_inserted_at, + 'output_event_external_id', CASE WHEN e.event_type = 'FINISHED' THEN e.external_id END, + 'step_name', t.step_name + ) + ) AS task_metadata + FROM relevant_events e + JOIN v1_tasks_olap t ON (e.task_id, e.task_inserted_at) = (t.id, t.inserted_at) JOIN max_retry_counts mrc ON (e.task_id, e.retry_count) = (mrc.task_id, mrc.max_retry_count) ), error_message AS ( SELECT diff --git a/pkg/repository/sqlcv1/olap.sql.go b/pkg/repository/sqlcv1/olap.sql.go index 5bc89faa4d..13de11305e 100644 --- a/pkg/repository/sqlcv1/olap.sql.go +++ b/pkg/repository/sqlcv1/olap.sql.go @@ -577,6 +577,7 @@ type CreateTasksOLAPParams struct { DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"` ParentTaskExternalID *uuid.UUID `json:"parent_task_external_id"` IsDurable bool `json:"is_durable"` + StepName pgtype.Text `json:"step_name"` } const createV1PayloadOLAPCutoverTemporaryTable = `-- name: CreateV1PayloadOLAPCutoverTemporaryTable :exec @@ -2562,7 +2563,7 @@ WITH selected_retry_count AS ( ) ) SELECT - t.tenant_id, t.id, t.inserted_at, t.external_id, t.queue, t.action_id, t.step_id, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.display_name, t.input, t.additional_metadata, t.readable_status, t.latest_retry_count, t.latest_worker_id, t.dag_id, t.dag_inserted_at, t.parent_task_external_id, t.is_durable, + t.tenant_id, t.id, t.inserted_at, t.external_id, t.queue, t.action_id, t.step_id, t.step_name, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.display_name, t.input, t.additional_metadata, t.readable_status, t.latest_retry_count, t.latest_worker_id, t.dag_id, t.dag_inserted_at, t.parent_task_external_id, t.is_durable, (t.dag_id IS NULL)::BOOLEAN AS is_standalone, st.readable_status::v1_readable_status_olap as status, f.finished_at::timestamptz as finished_at, @@ -2608,6 +2609,7 @@ type PopulateSingleTaskRunDataRow struct { Queue string `json:"queue"` ActionID string `json:"action_id"` StepID uuid.UUID `json:"step_id"` + StepName pgtype.Text `json:"step_name"` WorkflowID uuid.UUID `json:"workflow_id"` WorkflowVersionID uuid.UUID `json:"workflow_version_id"` WorkflowRunID uuid.UUID `json:"workflow_run_id"` @@ -2654,6 +2656,7 @@ func (q *Queries) PopulateSingleTaskRunData(ctx context.Context, db DBTX, arg Po &i.Queue, &i.ActionID, &i.StepID, + &i.StepName, &i.WorkflowID, &i.WorkflowVersionID, &i.WorkflowRunID, @@ -3112,7 +3115,7 @@ WITH lookup_task AS ( external_id = $1::uuid ) SELECT - t.tenant_id, t.id, t.inserted_at, t.external_id, t.queue, t.action_id, t.step_id, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.display_name, t.input, t.additional_metadata, t.readable_status, t.latest_retry_count, t.latest_worker_id, t.dag_id, t.dag_inserted_at, t.parent_task_external_id, t.is_durable, + t.tenant_id, t.id, t.inserted_at, t.external_id, t.queue, t.action_id, t.step_id, t.step_name, t.workflow_id, t.workflow_version_id, t.workflow_run_id, t.schedule_timeout, t.step_timeout, t.priority, t.sticky, t.desired_worker_id, t.display_name, t.input, t.additional_metadata, t.readable_status, t.latest_retry_count, t.latest_worker_id, t.dag_id, t.dag_inserted_at, t.parent_task_external_id, t.is_durable, e.output, e.external_id AS event_external_id, e.error_message @@ -3132,6 +3135,7 @@ type ReadTaskByExternalIDRow struct { Queue string `json:"queue"` ActionID string `json:"action_id"` StepID uuid.UUID `json:"step_id"` + StepName pgtype.Text `json:"step_name"` WorkflowID uuid.UUID `json:"workflow_id"` WorkflowVersionID uuid.UUID `json:"workflow_version_id"` WorkflowRunID uuid.UUID `json:"workflow_run_id"` @@ -3166,6 +3170,7 @@ func (q *Queries) ReadTaskByExternalID(ctx context.Context, db DBTX, externalid &i.Queue, &i.ActionID, &i.StepID, + &i.StepName, &i.WorkflowID, &i.WorkflowVersionID, &i.WorkflowRunID, @@ -3268,9 +3273,16 @@ WITH runs AS ( MIN(e.inserted_at)::timestamptz AS created_at, MIN(e.inserted_at) FILTER (WHERE e.readable_status = 'RUNNING')::timestamptz AS started_at, MAX(e.inserted_at) FILTER (WHERE e.readable_status IN ('COMPLETED', 'CANCELLED', 'FAILED'))::timestamptz AS finished_at, - JSON_AGG(JSON_BUILD_OBJECT('task_id', e.task_id,'task_inserted_at', e.task_inserted_at)) AS task_metadata - FROM - relevant_events e + JSON_AGG( + JSON_BUILD_OBJECT( + 'task_id', e.task_id, + 'task_inserted_at', e.task_inserted_at, + 'output_event_external_id', CASE WHEN e.event_type = 'FINISHED' THEN e.external_id END, + 'step_name', t.step_name + ) + ) AS task_metadata + FROM relevant_events e + JOIN v1_tasks_olap t ON (e.task_id, e.task_inserted_at) = (t.id, t.inserted_at) JOIN max_retry_counts mrc ON (e.task_id, e.retry_count) = (mrc.task_id, mrc.max_retry_count) ), error_message AS ( SELECT diff --git a/pkg/repository/sqlcv1/tasks-overwrite.go b/pkg/repository/sqlcv1/tasks-overwrite.go index bbc9cf519f..0058bfa736 100644 --- a/pkg/repository/sqlcv1/tasks-overwrite.go +++ b/pkg/repository/sqlcv1/tasks-overwrite.go @@ -48,7 +48,8 @@ WITH input AS ( unnest($34::boolean[]) AS is_durable, unnest($35::jsonb[]) AS desired_worker_label, unnest($36::uuid[]) AS triggering_event_external_id, - unnest($37::text[]) AS triggering_event_key + unnest($37::text[]) AS triggering_event_key, + unnest($38::text[]) AS step_name ) INSERT INTO v1_task ( tenant_id, @@ -87,7 +88,8 @@ INSERT INTO v1_task ( is_durable, desired_worker_label, triggering_event_external_id, - triggering_event_key + triggering_event_key, + step_name ) SELECT i.tenant_id, @@ -126,11 +128,12 @@ SELECT i.is_durable, i.desired_worker_label, i.triggering_event_external_id, - i.triggering_event_key + i.triggering_event_key, + i.step_name FROM input i RETURNING - id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, additional_metadata, initial_state, dag_id, dag_inserted_at, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, initial_state_reason, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, step_index, retry_backoff_factor, retry_max_backoff, workflow_version_id, workflow_run_id, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key + id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, additional_metadata, initial_state, dag_id, dag_inserted_at, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, initial_state_reason, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, step_index, retry_backoff_factor, retry_max_backoff, workflow_version_id, workflow_run_id, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, step_name ` type CreateTasksParams struct { @@ -174,6 +177,7 @@ type CreateTasksParams struct { DesiredWorkerLabels [][]byte `json:"desiredWorkerLabels"` TriggeringEventExternalIds []*uuid.UUID `json:"triggeringEventExternalIds"` TriggeringEventKeys []pgtype.Text `json:"triggeringEventKeys"` + StepNames []pgtype.Text `json:"stepNames"` } func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParams) ([]*V1Task, error) { @@ -228,6 +232,7 @@ func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParam arg.DesiredWorkerLabels, arg.TriggeringEventExternalIds, arg.TriggeringEventKeys, + arg.StepNames, ) if err != nil { return nil, err @@ -278,6 +283,7 @@ func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParam &i.DesiredWorkerLabel, &i.TriggeringEventExternalID, &i.TriggeringEventKey, + &i.StepName, ); err != nil { return nil, err } diff --git a/pkg/repository/sqlcv1/tasks.sql.go b/pkg/repository/sqlcv1/tasks.sql.go index 47eb2d315b..3493ba7898 100644 --- a/pkg/repository/sqlcv1/tasks.sql.go +++ b/pkg/repository/sqlcv1/tasks.sql.go @@ -722,7 +722,7 @@ func (q *Queries) FindOldestRunningTask(ctx context.Context, db DBTX) (*FindOlde } const findOldestTask = `-- name: FindOldestTask :one -SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key +SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, step_name FROM v1_task ORDER BY id, inserted_at LIMIT 1 @@ -773,6 +773,7 @@ func (q *Queries) FindOldestTask(ctx context.Context, db DBTX) (*V1Task, error) &i.DesiredWorkerLabel, &i.TriggeringEventExternalID, &i.TriggeringEventKey, + &i.StepName, ) return &i, err } @@ -1752,7 +1753,7 @@ func (q *Queries) ListTaskRunningStatuses(ctx context.Context, db DBTX, arg List } const listTasks = `-- name: ListTasks :many -SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key +SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, step_name FROM v1_task WHERE @@ -1816,6 +1817,7 @@ func (q *Queries) ListTasks(ctx context.Context, db DBTX, arg ListTasksParams) ( &i.DesiredWorkerLabel, &i.TriggeringEventExternalID, &i.TriggeringEventKey, + &i.StepName, ); err != nil { return nil, err } @@ -2508,7 +2510,7 @@ WITH input AS ( UNNEST($3::bigint[]) AS task_id, UNNEST($4::timestamptz[]) AS task_inserted_at ), relevant_tasks AS ( - SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, task_id, task_inserted_at + SELECT id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, step_name, task_id, task_inserted_at FROM v1_task t JOIN diff --git a/pkg/repository/sqlcv1/workers.sql.go b/pkg/repository/sqlcv1/workers.sql.go index 288fe1309f..31502b3a1d 100644 --- a/pkg/repository/sqlcv1/workers.sql.go +++ b/pkg/repository/sqlcv1/workers.sql.go @@ -906,7 +906,7 @@ func (q *Queries) ListManyWorkerLabels(ctx context.Context, db DBTX, workerids [ const listSemaphoreSlotsWithStateForWorker = `-- name: ListSemaphoreSlotsWithStateForWorker :many SELECT - task_id, task_inserted_at, runtime.retry_count, worker_id, runtime.tenant_id, timeout_at, evicted_at, id, inserted_at, v1_task.tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, v1_task.retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key + task_id, task_inserted_at, runtime.retry_count, worker_id, runtime.tenant_id, timeout_at, evicted_at, id, inserted_at, v1_task.tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, workflow_version_id, workflow_run_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, v1_task.retry_count, internal_retry_count, app_retry_count, step_index, additional_metadata, dag_id, dag_inserted_at, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, initial_state, initial_state_reason, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, retry_backoff_factor, retry_max_backoff, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, step_name FROM v1_task_runtime runtime JOIN @@ -973,6 +973,7 @@ type ListSemaphoreSlotsWithStateForWorkerRow struct { DesiredWorkerLabel []byte `json:"desired_worker_label"` TriggeringEventExternalID *uuid.UUID `json:"triggering_event_external_id"` TriggeringEventKey pgtype.Text `json:"triggering_event_key"` + StepName pgtype.Text `json:"step_name"` } func (q *Queries) ListSemaphoreSlotsWithStateForWorker(ctx context.Context, db DBTX, arg ListSemaphoreSlotsWithStateForWorkerParams) ([]*ListSemaphoreSlotsWithStateForWorkerRow, error) { @@ -1033,6 +1034,7 @@ func (q *Queries) ListSemaphoreSlotsWithStateForWorker(ctx context.Context, db D &i.DesiredWorkerLabel, &i.TriggeringEventExternalID, &i.TriggeringEventKey, + &i.StepName, ); err != nil { return nil, err } diff --git a/pkg/repository/task.go b/pkg/repository/task.go index 51135dd6d2..675d3656ba 100644 --- a/pkg/repository/task.go +++ b/pkg/repository/task.go @@ -1849,6 +1849,7 @@ func (r *sharedRepository) insertTasks( workflowRunIds := make([]uuid.UUID, len(tasks)) isDurables := make([]bool, len(tasks)) desiredWorkerLabels := make([][]byte, len(tasks)) + stepNames := make([]pgtype.Text, len(tasks)) externalIdToInput := make(map[uuid.UUID][]byte, len(tasks)) @@ -1876,6 +1877,7 @@ func (r *sharedRepository) insertTasks( retryMaxBackoffs[i] = stepConfig.RetryMaxBackoff workflowRunIds[i] = task.WorkflowRunId isDurables[i] = stepConfig.IsDurable + stepNames[i] = stepConfig.ReadableId // TODO: case on whether this is a v1 or v2 task by looking at the step data. for now, // we're assuming a v1 task. @@ -2171,6 +2173,7 @@ func (r *sharedRepository) insertTasks( DesiredWorkerLabels: make([][]byte, 0), TriggeringEventExternalIds: make([]*uuid.UUID, 0), TriggeringEventKeys: make([]pgtype.Text, 0), + StepNames: make([]pgtype.Text, 0), } } @@ -2209,6 +2212,7 @@ func (r *sharedRepository) insertTasks( params.WorkflowRunIds = append(params.WorkflowRunIds, workflowRunIds[i]) params.IsDurables = append(params.IsDurables, isDurables[i]) params.TriggeringEventExternalIds = append(params.TriggeringEventExternalIds, task.TriggeringEventExternalId) + params.StepNames = append(params.StepNames, stepNames[i]) triggeringEventKey := pgtype.Text{} diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index a75a86b305..505d7b821e 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -309,6 +309,7 @@ CREATE TABLE v1_task ( desired_worker_label JSONB, triggering_event_external_id UUID, triggering_event_key TEXT, + step_name TEXT, CONSTRAINT v1_task_pkey PRIMARY KEY (id, inserted_at) ) PARTITION BY RANGE(inserted_at); diff --git a/sql/schema/v1-olap.sql b/sql/schema/v1-olap.sql index 46dc783246..6bc34b0fcb 100644 --- a/sql/schema/v1-olap.sql +++ b/sql/schema/v1-olap.sql @@ -138,6 +138,7 @@ CREATE TABLE v1_tasks_olap ( queue TEXT NOT NULL, action_id TEXT NOT NULL, step_id UUID NOT NULL, + step_name TEXT, workflow_id UUID NOT NULL, workflow_version_id UUID NOT NULL, workflow_run_id UUID NOT NULL,