Skip to content

Commit

Permalink
feat(pipeline): allow variable substitution in pipeline.tasks[].onError
Browse files Browse the repository at this point in the history
fix #8564

In the reconcile logic of PipelineRun, add the ability to replace the
`pipeline.tasks[].onError` field. At the same time, the validation of
pipelineSpec legality has been delayed.
  • Loading branch information
l-qing committed Mar 12, 2025
1 parent 8b8a098 commit 9fa4f9f
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 54 deletions.
21 changes: 17 additions & 4 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,25 @@ For instructions on using variable substitutions see the relevant section of [th
| `TaskRun` | `spec.workspaces[].csi.driver` |
| `TaskRun` | `spec.workspaces[].csi.nodePublishSecretRef.name` |
| `Pipeline` | `spec.tasks[].params[].value` |
| `Pipeline` | `spec.tasks[].conditions[].params[].value` |
| `Pipeline` | `spec.results[].value` |
| `Pipeline` | `spec.tasks[].matrix.params[].value` |
| `Pipeline` | `spec.tasks[].matrix.include[].params[].value` |
| `Pipeline` | `spec.tasks[].displayName` |
| `Pipeline` | `spec.tasks[].workspaces[].subPath` |
| `Pipeline` | `spec.tasks[].when[].input` |
| `Pipeline` | `spec.tasks[].when[].values` |
| `Pipeline` | `spec.tasks[].workspaces[].subPath` |
| `Pipeline` | `spec.tasks[].displayName` |
| `Pipeline` | `spec.tasks[].taskRef.params[].values` |
| `Pipeline` | `spec.tasks[].taskRef.name` |
| `Pipeline` | `spec.tasks[].onError` |
| `Pipeline` | `spec.finally[].params[].value` |
| `Pipeline` | `spec.finally[].matrix.params[].value` |
| `Pipeline` | `spec.finally[].matrix.include[].params[].value` |
| `Pipeline` | `spec.finally[].displayName` |
| `Pipeline` | `spec.finally[].workspaces[].subPath` |
| `Pipeline` | `spec.finally[].when[].input` |
| `Pipeline` | `spec.finally[].when[].values` |
| `Pipeline` | `spec.finally[].taskRef.params[].values` |
| `Pipeline` | `spec.finally[].taskRef.name` |
| `Pipeline` | `spec.finally[].onError` |
| `PipelineRun` | `spec.workspaces[].subPath` |
| `PipelineRun` | `spec.workspaces[].persistentVolumeClaim.claimName` |
| `PipelineRun` | `spec.workspaces[].configMap.name` |
Expand Down
24 changes: 15 additions & 9 deletions pkg/apis/pipeline/v1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,7 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) {
ClusterTaskRefKind: true,
}

if pt.OnError != "" {
errs = errs.Also(config.ValidateEnabledAPIFields(ctx, "OnError", config.BetaAPIFields))
if pt.OnError != PipelineTaskContinue && pt.OnError != PipelineTaskStopAndFail {
errs = errs.Also(apis.ErrInvalidValue(pt.OnError, "OnError", "PipelineTask OnError must be either \"continue\" or \"stopAndFail\""))
}
if pt.OnError == PipelineTaskContinue && pt.Retries > 0 {
errs = errs.Also(apis.ErrGeneric("PipelineTask OnError cannot be set to \"continue\" when Retries is greater than 0"))
}
}
errs = errs.Also(pt.ValidateOnError(ctx))

