Skip to content
4 changes: 2 additions & 2 deletions api/v1/server/handlers/workers/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ func (t *WorkerService) workerGetV1(ctx echo.Context, tenant *sqlcv1.Tenant, req
return nil, err
}

workerIdToActions, err := t.config.V1.Workers().GetWorkerActionsByWorkerId(
workerIdToActions, err := t.config.V1.Workers().GetWorkerActionsForWorkers(
reqCtx,
worker.Worker.TenantId,
[]uuid.UUID{worker.Worker.ID},
[]sqlcv1.Worker{worker.Worker},
)

if err != nil {
Expand Down
22 changes: 14 additions & 8 deletions api/v1/server/handlers/workers/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,27 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re
telemetry.AttributeKV{Key: "tenant.id", Value: tenant.ID},
)

workers, err := t.config.V1.Workers().ListWorkers(listCtx, tenantId, opts)
workerRows, err := t.config.V1.Workers().ListWorkers(listCtx, tenantId, opts)

if err != nil {
listSpan.RecordError(err)
return nil, err
}

workers := make([]sqlcv1.Worker, len(workerRows))

for i, workerRow := range workerRows {
workers[i] = workerRow.Worker
}

telemetry.WithAttributes(listSpan,
telemetry.AttributeKV{Key: "workers.count", Value: len(workers)},
)

workerIdSet := make(map[uuid.UUID]struct{})

for _, worker := range workers {
workerIdSet[worker.Worker.ID] = struct{}{}
workerIdSet[worker.ID] = struct{}{}
}

workerIds := make([]uuid.UUID, 0, len(workerIdSet))
Expand All @@ -128,10 +134,10 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re
telemetry.AttributeKV{Key: "workers.unique_ids.count", Value: len(workerIds)},
)

workerIdToActionIds, err := t.config.V1.Workers().GetWorkerActionsByWorkerId(
workerIdToActionIds, err := t.config.V1.Workers().GetWorkerActionsForWorkers(
listCtx,
tenant.ID,
workerIds,
workers,
)

if err != nil {
Expand Down Expand Up @@ -160,11 +166,11 @@ func (t *WorkerService) workerListV1(ctx echo.Context, tenant *sqlcv1.Tenant, re

for i, worker := range workers {
workerCp := worker
actions := workerIdToActionIds[workerCp.Worker.ID.String()]
slotConfig := workerSlotConfig[workerCp.Worker.ID]
labels := workerIdToLabels[workerCp.Worker.ID]
actions := workerIdToActionIds[workerCp.ID.String()]
slotConfig := workerSlotConfig[workerCp.ID]
labels := workerIdToLabels[workerCp.ID]

rows[i] = *transformersv1.ToWorkerSqlc(&workerCp.Worker, slotConfig, actions, nil, labels)
rows[i] = *transformersv1.ToWorkerSqlc(&workerCp, slotConfig, actions, nil, labels)
}

return gen.WorkerList200JSONResponse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE "Worker" ADD COLUMN "actionHash" BYTEA;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE "Worker" DROP COLUMN "actionHash";
-- +goose StatementEnd
11 changes: 11 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20260508213451_v1_0_107.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- +goose no transaction

-- +goose Up
-- +goose StatementBegin
CREATE INDEX CONCURRENTLY IF NOT EXISTS "Worker_tenantId_actionHash_idx" ON "Worker" ("tenantId", "actionHash");
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP INDEX CONCURRENTLY IF EXISTS "Worker_tenantId_actionHash_idx";
-- +goose StatementEnd
1 change: 1 addition & 0 deletions pkg/repository/sqlcv1/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 19 additions & 9 deletions pkg/repository/sqlcv1/workers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -247,19 +247,27 @@ GROUP BY "tenantId"
;

-- name: GetWorkerActionsByWorkerId :many
WITH inputs AS (
SELECT UNNEST(@workerIds::UUID[]) AS "workerId"
)

SELECT
w."id" AS "workerId",
a."actionId" AS actionId
FROM "Worker" w
JOIN inputs i ON w."id" = i."workerId"
LEFT JOIN "_ActionToWorker" aw ON w.id = aw."B"
LEFT JOIN "Action" a ON aw."A" = a.id
JOIN "_ActionToWorker" aw ON w.id = aw."B"
JOIN "Action" a ON aw."A" = a.id
WHERE
a."tenantId" = @tenantId::UUID
AND w.id = ANY(@workerIds::UUID[])
;

-- name: GetWorkerActionsByWorkerActionHash :many
SELECT DISTINCT
w."actionHash" AS action_hash,
a."actionId" AS action_id
FROM "Worker" w
JOIN "_ActionToWorker" aw ON w.id = aw."B"
JOIN "Action" a ON aw."A" = a.id
WHERE
w."tenantId" = @tenantId::UUID
AND w."actionHash" = ANY(@actionHashes::BYTEA[])
;

-- name: GetWorkerWorkflowsByWorkerId :many
Expand Down Expand Up @@ -438,7 +446,8 @@ INSERT INTO "Worker" (
"language",
"languageVersion",
"os",
"runtimeExtra"
"runtimeExtra",
"actionHash"
) VALUES (
gen_random_uuid(),
CURRENT_TIMESTAMP,
Expand All @@ -451,7 +460,8 @@ INSERT INTO "Worker" (
sqlc.narg('language')::"WorkerSDKS",
sqlc.narg('languageVersion')::text,
sqlc.narg('os')::text,
sqlc.narg('runtimeExtra')::text
sqlc.narg('runtimeExtra')::text,
@actionHash::bytea
) RETURNING *;

-- name: LinkServicesToWorker :exec
Expand Down
Loading
Loading