Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions Sources/SWBTaskConstruction/ProductPlanning/BuildPlan.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package import SWBUtil
package import SWBCore
import os
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The os module is Darwin-only and doesn't seem to be used here

import Foundation

/// Information describing a complete build plan request.
package struct BuildPlanRequest: Sendable {
Expand Down Expand Up @@ -88,8 +90,10 @@ package final class BuildPlan: StaleFileRemovalContext {
// Create a planner to produce the actual product plans.
let planner = ProductPlanner(planRequest: planRequest, taskPlanningDelegate: delegate)

// Create the queues to produce and aggregate the tasks.
let aggregationQueue = SWBQueue(label: "SWBTaskConstruction.BuildPlan.aggregationQueue", qos: planRequest.buildRequest.qos, autoreleaseFrequency: .workItem)
// Compute adaptive parallelism based on available cores
let processorCount = ProcessInfo.processInfo.activeProcessorCount
let highParallelism = max(processorCount * 2, 4) // Allow oversubscription for I/O-bound tasks
let mediumParallelism = max(processorCount, 2) // CPU-bound tasks

// Compute a collated list of result contexts and task producers, so we can do a single parallel dispatch.
//
Expand Down Expand Up @@ -177,16 +181,13 @@ package final class BuildPlan: StaleFileRemovalContext {
await ext.generateAdditionalTasks(&tasks, producer)
}

aggregationQueue.async { [tasks] in
productPlanResultContext.addPlannedTasks(tasks)
}
// Direct call - thread-safe via ProductPlanResultContext's internal lock
productPlanResultContext.addPlannedTasks(tasks)
}

// Wait for task production.
await group.waitForAll()
}

