diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 66e8de6..7d4880e 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -37,6 +37,7 @@ extension Configuration { outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe ) throws -> SpawnResult { + // Ensure the waiter thread is running. _setupMonitorSignalHandler() // Instead of checking if every possible executable path @@ -266,32 +267,53 @@ extension String { internal func monitorProcessTermination( forProcessWithIdentifier pid: ProcessIdentifier ) async throws -> TerminationStatus { - return try await withCheckedThrowingContinuation { continuation in + try await withCheckedThrowingContinuation { continuation in _childProcessContinuations.withLock { continuations in - if let existing = continuations.removeValue(forKey: pid.value), - case .status(let existingStatus) = existing - { - // We already have existing status to report - continuation.resume(returning: existingStatus) - } else { - // Save the continuation for handler - continuations[pid.value] = .continuation(continuation) - } + // We don't need to worry about a race condition here because waitid() + // does not clear the wait/zombie state of the child process. If it sees + // the child process has terminated and manages to acquire the lock before + // we add this continuation to the dictionary, then it will simply loop + // and report the status again. + let oldContinuation = continuations.updateValue(continuation, forKey: pid.value) + precondition(oldContinuation == nil) + + // Wake up the waiter thread if it is waiting for more child processes. + _ = pthread_cond_signal(_waitThreadNoChildrenCondition) } } } -private enum ContinuationOrStatus { - case continuation(CheckedContinuation) - case status(TerminationStatus) +// Small helper to provide thread-safe access to the child process to continuations map as well as a condition variable to suspend the calling thread when there are no subprocesses to wait for. Note that Mutex cannot be used here because we need the semantics of pthread_cond_wait, which requires passing the pthread_mutex_t instance as a parameter, something the Mutex API does not provide access to. +private final class ChildProcessContinuations: Sendable { + private nonisolated(unsafe) var continuations = [pid_t: CheckedContinuation]() + private nonisolated(unsafe) let mutex = UnsafeMutablePointer.allocate(capacity: 1) + + init() { + pthread_mutex_init(mutex, nil) + } + + func withLock(_ body: (inout [pid_t: CheckedContinuation]) throws -> R) rethrows -> R { + try withUnsafeUnderlyingLock { _, continuations in + try body(&continuations) + } + } + + func withUnsafeUnderlyingLock(_ body: (UnsafeMutablePointer, inout [pid_t: CheckedContinuation]) throws -> R) rethrows -> R { + pthread_mutex_lock(mutex) + defer { + pthread_mutex_unlock(mutex) + } + return try body(mutex, &continuations) + } } -private let _childProcessContinuations: - Mutex< - [pid_t: ContinuationOrStatus] - > = Mutex([:]) +private let _childProcessContinuations = ChildProcessContinuations() -private let signalSource: SendableSourceSignal = SendableSourceSignal() +private nonisolated(unsafe) let _waitThreadNoChildrenCondition = { + let result = UnsafeMutablePointer.allocate(capacity: 1) + _ = pthread_cond_init(result, nil) + return result +}() private extension siginfo_t { var si_status: Int32 { @@ -316,64 +338,70 @@ private extension siginfo_t { } private let setup: () = { - signalSource.setEventHandler { - while true { - var siginfo = siginfo_t() - guard waitid(P_ALL, id_t(0), &siginfo, WEXITED) == 0 || errno == EINTR else { - return - } - var status: TerminationStatus? = nil - switch siginfo.si_code { - case .init(CLD_EXITED): - status = .exited(siginfo.si_status) - case .init(CLD_KILLED), .init(CLD_DUMPED): - status = .unhandledException(siginfo.si_status) - case .init(CLD_TRAPPED), .init(CLD_STOPPED), .init(CLD_CONTINUED): - // Ignore these signals because they are not related to - // process exiting - break - default: - fatalError("Unexpected exit status: \(siginfo.si_code)") - } - if let status = status { - _childProcessContinuations.withLock { continuations in + // Create the thread. It will run immediately; because it runs in an infinite + // loop, we aren't worried about detaching or joining it. + var thread = pthread_t() + _ = pthread_create( + &thread, + nil, + { _ -> UnsafeMutableRawPointer? in + // Run an infinite loop that waits for child processes to terminate and + // captures their exit statuses. + while true { + // Listen for child process exit events. WNOWAIT means we don't perturb the + // state of a terminated (zombie) child process, allowing us to fetch the + // continuation (if available) before reaping. + var siginfo = siginfo_t() + errno = 0 + if waitid(P_ALL, id_t(0), &siginfo, WEXITED | WNOWAIT) == 0 { let pid = siginfo.si_pid - if let existing = continuations.removeValue(forKey: pid), - case .continuation(let c) = existing - { - c.resume(returning: status) - } else { - // We don't have continuation yet, just state status - continuations[pid] = .status(status) + + // If we had a continuation for this PID, allow the process to be reaped + // and pass the resulting exit condition back to the calling task. If + // there is no continuation, then either it hasn't been stored yet or + // this child process is not tracked by the waiter thread. + guard pid != 0, let c = _childProcessContinuations.withLock({ $0.removeValue(forKey: pid) }) else { + continue + } + + c.resume(with: Result { + // Here waitid should not block because `pid` has already terminated at this point. + while true { + var siginfo = siginfo_t() + errno = 0 + if waitid(P_PID, numericCast(pid), &siginfo, WEXITED) == 0 { + var status: TerminationStatus? = nil + switch siginfo.si_code { + case .init(CLD_EXITED): + return .exited(siginfo.si_status) + case .init(CLD_KILLED), .init(CLD_DUMPED): + return .unhandledException(siginfo.si_status) + default: + fatalError("Unexpected exit status: \(siginfo.si_code)") + } + } else if errno != EINTR { + throw SubprocessError.UnderlyingError(rawValue: errno) + } + } + }) + } else if errno == ECHILD { + // We got ECHILD. If there are no continuations added right now, we should + // suspend this thread on the no-children condition until it's awoken by a + // newly-scheduled waiter process. (If this condition is spuriously + // woken, we'll just loop again, which is fine.) Note that we read errno + // outside the lock in case acquiring the lock perturbs it. + _childProcessContinuations.withUnsafeUnderlyingLock { lock, childProcessContinuations in + if childProcessContinuations.isEmpty { + _ = pthread_cond_wait(_waitThreadNoChildrenCondition, lock) + } } } } - } - } - signalSource.resume() + }, + nil + ) }() -/// Unchecked Sendable here since this class is only explicitly -/// initialized once during the lifetime of the process -final class SendableSourceSignal: @unchecked Sendable { - private let signalSource: DispatchSourceSignal - - func setEventHandler(handler: @escaping DispatchSourceHandler) { - self.signalSource.setEventHandler(handler: handler) - } - - func resume() { - self.signalSource.resume() - } - - init() { - self.signalSource = DispatchSource.makeSignalSource( - signal: SIGCHLD, - queue: .global() - ) - } -} - private func _setupMonitorSignalHandler() { // Only executed once setup