From 7faeeb898fd823a7b945e20fec9d9e59f980bcca Mon Sep 17 00:00:00 2001 From: Jake Petroules Date: Mon, 16 Jun 2025 10:23:08 -0700 Subject: [PATCH] Process termination monitoring implementation on Linux conflicts with processes spawned by other means The approach used by Subprocess to monitor subprocess termination on Linux is fundamentally flawed as it calls waitid with P_ALL, and WEXITED without WNOWAIT, which will end up reaping pids that were spawned outside the Subprocess library. Use an implementation more like swift-testing does for wait tests, which doesn't suffer from this issue. Closes #82 --- .../Platforms/Subprocess+Linux.swift | 168 ++++++++++-------- 1 file changed, 98 insertions(+), 70 deletions(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 66e8de6..7d4880e 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -37,6 +37,7 @@ extension Configuration { outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe ) throws -> SpawnResult { + // Ensure the waiter thread is running. _setupMonitorSignalHandler() // Instead of checking if every possible executable path @@ -266,32 +267,53 @@ extension String { internal func monitorProcessTermination( forProcessWithIdentifier pid: ProcessIdentifier ) async throws -> TerminationStatus { - return try await withCheckedThrowingContinuation { continuation in + try await withCheckedThrowingContinuation { continuation in _childProcessContinuations.withLock { continuations in - if let existing = continuations.removeValue(forKey: pid.value), - case .status(let existingStatus) = existing - { - // We already have existing status to report - continuation.resume(returning: existingStatus) - } else { - // Save the continuation for handler - continuations[pid.value] = .continuation(continuation) - } + // We don't need to worry about a race condition here because waitid() + // does not clear the wait/zombie state of the child process. If it sees + // the child process has terminated and manages to acquire the lock before + // we add this continuation to the dictionary, then it will simply loop + // and report the status again. + let oldContinuation = continuations.updateValue(continuation, forKey: pid.value) + precondition(oldContinuation == nil) + + // Wake up the waiter thread if it is waiting for more child processes. + _ = pthread_cond_signal(_waitThreadNoChildrenCondition) } } } -private enum ContinuationOrStatus { - case continuation(CheckedContinuation) - case status(TerminationStatus) +// Small helper to provide thread-safe access to the child process to continuations map as well as a condition variable to suspend the calling thread when there are no subprocesses to wait for. Note that Mutex cannot be used here because we need the semantics of pthread_cond_wait, which requires passing the pthread_mutex_t instance as a parameter, something the Mutex API does not provide access to. +private final class ChildProcessContinuations: Sendable { + private nonisolated(unsafe) var continuations = [pid_t: CheckedContinuation]() + private nonisolated(unsafe) let mutex = UnsafeMutablePointer.allocate(capacity: 1) + + init() { + pthread_mutex_init(mutex, nil) + } + + func withLock(_ body: (inout [pid_t: CheckedContinuation]) throws -> R) rethrows -> R { + try withUnsafeUnderlyingLock { _, continuations in + try body(&continuations) + } + } + + func withUnsafeUnderlyingLock(_ body: (UnsafeMutablePointer, inout [pid_t: CheckedContinuation]) throws -> R) rethrows -> R { + pthread_mutex_lock(mutex) + defer { + pthread_mutex_unlock(mutex) + } + return try body(mutex, &continuations) + } } -private let _childProcessContinuations: - Mutex< - [pid_t: ContinuationOrStatus] - > = Mutex([:]) +private let _childProcessContinuations = ChildProcessContinuations() -private let signalSource: SendableSourceSignal = SendableSourceSignal() +private nonisolated(unsafe) let _waitThreadNoChildrenCondition = { + let result = UnsafeMutablePointer.allocate(capacity: 1) + _ = pthread_cond_init(result, nil) + return result +}() private extension siginfo_t { var si_status: Int32 { @@ -316,64 +338,70 @@ private extension siginfo_t { } private let setup: () = { - signalSource.setEventHandler { - while true { - var siginfo = siginfo_t() - guard waitid(P_ALL, id_t(0), &siginfo, WEXITED) == 0 || errno == EINTR else { - return - } - var status: TerminationStatus? = nil - switch siginfo.si_code { - case .init(CLD_EXITED): - status = .exited(siginfo.si_status) - case .init(CLD_KILLED), .init(CLD_DUMPED): - status = .unhandledException(siginfo.si_status) - case .init(CLD_TRAPPED), .init(CLD_STOPPED), .init(CLD_CONTINUED): - // Ignore these signals because they are not related to - // process exiting - break - default: - fatalError("Unexpected exit status: \(siginfo.si_code)") - } - if let status = status { - _childProcessContinuations.withLock { continuations in + // Create the thread. It will run immediately; because it runs in an infinite + // loop, we aren't worried about detaching or joining it. + var thread = pthread_t() + _ = pthread_create( + &thread, + nil, + { _ -> UnsafeMutableRawPointer? in + // Run an infinite loop that waits for child processes to terminate and + // captures their exit statuses. + while true { + // Listen for child process exit events. WNOWAIT means we don't perturb the + // state of a terminated (zombie) child process, allowing us to fetch the + // continuation (if available) before reaping. + var siginfo = siginfo_t() + errno = 0 + if waitid(P_ALL, id_t(0), &siginfo, WEXITED | WNOWAIT) == 0 { let pid = siginfo.si_pid - if let existing = continuations.removeValue(forKey: pid), - case .continuation(let c) = existing - { - c.resume(returning: status) - } else { - // We don't have continuation yet, just state status - continuations[pid] = .status(status) + + // If we had a continuation for this PID, allow the process to be reaped + // and pass the resulting exit condition back to the calling task. If + // there is no continuation, then either it hasn't been stored yet or + // this child process is not tracked by the waiter thread. + guard pid != 0, let c = _childProcessContinuations.withLock({ $0.removeValue(forKey: pid) }) else { + continue + } + + c.resume(with: Result { + // Here waitid should not block because `pid` has already terminated at this point. + while true { + var siginfo = siginfo_t() + errno = 0 + if waitid(P_PID, numericCast(pid), &siginfo, WEXITED) == 0 { + var status: TerminationStatus? = nil + switch siginfo.si_code { + case .init(CLD_EXITED): + return .exited(siginfo.si_status) + case .init(CLD_KILLED), .init(CLD_DUMPED): + return .unhandledException(siginfo.si_status) + default: + fatalError("Unexpected exit status: \(siginfo.si_code)") + } + } else if errno != EINTR { + throw SubprocessError.UnderlyingError(rawValue: errno) + } + } + }) + } else if errno == ECHILD { + // We got ECHILD. If there are no continuations added right now, we should + // suspend this thread on the no-children condition until it's awoken by a + // newly-scheduled waiter process. (If this condition is spuriously + // woken, we'll just loop again, which is fine.) Note that we read errno + // outside the lock in case acquiring the lock perturbs it. + _childProcessContinuations.withUnsafeUnderlyingLock { lock, childProcessContinuations in + if childProcessContinuations.isEmpty { + _ = pthread_cond_wait(_waitThreadNoChildrenCondition, lock) + } } } } - } - } - signalSource.resume() + }, + nil + ) }() -/// Unchecked Sendable here since this class is only explicitly -/// initialized once during the lifetime of the process -final class SendableSourceSignal: @unchecked Sendable { - private let signalSource: DispatchSourceSignal - - func setEventHandler(handler: @escaping DispatchSourceHandler) { - self.signalSource.setEventHandler(handler: handler) - } - - func resume() { - self.signalSource.resume() - } - - init() { - self.signalSource = DispatchSource.makeSignalSource( - signal: SIGCHLD, - queue: .global() - ) - } -} - private func _setupMonitorSignalHandler() { // Only executed once setup