diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 13f9e38..163e595 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -26,39 +26,39 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public struct Iterator: AsyncIteratorProtocol { public typealias Element = SequenceOutput.Buffer - private let fileDescriptor: TrackedFileDescriptor + private let diskIO: TrackedPlatformDiskIO private var buffer: [UInt8] private var currentPosition: Int private var finished: Bool - internal init(fileDescriptor: TrackedFileDescriptor) { - self.fileDescriptor = fileDescriptor + internal init(diskIO: TrackedPlatformDiskIO) { + self.diskIO = diskIO self.buffer = [] self.currentPosition = 0 self.finished = false } - public mutating func next() async throws -> SequenceOutput.Buffer? { - let data = try await self.fileDescriptor.wrapped.readChunk( + public func next() async throws -> SequenceOutput.Buffer? { + let data = try await self.diskIO.readChunk( upToLength: readBufferSize ) if data == nil { // We finished reading. Close the file descriptor now - try self.fileDescriptor.safelyClose() + try self.diskIO.safelyClose() return nil } return data } } - private let fileDescriptor: TrackedFileDescriptor + private let diskIO: TrackedPlatformDiskIO - init(fileDescriptor: TrackedFileDescriptor) { - self.fileDescriptor = fileDescriptor + internal init(diskIO: TrackedPlatformDiskIO) { + self.diskIO = diskIO } public func makeAsyncIterator() -> Iterator { - return Iterator(fileDescriptor: self.fileDescriptor) + return Iterator(diskIO: self.diskIO) } } diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index c9d821a..6586e10 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -90,9 +90,6 @@ public struct Configuration: Sendable { // After spawn, cleanup child side fds try await self.cleanup( execution: execution, - inputPipe: inputPipe, - outputPipe: outputPipe, - errorPipe: errorPipe, childSide: true, parentSide: false, attemptToTerminateSubProcess: false @@ -104,7 +101,7 @@ public struct Configuration: Sendable { // Body runs in the same isolation let result = try await body( execution, - .init(fileDescriptor: inputPipe.writeFileDescriptor!) + .init(diskIO: execution.inputPipe.writeEnd!) ) return ExecutionResult( terminationStatus: try await waitingStatus, @@ -116,9 +113,6 @@ public struct Configuration: Sendable { // this is the best we can do try? await self.cleanup( execution: execution, - inputPipe: inputPipe, - outputPipe: outputPipe, - errorPipe: errorPipe, childSide: false, parentSide: true, attemptToTerminateSubProcess: true @@ -154,9 +148,6 @@ public struct Configuration: Sendable { // After spawn, clean up child side try await self.cleanup( execution: execution, - inputPipe: inputPipe, - outputPipe: outputPipe, - errorPipe: errorPipe, childSide: true, parentSide: false, attemptToTerminateSubProcess: false @@ -174,7 +165,7 @@ public struct Configuration: Sendable { standardError ) = try await execution.captureIOs() // Write input in the same scope - guard let writeFd = inputPipe.writeFileDescriptor else { + guard let writeFd = execution.inputPipe.writeEnd else { fatalError("Trying to write to an input that has been closed") } try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in @@ -193,7 +184,7 @@ public struct Configuration: Sendable { ) #endif - writeFd.wrapped.write(bytes) { _, error in + writeFd.write(bytes) { _, error in if let error = error { continuation.resume(throwing: error) } else { @@ -216,9 +207,6 @@ public struct Configuration: Sendable { // this is the best we can do try? await self.cleanup( execution: execution, - inputPipe: inputPipe, - outputPipe: outputPipe, - errorPipe: errorPipe, childSide: false, parentSide: true, attemptToTerminateSubProcess: true @@ -257,9 +245,6 @@ public struct Configuration: Sendable { // After spawn, clean up child side try await self.cleanup( execution: execution, - inputPipe: inputPipe, - outputPipe: outputPipe, - errorPipe: errorPipe, childSide: true, parentSide: false, attemptToTerminateSubProcess: false @@ -271,8 +256,8 @@ public struct Configuration: Sendable { returning: ExecutionResult.self ) { group in group.addTask { - if let writeFd = inputPipe.writeFileDescriptor { - let writer = StandardInputWriter(fileDescriptor: writeFd) + if let writeFd = execution.inputPipe.writeEnd { + let writer = StandardInputWriter(diskIO: writeFd) try await input.write(with: writer) try await writer.finish() } @@ -300,9 +285,6 @@ public struct Configuration: Sendable { // this is the best we can do try? await self.cleanup( execution: execution, - inputPipe: inputPipe, - outputPipe: outputPipe, - errorPipe: errorPipe, childSide: false, parentSide: true, attemptToTerminateSubProcess: true @@ -350,9 +332,6 @@ extension Configuration { Error: OutputProtocol >( execution: Execution, - inputPipe: CreatedPipe, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe, childSide: Bool, parentSide: Bool, attemptToTerminateSubProcess: Bool @@ -384,25 +363,25 @@ extension Configuration { if childSide { inputError = captureError { - try inputPipe.readFileDescriptor?.safelyClose() + try execution.inputPipe.readEnd?.safelyClose() } outputError = captureError { - try outputPipe.writeFileDescriptor?.safelyClose() + try execution.outputPipe.writeEnd?.safelyClose() } errorError = captureError { - try errorPipe.writeFileDescriptor?.safelyClose() + try execution.errorPipe.writeEnd?.safelyClose() } } if parentSide { inputError = captureError { - try inputPipe.writeFileDescriptor?.safelyClose() + try execution.inputPipe.writeEnd?.safelyClose() } outputError = captureError { - try outputPipe.readFileDescriptor?.safelyClose() + try execution.outputPipe.readEnd?.safelyClose() } errorError = captureError { - try errorPipe.readFileDescriptor?.safelyClose() + try execution.errorPipe.readEnd?.safelyClose() } } @@ -822,17 +801,17 @@ internal enum StringOrRawBytes: Sendable, Hashable { } } -/// A simple wrapper on `FileDescriptor` plus a flag indicating -/// whether it should be closed automactially when done. -internal struct TrackedFileDescriptor: Hashable { +/// A wrapped `FileDescriptor` and whether it should be closed +/// automactially when done. +internal struct TrackedFileDescriptor { internal let closeWhenDone: Bool - internal let wrapped: FileDescriptor + internal let fileDescriptor: FileDescriptor internal init( - _ wrapped: FileDescriptor, + _ fileDescriptor: FileDescriptor, closeWhenDone: Bool ) { - self.wrapped = wrapped + self.fileDescriptor = fileDescriptor self.closeWhenDone = closeWhenDone } @@ -842,7 +821,7 @@ internal struct TrackedFileDescriptor: Hashable { } do { - try self.wrapped.close() + try fileDescriptor.close() } catch { guard let errno: Errno = error as? Errno else { throw error @@ -854,9 +833,38 @@ internal struct TrackedFileDescriptor: Hashable { } internal var platformDescriptor: PlatformFileDescriptor { - return self.wrapped.platformDescriptor + return self.fileDescriptor.platformDescriptor + } +} + +#if !os(Windows) +/// A wrapped `DispatchIO` and whether it should be closed +/// automactially when done. +internal struct TrackedDispatchIO { + internal let closeWhenDone: Bool + internal let dispatchIO: DispatchIO + + internal init( + _ dispatchIO: DispatchIO, + closeWhenDone: Bool + ) { + self.dispatchIO = dispatchIO + self.closeWhenDone = closeWhenDone + } + + internal func safelyClose() throws { + guard self.closeWhenDone else { + return + } + + dispatchIO.close() + } + + internal var platformDescriptor: PlatformFileDescriptor { + return self.dispatchIO.fileDescriptor } } +#endif internal struct CreatedPipe { internal let readFileDescriptor: TrackedFileDescriptor? @@ -864,7 +872,7 @@ internal struct CreatedPipe { internal init( readFileDescriptor: TrackedFileDescriptor?, - writeFileDescriptor: TrackedFileDescriptor? + writeFileDescriptor: TrackedFileDescriptor?, ) { self.readFileDescriptor = readFileDescriptor self.writeFileDescriptor = writeFileDescriptor @@ -872,7 +880,6 @@ internal struct CreatedPipe { internal init(closeWhenDone: Bool) throws { let pipe = try FileDescriptor.ssp_pipe() - self.readFileDescriptor = .init( pipe.readEnd, closeWhenDone: closeWhenDone diff --git a/Sources/Subprocess/Error.swift b/Sources/Subprocess/Error.swift index bf1c911..870e72b 100644 --- a/Sources/Subprocess/Error.swift +++ b/Sources/Subprocess/Error.swift @@ -92,7 +92,7 @@ extension SubprocessError: CustomStringConvertible, CustomDebugStringConvertible public var description: String { switch self.code.storage { case .spawnFailed: - return "Failed to spawn the new process." + return "Failed to spawn the new process with underlying error: \(self.underlyingError!)" case .executableNotFound(let executableName): return "Executable \"\(executableName)\" is not found or cannot be executed." case .failedToChangeWorkingDirectory(let workingDirectory): diff --git a/Sources/Subprocess/Execution.swift b/Sources/Subprocess/Execution.swift index 3839aac..3c1f763 100644 --- a/Sources/Subprocess/Execution.swift +++ b/Sources/Subprocess/Execution.swift @@ -42,8 +42,9 @@ public final class Execution< internal let output: Output internal let error: Error - internal let outputPipe: CreatedPipe - internal let errorPipe: CreatedPipe + internal let inputPipe: InputPipe + internal let outputPipe: OutputPipe + internal let errorPipe: OutputPipe internal let outputConsumptionState: AtomicBox #if os(Windows) @@ -53,13 +54,15 @@ public final class Execution< processIdentifier: ProcessIdentifier, output: Output, error: Error, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe, + inputPipe: InputPipe, + outputPipe: OutputPipe, + errorPipe: OutputPipe, consoleBehavior: PlatformOptions.ConsoleBehavior ) { self.processIdentifier = processIdentifier self.output = output self.error = error + self.inputPipe = inputPipe self.outputPipe = outputPipe self.errorPipe = errorPipe self.outputConsumptionState = AtomicBox() @@ -70,12 +73,14 @@ public final class Execution< processIdentifier: ProcessIdentifier, output: Output, error: Error, - outputPipe: CreatedPipe, - errorPipe: CreatedPipe + inputPipe: InputPipe, + outputPipe: OutputPipe, + errorPipe: OutputPipe ) { self.processIdentifier = processIdentifier self.output = output self.error = error + self.inputPipe = inputPipe self.outputPipe = outputPipe self.errorPipe = errorPipe self.outputConsumptionState = AtomicBox() @@ -98,11 +103,11 @@ extension Execution where Output == SequenceOutput { ) guard consumptionState.contains(.standardOutputConsumed), - let fd = self.outputPipe.readFileDescriptor + let readFd = self.outputPipe.readEnd else { fatalError("The standard output has already been consumed") } - return AsyncBufferSequence(fileDescriptor: fd) + return AsyncBufferSequence(diskIO: readFd) } } @@ -121,11 +126,11 @@ extension Execution where Error == SequenceOutput { ) guard consumptionState.contains(.standardErrorConsumed), - let fd = self.errorPipe.readFileDescriptor + let readFd = self.errorPipe.readEnd else { fatalError("The standard output has already been consumed") } - return AsyncBufferSequence(fileDescriptor: fd) + return AsyncBufferSequence(diskIO: readFd) } } @@ -165,13 +170,13 @@ extension Execution { ) { group in group.addTask { let stdout = try await self.output.captureOutput( - from: self.outputPipe.readFileDescriptor + from: self.outputPipe.readEnd ) return .standardOutputCaptured(stdout) } group.addTask { let stderr = try await self.error.captureOutput( - from: self.errorPipe.readFileDescriptor + from: self.errorPipe.readEnd ) return .standardErrorCaptured(stderr) } diff --git a/Sources/Subprocess/IO/Input.swift b/Sources/Subprocess/IO/Input.swift index d01f6bf..ff5dc45 100644 --- a/Sources/Subprocess/IO/Input.swift +++ b/Sources/Subprocess/IO/Input.swift @@ -212,10 +212,10 @@ extension InputProtocol { /// A writer that writes to the standard input of the subprocess. public final actor StandardInputWriter: Sendable { - internal let fileDescriptor: TrackedFileDescriptor + internal let diskIO: TrackedPlatformDiskIO - init(fileDescriptor: TrackedFileDescriptor) { - self.fileDescriptor = fileDescriptor + init(diskIO: TrackedPlatformDiskIO) { + self.diskIO = diskIO } /// Write an array of UInt8 to the standard input of the subprocess. @@ -224,7 +224,7 @@ public final actor StandardInputWriter: Sendable { public func write( _ array: [UInt8] ) async throws -> Int { - return try await self.fileDescriptor.wrapped.write(array) + return try await self.diskIO.write(array) } /// Write a `RawSpan` to the standard input of the subprocess. @@ -233,7 +233,7 @@ public final actor StandardInputWriter: Sendable { #if SubprocessSpan @available(SubprocessSpan, *) public func write(_ span: borrowing RawSpan) async throws -> Int { - return try await self.fileDescriptor.wrapped.write(span) + return try await self.diskIO.write(span) } #endif @@ -254,7 +254,24 @@ public final actor StandardInputWriter: Sendable { /// Signal all writes are finished public func finish() async throws { - try self.fileDescriptor.safelyClose() + try self.diskIO.safelyClose() + } +} + + +// MARK: - InputPipe +internal struct InputPipe { + // On Darwin and Linux, parent end (write end) should be + // wrapped as `DispatchIO` for writing + internal let readEnd: TrackedFileDescriptor? + internal let writeEnd: TrackedPlatformDiskIO? + + internal init( + readEnd: TrackedFileDescriptor?, + writeEnd: TrackedPlatformDiskIO? + ) { + self.readEnd = readEnd + self.writeEnd = writeEnd } } diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index bff78ff..e33292d 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -73,8 +73,8 @@ public struct DiscardedOutput: OutputProtocol { #else let devnull: FileDescriptor = try .openDevNull(withAcessMode: .readOnly) return CreatedPipe( - readFileDescriptor: .init(devnull, closeWhenDone: true), - writeFileDescriptor: nil + readFileDescriptor: nil, + writeFileDescriptor: .init(devnull, closeWhenDone: true) ) #endif } @@ -160,13 +160,15 @@ public struct BytesOutput: OutputProtocol { public typealias OutputType = [UInt8] public let maxSize: Int - internal func captureOutput(from fileDescriptor: TrackedFileDescriptor?) async throws -> [UInt8] { + internal func captureOutput( + from diskIO: TrackedPlatformDiskIO? + ) async throws -> [UInt8] { return try await withCheckedThrowingContinuation { continuation in - guard let fileDescriptor = fileDescriptor else { + guard let diskIO = diskIO else { // Show not happen due to type system constraints fatalError("Trying to capture output without file descriptor") } - fileDescriptor.wrapped.readUntilEOF(upToLength: self.maxSize) { result in + diskIO.readUntilEOF(upToLength: self.maxSize) { result in switch result { case .success(let data): // FIXME: remove workaround for @@ -298,6 +300,22 @@ extension OutputProtocol { } #endif +// MARK: - OutputPipe +internal struct OutputPipe { + // On Darwin and Linux, parent end (read end) should be + // wrapped as `DispatchIO` for reading + internal let readEnd: TrackedPlatformDiskIO? + internal let writeEnd: TrackedFileDescriptor? + + internal init( + readEnd: TrackedPlatformDiskIO?, + writeEnd: TrackedFileDescriptor? + ) { + self.readEnd = readEnd + self.writeEnd = writeEnd + } +} + // MARK: - Default Implementations #if SubprocessSpan @available(SubprocessSpan, *) @@ -317,22 +335,22 @@ extension OutputProtocol { /// Capture the output from the subprocess up to maxSize @_disfavoredOverload internal func captureOutput( - from fileDescriptor: TrackedFileDescriptor? + from diskIO: TrackedPlatformDiskIO? ) async throws -> OutputType { if let bytesOutput = self as? BytesOutput { - return try await bytesOutput.captureOutput(from: fileDescriptor) as! Self.OutputType + return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType } return try await withCheckedThrowingContinuation { continuation in if OutputType.self == Void.self { continuation.resume(returning: () as! OutputType) return } - guard let fileDescriptor = fileDescriptor else { + guard let diskIO = diskIO else { // Show not happen due to type system constraints fatalError("Trying to capture output without file descriptor") } - fileDescriptor.wrapped.readUntilEOF(upToLength: self.maxSize) { result in + diskIO.readUntilEOF(upToLength: self.maxSize) { result in do { switch result { case .success(let data): @@ -356,7 +374,7 @@ extension OutputProtocol { @available(SubprocessSpan, *) #endif extension OutputProtocol where OutputType == Void { - internal func captureOutput(from fileDescriptor: TrackedFileDescriptor?) async throws {} + internal func captureOutput(from fileDescriptor: TrackedPlatformDiskIO?) async throws {} #if SubprocessSpan /// Convert the output from Data to expected output type diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index ff6f114..384091b 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -195,7 +195,7 @@ extension Configuration { // Input var result: Int32 = -1 if let inputRead = inputPipe.readFileDescriptor { - result = posix_spawn_file_actions_adddup2(&fileActions, inputRead.wrapped.rawValue, 0) + result = posix_spawn_file_actions_adddup2(&fileActions, inputRead.platformDescriptor, 0) guard result == 0 else { try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) throw SubprocessError( @@ -206,7 +206,7 @@ extension Configuration { } if let inputWrite = inputPipe.writeFileDescriptor { // Close parent side - result = posix_spawn_file_actions_addclose(&fileActions, inputWrite.wrapped.rawValue) + result = posix_spawn_file_actions_addclose(&fileActions, inputWrite.platformDescriptor) guard result == 0 else { try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) throw SubprocessError( @@ -217,7 +217,7 @@ extension Configuration { } // Output if let outputWrite = outputPipe.writeFileDescriptor { - result = posix_spawn_file_actions_adddup2(&fileActions, outputWrite.wrapped.rawValue, 1) + result = posix_spawn_file_actions_adddup2(&fileActions, outputWrite.platformDescriptor, 1) guard result == 0 else { try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) throw SubprocessError( @@ -228,7 +228,7 @@ extension Configuration { } if let outputRead = outputPipe.readFileDescriptor { // Close parent side - result = posix_spawn_file_actions_addclose(&fileActions, outputRead.wrapped.rawValue) + result = posix_spawn_file_actions_addclose(&fileActions, outputRead.platformDescriptor) guard result == 0 else { try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) throw SubprocessError( @@ -239,7 +239,7 @@ extension Configuration { } // Error if let errorWrite = errorPipe.writeFileDescriptor { - result = posix_spawn_file_actions_adddup2(&fileActions, errorWrite.wrapped.rawValue, 2) + result = posix_spawn_file_actions_adddup2(&fileActions, errorWrite.platformDescriptor, 2) guard result == 0 else { try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) throw SubprocessError( @@ -250,7 +250,7 @@ extension Configuration { } if let errorRead = errorPipe.readFileDescriptor { // Close parent side - result = posix_spawn_file_actions_addclose(&fileActions, errorRead.wrapped.rawValue) + result = posix_spawn_file_actions_addclose(&fileActions, errorRead.platformDescriptor) guard result == 0 else { try self.cleanupPreSpawn(input: inputPipe, output: outputPipe, error: errorPipe) throw SubprocessError( @@ -354,8 +354,9 @@ extension Configuration { processIdentifier: .init(value: pid), output: output, error: error, - outputPipe: outputPipe, - errorPipe: errorPipe + inputPipe: inputPipe.createInputPipe(), + outputPipe: outputPipe.createOutputPipe(), + errorPipe: errorPipe.createOutputPipe() ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 16abb9f..fa4b965 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -71,12 +71,12 @@ extension Configuration { } // Setup input let fileDescriptors: [CInt] = [ - inputPipe.readFileDescriptor?.wrapped.rawValue ?? -1, - inputPipe.writeFileDescriptor?.wrapped.rawValue ?? -1, - outputPipe.writeFileDescriptor?.wrapped.rawValue ?? -1, - outputPipe.readFileDescriptor?.wrapped.rawValue ?? -1, - errorPipe.writeFileDescriptor?.wrapped.rawValue ?? -1, - errorPipe.readFileDescriptor?.wrapped.rawValue ?? -1, + inputPipe.readFileDescriptor?.platformDescriptor ?? -1, + inputPipe.writeFileDescriptor?.platformDescriptor ?? -1, + outputPipe.writeFileDescriptor?.platformDescriptor ?? -1, + outputPipe.readFileDescriptor?.platformDescriptor ?? -1, + errorPipe.writeFileDescriptor?.platformDescriptor ?? -1, + errorPipe.readFileDescriptor?.platformDescriptor ?? -1, ] let workingDirectory: String = self.workingDirectory.string @@ -126,8 +126,9 @@ extension Configuration { processIdentifier: .init(value: pid), output: output, error: error, - outputPipe: outputPipe, - errorPipe: errorPipe + inputPipe: inputPipe.createInputPipe(), + outputPipe: outputPipe.createOutputPipe(), + errorPipe: errorPipe.createOutputPipe() ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 2fb7711..19461db 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -29,7 +29,7 @@ import Glibc import Musl #endif -package import Dispatch +internal import Dispatch // MARK: - Signals @@ -390,19 +390,78 @@ extension FileDescriptor { } internal var platformDescriptor: PlatformFileDescriptor { - return self + return self.rawValue + } +} + +internal typealias PlatformFileDescriptor = CInt +internal typealias TrackedPlatformDiskIO = TrackedDispatchIO + +extension CreatedPipe { + internal func createInputPipe() -> InputPipe { + var writeEnd: TrackedPlatformDiskIO? = nil + if let writeFileDescriptor = self.writeFileDescriptor { + let dispatchIO: DispatchIO = DispatchIO( + type: .stream, + fileDescriptor: writeFileDescriptor.platformDescriptor, + queue: .global(), + cleanupHandler: { error in + // Close the file descriptor + if writeFileDescriptor.closeWhenDone { + try? writeFileDescriptor.safelyClose() + } + } + ) + writeEnd = .init( + dispatchIO, + closeWhenDone: writeFileDescriptor.closeWhenDone + ) + } + return InputPipe( + readEnd: self.readFileDescriptor, + writeEnd: writeEnd + ) } - #if SubprocessSpan + internal func createOutputPipe() -> OutputPipe { + var readEnd: TrackedPlatformDiskIO? = nil + if let readFileDescriptor = self.readFileDescriptor { + let dispatchIO: DispatchIO = DispatchIO( + type: .stream, + fileDescriptor: readFileDescriptor.platformDescriptor, + queue: .global(), + cleanupHandler: { error in + // Close the file descriptor + if readFileDescriptor.closeWhenDone { + try? readFileDescriptor.safelyClose() + } + } + ) + readEnd = .init( + dispatchIO, + closeWhenDone: readFileDescriptor.closeWhenDone + ) + } + return OutputPipe( + readEnd: readEnd, + writeEnd: self.writeFileDescriptor + ) + } +} + +// MARK: - TrackedDispatchIO extensions +extension TrackedDispatchIO { +#if SubprocessSpan @available(SubprocessSpan, *) - #endif +#endif package func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? { return try await withCheckedThrowingContinuation { continuation in - DispatchIO.read( - fromFileDescriptor: self.rawValue, - maxLength: maxLength, - runningHandlerOn: .global() - ) { data, error in + var buffer: DispatchData = .empty + self.dispatchIO.read( + offset: 0, + length: maxLength, + queue: .global() + ) { done, data, error in if error != 0 { continuation.resume( throwing: SubprocessError( @@ -412,10 +471,19 @@ extension FileDescriptor { ) return } - if data.isEmpty { - continuation.resume(returning: nil) - } else { - continuation.resume(returning: SequenceOutput.Buffer(data: data)) + if let data = data { + if buffer.isEmpty { + buffer = data + } else { + buffer.append(data) + } + } + if done { + if !buffer.isEmpty { + continuation.resume(returning: SequenceOutput.Buffer(data: buffer)) + } else { + continuation.resume(returning: nil) + } } } } @@ -425,19 +493,13 @@ extension FileDescriptor { upToLength maxLength: Int, resultHandler: sending @escaping (Swift.Result) -> Void ) { - let dispatchIO = DispatchIO( - type: .stream, - fileDescriptor: self.rawValue, - queue: .global() - ) { error in } var buffer: DispatchData? - dispatchIO.read( + self.dispatchIO.read( offset: 0, length: maxLength, queue: .global() ) { done, data, error in guard error == 0, let chunkData = data else { - dispatchIO.close() resultHandler( .failure( SubprocessError( @@ -451,7 +513,6 @@ extension FileDescriptor { // Easy case: if we are done and buffer is nil, this means // there is only one chunk of data if done && buffer == nil { - dispatchIO.close() buffer = chunkData resultHandler(.success(chunkData)) return @@ -464,16 +525,15 @@ extension FileDescriptor { } if done { - dispatchIO.close() resultHandler(.success(buffer!)) return } } } - #if SubprocessSpan +#if SubprocessSpan @available(SubprocessSpan, *) - package func write( + internal func write( _ span: borrowing RawSpan ) async throws -> Int { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in @@ -497,9 +557,9 @@ extension FileDescriptor { } } } - #endif // SubprocessSpan +#endif // SubprocessSpan - package func write( + internal func write( _ array: [UInt8] ) async throws -> Int { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in @@ -524,16 +584,21 @@ extension FileDescriptor { } } - package func write( + internal func write( _ dispatchData: DispatchData, queue: DispatchQueue = .global(), completion: @escaping (Int, Error?) -> Void ) { - DispatchIO.write( - toFileDescriptor: self.rawValue, + self.dispatchIO.write( + offset: 0, data: dispatchData, - runningHandlerOn: queue - ) { unwritten, error in + queue: queue + ) { done, unwritten, error in + guard done else { + // Wait until we are done writing or encountered some error + return + } + let unwrittenLength = unwritten?.count ?? 0 let writtenLength = dispatchData.count - unwrittenLength guard error != 0 else { @@ -551,6 +616,4 @@ extension FileDescriptor { } } -internal typealias PlatformFileDescriptor = FileDescriptor - #endif // canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index fbe0a34..627cdaa 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -151,8 +151,9 @@ extension Configuration { processIdentifier: pid, output: output, error: error, - outputPipe: outputPipe, - errorPipe: errorPipe, + inputPipe: inputPipe.createInputPipe(), + outputPipe: outputPipe.createOutputPipe(), + errorPipe: errorPipe.createOutputPipe(), consoleBehavior: self.platformOptions.consoleBehavior ) } @@ -267,8 +268,9 @@ extension Configuration { processIdentifier: pid, output: output, error: error, - outputPipe: outputPipe, - errorPipe: errorPipe, + inputPipe: inputPipe.createInputPipe(), + outputPipe: outputPipe.createOutputPipe(), + errorPipe: errorPipe.createOutputPipe(), consoleBehavior: self.platformOptions.consoleBehavior ) } @@ -971,9 +973,11 @@ extension Configuration { } } -// MARK: - PlatformFileDescriptor Type +// MARK: - Type alias internal typealias PlatformFileDescriptor = HANDLE +internal typealias TrackedPlatformDiskIO = TrackedFileDescriptor + // MARK: - Pipe Support extension FileDescriptor { // NOTE: Not the same as SwiftSystem's FileDescriptor.pipe, which has different behavior, @@ -1019,7 +1023,26 @@ extension FileDescriptor { var platformDescriptor: PlatformFileDescriptor { return HANDLE(bitPattern: _get_osfhandle(self.rawValue))! } +} + +extension CreatedPipe { + /// On Windows, we use file descriptors directly + internal func createInputPipe() -> InputPipe { + return InputPipe( + readEnd: self.readFileDescriptor, + writeEnd: self.writeFileDescriptor + ) + } + + internal func createOutputPipe() -> OutputPipe { + return OutputPipe( + readEnd: self.readFileDescriptor, + writeEnd: self.writeFileDescriptor + ) + } +} +extension TrackedFileDescriptor { internal func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? { return try await withCheckedThrowingContinuation { continuation in self.readUntilEOF( @@ -1053,7 +1076,7 @@ extension FileDescriptor { let bufferPtr = baseAddress.advanced(by: totalBytesRead) var bytesRead: DWORD = 0 let readSucceed = ReadFile( - self.platformDescriptor, + self.fileDescriptor.platformDescriptor, UnsafeMutableRawPointer(mutating: bufferPtr), DWORD(maxLength - totalBytesRead), &bytesRead, @@ -1094,7 +1117,7 @@ extension FileDescriptor { } } - #if SubprocessSpan +#if SubprocessSpan @available(SubprocessSpan, *) internal func write( _ span: borrowing RawSpan @@ -1112,7 +1135,7 @@ extension FileDescriptor { } } } - #endif +#endif internal func write( _ array: [UInt8] @@ -1133,32 +1156,26 @@ extension FileDescriptor { } } - package func write( + internal func write( _ ptr: UnsafeRawBufferPointer, completion: @escaping (Int, Swift.Error?) -> Void ) { - func _write( - _ ptr: UnsafeRawBufferPointer, - count: Int, - completion: @escaping (Int, Swift.Error?) -> Void - ) { - var writtenBytes: DWORD = 0 - let writeSucceed = WriteFile( - self.platformDescriptor, - ptr.baseAddress, - DWORD(count), - &writtenBytes, - nil + var writtenBytes: DWORD = 0 + let writeSucceed = WriteFile( + self.fileDescriptor.platformDescriptor, + ptr.baseAddress, + DWORD(ptr.count), + &writtenBytes, + nil + ) + if !writeSucceed { + let error = SubprocessError( + code: .init(.failedToWriteToSubprocess), + underlyingError: .init(rawValue: GetLastError()) ) - if !writeSucceed { - let error = SubprocessError( - code: .init(.failedToWriteToSubprocess), - underlyingError: .init(rawValue: GetLastError()) - ) - completion(Int(writtenBytes), error) - } else { - completion(Int(writtenBytes), nil) - } + completion(Int(writtenBytes), error) + } else { + completion(Int(writtenBytes), nil) } } } diff --git a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift index b00bf5a..3a18818 100644 --- a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift +++ b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift @@ -111,7 +111,7 @@ extension StandardInputWriter { public func write( _ data: Data ) async throws -> Int { - return try await self.fileDescriptor.wrapped.write(data) + return try await self.diskIO.write(data) } /// Write a AsyncSequence of Data to the standard input of the subprocess. @@ -128,8 +128,8 @@ extension StandardInputWriter { } } -extension FileDescriptor { - #if os(Windows) +#if os(Windows) +extension TrackedFileDescriptor { internal func write( _ data: Data ) async throws -> Int { @@ -148,7 +148,9 @@ extension FileDescriptor { } } } - #else +} +#else +extension TrackedDispatchIO { internal func write( _ data: Data ) async throws -> Int { @@ -173,7 +175,7 @@ extension FileDescriptor { } } } - #endif } +#endif // os(Windows) #endif // SubprocessFoundation diff --git a/Sources/_SubprocessCShims/process_shims.c b/Sources/_SubprocessCShims/process_shims.c index c8f59cb..8b0090d 100644 --- a/Sources/_SubprocessCShims/process_shims.c +++ b/Sources/_SubprocessCShims/process_shims.c @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -83,6 +84,42 @@ vm_size_t _subprocess_vm_size(void) { } #endif +// MARK: - Private Helpers +static pthread_mutex_t _subprocess_fork_lock = PTHREAD_MUTEX_INITIALIZER; + +// Used after fork, before exec +static int _subprocess_block_everything_but_something_went_seriously_wrong_signals(sigset_t *old_mask) { + sigset_t mask; + int r = 0; + r |= sigfillset(&mask); + r |= sigdelset(&mask, SIGABRT); + r |= sigdelset(&mask, SIGBUS); + r |= sigdelset(&mask, SIGFPE); + r |= sigdelset(&mask, SIGILL); + r |= sigdelset(&mask, SIGKILL); + r |= sigdelset(&mask, SIGSEGV); + r |= sigdelset(&mask, SIGSTOP); + r |= sigdelset(&mask, SIGSYS); + r |= sigdelset(&mask, SIGTRAP); + + r |= pthread_sigmask(SIG_BLOCK, &mask, old_mask); + return r; +} + +#define _subprocess_precondition(__cond) do { \ + int eval = (__cond); \ + if (!eval) { \ + __builtin_trap(); \ + } \ +} while(0) + +#if __DARWIN_NSIG +# define _SUBPROCESS_SIG_MAX __DARWIN_NSIG +#else +# define _SUBPROCESS_SIG_MAX 32 +#endif + + // MARK: - Darwin (posix_spawn) #if TARGET_OS_MAC static int _subprocess_spawn_prefork( @@ -97,6 +134,11 @@ static int _subprocess_spawn_prefork( int number_of_sgroups, const gid_t * _Nullable sgroups, int create_session ) { +#define write_error_and_exit int error = errno; \ + write(pipefd[1], &error, sizeof(error));\ + close(pipefd[1]); \ + _exit(EXIT_FAILURE) + // Set `POSIX_SPAWN_SETEXEC` flag since we are forking ourselves short flags = 0; int rc = posix_spawnattr_getflags(spawn_attrs, &flags); @@ -147,7 +189,7 @@ static int _subprocess_spawn_prefork( #pragma GCC diagnostic ignored "-Wdeprecated" pid_t childPid = fork(); #pragma GCC diagnostic pop - if (childPid == -1) { + if (childPid < 0) { close(pipefd[0]); close(pipefd[1]); return errno; @@ -160,28 +202,19 @@ static int _subprocess_spawn_prefork( // Perform setups if (number_of_sgroups > 0 && sgroups != NULL) { if (setgroups(number_of_sgroups, sgroups) != 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } if (uid != NULL) { if (setuid(*uid) != 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } if (gid != NULL) { if (setgid(*gid) != 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } @@ -197,20 +230,32 @@ static int _subprocess_spawn_prefork( _exit(EXIT_FAILURE); } else { // Parent process - close(pipefd[1]); // Close unused write end + // Close unused write end + close(pipefd[1]); // Communicate child pid back *pid = childPid; // Read from the pipe until pipe is closed - // Eitehr due to exec succeeds or error is written - int childError = 0; - if (read(pipefd[0], &childError, sizeof(childError)) > 0) { - // We encountered error - close(pipefd[0]); - return childError; - } else { - // Child process exec was successful - close(pipefd[0]); - return 0; + // either due to exec succeeds or error is written + while (true) { + int childError = 0; + ssize_t read_rc = read(pipefd[0], &childError, sizeof(childError)); + if (read_rc == 0) { + // exec worked! + close(pipefd[0]); + return 0; + } else if (read_rc > 0) { + // Child exec failed and reported back + close(pipefd[0]); + return childError; + } else { + // Read failed + if (errno == EINTR) { + continue; + } else { + close(pipefd[0]); + return errno; + } + } } } } @@ -416,6 +461,11 @@ int _subprocess_fork_exec( int create_session, void (* _Nullable configurator)(void) ) { +#define write_error_and_exit int error = errno; \ + write(pipefd[1], &error, sizeof(error));\ + close(pipefd[1]); \ + _exit(EXIT_FAILURE) + int require_pre_fork = _subprocess_is_addchdir_np_available() == 0 || uid != NULL || gid != NULL || @@ -474,12 +524,27 @@ int _subprocess_fork_exec( return errno; } + // Protect the signal masking below + // Note that we only unlock in parent since child + // will be exec'd anyway + int rc = pthread_mutex_lock(&_subprocess_fork_lock); + _subprocess_precondition(rc == 0); + // Block all signals on this thread + sigset_t old_sigmask; + rc = _subprocess_block_everything_but_something_went_seriously_wrong_signals(&old_sigmask); + if (rc != 0) { + close(pipefd[0]); + close(pipefd[1]); + return errno; + } + // Finally, fork #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated" pid_t childPid = fork(); #pragma GCC diagnostic pop - if (childPid == -1) { + if (childPid < 0) { + // Fork failed close(pipefd[0]); close(pipefd[1]); return errno; @@ -489,41 +554,53 @@ int _subprocess_fork_exec( // Child process close(pipefd[0]); // Close unused read end + // Reset signal handlers + for (int signo = 1; signo < _SUBPROCESS_SIG_MAX; signo++) { + if (signo == SIGKILL || signo == SIGSTOP) { + continue; + } + void (*err_ptr)(int) = signal(signo, SIG_DFL); + if (err_ptr != SIG_ERR) { + continue; + } + + if (errno == EINVAL) { + break; // probably too high of a signal + } + + write_error_and_exit; + } + + // Reset signal mask + sigset_t sigset = { 0 }; + sigemptyset(&sigset); + int rc = sigprocmask(SIG_SETMASK, &sigset, NULL) != 0; + if (rc != 0) { + write_error_and_exit; + } + // Perform setups if (working_directory != NULL) { if (chdir(working_directory) != 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } - if (uid != NULL) { if (setuid(*uid) != 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } if (gid != NULL) { if (setgid(*gid) != 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } if (number_of_sgroups > 0 && sgroups != NULL) { if (setgroups(number_of_sgroups, sgroups) != 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } @@ -536,23 +613,16 @@ int _subprocess_fork_exec( } // Bind stdin, stdout, and stderr - int rc = 0; if (file_descriptors[0] >= 0) { rc = dup2(file_descriptors[0], STDIN_FILENO); if (rc < 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } if (file_descriptors[2] >= 0) { rc = dup2(file_descriptors[2], STDOUT_FILENO); if (rc < 0) { - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } } if (file_descriptors[4] >= 0) { @@ -572,7 +642,7 @@ int _subprocess_fork_exec( rc = close(file_descriptors[3]); } if (file_descriptors[4] >= 0) { - rc = close(file_descriptors[5]); + rc = close(file_descriptors[4]); } if (rc != 0) { int error = errno; @@ -587,26 +657,46 @@ int _subprocess_fork_exec( // Finally, exec execve(exec_path, args, env); // If we reached this point, something went wrong - int error = errno; - write(pipefd[1], &error, sizeof(error)); - close(pipefd[1]); - _exit(EXIT_FAILURE); + write_error_and_exit; } else { // Parent process close(pipefd[1]); // Close unused write end + + // Restore old signmask + rc = pthread_sigmask(SIG_SETMASK, &old_sigmask, NULL); + if (rc != 0) { + close(pipefd[0]); + return errno; + } + + // Unlock + rc = pthread_mutex_unlock(&_subprocess_fork_lock); + _subprocess_precondition(rc == 0); + // Communicate child pid back *pid = childPid; // Read from the pipe until pipe is closed - // Eitehr due to exec succeeds or error is written - int childError = 0; - if (read(pipefd[0], &childError, sizeof(childError)) > 0) { - // We encountered error - close(pipefd[0]); - return childError; - } else { - // Child process exec was successful - close(pipefd[0]); - return 0; + // either due to exec succeeds or error is written + while (1) { + int childError = 0; + ssize_t read_rc = read(pipefd[0], &childError, sizeof(childError)); + if (read_rc == 0) { + // exec worked! + close(pipefd[0]); + return 0; + } else if (read_rc > 0) { + // Child exec failed and reported back + close(pipefd[0]); + return childError; + } else { + // Read failed + if (errno == EINTR) { + continue; + } else { + close(pipefd[0]); + return errno; + } + } } } } diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index b59fa7e..3326807 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -339,7 +339,7 @@ extension SubprocessUnixTests { .readOnly ) let cat = try await Subprocess.run( - .name("cat"), + .path("/bin/cat"), input: .fileDescriptor(text, closeAfterSpawningProcess: true), output: .data(limit: 2048 * 1024) ) @@ -658,7 +658,7 @@ extension SubprocessUnixTests { contentsOf: URL(filePath: theMysteriousIsland.string) ) let catResult = try await Subprocess.run( - .name("/bin/bash"), + .path("/bin/bash"), arguments: ["-c", "cat \(theMysteriousIsland.string) 1>&2"], error: .data(limit: 2048 * 1024) ) @@ -729,7 +729,7 @@ extension SubprocessUnixTests { var platformOptions = PlatformOptions() platformOptions.supplementaryGroups = Array(expectedGroups) let idResult = try await Subprocess.run( - .name("/usr/bin/swift"), + .path("/usr/bin/swift"), arguments: [getgroupsSwift.string], platformOptions: platformOptions, output: .string @@ -760,7 +760,7 @@ extension SubprocessUnixTests { // Sets the process group ID to 0, which creates a new session platformOptions.processGroupID = 0 let psResult = try await Subprocess.run( - .name("/bin/bash"), + .path("/bin/bash"), arguments: ["-c", "ps -o pid,pgid -p $$"], platformOptions: platformOptions, output: .string @@ -790,7 +790,7 @@ extension SubprocessUnixTests { // Check the proces ID (pid), pross group ID (pgid), and // controling terminal's process group ID (tpgid) let psResult = try await Subprocess.run( - .name("/bin/bash"), + .path("/bin/bash"), arguments: ["-c", "ps -o pid,pgid,tpgid -p $$"], platformOptions: platformOptions, output: .string @@ -803,7 +803,7 @@ extension SubprocessUnixTests { return } let result = try await Subprocess.run( - .name("/bin/bash"), + .path("/bin/bash"), arguments: [ "-c", """ @@ -911,6 +911,50 @@ extension SubprocessUnixTests { // .standardOutputConsumed ^ .standardOutputConsumed = 0 #expect(atomicBox.bitwiseXor(.standardOutputConsumed) == OutputConsumptionState(rawValue: 0)) } + + @Test func testExitSignal() async throws { + guard #available(SubprocessSpan , *) else { + return + } + + let signalsToTest: [CInt] = [SIGKILL, SIGTERM, SIGINT] + for signal in signalsToTest { + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "kill -\(signal) $$"] + ) + #expect(result.terminationStatus == .unhandledException(signal)) + } + } + + @Test func testCanReliablyKillProcessesEvenWithSigmask() async throws { + guard #available(SubprocessSpan , *) else { + return + } + let result = try await withThrowingTaskGroup( + of: TerminationStatus?.self, + returning: TerminationStatus.self + ) { group in + group.addTask { + return try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "trap 'echo no' TERM; while true; do sleep 1; done"], + ).terminationStatus + } + group.addTask { + try? await Task.sleep(nanoseconds: 100_000_000) + return nil + } + while let result = try await group.next() { + group.cancelAll() + if let result = result { + return result + } + } + preconditionFailure("Task shold have returned a result") + } + #expect(result == .unhandledException(SIGKILL)) + } } // MARK: - Utils @@ -924,7 +968,7 @@ extension SubprocessUnixTests { isEqualTo expected: gid_t ) async throws { let idResult = try await Subprocess.run( - .name("/usr/bin/id"), + .path("/usr/bin/id"), arguments: [argument], platformOptions: platformOptions, output: .string @@ -1081,6 +1125,40 @@ extension SubprocessUnixTests { try await group.waitForAll() } } + + @Test func testCancelProcessVeryEarlyOnStressTest() async throws { + guard #available(SubprocessSpan , *) else { + return + } + + for i in 0..<100 { + let terminationStatus = try await withThrowingTaskGroup( + of: TerminationStatus?.self, + returning: TerminationStatus.self + ) { group in + group.addTask { + return try await Subprocess.run( + .path("/bin/sleep"), + arguments: ["100000"] + ).terminationStatus + } + group.addTask { + let waitNS = UInt64.random(in: 0..<10_000_000) + try? await Task.sleep(nanoseconds: waitNS) + return nil + } + + while let result = try await group.next() { + group.cancelAll() + if let result = result { + return result + } + } + preconditionFailure("this should be impossible, task should've returned a result") + } + #expect(terminationStatus == .unhandledException(SIGKILL), "iteration \(i)") + } + } } #endif // canImport(Darwin) || canImport(Glibc)