Skip to content

Block signal masks pre-fork and restore post-fork. #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,39 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = SequenceOutput.Buffer

private let fileDescriptor: TrackedFileDescriptor
private let diskIO: TrackedPlatformDiskIO
private var buffer: [UInt8]
private var currentPosition: Int
private var finished: Bool

internal init(fileDescriptor: TrackedFileDescriptor) {
self.fileDescriptor = fileDescriptor
internal init(diskIO: TrackedPlatformDiskIO) {
self.diskIO = diskIO
self.buffer = []
self.currentPosition = 0
self.finished = false
}

public mutating func next() async throws -> SequenceOutput.Buffer? {
let data = try await self.fileDescriptor.wrapped.readChunk(
public func next() async throws -> SequenceOutput.Buffer? {
let data = try await self.diskIO.readChunk(
upToLength: readBufferSize
)
if data == nil {
// We finished reading. Close the file descriptor now
try self.fileDescriptor.safelyClose()
try self.diskIO.safelyClose()
return nil
}
return data
}
}

private let fileDescriptor: TrackedFileDescriptor
private let diskIO: TrackedPlatformDiskIO

init(fileDescriptor: TrackedFileDescriptor) {
self.fileDescriptor = fileDescriptor
internal init(diskIO: TrackedPlatformDiskIO) {
self.diskIO = diskIO
}

public func makeAsyncIterator() -> Iterator {
return Iterator(fileDescriptor: self.fileDescriptor)
return Iterator(diskIO: self.diskIO)
}
}

Expand Down
91 changes: 49 additions & 42 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ public struct Configuration: Sendable {
// After spawn, cleanup child side fds
try await self.cleanup(
execution: execution,
inputPipe: inputPipe,
outputPipe: outputPipe,
errorPipe: errorPipe,
childSide: true,
parentSide: false,
attemptToTerminateSubProcess: false
Expand All @@ -104,7 +101,7 @@ public struct Configuration: Sendable {
// Body runs in the same isolation
let result = try await body(
execution,
.init(fileDescriptor: inputPipe.writeFileDescriptor!)
.init(diskIO: execution.inputPipe.writeEnd!)
)
return ExecutionResult(
terminationStatus: try await waitingStatus,
Expand All @@ -116,9 +113,6 @@ public struct Configuration: Sendable {
// this is the best we can do
try? await self.cleanup(
execution: execution,
inputPipe: inputPipe,
outputPipe: outputPipe,
errorPipe: errorPipe,
childSide: false,
parentSide: true,
attemptToTerminateSubProcess: true
Expand Down Expand Up @@ -154,9 +148,6 @@ public struct Configuration: Sendable {
// After spawn, clean up child side
try await self.cleanup(
execution: execution,
inputPipe: inputPipe,
outputPipe: outputPipe,
errorPipe: errorPipe,
childSide: true,
parentSide: false,
attemptToTerminateSubProcess: false
Expand All @@ -174,7 +165,7 @@ public struct Configuration: Sendable {
standardError
) = try await execution.captureIOs()
// Write input in the same scope
guard let writeFd = inputPipe.writeFileDescriptor else {
guard let writeFd = execution.inputPipe.writeEnd else {
fatalError("Trying to write to an input that has been closed")
}
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, any Swift.Error>) in
Expand All @@ -193,7 +184,7 @@ public struct Configuration: Sendable {
)
#endif

writeFd.wrapped.write(bytes) { _, error in
writeFd.write(bytes) { _, error in
if let error = error {
continuation.resume(throwing: error)
} else {
Expand All @@ -216,9 +207,6 @@ public struct Configuration: Sendable {
// this is the best we can do
try? await self.cleanup(
execution: execution,
inputPipe: inputPipe,
outputPipe: outputPipe,
errorPipe: errorPipe,
childSide: false,
parentSide: true,
attemptToTerminateSubProcess: true
Expand Down Expand Up @@ -257,9 +245,6 @@ public struct Configuration: Sendable {
// After spawn, clean up child side
try await self.cleanup(
execution: execution,
inputPipe: inputPipe,
outputPipe: outputPipe,
errorPipe: errorPipe,
childSide: true,
parentSide: false,
attemptToTerminateSubProcess: false
Expand All @@ -271,8 +256,8 @@ public struct Configuration: Sendable {
returning: ExecutionResult.self
) { group in
group.addTask {
if let writeFd = inputPipe.writeFileDescriptor {
let writer = StandardInputWriter(fileDescriptor: writeFd)
if let writeFd = execution.inputPipe.writeEnd {
let writer = StandardInputWriter(diskIO: writeFd)
try await input.write(with: writer)
try await writer.finish()
}
Expand Down Expand Up @@ -300,9 +285,6 @@ public struct Configuration: Sendable {
// this is the best we can do
try? await self.cleanup(
execution: execution,
inputPipe: inputPipe,
outputPipe: outputPipe,
errorPipe: errorPipe,
childSide: false,
parentSide: true,
attemptToTerminateSubProcess: true
Expand Down Expand Up @@ -350,9 +332,6 @@ extension Configuration {
Error: OutputProtocol
>(
execution: Execution<Output, Error>,
inputPipe: CreatedPipe,
outputPipe: CreatedPipe,
errorPipe: CreatedPipe,
childSide: Bool,
parentSide: Bool,
attemptToTerminateSubProcess: Bool
Expand Down Expand Up @@ -384,25 +363,25 @@ extension Configuration {

if childSide {
inputError = captureError {
try inputPipe.readFileDescriptor?.safelyClose()
try execution.inputPipe.readEnd?.safelyClose()
}
outputError = captureError {
try outputPipe.writeFileDescriptor?.safelyClose()
try execution.outputPipe.writeEnd?.safelyClose()
}
errorError = captureError {
try errorPipe.writeFileDescriptor?.safelyClose()
try execution.errorPipe.writeEnd?.safelyClose()
}
}

if parentSide {
inputError = captureError {
try inputPipe.writeFileDescriptor?.safelyClose()
try execution.inputPipe.writeEnd?.safelyClose()
}
outputError = captureError {
try outputPipe.readFileDescriptor?.safelyClose()
try execution.outputPipe.readEnd?.safelyClose()
}
errorError = captureError {
try errorPipe.readFileDescriptor?.safelyClose()
try execution.errorPipe.readEnd?.safelyClose()
}
}

Expand Down Expand Up @@ -822,17 +801,17 @@ internal enum StringOrRawBytes: Sendable, Hashable {
}
}

/// A simple wrapper on `FileDescriptor` plus a flag indicating
/// whether it should be closed automactially when done.
internal struct TrackedFileDescriptor: Hashable {
/// A wrapped `FileDescriptor` and whether it should be closed
/// automactially when done.
internal struct TrackedFileDescriptor {
internal let closeWhenDone: Bool
internal let wrapped: FileDescriptor
internal let fileDescriptor: FileDescriptor

internal init(
_ wrapped: FileDescriptor,
_ fileDescriptor: FileDescriptor,
closeWhenDone: Bool
) {
self.wrapped = wrapped
self.fileDescriptor = fileDescriptor
self.closeWhenDone = closeWhenDone
}

Expand All @@ -842,7 +821,7 @@ internal struct TrackedFileDescriptor: Hashable {
}

do {
try self.wrapped.close()
try fileDescriptor.close()
} catch {
guard let errno: Errno = error as? Errno else {
throw error
Expand All @@ -854,25 +833,53 @@ internal struct TrackedFileDescriptor: Hashable {
}

internal var platformDescriptor: PlatformFileDescriptor {
return self.wrapped.platformDescriptor
return self.fileDescriptor.platformDescriptor
}
}

#if !os(Windows)
/// A wrapped `DispatchIO` and whether it should be closed
/// automactially when done.
internal struct TrackedDispatchIO {
internal let closeWhenDone: Bool
internal let dispatchIO: DispatchIO

internal init(
_ dispatchIO: DispatchIO,
closeWhenDone: Bool
) {
self.dispatchIO = dispatchIO
self.closeWhenDone = closeWhenDone
}

internal func safelyClose() throws {
guard self.closeWhenDone else {
return
}

dispatchIO.close()
}

internal var platformDescriptor: PlatformFileDescriptor {
return self.dispatchIO.fileDescriptor
}
}
#endif

internal struct CreatedPipe {
internal let readFileDescriptor: TrackedFileDescriptor?
internal let writeFileDescriptor: TrackedFileDescriptor?

internal init(
readFileDescriptor: TrackedFileDescriptor?,
writeFileDescriptor: TrackedFileDescriptor?
writeFileDescriptor: TrackedFileDescriptor?,
) {
self.readFileDescriptor = readFileDescriptor
self.writeFileDescriptor = writeFileDescriptor
}

internal init(closeWhenDone: Bool) throws {
let pipe = try FileDescriptor.ssp_pipe()

self.readFileDescriptor = .init(
pipe.readEnd,
closeWhenDone: closeWhenDone
Expand Down
2 changes: 1 addition & 1 deletion Sources/Subprocess/Error.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ extension SubprocessError: CustomStringConvertible, CustomDebugStringConvertible
public var description: String {
switch self.code.storage {
case .spawnFailed:
return "Failed to spawn the new process."
return "Failed to spawn the new process with underlying error: \(self.underlyingError!)"
case .executableNotFound(let executableName):
return "Executable \"\(executableName)\" is not found or cannot be executed."
case .failedToChangeWorkingDirectory(let workingDirectory):
Expand Down
29 changes: 17 additions & 12 deletions Sources/Subprocess/Execution.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public final class Execution<

internal let output: Output
internal let error: Error
internal let outputPipe: CreatedPipe
internal let errorPipe: CreatedPipe
internal let inputPipe: InputPipe
internal let outputPipe: OutputPipe
internal let errorPipe: OutputPipe
internal let outputConsumptionState: AtomicBox

#if os(Windows)
Expand All @@ -53,13 +54,15 @@ public final class Execution<
processIdentifier: ProcessIdentifier,
output: Output,
error: Error,
outputPipe: CreatedPipe,
errorPipe: CreatedPipe,
inputPipe: InputPipe,
outputPipe: OutputPipe,
errorPipe: OutputPipe,
consoleBehavior: PlatformOptions.ConsoleBehavior
) {
self.processIdentifier = processIdentifier
self.output = output
self.error = error
self.inputPipe = inputPipe
self.outputPipe = outputPipe
self.errorPipe = errorPipe
self.outputConsumptionState = AtomicBox()
Expand All @@ -70,12 +73,14 @@ public final class Execution<
processIdentifier: ProcessIdentifier,
output: Output,
error: Error,
outputPipe: CreatedPipe,
errorPipe: CreatedPipe
inputPipe: InputPipe,
outputPipe: OutputPipe,
errorPipe: OutputPipe
) {
self.processIdentifier = processIdentifier
self.output = output
self.error = error
self.inputPipe = inputPipe
self.outputPipe = outputPipe
self.errorPipe = errorPipe
self.outputConsumptionState = AtomicBox()
Expand All @@ -98,11 +103,11 @@ extension Execution where Output == SequenceOutput {
)

guard consumptionState.contains(.standardOutputConsumed),
let fd = self.outputPipe.readFileDescriptor
let readFd = self.outputPipe.readEnd
else {
fatalError("The standard output has already been consumed")
}
return AsyncBufferSequence(fileDescriptor: fd)
return AsyncBufferSequence(diskIO: readFd)
}
}

Expand All @@ -121,11 +126,11 @@ extension Execution where Error == SequenceOutput {
)

guard consumptionState.contains(.standardErrorConsumed),
let fd = self.errorPipe.readFileDescriptor
let readFd = self.errorPipe.readEnd
else {
fatalError("The standard output has already been consumed")
}
return AsyncBufferSequence(fileDescriptor: fd)
return AsyncBufferSequence(diskIO: readFd)
}
}

Expand Down Expand Up @@ -165,13 +170,13 @@ extension Execution {
) { group in
group.addTask {
let stdout = try await self.output.captureOutput(
from: self.outputPipe.readFileDescriptor
from: self.outputPipe.readEnd
)
return .standardOutputCaptured(stdout)
}
group.addTask {
let stderr = try await self.error.captureOutput(
from: self.errorPipe.readFileDescriptor
from: self.errorPipe.readEnd
)
return .standardErrorCaptured(stderr)
}
Expand Down
Loading