Skip to content

Commit 7324ac4

Browse files
committed
Allow callers to run a subprocess and provide low and high water marks
when using SequenceOutput to emit standard output and standard error as soon as it arrives. Resolves swiftlang#39
1 parent c0bbfa5 commit 7324ac4

File tree

4 files changed

+147
-15
lines changed

4 files changed

+147
-15
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

+74-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
@preconcurrency import SystemPackage
1616
#endif
1717

18+
internal import Dispatch
19+
1820
#if SubprocessSpan
1921
@available(SubprocessSpan, *)
2022
#endif
@@ -27,38 +29,100 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
2729
public typealias Element = SequenceOutput.Buffer
2830

2931
private let diskIO: TrackedPlatformDiskIO
32+
private let bufferSize: Int
3033
private var buffer: [UInt8]
3134
private var currentPosition: Int
3235
private var finished: Bool
36+
private var streamIterator: AsyncThrowingStream<StreamStatus, Swift.Error>.AsyncIterator
3337

34-
internal init(diskIO: TrackedPlatformDiskIO) {
38+
internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) {
3539
self.diskIO = diskIO
40+
self.bufferSize = bufferSize
3641
self.buffer = []
3742
self.currentPosition = 0
3843
self.finished = false
44+
self.streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
3945
}
4046

41-
public func next() async throws -> SequenceOutput.Buffer? {
42-
let data = try await self.diskIO.readChunk(
43-
upToLength: readBufferSize
44-
)
45-
if data == nil {
46-
// We finished reading. Close the file descriptor now
47+
public mutating func next() async throws -> SequenceOutput.Buffer? {
48+
if let status = try await streamIterator.next() {
49+
switch status {
50+
case .data(let data):
51+
return data
52+
53+
case .endOfStream(let data):
54+
streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
55+
return data
56+
57+
case .endOfFile:
58+
try self.diskIO.safelyClose()
59+
return nil
60+
}
61+
} else {
4762
try self.diskIO.safelyClose()
4863
return nil
4964
}
50-
return data
65+
}
66+
67+
private enum StreamStatus {
68+
case data(SequenceOutput.Buffer)
69+
case endOfStream(SequenceOutput.Buffer)
70+
case endOfFile
71+
}
72+
73+
private static func createDataStream(with dispatchIO: DispatchIO, bufferSize: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
74+
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
75+
dispatchIO.read(
76+
offset: 0,
77+
length: bufferSize,
78+
queue: .global()
79+
) { done, data, error in
80+
if error != 0 {
81+
continuation.finish(throwing: SubprocessError(
82+
code: .init(.failedToReadFromSubprocess),
83+
underlyingError: .init(rawValue: error)
84+
))
85+
return
86+
}
87+
88+
// Treat empty data and nil as the same
89+
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
90+
let status: StreamStatus
91+
92+
switch (buffer, done) {
93+
case (.some(let data), false):
94+
status = .data(SequenceOutput.Buffer(data: data))
95+
96+
case (.some(let data), true):
97+
status = .endOfStream(SequenceOutput.Buffer(data: data))
98+
99+
case (nil, false):
100+
return
101+
102+
case (nil, true):
103+
status = .endOfFile
104+
}
105+
106+
continuation.yield(status)
107+
108+
if done {
109+
continuation.finish()
110+
}
111+
}
112+
}
51113
}
52114
}
53115

54116
private let diskIO: TrackedPlatformDiskIO
117+
private let bufferSize: Int
55118

56-
internal init(diskIO: TrackedPlatformDiskIO) {
119+
internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) {
57120
self.diskIO = diskIO
121+
self.bufferSize = bufferSize
58122
}
59123

60124
public func makeAsyncIterator() -> Iterator {
61-
return Iterator(diskIO: self.diskIO)
125+
return Iterator(diskIO: self.diskIO, bufferSize: bufferSize)
62126
}
63127
}
64128

Sources/Subprocess/Execution.swift

+23-3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import Musl
2727
import WinSDK
2828
#endif
2929

