Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
63c46a6
refactor: improve operation pool / tenant for controller logic
mrkaye97 Apr 5, 2026
a62e0c2
refactor: remove some more loops
mrkaye97 Apr 5, 2026
6a748e0
feat: initial work on mq-based status updates
mrkaye97 Apr 5, 2026
d53b3f0
feat: initial update query impl (will need to revise)
mrkaye97 Apr 5, 2026
515a878
fix: couple bugs
mrkaye97 Apr 5, 2026
521576f
fix: don't need to write into the temp table anymore
mrkaye97 Apr 5, 2026
98bb161
refactor: attempt to do status updates inline
mrkaye97 Apr 6, 2026
9095a91
fix: try to handle deadlocking
mrkaye97 Apr 6, 2026
42b10c9
chore: rm dead code
mrkaye97 Apr 6, 2026
9d418e8
feat: dag status updates directly
mrkaye97 Apr 6, 2026
4385840
fix: query changes, remove unneeded stuff
mrkaye97 Apr 6, 2026
05fc639
fix: migration first pass
mrkaye97 Apr 8, 2026
86094ce
fix: move migration
mrkaye97 Apr 10, 2026
eb4cec6
chore: rm old idx from schema
mrkaye97 Apr 10, 2026
503ebfb
feat: remove status partitioning function + update call sites
mrkaye97 Apr 10, 2026
dc55923
fix: add analyze
mrkaye97 Apr 10, 2026
85cb210
fix: oops don't actually need analyze
mrkaye97 Apr 10, 2026
ad9e1bf
fix: timestamps, add link to note
mrkaye97 Apr 10, 2026
677ffd5
Merge branch 'mk/remove-status-partitioning' into mk/mq-for-status-up…
mrkaye97 Apr 10, 2026
af0d257
Merge branch 'main' into mk/remove-status-partitioning
mrkaye97 Apr 10, 2026
9c0020d
Merge branch 'mk/remove-status-partitioning' into mk/mq-for-status-up…
mrkaye97 Apr 10, 2026
d5e9d57
Merge branch 'main' into mk/remove-status-partitioning
mrkaye97 Apr 13, 2026
7f2f68d
fix: had locking backwards all along :facepalm:
mrkaye97 Apr 14, 2026
69ab53d
fix: rm target partition check, since I don't think we need this anymore
mrkaye97 Apr 14, 2026
611123b
fix: improve / simplify locking more
mrkaye97 Apr 14, 2026
40e607b
fix: is_dag_task
mrkaye97 Apr 14, 2026
92af111
fix: add requeueing logic, comment out some stuff for dev
mrkaye97 Apr 14, 2026
09be02e
refactor: simplify more
mrkaye97 Apr 14, 2026
0dcc2d6
fix: copy paste bug
mrkaye97 Apr 14, 2026
37da81f
fix: simplify more
mrkaye97 Apr 14, 2026
8dd39a4
fix: compiler
mrkaye97 Apr 14, 2026
734e180
chore: gen
mrkaye97 Apr 14, 2026
9b1d4d4
fix: more copy paste
mrkaye97 Apr 14, 2026
128ed8f
fix: yet more copy paste
mrkaye97 Apr 14, 2026
b4ee0dd
fix: add where clause
mrkaye97 Apr 14, 2026
ce56a94
chore: remove debug prints
mrkaye97 Apr 14, 2026
f082507
refactor: simplify the dag status update query a bunch to be more in …
mrkaye97 Apr 14, 2026
d5538da
chore: rm old comment
mrkaye97 Apr 14, 2026
97d0231
Merge branch 'mk/mq-for-status-updates' into mk/python-saga
mrkaye97 Apr 14, 2026
623f510
feat: add first pass of saga pattern
mrkaye97 Apr 15, 2026
73fdc39
fix: naming
mrkaye97 Apr 15, 2026
71da199
Merge branch 'main' into mk/remove-status-partitioning
mrkaye97 Apr 15, 2026
5740134
Merge branch 'mk/remove-status-partitioning' into mk/mq-for-status-up…
mrkaye97 Apr 15, 2026
46d20c5
Merge branch 'mk/mq-for-status-updates' into mk/python-saga
mrkaye97 Apr 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20260410190713_v1_0_97.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package migrations

import (
"context"
"database/sql"
"fmt"

"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigrationNoTxContext(up20260410190713, down20260410190713)
}

func v1RunsOlapTenantStatusInsAtIdxName(table string) string {
return fmt.Sprintf("ix_%s_tenant_status_ins_at", table)
}

