Skip to content

Commit 0ca3ba9

Browse files
znullCopilot
andcommitted
Align start-failure cleanup with stream ownership
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 69cd24d commit 0ca3ba9

5 files changed

Lines changed: 56 additions & 29 deletions

File tree

pipe/close_responsibility_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ func (w *writeCloseSpy) Close() error {
3434
return nil
3535
}
3636

37-
// TestGoStageHonorsCloseFlags verifies that a Function stage closes
38-
// stdin/stdout iff the corresponding close flag is true.
39-
func TestGoStageHonorsCloseFlags(t *testing.T) {
37+
// TestGoStageHonorsStreamOwnership verifies that a Function stage closes
38+
// stdin/stdout iff the corresponding stream is closing.
39+
func TestGoStageHonorsStreamOwnership(t *testing.T) {
4040
cases := []struct {
4141
name string
4242
leaveIn, leaveOut bool
@@ -63,8 +63,8 @@ func TestGoStageHonorsCloseFlags(t *testing.T) {
6363
))
6464
require.NoError(t, s.Wait())
6565

66-
assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closeStdin=%v", !tc.leaveIn)
67-
assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closeStdout=%v", !tc.leaveOut)
66+
assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closing stdin=%v", !tc.leaveIn)
67+
assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closing stdout=%v", !tc.leaveOut)
6868
})
6969
}
7070
}
@@ -88,8 +88,8 @@ func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) {
8888
}
8989

9090
// TestCommandStageHonorsCloseStdin verifies that a command stage closes a
91-
// non-file stdin (a "late" closer) iff closeStdin is true. An empty
92-
// reader is used so exec.Cmd's input-copy goroutine sees EOF promptly.
91+
// non-file stdin (a "late" closer) iff the input stream is closing. An
92+
// empty reader is used so exec.Cmd's input-copy goroutine sees EOF promptly.
9393
func TestCommandStageHonorsCloseStdin(t *testing.T) {
9494
for _, leave := range []bool{false, true} {
9595
name := "owns stdin"
@@ -109,14 +109,14 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) {
109109
))
110110
require.NoError(t, s.Wait())
111111

112-
assert.Equal(t, !leave, in.closed.Load(), "closeStdin=%v", !leave)
112+
assert.Equal(t, !leave, in.closed.Load(), "closing stdin=%v", !leave)
113113
})
114114
}
115115
}
116116

117117
// TestCommandStageHonorsCloseStdout verifies the stdout counterpart: a
118118
// non-file stdout (routed through the pooled-copy path) is closed iff
119-
// closeStdout is true.
119+
// the output stream is closing.
120120
func TestCommandStageHonorsCloseStdout(t *testing.T) {
121121
for _, leave := range []bool{false, true} {
122122
name := "owns stdout"
@@ -136,7 +136,7 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) {
136136
))
137137
require.NoError(t, s.Wait())
138138

139-
assert.Equal(t, !leave, out.closed.Load(), "closeStdout=%v", !leave)
139+
assert.Equal(t, !leave, out.closed.Load(), "closing stdout=%v", !leave)
140140
})
141141
}
142142
}