30+
internal import Dispatch
31+
3032
/// An object that repersents a subprocess that has been
3133
/// executed. You can use this object to send signals to the
3234
/// child process as well as stream its output and error.
@@ -107,7 +109,16 @@ extension Execution where Output == SequenceOutput {
107109
else {
108110
fatalError("The standard output has already been consumed")
109111
}
110-
return AsyncBufferSequence(diskIO: readFd)
112+
113+
if let lowWater = output.lowWater {
114+
readFd.dispatchIO.setLimit(lowWater: lowWater)
115+
}
116+
117+
if let highWater = output.highWater {
118+
readFd.dispatchIO.setLimit(highWater: highWater)
119+
}
120+
121+
return AsyncBufferSequence(diskIO: readFd, bufferSize: output.bufferSize)
111122
}
112123
}
113124

@@ -122,15 +133,24 @@ extension Execution where Error == SequenceOutput {
122133
/// via pipe under the hood and each pipe can only be consumed once.
123134
public var standardError: AsyncBufferSequence {
124135
let consumptionState = self.outputConsumptionState.bitwiseXor(
125-
OutputConsumptionState.standardOutputConsumed
136+
OutputConsumptionState.standardErrorConsumed
126137
)
127138

128139
guard consumptionState.contains(.standardErrorConsumed),
129140
let readFd = self.errorPipe.readEnd
130141
else {
131142
fatalError("The standard output has already been consumed")
132143
}
133-
return AsyncBufferSequence(diskIO: readFd)
144+
145+
if let lowWater = error.lowWater {
146+
readFd.dispatchIO.setLimit(lowWater: lowWater)
147+
}
148+
149+
if let highWater = error.highWater {
150+
readFd.dispatchIO.setLimit(highWater: highWater)
151+
}
152+
153+
return AsyncBufferSequence(diskIO: readFd, bufferSize: error.bufferSize)
134154
}
135155
}
136156

Sources/Subprocess/IO/Output.swift

+12-2
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,15 @@ public struct BytesOutput: OutputProtocol {
210210
#endif
211211
public struct SequenceOutput: OutputProtocol {
212212
public typealias OutputType = Void
213-
214-
internal init() {}
213+
internal let lowWater: Int?
214+
internal let highWater: Int?
215+
internal let bufferSize: Int
216+
217+
internal init(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int = readBufferSize) {
218+
self.lowWater = lowWater
219+
self.highWater = highWater
220+
self.bufferSize = bufferSize
221+
}
215222
}
216223

217224
#if SubprocessSpan
@@ -284,6 +291,9 @@ extension OutputProtocol where Self == SequenceOutput {
284291
/// to the `.standardOutput` (or `.standardError`) property
285292
/// of `Execution` as `AsyncSequence<Data>`.
286293
public static var sequence: Self { .init() }
294+
public static func sequence(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int? = nil) -> Self {
295+
.init(lowWater: lowWater, highWater: highWater, bufferSize: bufferSize ?? readBufferSize)
296+
}
287297
}
288298

289299
// MARK: - Span Default Implementations

Tests/SubprocessTests/SubprocessTests+Unix.swift

+38
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,44 @@ extension SubprocessUnixTests {
665665
#expect(catResult.terminationStatus.isSuccess)
666666
#expect(catResult.standardError == expected)
667667
}
668+
669+
@Test func testSlowDripRedirectedOutputRedirectToSequence() async throws {
670+
let threshold: Double = 0.5
671+
672+
let script = """
673+
echo "DONE"
674+
sleep \(threshold)
675+
"""
676+
677+
let start = ContinuousClock().now
678+
679+
let catResult = try await Subprocess.run(
680+
.path("/bin/bash"),
681+
arguments: ["-c", script],
682+
output: .sequence(lowWater: 0),
683+
error: .discarded,
684+
body: { (execution, _) in
685+
for try await chunk in execution.standardOutput {
686+
let string = chunk.withUnsafeBytes { String(decoding: $0, as: UTF8.self) }
687+
688+
if string.hasPrefix("DONE") {
689+
let end = ContinuousClock().now
690+
691+
if (end - start) > .seconds(threshold) {
692+
return "Failure"
693+
694+
} else {
695+
return "Success"
696+
}
697+
}
698+
}
699+
700+
return "Failure"
701+
}
702+
)
703+
#expect(catResult.terminationStatus.isSuccess)
704+
#expect(catResult.value == "Success")
705+
}
668706
}
669707

670708
// MARK: - PlatformOption Tests

0 commit comments

Comments
 (0)