diff --git a/engine/collector_retry.go b/engine/collector_retry.go index 89f9b570f..bfec02d7b 100644 --- a/engine/collector_retry.go +++ b/engine/collector_retry.go @@ -53,7 +53,8 @@ func getUpdateErrorResolution(dbp zesty.DBProvider) (*resolution.Resolution, err SELECT id FROM "resolution" WHERE ((instance_id = $1 AND state = $2) OR - ((state = $3 OR state = $4) AND next_retry < NOW())) + ((state = $3 OR state = $4) AND next_retry < NOW()) OR + (state = $5 AND next_retry > last_start AND next_retry < NOW())) LIMIT 1 FOR UPDATE SKIP LOCKED ) @@ -62,7 +63,16 @@ func getUpdateErrorResolution(dbp zesty.DBProvider) (*resolution.Resolution, err var r resolution.Resolution instanceID := utask.InstanceID - if err := dbp.DB().SelectOne(&r, sqlStmt, instanceID, resolution.StateRetry, resolution.StateError, resolution.StateToAutorunDelayed); err != nil { + err := dbp.DB().SelectOne( + &r, + sqlStmt, + instanceID, + resolution.StateRetry, + resolution.StateError, + resolution.StateToAutorunDelayed, + resolution.StateWaiting, + ) + if err != nil { return nil, pgjuju.Interpret(err) } diff --git a/engine/engine.go b/engine/engine.go index 55f23129c..5392ab25f 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -610,12 +610,13 @@ forLoop: utask.ReleaseResource("template:" + t.TemplateName) utask.ReleaseExecutionSlot() - if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil { + if err := wakeParentTask(dbp, t, debugLogger); err != nil { debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err) } } -func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error { +// wakeParentTask wakes up the current task's parent if needed by changing it's next_retry to now. +func wakeParentTask(dbp zesty.DBProvider, currentTask *task.Task, debugLogger *logrus.Entry) error { parentTask, err := taskutils.ShouldResumeParentTask(dbp, currentTask) if err != nil { return err @@ -625,9 +626,17 @@ func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphor return nil } - debugLogger.Debugf("resuming parent task %q resolution %q", parentTask.PublicID, *parentTask.Resolution) - debugLogger.WithFields(logrus.Fields{"task_id": parentTask.PublicID, "resolution_id": *parentTask.Resolution}).Debugf("resuming resolution %q as child task %q state changed", *parentTask.Resolution, currentTask.PublicID) - return GetEngine().Resolve(*parentTask.Resolution, sm) + res, err := resolution.LoadFromPublicID(dbp, *parentTask.Resolution) + if err != nil { + return err + } + + if _, err := res.UpdateNextRetry(dbp, time.Now()); err != nil { + return errors.Annotatef(err, "next_retry update failure for parent task '%s'", parentTask.PublicID) + } + + debugLogger.Debugf("updated parent task %q resolution %q next_retry", parentTask.PublicID, *parentTask.Resolution) + return nil } func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) error { diff --git a/engine/engine_test.go b/engine/engine_test.go index d8ca7824c..78d0698a5 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -1258,6 +1258,11 @@ func TestResolveSubTask(t *testing.T) { require.NotNil(t, res) assert.Equal(t, resolution.StateWaiting, res.State) + nextRetryBeforeRun := time.Time{} + if res.NextRetry != nil { + nextRetryBeforeRun = *res.NextRetry + } + for _, subtaskName := range []string{"subtaskCreation", "jsonInputSubtask", "templatingJsonInputSubtask"} { subtaskCreationOutput := res.Steps[subtaskName].Output.(map[string]interface{}) subtaskPublicID := subtaskCreationOutput["id"].(string) @@ -1285,27 +1290,25 @@ func TestResolveSubTask(t *testing.T) { assert.Equal(t, res.TaskID, parentTaskToResume.ID) } - // checking if the parent task is picked up after that the subtask is resolved. - // need to sleep a bit because the parent task is resumed asynchronously - ti := time.Second - i := time.Duration(0) - for i < ti { - res, err = resolution.LoadFromPublicID(dbp, res.PublicID) - require.Nil(t, err) - if res.State != resolution.StateWaiting { - break - } + // checking whether the parent task will be picked up by the RetryCollector after the subtask is resolved. + res, err = resolution.LoadFromPublicID(dbp, res.PublicID) + require.Nil(t, err) + assert.NotNil(t, res.NextRetry) + assert.False(t, res.NextRetry.IsZero()) + assert.True(t, res.NextRetry.After(nextRetryBeforeRun)) + assert.True(t, res.NextRetry.Before(time.Now())) - time.Sleep(time.Millisecond * 10) - i += time.Millisecond * 10 - } + // Starting the RetryCollector to resume the parent task + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + engine.RetryCollector(ctx) - ti = time.Second - i = time.Duration(0) + ti := time.Second + i := time.Duration(0) for i < ti { res, err = resolution.LoadFromPublicID(dbp, res.PublicID) require.Nil(t, err) - if res.State != resolution.StateRunning { + if res.State == resolution.StateDone { break } @@ -1409,6 +1412,11 @@ func TestBatch(t *testing.T) { require.NotNil(t, res) assert.Equal(t, resolution.StateWaiting, res.State) + nextRetryBeforeRun := time.Time{} + if res.NextRetry != nil { + nextRetryBeforeRun = *res.NextRetry + } + for _, batchStepName := range []string{"batchJsonInputs", "batchYamlInputs"} { batchStepMetadataRaw, ok := res.Steps[batchStepName].Metadata.(string) assert.True(t, ok, "wrong type of metadata for step '%s'", batchStepName) @@ -1463,32 +1471,79 @@ func TestBatch(t *testing.T) { } } - // checking if the parent task is picked up after the subtask is resolved. - // We need to sleep a bit because the parent task is resumed asynchronously + // checking whether the parent task will be picked up by the RetryCollector after the subtask is resolved. + res, err = resolution.LoadFromPublicID(dbp, res.PublicID) + require.Nil(t, err) + assert.NotNil(t, res.NextRetry) + assert.False(t, res.NextRetry.IsZero()) + assert.True(t, res.NextRetry.After(nextRetryBeforeRun)) + assert.True(t, res.NextRetry.Before(time.Now())) + + // Starting the RetryCollector to resume the parent task + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + engine.RetryCollector(ctx) + ti := time.Second i := time.Duration(0) for i < ti { res, err = resolution.LoadFromPublicID(dbp, res.PublicID) require.Nil(t, err) - if res.State != resolution.StateWaiting { + if res.State == resolution.StateDone { break } time.Sleep(time.Millisecond * 10) i += time.Millisecond * 10 } + assert.Equal(t, resolution.StateDone, res.State) +} - ti = time.Second - i = time.Duration(0) - for i < ti { - res, err = resolution.LoadFromPublicID(dbp, res.PublicID) - require.Nil(t, err) - if res.State != resolution.StateRunning { - break - } +func TestWakeParent(t *testing.T) { + dbp, err := zesty.NewDBProvider(utask.DBName) + require.Nil(t, err) - time.Sleep(time.Millisecond * 10) - i += time.Millisecond * 10 - } - assert.Equal(t, resolution.StateDone, res.State) + // Create a task that spawns two simple subtasks + _, err = templateFromYAML(dbp, "no-output.yaml") + require.Nil(t, err) + + res, err := createResolution("noOutputSubtask.yaml", map[string]interface{}{}, nil) + require.Nil(t, err, "failed to create resolution: %s", err) + + res, err = runResolution(res) + require.Nil(t, err) + require.NotNil(t, res) + require.Equal(t, resolution.StateWaiting, res.State) + assert.True(t, res.NextRetry.IsZero()) + + // Force the parent task to the RUNNING state + res.SetState(resolution.StateRunning) + res.Update(dbp) + + // Create and run one of the subtasks + subtaskCreationOutput := res.Steps["subtaskCreation"].Output.(map[string]interface{}) + subtaskPublicID := subtaskCreationOutput["id"].(string) + + subtask, err := task.LoadFromPublicID(dbp, subtaskPublicID) + require.Nil(t, err) + require.Equal(t, task.StateTODO, subtask.State) + + subtaskResolution, err := resolution.Create(dbp, subtask, nil, "", false, nil) + require.Nil(t, err) + + beforeRun := time.Now() + subtaskResolution, err = runResolution(subtaskResolution) + require.Nil(t, err) + require.Equal(t, task.StateDone, subtaskResolution.State) + afterRun := time.Now() + + // Refreshing parent resolution to check its next_retry value + res, err = resolution.LoadFromPublicID(dbp, res.PublicID) + require.Nil(t, err) + assert.Equal(t, res.State, resolution.StateRunning) // Parent should still be RUNNING + + // The parent's next_retry should have been updated so that the RetryCollector would pick it up + assert.NotNil(t, res.NextRetry) + assert.True(t, res.NextRetry.After(beforeRun)) + assert.True(t, res.NextRetry.Before(afterRun)) } diff --git a/engine/templates_tests/noOutputSubtask.yaml b/engine/templates_tests/noOutputSubtask.yaml new file mode 100644 index 000000000..4901eaff4 --- /dev/null +++ b/engine/templates_tests/noOutputSubtask.yaml @@ -0,0 +1,18 @@ +name: noOutputSubtaskTemplate +description: Template that spawns two simple subtasks +title_format: "[test] simple subtasks template test" + +steps: + subtaskCreation: + description: creating a subtask + action: + type: subtask + configuration: + template: no-output + + otherSubtaskCreation: + description: creating another subtask + action: + type: subtask + configuration: + template: no-output diff --git a/models/resolution/resolution.go b/models/resolution/resolution.go index 3b4440ae5..c7d5241cc 100644 --- a/models/resolution/resolution.go +++ b/models/resolution/resolution.go @@ -6,6 +6,11 @@ import ( "encoding/json" "time" + "github.com/Masterminds/squirrel" + "github.com/gofrs/uuid" + "github.com/juju/errors" + "github.com/loopfz/gadgeto/zesty" + "github.com/ovh/utask" "github.com/ovh/utask/db/pgjuju" "github.com/ovh/utask/db/sqlgenerator" @@ -17,11 +22,6 @@ import ( "github.com/ovh/utask/pkg/compress" "github.com/ovh/utask/pkg/now" "github.com/ovh/utask/pkg/utils" - - "github.com/Masterminds/squirrel" - "github.com/gofrs/uuid" - "github.com/juju/errors" - "github.com/loopfz/gadgeto/zesty" ) // all valid resolution states @@ -410,6 +410,22 @@ func (r *Resolution) Update(dbp zesty.DBProvider) (err error) { // force empty to stop using old crypto code r.CryptKey = []byte{} + var next_retry time.Time + if r.NextRetry == nil || r.NextRetry.IsZero() { + // No local next_retry value. It may have been set in DB since the last read, so we need to refresh it + next_retry, err = getNextRetry(dbp, r.ID) + if err != nil { + return err + } + } else { + // Updating the next_retry individually to prevent overriding it with a nil value or a later date + next_retry, err = r.UpdateNextRetry(dbp, *r.NextRetry) + if err != nil { + return errors.Annotatef(err, "failed to update resolution's next_retry") + } + } + r.NextRetry = &next_retry + rows, err := dbp.DB().Update(&r.DBModel) if err != nil { return pgjuju.Interpret(err) @@ -420,6 +436,63 @@ func (r *Resolution) Update(dbp zesty.DBProvider) (err error) { return nil } +// UpdateNextRetry updates the Resolution's next_retry field while respecting the current next_retry value in DB. It +// can only shorten the time before the resolution will be retried next, not increase it. +func (r *Resolution) UpdateNextRetry(dbp zesty.DBProvider, newNextRetry time.Time) (time.Time, error) { + sp, err := dbp.TxSavepoint() + defer dbp.RollbackTo(sp) + if err != nil { + return time.Time{}, err + } + + // Using the Resolution's ID to soft lock the update and prevent concurrent updates + if _, err := dbp.DB().Exec(`SELECT pg_advisory_xact_lock($1)`, r.ID); err != nil { + return time.Time{}, err + } + + query, params, err := sqlgenerator.PGsql. + Update("resolution"). + Where(squirrel.Eq{"public_id": r.PublicID}). + // Is the new next_retry valid + Where("? > last_start", newNextRetry). + // And, is the current next_retry outdated or further in time than the new one + Where("(next_retry < last_start OR ? < next_retry OR next_retry IS NULL)", newNextRetry). + Set("next_retry", newNextRetry). + Suffix("RETURNING next_retry"). + ToSql() + if err != nil { + return time.Time{}, err + } + + res, err := dbp.DB().Query(query, params...) + if err != nil { + return time.Time{}, err + } + defer res.Close() + + var nextRetry time.Time + if !res.Next() { + // Update returned no result + if err := res.Err(); err != nil { + // An error happened when reading the query's result + return time.Time{}, err + } + + // The next_retry wasn't updated, we need to fetch its current value + next_retry, err := getNextRetry(dbp, r.ID) + if err != nil { + return time.Time{}, err + } + return next_retry, nil + } + + if err := res.Scan(&nextRetry); err != nil { + return time.Time{}, err + } + + return nextRetry, dbp.Commit() +} + // Delete removes the Resolution from DB func (r *Resolution) Delete(dbp zesty.DBProvider) (err error) { defer errors.DeferredAnnotatef(&err, "Failed to update resolution") @@ -581,6 +654,21 @@ func (r *Resolution) SetInput(input map[string]interface{}) { r.ResolverInput = input } +// getNextRetry fetches from the database the current next_retry value of the resolution with given ID +func getNextRetry(dbp zesty.DBProvider, resolutionID int64) (time.Time, error) { + var tmpRes Resolution + err := dbp.DB().SelectOne(&tmpRes, `SELECT next_retry FROM resolution WHERE id = $1`, resolutionID) + if err != nil { + return time.Time{}, err + } + + if tmpRes.NextRetry == nil { + return time.Time{}, nil + } + + return *tmpRes.NextRetry, nil +} + var rSelector = sqlgenerator.PGsql.Select( `"resolution".id, "resolution".public_id, "resolution".id_task, "resolution".resolver_username, "resolution".state, "resolution".instance_id, "resolution".created, "resolution".last_start, "resolution".last_stop, "resolution".next_retry, "resolution".run_count, "resolution".run_max, "resolution".crypt_key, "resolution".encrypted_steps, "resolution".steps_compression_alg, "resolution".encrypted_resolver_input, "resolution".base_configurations, "task".public_id as task_public_id, "task".title as task_title`, ).From(