func up20260410190713(ctx context.Context, db *sql.DB) error {
// drop the old outdated index first
// note: can't do this concurrently or in parts (i.e. dropping children first)
// see: https://stackoverflow.com/a/76167838
stmt := "DROP INDEX IF EXISTS ix_v1_runs_olap_tenant_id"

if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("drop old index on %s: %w", v1RunsOlapTable, err)
}

grandchildPartitions, err := listLeafPartitions(ctx, db, v1RunsOlapTable, 2)
if err != nil {
return err
}

for _, partition := range grandchildPartitions {
stmt := fmt.Sprintf(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (tenant_id, readable_status, inserted_at DESC)`,
quoteIdent(v1RunsOlapTenantStatusInsAtIdxName(partition)),
quoteIdent(partition),
)
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("create index concurrently on %s: %w", partition, err)
}
}

childPartitions, err := listLeafPartitions(ctx, db, v1RunsOlapTable, 1)
if err != nil {
return err
}

for _, partition := range childPartitions {
stmt := fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS %s ON %s (tenant_id, readable_status, inserted_at DESC)`,
quoteIdent(v1RunsOlapTenantStatusInsAtIdxName(partition)),
quoteIdent(partition),
)
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("create index on partition %s: %w", partition, err)
}
}

stmt = fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS %s ON %s (tenant_id, readable_status, inserted_at DESC)`,
quoteIdent(v1RunsOlapTenantStatusInsAtIdxName(v1RunsOlapTable)),
quoteIdent(v1RunsOlapTable),
)
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("create index on %s: %w", v1RunsOlapTable, err)
}

return nil
}

func down20260410190713(ctx context.Context, db *sql.DB) error {
// drop the new index first so we can rebuild the old one bottom-up
stmt := "DROP INDEX IF EXISTS ix_v1_runs_olap_tenant_status_ins_at"
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("drop new index on %s: %w", v1RunsOlapTable, err)
}

grandchildPartitions, err := listLeafPartitions(ctx, db, v1RunsOlapTable, 2)
if err != nil {
return err
}

for _, partition := range grandchildPartitions {
stmt := fmt.Sprintf(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (tenant_id, inserted_at, id, readable_status, kind)`,
quoteIdent(idxNameForPartition(partition)),
quoteIdent(partition),
)
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("create index concurrently on %s: %w", partition, err)
}
}

childPartitions, err := listLeafPartitions(ctx, db, v1RunsOlapTable, 1)
if err != nil {
return err
}

for _, partition := range childPartitions {
stmt := fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS %s ON %s (tenant_id, inserted_at, id, readable_status, kind)`,
quoteIdent(idxNameForPartition(partition)),
quoteIdent(partition),
)
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("create index on partition %s: %w", partition, err)
}
}

stmt = fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS %s ON %s (tenant_id, inserted_at, id, readable_status, kind)`,
quoteIdent(idxNameForPartition(v1RunsOlapTable)),
quoteIdent(v1RunsOlapTable),
)
if _, err := db.ExecContext(ctx, stmt); err != nil {
return fmt.Errorf("create index on %s: %w", v1RunsOlapTable, err)
}

return nil
}
41 changes: 41 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20260410202520_v1_0_98.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- +goose Up
-- +goose StatementBegin
DROP FUNCTION create_v1_olap_partition_with_date_and_status(text, date);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
CREATE OR REPLACE FUNCTION create_v1_olap_partition_with_date_and_status(
targetTableName text,
targetDate date
) RETURNS integer
LANGUAGE plpgsql AS
$$
DECLARE
targetDateStr varchar;
targetDatePlusOneDayStr varchar;
newTableName varchar;
BEGIN
SELECT to_char(targetDate, 'YYYYMMDD') INTO targetDateStr;
SELECT to_char(targetDate + INTERVAL '1 day', 'YYYYMMDD') INTO targetDatePlusOneDayStr;
SELECT format('%s_%s', targetTableName, targetDateStr) INTO newTableName;
IF NOT EXISTS (SELECT 1 FROM pg_tables WHERE tablename = newTableName) THEN
EXECUTE format('CREATE TABLE %s (LIKE %s INCLUDING INDEXES) PARTITION BY LIST (readable_status)', newTableName, targetTableName);
END IF;

