Skip to content

Prevent race conditions in process teardown sequences and use of Execution in the body closure #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 23, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 38 additions & 14 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,29 @@ public struct Configuration: Sendable {
let pid = spawnResults.execution.processIdentifier

var spawnResultBox: SpawnResult?? = consume spawnResults
var _spawnResult = spawnResultBox!.take()!

return try await withAsyncTaskCleanupHandler {
var _spawnResult = spawnResultBox!.take()!
let processIdentifier = _spawnResult.execution.processIdentifier

let result = try await withAsyncTaskCleanupHandler {
let inputIO = _spawnResult.inputWriteEnd()
let outputIO = _spawnResult.outputReadEnd()
let errorIO = _spawnResult.errorReadEnd()
let processIdentifier = _spawnResult.execution.processIdentifier

async let terminationStatus = try monitorProcessTermination(
forProcessWithIdentifier: processIdentifier
)
// Body runs in the same isolation
let result = try await body(_spawnResult.execution, inputIO, outputIO, errorIO)
return ExecutionResult(
terminationStatus: try await terminationStatus,
value: result
)
return try await body(_spawnResult.execution, inputIO, outputIO, errorIO)
} onCleanup: {
// Attempt to terminate the child process
await Execution.runTeardownSequence(
self.platformOptions.teardownSequence,
on: pid
)
}

return ExecutionResult(
terminationStatus: try await monitorProcessTermination(forProcessWithIdentifier: processIdentifier),
value: result
)
}
}

Expand Down Expand Up @@ -752,11 +751,13 @@ extension Optional where Wrapped == String {
}
}

/// Runs `body`, and then runs `onCleanup` if body throws an error, or if the parent task is cancelled. In the latter case, `onCleanup` may be run concurrently with `body`. `body` is guaranteed to run exactly once. `onCleanup` is guaranteed to run only once, or not at all.
internal func withAsyncTaskCleanupHandler<Result>(
_ body: () async throws -> Result,
onCleanup handler: @Sendable @escaping () async -> Void,
isolation: isolated (any Actor)? = #isolation
) async rethrows -> Result {
let (runCancellationHandlerStream, runCancellationHandlerContinuation) = AsyncThrowingStream.makeStream(of: Void.self)
return try await withThrowingTaskGroup(
of: Void.self,
returning: Result.self
Expand All @@ -767,15 +768,38 @@ internal func withAsyncTaskCleanupHandler<Result>(
// before the time ends. We then run the cancel handler.
do { while true { try await Task.sleep(nanoseconds: 1_000_000_000) } } catch {}
// Run task cancel handler
await handler()
runCancellationHandlerContinuation.finish(throwing: CancellationError())
}

group.addTask {
// Enumerate the async stream until it completes or throws an error.
// Since we signal completion of the stream from cancellation or the
// parent task or the body throwing, this ensures that we run the
// cleanup handler exactly once in any failure scenario, and also do
// so _immediately_ if the failure scenario is due to parent task
// cancellation. We do so in a detached Task to prevent cancellation
// of the parent task from interrupting enumeration of the stream itself.
await Task.detached {
do {
var iterator = runCancellationHandlerStream.makeAsyncIterator()
while let _ = try await iterator.next() {
}
} catch {
await handler()
}
}.value
}

defer {
group.cancelAll()
}

do {
let result = try await body()
group.cancelAll()
runCancellationHandlerContinuation.finish()
return result
} catch {
await handler()
runCancellationHandlerContinuation.finish(throwing: error)
throw error
}
}
Expand Down
Loading