Skip to content

Commit b192b17

Browse files
committed
LineSequence: throw error when max line length exceeded instead of returning partial (and potentially incorrect) data
1 parent 00bcd99 commit b192b17

File tree

5 files changed

+76
-58
lines changed

5 files changed

+76
-58
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
8181
return Iterator(diskIO: self.diskIO)
8282
}
8383

84+
// [New API: 0.0.1]
8485
public func lines<Encoding: _UnicodeEncoding>(
8586
encoding: Encoding.Type = UTF8.self,
8687
bufferingPolicy: LineSequence<Encoding>.BufferingPolicy = .unbounded
@@ -91,6 +92,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
9192

9293
// MARK: - LineSequence
9394
extension AsyncBufferSequence {
95+
// [New API: 0.0.1]
9496
public struct LineSequence<Encoding: _UnicodeEncoding>: AsyncSequence, Sendable {
9597
public typealias Element = String
9698

@@ -140,25 +142,20 @@ extension AsyncBufferSequence {
140142
}
141143
#else
142144
// Unfortunately here we _have to_ copy the bytes out because
143-
// DisptachIO (rightfully) reuses buffer, which means `buffer.data`
145+
// DispatchIO (rightfully) reuses buffer, which means `buffer.data`
144146
// has the same address on all iterations, therefore we can't directly
145147
// create the result array from buffer.data
146-
let temporary = UnsafeMutableBufferPointer<Encoding.CodeUnit>.allocate(
147-
capacity: buffer.data.count
148-
)
149-
defer { temporary.deallocate() }
150-
let actualBytesCopied = buffer.data.copyBytes(
151-
to: temporary,
152-
count: buffer.data.count
153-
)
154148

155149
// Calculate how many CodePoint elements we have
156-
let elementCount = actualBytesCopied / MemoryLayout<Encoding.CodeUnit>.stride
150+
let elementCount = buffer.data.count / MemoryLayout<Encoding.CodeUnit>.stride
157151

158152
// Create array by copying from the buffer reinterpreted as CodePoint
159-
let result: Array<Encoding.CodeUnit> = Array(
160-
UnsafeBufferPointer(start: temporary.baseAddress, count: elementCount)
161-
)
153+
let result: Array<Encoding.CodeUnit> = buffer.data.withUnsafeBytes { ptr -> Array<Encoding.CodeUnit> in
154+
return Array(
155+
UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount)
156+
)
157+
}
158+
162159
#endif
163160
return result.isEmpty ? nil : result
164161
}
@@ -180,18 +177,38 @@ extension AsyncBufferSequence {
180177
/// let formFeed = Encoding.CodeUnit(0x0C)
181178
let carriageReturn = Encoding.CodeUnit(0x0D)
182179
// carriageReturn + lineFeed
183-
let newLine: Encoding.CodeUnit
184-
let lineSeparator: Encoding.CodeUnit
185-
let paragraphSeparator: Encoding.CodeUnit
180+
let newLine1: Encoding.CodeUnit
181+
let newLine2: Encoding.CodeUnit
182+
let lineSeparator1: Encoding.CodeUnit
183+
let lineSeparator2: Encoding.CodeUnit
184+
let lineSeparator3: Encoding.CodeUnit
185+
let paragraphSeparator1: Encoding.CodeUnit
186+
let paragraphSeparator2: Encoding.CodeUnit
187+
let paragraphSeparator3: Encoding.CodeUnit
186188
switch Encoding.CodeUnit.self {
187189
case is UInt8.Type:
188-
newLine = Encoding.CodeUnit(0xC2) // 0xC2 0x85
189-
lineSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA8
190-
paragraphSeparator = Encoding.CodeUnit(0xE2) // 0xE2 0x80 0xA9
190+
newLine1 = Encoding.CodeUnit(0xC2)
191+
newLine2 = Encoding.CodeUnit(0x85)
192+
193+
lineSeparator1 = Encoding.CodeUnit(0xE2)
194+
lineSeparator2 = Encoding.CodeUnit(0x80)
195+
lineSeparator3 = Encoding.CodeUnit(0xA8)
196+
197+
paragraphSeparator1 = Encoding.CodeUnit(0xE2)
198+
paragraphSeparator2 = Encoding.CodeUnit(0x80)
199+
paragraphSeparator3 = Encoding.CodeUnit(0xA9)
191200
case is UInt16.Type, is UInt32.Type:
192-
newLine = Encoding.CodeUnit(0x0085)
193-
lineSeparator = Encoding.CodeUnit(0x2028)
194-
paragraphSeparator = Encoding.CodeUnit(0x2029)
201+
// UTF16 and UTF32 use one byte for all
202+
newLine1 = Encoding.CodeUnit(0x0085)
203+
newLine2 = Encoding.CodeUnit(0x0085)
204+
205+
lineSeparator1 = Encoding.CodeUnit(0x2028)
206+
lineSeparator2 = Encoding.CodeUnit(0x2028)
207+
lineSeparator3 = Encoding.CodeUnit(0x2028)
208+
209+
paragraphSeparator1 = Encoding.CodeUnit(0x2029)
210+
paragraphSeparator2 = Encoding.CodeUnit(0x2029)
211+
paragraphSeparator3 = Encoding.CodeUnit(0x2029)
195212
default:
196213
fatalError("Unknown encoding type \(Encoding.self)")
197214
}
@@ -210,10 +227,13 @@ extension AsyncBufferSequence {
210227
var currentIndex: Int = self.startIndex
211228
for index in self.startIndex ..< self.buffer.count {
212229
currentIndex = index
213-
// Early return if we exceed max line length
230+
// Throw if we exceed max line length
214231
if case .maxLineLength(let maxLength) = self.bufferingPolicy,
215232
currentIndex >= maxLength {
216-
return yield(at: currentIndex)
233+
throw SubprocessError(
234+
code: .init(.streamOutputExceedsLimit(maxLength)),
235+
underlyingError: nil
236+
)
217237
}
218238
let byte = self.buffer[currentIndex]
219239
switch byte {
@@ -232,12 +252,12 @@ extension AsyncBufferSequence {
232252
continue
233253
}
234254
return result
235-
case newLine:
255+
case newLine1:
236256
var targetIndex = currentIndex
237257
if Encoding.CodeUnit.self is UInt8.Type {
238258
// For UTF8, look for the next 0x85 byte
239259
guard (targetIndex + 1) < self.buffer.count,
240-
self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x85) else {
260+
self.buffer[targetIndex + 1] == newLine2 else {
241261
// Not a valid new line. Keep looking
242262
continue
243263
}
@@ -248,21 +268,22 @@ extension AsyncBufferSequence {
248268
continue
249269
}
250270
return result
251-
case lineSeparator, paragraphSeparator:
271+
case lineSeparator1, paragraphSeparator1:
252272
var targetIndex = currentIndex
253273
if Encoding.CodeUnit.self is UInt8.Type {
254-
// For UTF8, look for the next 0x80 byte
274+
// For UTF8, look for the next byte
255275
guard (targetIndex + 1) < self.buffer.count,
256-
self.buffer[targetIndex + 1] == Encoding.CodeUnit(0x80) else {
276+
self.buffer[targetIndex + 1] == lineSeparator2 ||
277+
self.buffer[targetIndex + 1] == paragraphSeparator2 else {
257278
// Not a valid new line. Keep looking
258279
continue
259280
}
260-
// Swallow 0x80 byte
281+
// Swallow next byte
261282
targetIndex += 1
262-
// Look for the final 0xA8 (lineSeparator) or 0xA9 (paragraphSeparator)
283+
// Look for the final byte
263284
guard (targetIndex + 1) < self.buffer.count,
264-
(self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA8) ||
265-
self.buffer[targetIndex + 1] == Encoding.CodeUnit(0xA9)) else {
285+
(self.buffer[targetIndex + 1] == lineSeparator3 ||
286+
self.buffer[targetIndex + 1] == paragraphSeparator3) else {
266287
// Not a valid new line. Keep looking
267288
continue
268289
}
@@ -308,9 +329,8 @@ extension AsyncBufferSequence.LineSequence {
308329
/// on the number of buffered elements (line length).
309330
case unbounded
310331
/// Impose a max buffer size (line length) limit.
311-
/// When using this policy, `LineSequence` will return a line if:
312-
/// - A newline character is encountered (standard behavior)
313-
/// - The current line in the buffer reaches or exceeds the specified maximum length
332+
/// Subprocess **will throw an error** if the number of buffered
333+
/// elements (line length) exceeds the limit
314334
case maxLineLength(Int)
315335
}
316336
}

Sources/Subprocess/Buffer.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ extension DispatchData {
134134
internal func copyBytes<DestinationType>(
135135
to ptr: UnsafeMutableBufferPointer<DestinationType>, count: Int
136136
) -> Int {
137-
self.bytes.copyBytes(to: ptr, count: count)
137+
let target = UnsafeMutableRawBufferPointer(start: ptr.baseAddress, count: count)
138+
target.copyMemory(from: UnsafeRawBufferPointer(self.bytes))
139+
return self.bytes.count
138140
}
139141

140142
subscript(position: Int) -> UInt8 {

Sources/Subprocess/Error.swift

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ extension SubprocessError {
4040
case failedToReadFromSubprocess
4141
case failedToWriteToSubprocess
4242
case failedToMonitorProcess
43+
case streamOutputExceedsLimit(Int)
4344
// Signal
4445
case failedToSendSignal(Int32)
4546
// Windows Only
@@ -64,18 +65,20 @@ extension SubprocessError {
6465
return 4
6566
case .failedToMonitorProcess:
6667
return 5
67-
case .failedToSendSignal(_):
68+
case .streamOutputExceedsLimit(_):
6869
return 6
69-
case .failedToTerminate:
70+
case .failedToSendSignal(_):
7071
return 7
71-
case .failedToSuspend:
72+
case .failedToTerminate:
7273
return 8
73-
case .failedToResume:
74+
case .failedToSuspend:
7475
return 9
75-
case .failedToCreatePipe:
76+
case .failedToResume:
7677
return 10
77-
case .invalidWindowsPath(_):
78+
case .failedToCreatePipe:
7879
return 11
80+
case .invalidWindowsPath(_):
81+
return 12
7982
}
8083
}
8184

@@ -103,6 +106,8 @@ extension SubprocessError: CustomStringConvertible, CustomDebugStringConvertible
103106
return "Failed to write bytes to the child process."
104107
case .failedToMonitorProcess:
105108
return "Failed to monitor the state of child process with underlying error: \(self.underlyingError!)"
109+
case .streamOutputExceedsLimit(let limit):
110+
return "Failed to create output from current buffer because the output limit (\(limit)) was reached."
106111
case .failedToSendSignal(let signal):
107112
return "Failed to send signal \(signal) to the child process."
108113
case .failedToTerminate:

Sources/Subprocess/IO/Output.swift

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,10 @@ public struct BytesOutput: OutputProtocol {
144144
internal func captureOutput(
145145
from diskIO: consuming TrackedPlatformDiskIO?
146146
) async throws -> [UInt8] {
147-
guard let diskIO = diskIO else {
148-
// Show not happen due to type system constraints
149-
fatalError("Trying to capture output without file descriptor")
150-
}
151147
#if os(Windows)
152-
return try await diskIO.fileDescriptor.read(upToLength: self.maxSize) ?? []
148+
return try await diskIO!.fileDescriptor.read(upToLength: self.maxSize) ?? []
153149
#else
154-
let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize)
150+
let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize)
155151
return result?.array() ?? []
156152
#endif
157153
}
@@ -270,16 +266,11 @@ extension OutputProtocol {
270266
return () as! OutputType
271267
}
272268

273-
guard let diskIO = diskIO else {
274-
// Show not happen due to type system constraints
275-
fatalError("Trying to capture output without file descriptor")
276-
}
277-
278269
#if os(Windows)
279-
let result = try await diskIO.fileDescriptor.read(upToLength: self.maxSize)
270+
let result = try await diskIO!.fileDescriptor.read(upToLength: self.maxSize)
280271
return try self.output(from: result ?? [])
281272
#else
282-
let result = try await diskIO.dispatchIO.read(upToLength: self.maxSize)
273+
let result = try await diskIO!.dispatchIO.read(upToLength: self.maxSize)
283274
return try self.output(from: result ?? .empty)
284275
#endif
285276
}

Sources/Subprocess/Teardown.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ extension Execution {
8585
using sequence: some Sequence<TeardownStep> & Sendable,
8686
on processIdentifier: ProcessIdentifier
8787
) async {
88-
await withUncanceledTask {
88+
await withUncancelledTask {
8989
await Self.runTeardownSequence(sequence, on: processIdentifier)
9090
}
9191
}
@@ -222,7 +222,7 @@ extension Execution {
222222
}
223223
}
224224

225-
func withUncanceledTask<Result: Sendable>(
225+
func withUncancelledTask<Result: Sendable>(
226226
returning: Result.Type = Result.self,
227227
_ body: @Sendable @escaping () async -> Result
228228
) async -> Result {

0 commit comments

Comments
 (0)