diff --git a/cmd/hatchet-migrate/migrate/migrations/20260505195258_v1_0_104.sql b/cmd/hatchet-migrate/migrate/migrations/20260505195258_v1_0_104.sql new file mode 100644 index 0000000000..9b73c40fe7 --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20260505195258_v1_0_104.sql @@ -0,0 +1,56 @@ +-- +goose Up +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION v1_tasks_olap_status_update_function() +RETURNS TRIGGER AS +$$ +BEGIN + UPDATE + v1_runs_olap r + SET + readable_status = n.readable_status + FROM new_rows n + WHERE + r.id = n.id + AND r.inserted_at = n.inserted_at + AND r.kind = 'TASK'; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION v1_tasks_olap_status_update_function() +RETURNS TRIGGER AS +$$ +BEGIN + UPDATE + v1_runs_olap r + SET + readable_status = n.readable_status + FROM new_rows n + WHERE + r.id = n.id + AND r.inserted_at = n.inserted_at + AND r.kind = 'TASK'; + + -- insert tmp events into task status updates table if we have a dag_id + INSERT INTO v1_task_status_updates_tmp ( + tenant_id, + dag_id, + dag_inserted_at + ) + SELECT + tenant_id, + dag_id, + dag_inserted_at + FROM new_rows + WHERE dag_id IS NOT NULL; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; +-- +goose StatementEnd diff --git a/pkg/repository/sqlcv1/copyfrom.go b/pkg/repository/sqlcv1/copyfrom.go index 90ea943700..6dfb4d7399 100644 --- a/pkg/repository/sqlcv1/copyfrom.go +++ b/pkg/repository/sqlcv1/copyfrom.go @@ -203,44 +203,6 @@ func (q *Queries) CreateTaskEventsOLAP(ctx context.Context, db DBTX, arg []Creat return db.CopyFrom(ctx, []string{"v1_task_events_olap"}, []string{"tenant_id", "task_id", "task_inserted_at", "event_type", "workflow_id", "event_timestamp", "readable_status", "retry_count", "error_message", "output", "worker_id", "additional__event_data", "additional__event_message", "external_id", "durable_invocation_count"}, &iteratorForCreateTaskEventsOLAP{rows: arg}) } -// iteratorForCreateTaskEventsOLAPTmp implements pgx.CopyFromSource. -type iteratorForCreateTaskEventsOLAPTmp struct { - rows []CreateTaskEventsOLAPTmpParams - skippedFirstNextCall bool -} - -func (r *iteratorForCreateTaskEventsOLAPTmp) Next() bool { - if len(r.rows) == 0 { - return false - } - if !r.skippedFirstNextCall { - r.skippedFirstNextCall = true - return true - } - r.rows = r.rows[1:] - return len(r.rows) > 0 -} - -func (r iteratorForCreateTaskEventsOLAPTmp) Values() ([]interface{}, error) { - return []interface{}{ - r.rows[0].TenantID, - r.rows[0].TaskID, - r.rows[0].TaskInsertedAt, - r.rows[0].EventType, - r.rows[0].ReadableStatus, - r.rows[0].RetryCount, - r.rows[0].WorkerID, - }, nil -} - -func (r iteratorForCreateTaskEventsOLAPTmp) Err() error { - return nil -} - -func (q *Queries) CreateTaskEventsOLAPTmp(ctx context.Context, db DBTX, arg []CreateTaskEventsOLAPTmpParams) (int64, error) { - return db.CopyFrom(ctx, []string{"v1_task_events_olap_tmp"}, []string{"tenant_id", "task_id", "task_inserted_at", "event_type", "readable_status", "retry_count", "worker_id"}, &iteratorForCreateTaskEventsOLAPTmp{rows: arg}) -} - // iteratorForInsertLogLine implements pgx.CopyFromSource. type iteratorForInsertLogLine struct { rows []InsertLogLineParams diff --git a/pkg/repository/sqlcv1/olap.sql b/pkg/repository/sqlcv1/olap.sql index b1503e2ce8..d6219cc932 100644 --- a/pkg/repository/sqlcv1/olap.sql +++ b/pkg/repository/sqlcv1/olap.sql @@ -154,25 +154,6 @@ WHERE END ; --- name: CreateTaskEventsOLAPTmp :copyfrom -INSERT INTO v1_task_events_olap_tmp ( - tenant_id, - task_id, - task_inserted_at, - event_type, - readable_status, - retry_count, - worker_id -) VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7 -); - -- name: CreateTaskEventsOLAP :copyfrom INSERT INTO v1_task_events_olap ( tenant_id, diff --git a/pkg/repository/sqlcv1/olap.sql.go b/pkg/repository/sqlcv1/olap.sql.go index a5410d81df..15f6f97388 100644 --- a/pkg/repository/sqlcv1/olap.sql.go +++ b/pkg/repository/sqlcv1/olap.sql.go @@ -530,16 +530,6 @@ type CreateTaskEventsOLAPParams struct { DurableInvocationCount int32 `json:"durable_invocation_count"` } -type CreateTaskEventsOLAPTmpParams struct { - TenantID uuid.UUID `json:"tenant_id"` - TaskID int64 `json:"task_id"` - TaskInsertedAt pgtype.Timestamptz `json:"task_inserted_at"` - EventType V1EventTypeOlap `json:"event_type"` - ReadableStatus V1ReadableStatusOlap `json:"readable_status"` - RetryCount int32 `json:"retry_count"` - WorkerID *uuid.UUID `json:"worker_id"` -} - const createV1PayloadOLAPCutoverTemporaryTable = `-- name: CreateV1PayloadOLAPCutoverTemporaryTable :exec SELECT copy_v1_payloads_olap_partition_structure($1::DATE) ` diff --git a/sql/schema/v1-olap.sql b/sql/schema/v1-olap.sql index 32f3d1f159..d267ffdb66 100644 --- a/sql/schema/v1-olap.sql +++ b/sql/schema/v1-olap.sql @@ -628,19 +628,6 @@ BEGIN AND r.inserted_at = n.inserted_at AND r.kind = 'TASK'; - -- insert tmp events into task status updates table if we have a dag_id - INSERT INTO v1_task_status_updates_tmp ( - tenant_id, - dag_id, - dag_inserted_at - ) - SELECT - tenant_id, - dag_id, - dag_inserted_at - FROM new_rows - WHERE dag_id IS NOT NULL; - RETURN NULL; END; $$