await aggregationQueue.sync{ }
if delegate.cancelled {
// Reset any deferred producers, which may participate in cycles.
for context in productPlanResultContexts {
Expand All @@ -200,23 +201,19 @@ package final class BuildPlan: StaleFileRemovalContext {

// Compute all of the deferred tasks (in parallel).
delegate.updateProgress(statusMessage: messageShortening == .full ? "Planning deferred tasks" : "Constructing deferred tasks", showInLog: false)
await TaskGroup.concurrentPerform(iterations: productPlanResultContexts.count, maximumParallelism: 10) { i in
await TaskGroup.concurrentPerform(iterations: productPlanResultContexts.count, maximumParallelism: mediumParallelism) { i in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximumParallelism is intended to put an upper bound on the number of active tasks rather than fan out as widely as possible, since we fan out at multiple levels of the build planning process. What kind of speedup do you see if this change and the one below are applied in isolation?

let productPlanResultContext = productPlanResultContexts[i]
let plan = productPlanResultContext.productPlan
plan.taskProducerContext.outputsOfMainTaskProducers = productPlanResultContext.outputNodes
let deferredProducers = plan.taskProducerContext.takeDeferredProducers()

if delegate.cancelled { return }
await TaskGroup.concurrentPerform(iterations: deferredProducers.count, maximumParallelism: 10) { i in
await TaskGroup.concurrentPerform(iterations: deferredProducers.count, maximumParallelism: mediumParallelism) { i in
let tasks = await deferredProducers[i]()
aggregationQueue.async {
productPlanResultContext.addPlannedTasks(tasks)
}
// Direct call - thread-safe via ProductPlanResultContext's internal lock
productPlanResultContext.addPlannedTasks(tasks)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this from an asynchronous dispatch to a blocking call with internal locking means that the aggregation and task production are less pipelined here - is this change beneficial in isolation?

}
}

// Wait for product plan aggregation.
await aggregationQueue.sync {}
if delegate.cancelled {
return nil
}
Expand All @@ -230,16 +227,13 @@ package final class BuildPlan: StaleFileRemovalContext {
if delegate.cancelled { return [] }

// Get the list of effective planned tasks for the product plan.
return await aggregationQueue.sync { resultContext.plannedTasks }
return resultContext.plannedTasks
}
}

// Serially add the tasks for this product plan to the array for the whole build request.
return await group.reduce(into: [], { $0.append(contentsOf: $1) })
}

// Wait for task validation.
await aggregationQueue.sync{ }
if delegate.cancelled {
return nil
}
Expand Down Expand Up @@ -298,14 +292,20 @@ package final class BuildPlan: StaleFileRemovalContext {



/// This context stores the results of task generation for a product plan. It is used by a build plan to collect results of task generation, and once task generation is complete to compute the final set of planned tasks to be used for a product plan by evaluating task validity criteria..
/// This context stores the results of task generation for a product plan. It is used by a build plan to collect results of task generation, and once task generation is complete to compute the final set of planned tasks to be used for a product plan by evaluating task validity criteria.
///
/// This class is not thread-safe; the build plan is expected to build up the context in a manner that accounts for that.
/// **Thread-Safety**: This class is thread-safe for concurrent task additions.
/// - `addPlannedTask()` and `addPlannedTasks()` are protected by an internal lock, allowing multiple producers to add tasks concurrently
/// - Properties like `plannedTasks`, `outputNodes`, `inputPaths`, etc. must only be accessed after all task additions are complete
/// - The BuildPlan ensures proper ordering: all writes complete before any reads of these properties
private final class ProductPlanResultContext: TaskValidationContext, CustomStringConvertible {
fileprivate let productPlan: ProductPlan

private let targetName: String

/// Lock to protect concurrent access to mutable state
private let lock = Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the SWBMutex shim over Lock in all new code (and allow annotating ProductPlanResultContext as Sendable), but based on the other comment I'm not convinced this is an improvement over the existing queue


/// All planned tasks for the product plan.
private var allPlannedTasks: Set<Ref<any PlannedTask>>

Expand Down Expand Up @@ -351,16 +351,18 @@ private final class ProductPlanResultContext: TaskValidationContext, CustomStrin
}

func addPlannedTask(_ plannedTask: any PlannedTask) {
allPlannedTasks.insert(Ref(plannedTask))

// Add the task's inputs and outputs to the result context. However, we only do this if the task doesn't have validity criteria.
// Otherwise, a task which we later determine is not valid might cause another to be considered valid when it otherwise would not be.
if plannedTask.validityCriteria == nil {
for input in plannedTask.inputs {
addInputPath(input.path)
}
for output in plannedTask.outputs {
addOutputPath(output.path)
lock.withLock {
allPlannedTasks.insert(Ref(plannedTask))

// Add the task's inputs and outputs to the result context. However, we only do this if the task doesn't have validity criteria.
// Otherwise, a task which we later determine is not valid might cause another to be considered valid when it otherwise would not be.
if plannedTask.validityCriteria == nil {
for input in plannedTask.inputs {
addInputPath(input.path)
}
for output in plannedTask.outputs {
addOutputPath(output.path)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import SWBCore
import SWBUtil
import SWBMacro
import Foundation

@PluginExtensionSystemActor internal func taskProducerExtensions(_ workspaceContext: WorkspaceContext) -> [any TaskProducerExtension] {
let extensions = workspaceContext.core.pluginManager.extensions(of: TaskProducerExtensionPoint.self)
Expand Down Expand Up @@ -48,7 +49,8 @@ package struct ProductPlanner
let targetTaskInfos = globalProductPlan.targetGateNodes

// Create the plans themselves in parallel.
var productPlans = await globalProductPlan.allTargets.asyncMap { configuredTarget in
let maxParallelism = max(1, ProcessInfo.processInfo.activeProcessorCount)
var productPlans = await globalProductPlan.allTargets.concurrentMap(maximumParallelism: maxParallelism) { configuredTarget in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable to switch over to concurrentMap, but like the other changes I'm not convinced it should scale with core count unless that change yields a meaningful speedup in isolation

// Create the product plan for the this target, and serially add it to the list of product plans.
return await ProductPlanBuilder(configuredTarget: configuredTarget, workspaceContext: self.planRequest.workspaceContext, delegate: self.delegate).createProductPlan(targetTaskInfos[configuredTarget]!, globalProductPlan)
}
Expand Down
Loading