// Pipeline task having taskRef/taskSpec with APIVersion is classified as custom task
switch {
Expand All @@ -217,6 +209,20 @@ func (pt PipelineTask) Validate(ctx context.Context) (errs *apis.FieldError) {
return errs
}

// ValidateOnError validates the OnError field of a PipelineTask
func (pt PipelineTask) ValidateOnError(ctx context.Context) (errs *apis.FieldError) {
if pt.OnError != "" && !isParamRefs(string(pt.OnError)) {
errs = errs.Also(config.ValidateEnabledAPIFields(ctx, "OnError", config.BetaAPIFields))
if pt.OnError != PipelineTaskContinue && pt.OnError != PipelineTaskStopAndFail {
errs = errs.Also(apis.ErrInvalidValue(pt.OnError, "OnError", "PipelineTask OnError must be either \"continue\" or \"stopAndFail\""))
}
if pt.OnError == PipelineTaskContinue && pt.Retries > 0 {
errs = errs.Also(apis.ErrGeneric("PipelineTask OnError cannot be set to \"continue\" when Retries is greater than 0"))
}
}
return errs
}

func (pt *PipelineTask) validateMatrix(ctx context.Context) (errs *apis.FieldError) {
if pt.IsMatrixed() {
// This is a beta feature and will fail validation if it's used in a pipeline spec
Expand Down
25 changes: 25 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
// Update pipelinespec of pipelinerun's status field
pr.Status.PipelineSpec = pipelineSpec

// validate pipelineSpec after apply parameters
if err := validatePipelineSpecAfterApplyParameters(ctx, pipelineSpec); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(v1.PipelineRunReasonFailedValidation.String(),
"Pipeline %s/%s can't be Run; it has an invalid spec: %s",
pipelineMeta.Namespace, pipelineMeta.Name, pipelineErrors.WrapUserError(err))
return controller.NewPermanentError(err)
}

// pipelineState holds a list of pipeline tasks after fetching their resolved Task specs.
// pipelineState also holds a taskRun for each pipeline task after the taskRun is created
// pipelineState is instantiated and updated on every reconcile cycle
Expand Down Expand Up @@ -1679,3 +1688,19 @@ func conditionFromVerificationResult(verificationResult *trustedresources.Verifi
}
return condition, err
}

// validatePipelineSpecAfterApplyParameters validates the PipelineSpec after apply parameters
// Maybe some fields are modified during apply parameters, need to validate again. For example, tasks[].OnError.
func validatePipelineSpecAfterApplyParameters(ctx context.Context, pipelineSpec *v1.PipelineSpec) (errs *apis.FieldError) {
if pipelineSpec == nil {
errs = errs.Also(apis.ErrMissingField("PipelineSpec"))
return
}
tasks := make([]v1.PipelineTask, 0, len(pipelineSpec.Tasks)+len(pipelineSpec.Finally))
tasks = append(tasks, pipelineSpec.Tasks...)
tasks = append(tasks, pipelineSpec.Finally...)
for _, t := range tasks {
errs = errs.Also(t.ValidateOnError(ctx))
}
return errs
}
49 changes: 49 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17994,6 +17994,55 @@ func Test_runNextSchedulableTask(t *testing.T) {
}
}

