Skip to content

Commit

Permalink
use double locking to balance edges
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Sep 19, 2024
1 parent e9d3828 commit 4d51e14
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 593 deletions.
320 changes: 151 additions & 169 deletions turbopack/crates/turbo-tasks-memory/src/aggregation/balance_edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,29 @@ use std::cmp::Ordering;

use super::{
balance_queue::BalanceQueue,
followers::{
add_follower_count, remove_follower_count, remove_positive_follower_count,
RemovePositveFollowerCountResult,
},
in_progress::is_in_progress,
in_progress::{is_in_progress, start_in_progress_all, start_in_progress_count},
increase::IncreaseReason,
increase_aggregation_number_internal,
uppers::{
add_upper_count, remove_positive_upper_count, remove_upper_count,
RemovePositiveUpperCountResult,
},
AggregationContext, AggregationNode,
notify_lost_follower::notify_lost_follower,
notify_new_follower::notify_new_follower,
util::{get_aggregated_add_change, get_aggregated_remove_change, get_followers_or_children},
AggregationContext, AggregationNode, PreparedInternalOperation, PreparedOperation, StackVec,
};

// Migrated followers to uppers or uppers to followers depending on the
// Migrate followers to uppers or uppers to followers depending on the
// aggregation numbers of the nodes involved in the edge. Might increase targets
// aggregation number if they are equal.
pub(super) fn balance_edge<C: AggregationContext>(
ctx: &C,
balance_queue: &mut BalanceQueue<C::NodeRef>,
upper_id: &C::NodeRef,
mut upper_aggregation_number: u32,
target_id: &C::NodeRef,
mut target_aggregation_number: u32,
) -> (u32, u32) {
// too many uppers on target
let mut extra_uppers = 0;
// too many followers on upper
let mut extra_followers = 0;
// The last info about uppers
let mut uppers_count: Option<isize> = None;
// The last info about followers
let mut followers_count = None;

) {
loop {
let (mut upper, mut target) = ctx.node_pair(upper_id, target_id);
let upper_aggregation_number = upper.aggregation_number();
let target_aggregation_number = target.aggregation_number();

let root = upper_aggregation_number == u32::MAX || target_aggregation_number == u32::MAX;
let order = if root {
Ordering::Greater
Expand All @@ -45,164 +33,158 @@ pub(super) fn balance_edge<C: AggregationContext>(
};
match order {
Ordering::Equal => {
// we probably want to increase the aggregation number of target
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
drop(upper);
if upper_aggregation_number != u32::MAX
&& upper_aggregation_number == target_aggregation_number
{
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if upper_aggregation_number == target_aggregation_number {
// increase target aggregation number
increase_aggregation_number_internal(
ctx,
balance_queue,
target,
target_id,
target_aggregation_number + 1,
target_aggregation_number + 1,
IncreaseReason::EqualAggregationNumberOnBalance,
);
}
}
// increase target aggregation number
increase_aggregation_number_internal(
ctx,
balance_queue,
target,
target_id,
target_aggregation_number + 1,
target_aggregation_number + 1,
IncreaseReason::EqualAggregationNumberOnBalance,
);
}
Ordering::Less => {
// target should probably be a follower of upper
if uppers_count.map_or(false, |count| count <= 0) {
// We already removed all uppers, maybe too many
if is_in_progress(ctx, upper_id) {
drop(target);
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating
.enqueued_balancing
.push((upper_id.clone(), target_id.clone()));
drop(upper);
// Somebody else will balance this edge
break;
} else if extra_followers == 0 {
let upper = ctx.node(upper_id);
upper_aggregation_number = upper.aggregation_number();
if upper_aggregation_number < target_aggregation_number {
// target should be a follower of upper
// add some extra followers
let count = uppers_count.unwrap_or(1) as usize;
extra_followers += count;
followers_count = Some(add_follower_count(
ctx,
balance_queue,
upper,
upper_id,
target_id,
count,
true,
));
}
}

// target should be a follower of upper
let count = target
.uppers_mut()
.remove_all_positive_clonable_count(upper_id);
if count == 0 {
break;
}
let added = upper
.followers_mut()
.unwrap()
.add_clonable_count(target_id, count);

// target removed as upper
let remove_change = get_aggregated_remove_change(ctx, &target);
let followers = get_followers_or_children(ctx, &target);

let upper_uppers = if added {
// target added as follower
let uppers = upper.uppers().iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);
uppers
} else {
// we already have extra followers, remove some uppers to balance
let count = extra_followers + extra_uppers;
let target = ctx.node(target_id);
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
let RemovePositiveUpperCountResult {
removed_count,
remaining_count,
} = remove_positive_upper_count(
ctx,
balance_queue,
target,
upper_id,
count,
);
decrease_numbers(removed_count, &mut extra_uppers, &mut extra_followers);
uppers_count = Some(remaining_count);
}
Default::default()
};

drop(target);

// target removed as upper
let remove_prepared =
remove_change.and_then(|remove_change| upper.apply_change(ctx, remove_change));
start_in_progress_count(ctx, upper_id, followers.len() as u32);
let prepared = followers
.into_iter()
.map(|child_id| {
upper.notify_lost_follower(ctx, balance_queue, upper_id, &child_id)
})
.collect::<StackVec<_>>();
drop(upper);

// target added as follower
for upper_id in upper_uppers {
notify_new_follower(
ctx,
balance_queue,
ctx.node(&upper_id),
&upper_id,
target_id,
false,
);
}

// target removed as upper
remove_prepared.apply(ctx);
prepared.apply(ctx, balance_queue);

break;
}
Ordering::Greater => {
// target should probably be an inner node of upper
if followers_count.map_or(false, |count| count <= 0) {
// We already removed all followers, maybe too many
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating
.enqueued_balancing
.push((upper_id.clone(), target_id.clone()));
drop(upper);
// Somebody else will balance this edge
break;
}

// target should be a inner node of upper
let count = upper
.followers_mut()
.unwrap()
.remove_all_positive_clonable_count(target_id);
if count == 0 {
break;
} else if extra_uppers == 0 {
let target = ctx.node(target_id);
target_aggregation_number = target.aggregation_number();
if root || target_aggregation_number < upper_aggregation_number {
// target should be a inner node of upper
if is_in_progress(ctx, upper_id) {
drop(target);
let mut upper = ctx.node(upper_id);
if is_in_progress(ctx, upper_id) {
let AggregationNode::Aggegating(aggregating) = &mut *upper else {
unreachable!();
};
aggregating.enqueued_balancing.push((
upper_id.clone(),
upper_aggregation_number,
target_id.clone(),
target_aggregation_number,
));
drop(upper);
// Somebody else will balance this edge
return (upper_aggregation_number, target_aggregation_number);
}
} else {
// add some extra uppers
let count = followers_count.unwrap_or(1) as usize;
extra_uppers += count;
uppers_count = Some(
add_upper_count(
ctx,
balance_queue,
target,
target_id,
upper_id,
count,
true,
)
.new_count,
);
}
}
}
let added = target.uppers_mut().add_clonable_count(upper_id, count);

// target removed as follower
let uppers = upper.uppers().iter().cloned().collect::<StackVec<_>>();
start_in_progress_all(ctx, &uppers);

let (add_change, followers) = if added {
// target added as upper
let add_change = get_aggregated_add_change(ctx, &target);
let followers = get_followers_or_children(ctx, &target);
start_in_progress_count(ctx, upper_id, followers.len() as u32);
(add_change, followers)
} else {
// we already have extra uppers, try to remove some followers to balance
let count = extra_followers + extra_uppers;
let upper = ctx.node(upper_id);
let RemovePositveFollowerCountResult {
removed_count,
remaining_count,
} = remove_positive_follower_count(ctx, balance_queue, upper, target_id, count);
decrease_numbers(removed_count, &mut extra_followers, &mut extra_uppers);
followers_count = Some(remaining_count);
(None, Default::default())
};

drop(target);

// target added as upper
let add_prepared =
add_change.and_then(|add_change| upper.apply_change(ctx, add_change));
let prepared = followers
.into_iter()
.filter_map(|child_id| {
upper.notify_new_follower(ctx, balance_queue, upper_id, &child_id, false)
})
.collect::<StackVec<_>>();

drop(upper);

add_prepared.apply(ctx);
for prepared in prepared {
prepared.apply(ctx, balance_queue);
}

// target removed as follower
for upper_id in uppers {
notify_lost_follower(
ctx,
balance_queue,
ctx.node(&upper_id),
&upper_id,
target_id,
);
}

break;
}
}
}
if extra_followers > 0 {
let upper = ctx.node(upper_id);
remove_follower_count(ctx, balance_queue, upper, target_id, extra_followers);
}
if extra_uppers > 0 {
let target = ctx.node(target_id);
remove_upper_count(ctx, balance_queue, target, upper_id, extra_uppers);
}
(upper_aggregation_number, target_aggregation_number)
}

fn decrease_numbers(amount: usize, a: &mut usize, b: &mut usize) {
if *a >= amount {
*a -= amount;
} else {
*b -= amount - *a;
*a = 0;
}
}
Loading

0 comments on commit 4d51e14

Please sign in to comment.