PERFORM create_v1_partition_with_status(newTableName, 'QUEUED');
PERFORM create_v1_partition_with_status(newTableName, 'RUNNING');
PERFORM create_v1_partition_with_status(newTableName, 'COMPLETED');
PERFORM create_v1_partition_with_status(newTableName, 'CANCELLED');
PERFORM create_v1_partition_with_status(newTableName, 'FAILED');
PERFORM create_v1_partition_with_status(newTableName, 'EVICTED');

-- If it's not already attached, attach the partition
IF NOT EXISTS (SELECT 1 FROM pg_inherits WHERE inhrelid = newTableName::regclass) THEN
EXECUTE format('ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM (''%s'') TO (''%s'')', targetTableName, newTableName, targetDateStr, targetDatePlusOneDayStr);
END IF;

RETURN 1;
END;
$$;
-- +goose StatementEnd
4 changes: 2 additions & 2 deletions internal/operation/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func (p *TenantOperationPool) Cleanup() {
})
}

func (p *TenantOperationPool) setTenants(tenants []*sqlcv1.Tenant) {
func (p *TenantOperationPool) setTenants(tenants []uuid.UUID) {
tenantMap := make(map[uuid.UUID]bool)

for _, t := range tenants {
tenantMap[t.ID] = true
tenantMap[t] = true
}

// init ops for new tenants
Expand Down
6 changes: 3 additions & 3 deletions internal/queueutils/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package queueutils
import (
"time"

"github.com/google/uuid"
"github.com/hatchet-dev/hatchet/internal/syncx"
"github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1"

"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -33,11 +33,11 @@ func (p *OperationPool) WithJitter(maxJitter time.Duration) *OperationPool {
return p
}

func (p *OperationPool) SetTenants(tenants []*sqlcv1.Tenant) {
func (p *OperationPool) SetTenants(tenants []uuid.UUID) {
tenantMap := make(map[string]bool)

for _, t := range tenants {
tenantMap[t.ID.String()] = true
tenantMap[t.String()] = true
}

// delete tenants that are not in the list
Expand Down
95 changes: 68 additions & 27 deletions internal/services/controllers/olap/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,38 +304,38 @@ func (o *OLAPControllerImpl) Start() (func() error, error) {
}

// Default poll interval
pollIntervalSec := 2
// pollIntervalSec := 2

// Override with config value if available
if o.olapConfig != nil && o.olapConfig.PollInterval > 0 {
pollIntervalSec = o.olapConfig.PollInterval
}
// if o.olapConfig != nil && o.olapConfig.PollInterval > 0 {
// pollIntervalSec = o.olapConfig.PollInterval
// }

_, err = o.s.NewJob(
gocron.DurationJob(time.Second*time.Duration(pollIntervalSec)),
gocron.NewTask(
o.runTaskStatusUpdates(ctx),
),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
// _, err = o.s.NewJob(
// gocron.DurationJob(time.Second*time.Duration(pollIntervalSec)),
// gocron.NewTask(
// o.runTaskStatusUpdates(ctx),
// ),
// gocron.WithSingletonMode(gocron.LimitModeReschedule),
// )

if err != nil {
cancel()
return nil, fmt.Errorf("could not schedule task status updates: %w", err)
}
// if err != nil {
// cancel()
// return nil, fmt.Errorf("could not schedule task status updates: %w", err)
// }

_, err = o.s.NewJob(
gocron.DurationJob(time.Second*time.Duration(pollIntervalSec)),
gocron.NewTask(
o.runDAGStatusUpdates(ctx),
),
gocron.WithSingletonMode(gocron.LimitModeReschedule),
)
// _, err = o.s.NewJob(
// gocron.DurationJob(time.Second*time.Duration(pollIntervalSec)),
// gocron.NewTask(
// o.runDAGStatusUpdates(ctx),
// ),
// gocron.WithSingletonMode(gocron.LimitModeReschedule),
// )

if err != nil {
cancel()
return nil, fmt.Errorf("could not schedule dag status updates: %w", err)
}
// if err != nil {
// cancel()
// return nil, fmt.Errorf("could not schedule dag status updates: %w", err)
// }

_, err = o.s.NewJob(
gocron.DurationJob(time.Second*60),
Expand Down Expand Up @@ -766,6 +766,7 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t
durableInvocationCounts := make([]int32, 0)
workerIds := make([]uuid.UUID, 0)
workflowIds := make([]uuid.UUID, 0)
workflowRunIDs := make([]uuid.UUID, 0)
eventTypes := make([]sqlcv1.V1EventTypeOlap, 0)
readableStatuses := make([]sqlcv1.V1ReadableStatusOlap, 0)
eventPayloads := make([]string, 0)
Expand All @@ -790,6 +791,7 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t
taskIds = append(taskIds, msg.TaskId)
taskInsertedAts = append(taskInsertedAts, taskMeta.InsertedAt)
workflowIds = append(workflowIds, taskMeta.WorkflowID)
workflowRunIDs = append(workflowRunIDs, taskMeta.WorkflowRunID)
retryCounts = append(retryCounts, msg.RetryCount)
durableInvocationCounts = append(durableInvocationCounts, msg.DurableInvocationCount)
eventTypes = append(eventTypes, msg.EventType)
Expand Down Expand Up @@ -913,12 +915,51 @@ func (tc *OLAPControllerImpl) handleCreateMonitoringEvent(ctx context.Context, t
opts = append(opts, event)
}

err = tc.repo.OLAP().CreateTaskEvents(ctx, tenantId, opts)
notFoundEvents, err := tc.repo.OLAP().CreateTaskEvents(ctx, tenantId, opts)

if err != nil {
return err
}

if len(notFoundEvents) > 0 {
notFoundTaskIDs := make(map[int64]struct{}, len(notFoundEvents))
for _, e := range notFoundEvents {
notFoundTaskIDs[e.TaskID] = struct{}{}
}

requeueCount := 0

for _, msg := range msgs {
if _, ok := notFoundTaskIDs[msg.TaskId]; !ok {
continue
}

if msg.RequeueCount >= 10 {
tc.l.Error().Ctx(ctx).Msgf("giving up on requeuing monitoring event for task %d after %d attempts", msg.TaskId, msg.RequeueCount)
continue
}

requeued := *msg
requeued.RequeueCount++

requeueMsg, requeueErr := tasktypes.MonitoringEventMessageFromInternal(tenantId, requeued)
if requeueErr != nil {
tc.l.Error().Ctx(ctx).Err(requeueErr).Msgf("could not create requeue message for task %d", msg.TaskId)
continue
}

if requeueErr = tc.mq.SendMessage(ctx, msgqueue.OLAP_QUEUE, requeueMsg); requeueErr != nil {
tc.l.Error().Ctx(ctx).Err(requeueErr).Msgf("could not requeue monitoring event for task %d", msg.TaskId)
} else {
requeueCount++
}
}

if requeueCount > 0 {
tc.l.Warn().Ctx(ctx).Msgf("requeued %d monitoring events for tasks not yet in OLAP table", requeueCount)
}
}

tc.synthesizeEngineSpans(ctx, tenantId, spanEvents)

if !tc.repo.OLAP().PayloadStore().ExternalStoreEnabled() {
Expand Down
7 changes: 2 additions & 5 deletions internal/services/controllers/olap/process_alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func (o *OLAPControllerImpl) runTenantProcessAlerts(ctx context.Context) func()
return func() {
o.l.Debug().Ctx(ctx).Msgf("partition: processing tenant alerts")

// list all tenants
tenants, err := o.p.ListTenantsForController(ctx, sqlcv1.TenantMajorEngineVersionV1)

if err != nil {
Expand All @@ -26,10 +25,8 @@ func (o *OLAPControllerImpl) runTenantProcessAlerts(ctx context.Context) func()

o.processTenantAlertOperations.SetTenants(tenants)

for i := range tenants {
tenantId := tenants[i].ID.String()

o.processTenantAlertOperations.RunOrContinue(tenantId)
for _, tenantId := range tenants {
o.processTenantAlertOperations.RunOrContinue(tenantId.String())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,13 @@ func (o *OLAPControllerImpl) runDAGStatusUpdates(ctx context.Context) func() {
for shouldContinue {
o.l.Debug().Ctx(ctx).Msgf("partition: running status updates for dags")

// list all tenants
tenants, err := o.p.ListTenantsForController(ctx, sqlcv1.TenantMajorEngineVersionV1)
tenantIds, err := o.p.ListTenantsForController(ctx, sqlcv1.TenantMajorEngineVersionV1)

if err != nil {
o.l.Error().Ctx(ctx).Err(err).Msg("could not list tenants")
return
}

tenantIds := make([]uuid.UUID, 0, len(tenants))

for _, tenant := range tenants {
tenantId := tenant.ID
tenantIds = append(tenantIds, tenantId)
}

var rows []v1.UpdateDAGStatusRow

shouldContinue, rows, err = o.repo.OLAP().UpdateDAGStatuses(ctx, tenantIds)
Expand Down
Loading
Loading