pipe/command_stdout_fastpath_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ import (
1818
// subprocess can detect when that fd is closed.
1919
func TestCommandStageStdoutFastPath(t *testing.T) {
2020
cases := []struct {
21-
name string
22-
closeStdout bool
21+
name string
22+
closingStdout bool
2323
}{
2424
{
25-
name: "raw *os.File with closeStdout",
26-
closeStdout: true,
25+
name: "raw *os.File with closing stdout",
26+
closingStdout: true,
2727
},
2828
{
29-
name: "raw *os.File without closeStdout",
29+
name: "raw *os.File with non-closing stdout",
3030
},
3131
}
3232
for _, tc := range cases {
@@ -43,7 +43,7 @@ func TestCommandStageStdoutFastPath(t *testing.T) {
4343
s := CommandStage("true", cmd).(*commandStage)
4444

4545
stdout := OutputStream{writer: f}
46-
if tc.closeStdout {
46+
if tc.closingStdout {
4747
stdout = ClosingOutput(f)
4848
}
4949

pipe/pipeline.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,16 +347,19 @@ func (p *Pipeline) Start(ctx context.Context) error {
347347
}
348348
}
349349

350-
// Clean up any processes and pipes that have been created. `i` is
351-
// the index of the stage that failed to start (whose output pipe
352-
// has already been cleaned up if necessary).
350+
// Clean up any processes and pipes that have been created. `i` is the
351+
// index of the stage that failed to start. If the stage already received
352+
// its streams, it owns any closing stream.
353353
abort := func(i int, err error, closeFailedStageStdin bool) error {
354-
// Close the pipe that the previous stage was writing to.
355-
// That should cause it to exit even if it's not minding
356-
// its context.
354+
// If the failing stage never received its stdin, close the pipe that
355+
// the previous stage was writing to. That should cause it to exit
356+
// even if it's not minding its context.
357357
if closeFailedStageStdin {
358358
stageStarters[i].stdin.Close()
359359
}
360+
361+
// If stdout was supplied with WithStdoutCloser but the final stage
362+
// was never started, then the pipeline still owns that closer.
360363
if i < len(p.stages)-1 && p.stdoutCloser != nil {
361364
_ = p.stdoutCloser.Close()
362365
}

pipe/pipeline_test.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,15 @@ func TestPipelineFirstStageFailsToStart(t *testing.T) {
8383
t.Parallel()
8484
ctx := context.Background()
8585
startErr := errors.New("foo")
86+
stdout := &closeTrackingWriter{}
8687

87-
p := pipe.New()
88+
p := pipe.New(pipe.WithStdoutCloser(stdout))
8889
p.Add(
8990
ErrorStartingStage{startErr},
9091
ErrorStartingStage{errors.New("this error should never happen")},
9192
)
9293
assert.ErrorIs(t, p.Run(ctx), startErr)
94+
assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed")
9395
}
9496

9597
func TestPipelineFirstStageFailsToStartClosesStdoutCloser(t *testing.T) {
@@ -120,6 +122,22 @@ func TestPipelineSecondStageFailsToStart(t *testing.T) {
120122
assert.ErrorIs(t, p.Run(ctx), startErr)
121123
}
122124

125+
func TestPipelineMiddleStageFailsToStartClosesUnstartedStdoutCloser(t *testing.T) {
126+
t.Parallel()
127+
ctx := context.Background()
128+
startErr := errors.New("foo")
129+
stdout := &closeTrackingWriter{}
130+
131+
p := pipe.New(pipe.WithStdoutCloser(stdout))
132+
p.Add(
133+
seqFunction(20000),
134+
ErrorStartingStage{startErr},
135+
ErrorStartingStage{errors.New("this error should never happen")},
136+
)
137+
assert.ErrorIs(t, p.Run(ctx), startErr)
138+
assert.True(t, stdout.closed, "WithStdoutCloser destination should be closed")
139+
}
140+
123141
func TestPipelineSingleCommandOutput(t *testing.T) {
124142
t.Parallel()
125143
ctx := context.Background()

pipe/stage.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,18 +50,24 @@ import (
5050
// f.Close() // close our copy
5151
// cmd.Wait()
5252
//
53-
// If the stage is an external command and one of its arguments is not
54-
// an `*os.File`, then `exec.Cmd` will take care of creating an
55-
// `os.Pipe()`, copying from the provided argument in/out of the pipe,
56-
// and eventually closing both ends of the pipe. The stage must close
57-
// the argument itself, but only _after_ the external command has
53+
// If the stage is an external command and its stdin is not an
54+
// `*os.File`, then `exec.Cmd` will take care of creating an
55+
// `os.Pipe()`, copying from the provided reader into the pipe, and
56+
// eventually closing both ends of the pipe. The stage must close the
57+
// provided stdin itself, but only _after_ the external command has
5858
// finished, like so:
5959
//
60-
// cmd.Stdin = r // Similarly for stdout
60+
// cmd.Stdin = r
6161
// cmd.Start(…)
6262
// cmd.Wait()
6363
// r.Close()
6464
//
65+
// If the stage is an external command and its stdout is not an
66+
// `*os.File`, the stage creates a pipe, passes the write end to the
67+
// command, and copies from the read end to the provided writer. The
68+
// stage must close the provided stdout itself, but only _after_ the
69+
// external command and the copy have finished.
70+
//
6571
// If the stage is a Go function, then it holds the only copy of
6672
// stdin/stdout, so it must wait until the function is done before
6773
// closing them (regardless of their underlying type, like so:

0 commit comments

Comments
 (0)