Skip to content

fix: Fixed tasks using Batch or Subtask plugin getting stuck in the WAITING state #561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions engine/collector_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func getUpdateErrorResolution(dbp zesty.DBProvider) (*resolution.Resolution, err
SELECT id
FROM "resolution"
WHERE ((instance_id = $1 AND state = $2) OR
((state = $3 OR state = $4) AND next_retry < NOW()))
((state = $3 OR state = $4) AND next_retry < NOW()) OR
(state = $5 AND next_retry > last_start AND next_retry < NOW()))
LIMIT 1
FOR UPDATE SKIP LOCKED
)
Expand All @@ -62,7 +63,16 @@ func getUpdateErrorResolution(dbp zesty.DBProvider) (*resolution.Resolution, err
var r resolution.Resolution

instanceID := utask.InstanceID
if err := dbp.DB().SelectOne(&r, sqlStmt, instanceID, resolution.StateRetry, resolution.StateError, resolution.StateToAutorunDelayed); err != nil {
err := dbp.DB().SelectOne(
&r,
sqlStmt,
instanceID,
resolution.StateRetry,
resolution.StateError,
resolution.StateToAutorunDelayed,
resolution.StateWaiting,
)
if err != nil {
return nil, pgjuju.Interpret(err)
}

Expand Down
19 changes: 14 additions & 5 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,13 @@ forLoop:

utask.ReleaseResource("template:" + t.TemplateName)
utask.ReleaseExecutionSlot()
if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil {
if err := wakeParentTask(dbp, t, debugLogger); err != nil {
debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err)
}
}

func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error {
// wakeParentTask wakes up the current task's parent if needed by changing it's next_retry to now.
func wakeParentTask(dbp zesty.DBProvider, currentTask *task.Task, debugLogger *logrus.Entry) error {
parentTask, err := taskutils.ShouldResumeParentTask(dbp, currentTask)
if err != nil {
return err
Expand All @@ -625,9 +626,17 @@ func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphor
return nil
}

debugLogger.Debugf("resuming parent task %q resolution %q", parentTask.PublicID, *parentTask.Resolution)
debugLogger.WithFields(logrus.Fields{"task_id": parentTask.PublicID, "resolution_id": *parentTask.Resolution}).Debugf("resuming resolution %q as child task %q state changed", *parentTask.Resolution, currentTask.PublicID)
return GetEngine().Resolve(*parentTask.Resolution, sm)
res, err := resolution.LoadFromPublicID(dbp, *parentTask.Resolution)
if err != nil {
return err
}

if _, err := res.UpdateNextRetry(dbp, time.Now()); err != nil {
return errors.Annotatef(err, "next_retry update failure for parent task '%s'", parentTask.PublicID)
}

debugLogger.Debugf("updated parent task %q resolution %q next_retry", parentTask.PublicID, *parentTask.Resolution)
return nil
}

func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) error {
Expand Down
117 changes: 86 additions & 31 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,11 @@ func TestResolveSubTask(t *testing.T) {
require.NotNil(t, res)
assert.Equal(t, resolution.StateWaiting, res.State)

nextRetryBeforeRun := time.Time{}
if res.NextRetry != nil {
nextRetryBeforeRun = *res.NextRetry
}

for _, subtaskName := range []string{"subtaskCreation", "jsonInputSubtask", "templatingJsonInputSubtask"} {
subtaskCreationOutput := res.Steps[subtaskName].Output.(map[string]interface{})
subtaskPublicID := subtaskCreationOutput["id"].(string)
Expand Down Expand Up @@ -1285,27 +1290,25 @@ func TestResolveSubTask(t *testing.T) {
assert.Equal(t, res.TaskID, parentTaskToResume.ID)
}

// checking if the parent task is picked up after that the subtask is resolved.
// need to sleep a bit because the parent task is resumed asynchronously
ti := time.Second
i := time.Duration(0)
for i < ti {
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
if res.State != resolution.StateWaiting {
break
}
// checking whether the parent task will be picked up by the RetryCollector after the subtask is resolved.
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
assert.NotNil(t, res.NextRetry)
assert.False(t, res.NextRetry.IsZero())
assert.True(t, res.NextRetry.After(nextRetryBeforeRun))
assert.True(t, res.NextRetry.Before(time.Now()))

time.Sleep(time.Millisecond * 10)
i += time.Millisecond * 10
}
// Starting the RetryCollector to resume the parent task
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
engine.RetryCollector(ctx)

ti = time.Second
i = time.Duration(0)
ti := time.Second
i := time.Duration(0)
for i < ti {
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
if res.State != resolution.StateRunning {
if res.State == resolution.StateDone {
break
}

Expand Down Expand Up @@ -1409,6 +1412,11 @@ func TestBatch(t *testing.T) {
require.NotNil(t, res)
assert.Equal(t, resolution.StateWaiting, res.State)

nextRetryBeforeRun := time.Time{}
if res.NextRetry != nil {
nextRetryBeforeRun = *res.NextRetry
}

for _, batchStepName := range []string{"batchJsonInputs", "batchYamlInputs"} {
batchStepMetadataRaw, ok := res.Steps[batchStepName].Metadata.(string)
assert.True(t, ok, "wrong type of metadata for step '%s'", batchStepName)
Expand Down Expand Up @@ -1463,32 +1471,79 @@ func TestBatch(t *testing.T) {
}
}

// checking if the parent task is picked up after the subtask is resolved.
// We need to sleep a bit because the parent task is resumed asynchronously
// checking whether the parent task will be picked up by the RetryCollector after the subtask is resolved.
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
assert.NotNil(t, res.NextRetry)
assert.False(t, res.NextRetry.IsZero())
assert.True(t, res.NextRetry.After(nextRetryBeforeRun))
assert.True(t, res.NextRetry.Before(time.Now()))

// Starting the RetryCollector to resume the parent task
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
engine.RetryCollector(ctx)

ti := time.Second
i := time.Duration(0)
for i < ti {
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
if res.State != resolution.StateWaiting {
if res.State == resolution.StateDone {
break
}

time.Sleep(time.Millisecond * 10)
i += time.Millisecond * 10
}
assert.Equal(t, resolution.StateDone, res.State)
}

ti = time.Second
i = time.Duration(0)
for i < ti {
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
if res.State != resolution.StateRunning {
break
}
func TestWakeParent(t *testing.T) {
dbp, err := zesty.NewDBProvider(utask.DBName)
require.Nil(t, err)

time.Sleep(time.Millisecond * 10)
i += time.Millisecond * 10
}
assert.Equal(t, resolution.StateDone, res.State)
// Create a task that spawns two simple subtasks
_, err = templateFromYAML(dbp, "no-output.yaml")
require.Nil(t, err)

res, err := createResolution("noOutputSubtask.yaml", map[string]interface{}{}, nil)
require.Nil(t, err, "failed to create resolution: %s", err)

res, err = runResolution(res)
require.Nil(t, err)
require.NotNil(t, res)
require.Equal(t, resolution.StateWaiting, res.State)
assert.True(t, res.NextRetry.IsZero())

// Force the parent task to the RUNNING state
res.SetState(resolution.StateRunning)
res.Update(dbp)

// Create and run one of the subtasks
subtaskCreationOutput := res.Steps["subtaskCreation"].Output.(map[string]interface{})
subtaskPublicID := subtaskCreationOutput["id"].(string)

subtask, err := task.LoadFromPublicID(dbp, subtaskPublicID)
require.Nil(t, err)
require.Equal(t, task.StateTODO, subtask.State)

subtaskResolution, err := resolution.Create(dbp, subtask, nil, "", false, nil)
require.Nil(t, err)

beforeRun := time.Now()
subtaskResolution, err = runResolution(subtaskResolution)
require.Nil(t, err)
require.Equal(t, task.StateDone, subtaskResolution.State)
afterRun := time.Now()

// Refreshing parent resolution to check its next_retry value
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
require.Nil(t, err)
assert.Equal(t, res.State, resolution.StateRunning) // Parent should still be RUNNING

// The parent's next_retry should have been updated so that the RetryCollector would pick it up
assert.NotNil(t, res.NextRetry)
assert.True(t, res.NextRetry.After(beforeRun))
assert.True(t, res.NextRetry.Before(afterRun))
}
18 changes: 18 additions & 0 deletions engine/templates_tests/noOutputSubtask.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: noOutputSubtaskTemplate
description: Template that spawns two simple subtasks
title_format: "[test] simple subtasks template test"

steps:
subtaskCreation:
description: creating a subtask
action:
type: subtask
configuration:
template: no-output

otherSubtaskCreation:
description: creating another subtask
action:
type: subtask
configuration:
template: no-output
98 changes: 93 additions & 5 deletions models/resolution/resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"encoding/json"
"time"

"github.com/Masterminds/squirrel"
"github.com/gofrs/uuid"
"github.com/juju/errors"
"github.com/loopfz/gadgeto/zesty"

"github.com/ovh/utask"
"github.com/ovh/utask/db/pgjuju"
"github.com/ovh/utask/db/sqlgenerator"
Expand All @@ -17,11 +22,6 @@ import (
"github.com/ovh/utask/pkg/compress"
"github.com/ovh/utask/pkg/now"
"github.com/ovh/utask/pkg/utils"

"github.com/Masterminds/squirrel"
"github.com/gofrs/uuid"
"github.com/juju/errors"
"github.com/loopfz/gadgeto/zesty"
)

// all valid resolution states
Expand Down Expand Up @@ -410,6 +410,22 @@ func (r *Resolution) Update(dbp zesty.DBProvider) (err error) {
// force empty to stop using old crypto code
r.CryptKey = []byte{}

var next_retry time.Time
if r.NextRetry == nil || r.NextRetry.IsZero() {
// No local next_retry value. It may have been set in DB since the last read, so we need to refresh it
next_retry, err = getNextRetry(dbp, r.ID)
if err != nil {
return err
}
} else {
// Updating the next_retry individually to prevent overriding it with a nil value or a later date
next_retry, err = r.UpdateNextRetry(dbp, *r.NextRetry)
if err != nil {
return errors.Annotatef(err, "failed to update resolution's next_retry")
}
}
r.NextRetry = &next_retry

rows, err := dbp.DB().Update(&r.DBModel)
if err != nil {
return pgjuju.Interpret(err)
Expand All @@ -420,6 +436,63 @@ func (r *Resolution) Update(dbp zesty.DBProvider) (err error) {
return nil
}

// UpdateNextRetry updates the Resolution's next_retry field while respecting the current next_retry value in DB. It
// can only shorten the time before the resolution will be retried next, not increase it.
func (r *Resolution) UpdateNextRetry(dbp zesty.DBProvider, newNextRetry time.Time) (time.Time, error) {
sp, err := dbp.TxSavepoint()
defer dbp.RollbackTo(sp)
if err != nil {
return time.Time{}, err
}

// Using the Resolution's ID to soft lock the update and prevent concurrent updates
if _, err := dbp.DB().Exec(`SELECT pg_advisory_xact_lock($1)`, r.ID); err != nil {
return time.Time{}, err
}

query, params, err := sqlgenerator.PGsql.
Update("resolution").
Where(squirrel.Eq{"public_id": r.PublicID}).
// Is the new next_retry valid
Where("? > last_start", newNextRetry).
// And, is the current next_retry outdated or further in time than the new one
Where("(next_retry < last_start OR ? < next_retry OR next_retry IS NULL)", newNextRetry).
Set("next_retry", newNextRetry).
Suffix("RETURNING next_retry").
ToSql()
if err != nil {
return time.Time{}, err
}

res, err := dbp.DB().Query(query, params...)
if err != nil {
return time.Time{}, err
}
defer res.Close()

var nextRetry time.Time
if !res.Next() {
// Update returned no result
if err := res.Err(); err != nil {
// An error happened when reading the query's result
return time.Time{}, err
}

// The next_retry wasn't updated, we need to fetch its current value
next_retry, err := getNextRetry(dbp, r.ID)
if err != nil {
return time.Time{}, err
}
return next_retry, nil
}

if err := res.Scan(&nextRetry); err != nil {
return time.Time{}, err
}

return nextRetry, dbp.Commit()
}

// Delete removes the Resolution from DB
func (r *Resolution) Delete(dbp zesty.DBProvider) (err error) {
defer errors.DeferredAnnotatef(&err, "Failed to update resolution")
Expand Down Expand Up @@ -581,6 +654,21 @@ func (r *Resolution) SetInput(input map[string]interface{}) {
r.ResolverInput = input
}

// getNextRetry fetches from the database the current next_retry value of the resolution with given ID
func getNextRetry(dbp zesty.DBProvider, resolutionID int64) (time.Time, error) {
var tmpRes Resolution
err := dbp.DB().SelectOne(&tmpRes, `SELECT next_retry FROM resolution WHERE id = $1`, resolutionID)
if err != nil {
return time.Time{}, err
}

if tmpRes.NextRetry == nil {
return time.Time{}, nil
}

return *tmpRes.NextRetry, nil
}

var rSelector = sqlgenerator.PGsql.Select(
`"resolution".id, "resolution".public_id, "resolution".id_task, "resolution".resolver_username, "resolution".state, "resolution".instance_id, "resolution".created, "resolution".last_start, "resolution".last_stop, "resolution".next_retry, "resolution".run_count, "resolution".run_max, "resolution".crypt_key, "resolution".encrypted_steps, "resolution".steps_compression_alg, "resolution".encrypted_resolver_input, "resolution".base_configurations, "task".public_id as task_public_id, "task".title as task_title`,
).From(
Expand Down