Skip to content
11 changes: 11 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20260428164911_v1_0_102.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE v1_task ADD COLUMN step_name TEXT;
ALTER TABLE v1_tasks_olap ADD COLUMN step_name TEXT;
Comment thread
mrkaye97 marked this conversation as resolved.
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE v1_task DROP COLUMN step_name;
ALTER TABLE v1_tasks_olap DROP COLUMN step_name;
-- +goose StatementEnd
48 changes: 44 additions & 4 deletions pkg/repository/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,10 @@ func (r *OLAPRepositoryImpl) ReadTaskRun(ctx context.Context, taskExternalId uui
}

type TaskMetadata struct {
TaskID int64 `json:"task_id"`
TaskInsertedAt time.Time `json:"task_inserted_at"`
TaskID int64 `json:"task_id"`
TaskInsertedAt time.Time `json:"task_inserted_at"`
OutputEventExternalId *uuid.UUID `json:"output_event_external_id,omitempty"`
StepName *string `json:"step_name"`
}

func ParseTaskMetadata(jsonData []byte) ([]TaskMetadata, error) {
Expand Down Expand Up @@ -570,10 +572,47 @@ func (r *OLAPRepositoryImpl) ReadWorkflowRun(ctx context.Context, workflowRunExt
return nil, err
}

outputEventExternalIdToStepName := make(map[uuid.UUID]string)
outputEventExternalIds := make([]uuid.UUID, 0)

for _, taskMeta := range taskMetadata {
if taskMeta.OutputEventExternalId != nil && taskMeta.StepName != nil {
outputEventExternalIds = append(outputEventExternalIds, *taskMeta.OutputEventExternalId)
outputEventExternalIdToStepName[*taskMeta.OutputEventExternalId] = *taskMeta.StepName
}
}

outputPayloads, err := r.ReadPayloads(ctx, row.TenantID, outputEventExternalIds...)

Comment thread
mrkaye97 marked this conversation as resolved.
if err != nil {
return nil, err
}

output := make(map[string]interface{})

for externalId, payload := range outputPayloads {
stepName, ok := outputEventExternalIdToStepName[externalId]

if !ok {
continue
}

payloadMap := make(map[string]interface{})

err = json.Unmarshal(payload, &payloadMap)

if err != nil {
r.l.Error().Err(err).Msgf("failed to unmarshal payload for step %s, externalId %s", stepName, externalId)
continue
}

output[stepName] = payloadMap
Comment thread
mrkaye97 marked this conversation as resolved.
}

var outputPayload []byte

if row.OutputEventExternalID != nil {
outputPayload, err = r.ReadPayload(ctx, row.TenantID, *row.OutputEventExternalID)
if len(output) > 0 {
outputPayload, err = json.Marshal(output)

if err != nil {
return nil, err
Expand Down Expand Up @@ -1896,6 +1935,7 @@ func (r *OLAPRepositoryImpl) writeTaskBatch(ctx context.Context, tenantId uuid.U
WorkflowRunID: task.WorkflowRunID,
Input: payloadToWriteToTask,
IsDurable: task.IsDurable.Bool,
StepName: task.StepName,
})

putPayloadOpts = append(putPayloadOpts, StoreOLAPPayloadOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/repository/sqlcv1/batch.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/repository/sqlcv1/copyfrom.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/repository/sqlcv1/durable_event_log.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/repository/sqlcv1/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions pkg/repository/sqlcv1/olap.sql
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ INSERT INTO v1_tasks_olap (
dag_id,
dag_inserted_at,
parent_task_external_id,
is_durable
is_durable,
step_name
) VALUES (
$1,
$2,
Expand All @@ -200,7 +201,8 @@ INSERT INTO v1_tasks_olap (
$19,
$20,
$21,
$22
$22,
$23
);

-- name: CreateDAGsOLAP :copyfrom
Expand Down Expand Up @@ -1519,9 +1521,16 @@ WITH runs AS (
MIN(e.inserted_at)::timestamptz AS created_at,
MIN(e.inserted_at) FILTER (WHERE e.readable_status = 'RUNNING')::timestamptz AS started_at,
MAX(e.inserted_at) FILTER (WHERE e.readable_status IN ('COMPLETED', 'CANCELLED', 'FAILED'))::timestamptz AS finished_at,
JSON_AGG(JSON_BUILD_OBJECT('task_id', e.task_id,'task_inserted_at', e.task_inserted_at)) AS task_metadata
FROM
relevant_events e
JSON_AGG(
JSON_BUILD_OBJECT(
'task_id', e.task_id,
'task_inserted_at', e.task_inserted_at,
'output_event_external_id', CASE WHEN e.event_type = 'FINISHED' THEN e.external_id END,
'step_name', t.step_name
)
) AS task_metadata
FROM relevant_events e
JOIN v1_tasks_olap t ON (e.task_id, e.task_inserted_at) = (t.id, t.inserted_at)
JOIN max_retry_counts mrc ON (e.task_id, e.retry_count) = (mrc.task_id, mrc.max_retry_count)
), error_message AS (
SELECT
Expand Down
22 changes: 17 additions & 5 deletions pkg/repository/sqlcv1/olap.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions pkg/repository/sqlcv1/tasks-overwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ WITH input AS (
unnest($34::boolean[]) AS is_durable,
unnest($35::jsonb[]) AS desired_worker_label,
unnest($36::uuid[]) AS triggering_event_external_id,
unnest($37::text[]) AS triggering_event_key
unnest($37::text[]) AS triggering_event_key,
unnest($38::text[]) AS step_name
Comment thread
mrkaye97 marked this conversation as resolved.
)
INSERT INTO v1_task (
tenant_id,
Expand Down Expand Up @@ -87,7 +88,8 @@ INSERT INTO v1_task (
is_durable,
desired_worker_label,
triggering_event_external_id,
triggering_event_key
triggering_event_key,
step_name
)
SELECT
i.tenant_id,
Expand Down Expand Up @@ -126,11 +128,12 @@ SELECT
i.is_durable,
i.desired_worker_label,
i.triggering_event_external_id,
i.triggering_event_key
i.triggering_event_key,
i.step_name
FROM
input i
RETURNING
id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, additional_metadata, initial_state, dag_id, dag_inserted_at, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, initial_state_reason, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, step_index, retry_backoff_factor, retry_max_backoff, workflow_version_id, workflow_run_id, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key
id, inserted_at, tenant_id, queue, action_id, step_id, step_readable_id, workflow_id, schedule_timeout, step_timeout, priority, sticky, desired_worker_id, external_id, display_name, input, retry_count, internal_retry_count, app_retry_count, additional_metadata, initial_state, dag_id, dag_inserted_at, concurrency_parent_strategy_ids, concurrency_strategy_ids, concurrency_keys, initial_state_reason, parent_task_external_id, parent_task_id, parent_task_inserted_at, child_index, child_key, step_index, retry_backoff_factor, retry_max_backoff, workflow_version_id, workflow_run_id, is_durable, desired_worker_label, triggering_event_external_id, triggering_event_key, step_name
`

type CreateTasksParams struct {
Expand Down Expand Up @@ -174,6 +177,7 @@ type CreateTasksParams struct {
DesiredWorkerLabels [][]byte `json:"desiredWorkerLabels"`
TriggeringEventExternalIds []*uuid.UUID `json:"triggeringEventExternalIds"`
TriggeringEventKeys []pgtype.Text `json:"triggeringEventKeys"`
StepNames []pgtype.Text `json:"stepNames"`
}

func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParams) ([]*V1Task, error) {
Expand Down Expand Up @@ -228,6 +232,7 @@ func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParam
arg.DesiredWorkerLabels,
arg.TriggeringEventExternalIds,
arg.TriggeringEventKeys,
arg.StepNames,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -278,6 +283,7 @@ func (q *Queries) CreateTasks(ctx context.Context, db DBTX, arg CreateTasksParam
&i.DesiredWorkerLabel,
&i.TriggeringEventExternalID,
&i.TriggeringEventKey,
&i.StepName,
); err != nil {
return nil, err
}
Expand Down
Loading
Loading