func TestReconcile_InvalidOnErrorPipeline(t *testing.T) {
names.TestingSeed()

namespace := "foo"
prName := "test-pipeline-invalid-onerror"

prs := []*v1.PipelineRun{
parse.MustParseV1PipelineRun(t, `
metadata:
name: test-pipeline-invalid-onerror
namespace: foo
spec:
params:
- name: onerror
value: "invalid"
pipelineSpec:
tasks:
- name: echo
onError: $(params.onerror)
taskSpec:
steps:
- name: echo
image: ubuntu
script: |
echo "Hello, World!"
exit 1
`),
}

d := test.Data{
PipelineRuns: prs,
ConfigMaps: []*corev1.ConfigMap{newFeatureFlagsConfigMap()},
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

wantEvents := []string{
"Normal Started",
"(?s)Warning Failed .*PipelineTask OnError must be either \"continue\" or \"stopAndFail\"",
"(?s)Warning InternalError .*OnError\nPipelineTask OnError must be either \"continue\" or \"stopAndFail\"",
}
reconciledRun, clients := prt.reconcileRun(namespace, prName, wantEvents, true)

// Check that the expected TaskRun was not created
taskRuns := getTaskRunsForPipelineRun(prt.TestAssets.Ctx, t, clients, namespace, prName)
validateTaskRunsCount(t, taskRuns, 0)
verifyTaskRunStatusesCount(t, reconciledRun.Status, 0)
}

func getSignedV1Pipeline(unsigned *pipelinev1.Pipeline, signer signature.Signer, name string) (*pipelinev1.Pipeline, error) {
signed := unsigned.DeepCopy()
signed.Name = name
Expand Down
68 changes: 27 additions & 41 deletions pkg/reconciler/pipelinerun/resources/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,55 +298,41 @@ func ApplyWorkspaces(p *v1.PipelineSpec, pr *v1.PipelineRun) *v1.PipelineSpec {
return ApplyReplacements(p, replacements, map[string][]string{}, map[string]map[string]string{})
}

// ApplyReplacements replaces placeholders for declared parameters with the specified replacements.
func ApplyReplacements(p *v1.PipelineSpec, replacements map[string]string, arrayReplacements map[string][]string, objectReplacements map[string]map[string]string) *v1.PipelineSpec {
p = p.DeepCopy()

for i := range p.Tasks {
p.Tasks[i].Params = p.Tasks[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if p.Tasks[i].IsMatrixed() {
p.Tasks[i].Matrix.Params = p.Tasks[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range p.Tasks[i].Matrix.Include {
p.Tasks[i].Matrix.Include[j].Params = p.Tasks[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
// replaceVariablesInPipelineTasks handles variable replacement for a slice of PipelineTasks in-place
func replaceVariablesInPipelineTasks(tasks []v1.PipelineTask, replacements map[string]string,
arrayReplacements map[string][]string, objectReplacements map[string]map[string]string) {
for i := range tasks {
tasks[i].Params = tasks[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if tasks[i].IsMatrixed() {
tasks[i].Matrix.Params = tasks[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range tasks[i].Matrix.Include {
tasks[i].Matrix.Include[j].Params = tasks[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
}
} else {
p.Tasks[i].DisplayName = substitution.ApplyReplacements(p.Tasks[i].DisplayName, replacements)
tasks[i].DisplayName = substitution.ApplyReplacements(tasks[i].DisplayName, replacements)
}
for j := range p.Tasks[i].Workspaces {
p.Tasks[i].Workspaces[j].SubPath = substitution.ApplyReplacements(p.Tasks[i].Workspaces[j].SubPath, replacements)
for j := range tasks[i].Workspaces {
tasks[i].Workspaces[j].SubPath = substitution.ApplyReplacements(tasks[i].Workspaces[j].SubPath, replacements)
}
p.Tasks[i].When = p.Tasks[i].When.ReplaceVariables(replacements, arrayReplacements)
if p.Tasks[i].TaskRef != nil {
if p.Tasks[i].TaskRef.Params != nil {
p.Tasks[i].TaskRef.Params = p.Tasks[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
tasks[i].When = tasks[i].When.ReplaceVariables(replacements, arrayReplacements)
if tasks[i].TaskRef != nil {
if tasks[i].TaskRef.Params != nil {
tasks[i].TaskRef.Params = tasks[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
}
p.Tasks[i].TaskRef.Name = substitution.ApplyReplacements(p.Tasks[i].TaskRef.Name, replacements)
tasks[i].TaskRef.Name = substitution.ApplyReplacements(tasks[i].TaskRef.Name, replacements)
}
p.Tasks[i] = propagateParams(p.Tasks[i], replacements, arrayReplacements, objectReplacements)
tasks[i].OnError = v1.PipelineTaskOnErrorType(substitution.ApplyReplacements(string(tasks[i].OnError), replacements))
tasks[i] = propagateParams(tasks[i], replacements, arrayReplacements, objectReplacements)
}
}

for i := range p.Finally {
p.Finally[i].Params = p.Finally[i].Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
if p.Finally[i].IsMatrixed() {
p.Finally[i].Matrix.Params = p.Finally[i].Matrix.Params.ReplaceVariables(replacements, arrayReplacements, nil)
for j := range p.Finally[i].Matrix.Include {
p.Finally[i].Matrix.Include[j].Params = p.Finally[i].Matrix.Include[j].Params.ReplaceVariables(replacements, nil, nil)
}
} else {
p.Finally[i].DisplayName = substitution.ApplyReplacements(p.Finally[i].DisplayName, replacements)
}
for j := range p.Finally[i].Workspaces {
p.Finally[i].Workspaces[j].SubPath = substitution.ApplyReplacements(p.Finally[i].Workspaces[j].SubPath, replacements)
}
p.Finally[i].When = p.Finally[i].When.ReplaceVariables(replacements, arrayReplacements)
if p.Finally[i].TaskRef != nil {
if p.Finally[i].TaskRef.Params != nil {
p.Finally[i].TaskRef.Params = p.Finally[i].TaskRef.Params.ReplaceVariables(replacements, arrayReplacements, objectReplacements)
}
p.Finally[i].TaskRef.Name = substitution.ApplyReplacements(p.Finally[i].TaskRef.Name, replacements)
}
p.Finally[i] = propagateParams(p.Finally[i], replacements, arrayReplacements, objectReplacements)
}
// ApplyReplacements replaces placeholders for declared parameters with the specified replacements.
func ApplyReplacements(p *v1.PipelineSpec, replacements map[string]string, arrayReplacements map[string][]string, objectReplacements map[string]map[string]string) *v1.PipelineSpec {
p = p.DeepCopy()

// Replace variables in Tasks and Finally tasks
replaceVariablesInPipelineTasks(p.Tasks, replacements, arrayReplacements, objectReplacements)
replaceVariablesInPipelineTasks(p.Finally, replacements, arrayReplacements, objectReplacements)

return p
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/reconciler/pipelinerun/resources/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,26 @@ func TestApplyParameters(t *testing.T) {
}},
},
},
{
name: "parameter in onError",
original: v1.PipelineSpec{
Params: []v1.ParamSpec{
{Name: "onerror", Type: v1.ParamTypeString},
},
Tasks: []v1.PipelineTask{{
OnError: v1.PipelineTaskOnErrorType("$(params.onerror)"),
}},
},
params: v1.Params{{Name: "onerror", Value: *v1.NewStructuredValues("new-onerror-value")}},
expected: v1.PipelineSpec{
Params: []v1.ParamSpec{
{Name: "onerror", Type: v1.ParamTypeString},
},
Tasks: []v1.PipelineTask{{
OnError: v1.PipelineTaskOnErrorType("new-onerror-value"),
}},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 9fa4f9f

Please sign in to comment.