Skip to content

New concurrent AggregaringAttestationPool (V2) #9297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

tbenr
Copy link
Contributor

@tbenr tbenr commented Mar 28, 2025

new AggregatingAttestationPoolV2 and MatchingDataAttestationGroupV2 classes implementing the new pool

--Xaggregating-attestation-pool-v2-enabled CLI switches it on (default disabled)

intentionally left code duplications, so that the implementations can drift over time independently.

unit tests run on both implementations

fixes #9291

Documentation

  • I thought about documentation and added the doc-change-required label to this PR if updates are required.

Changelog

  • I thought about adding a changelog entry, and added one if I deemed necessary.

@tbenr tbenr marked this pull request as draft March 28, 2025 12:31
@tbenr tbenr force-pushed the concurrent-aggregatingAttestationPool branch from 2f12a02 to 3023f99 Compare March 28, 2025 13:27
@tbenr tbenr force-pushed the concurrent-aggregatingAttestationPool branch from bd7a49e to 1af17ee Compare March 28, 2025 18:53
@tbenr tbenr force-pushed the concurrent-aggregatingAttestationPool branch from 1af17ee to 838a7b0 Compare March 28, 2025 19:32
improved removeAttestationsPriorToSlot
@tbenr tbenr changed the title Pool v2 New concurrent AggregaringAttestationPool (V2) Mar 29, 2025
@tbenr tbenr marked this pull request as ready for review March 29, 2025 14:26
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.statetransition.attestation;
Copy link
Contributor

@rolfyone rolfyone Apr 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should just use the same class name from different packages?
not set on this but to demonstrate
statetransition.attestation.v1
statetransition.attestation.v2
may be cache.v1 and cache.v2 or something... just the concept...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea.. this way we can see that essentially V1 isn't changed.

@rolfyone rolfyone mentioned this pull request Apr 6, 2025
2 tasks
// attestation is not from the current or previous epoch
// this is really an edge case because the current or previous epoch is at least 31 slots
// and the attestation is only valid for 64 slots, so it may be epoch-2 but not beyond.
final UInt64 attestationEpochStartSlot = miscHelpers.computeStartSlotAtEpoch(attestationEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've been thinking about this a little.... i think if we only ever use the start slot of the current epoch or start slot of previous epoch, then that's probably safest (because of limited cache largely)
we've got 2 options, i think simplest is

  • like above, if attestation is current or previous epoch use bestState
  • otherwise use first slot of previous epoch (which generally is cached still)

if we can easily log cache misses here it may be a good thing...

i'd probably avoid doing the start slot of the attestation epoch because that may be 2 epochs behind (per comment) and almost certainly require a regeneration from ... probably the justified state...

Comment on lines +233 to +256
// Prune based on maximum size if needed
int currentSize = getSize();
if (currentSize > maximumAttestationCount) {
// Keep removing oldest slots until size is acceptable or only one slot remains
while (dataHashBySlot.size() > 1 && currentSize > maximumAttestationCount) {
LOG.trace(
"V2 Attestation cache at {} exceeds {}. Pruning...",
currentSize,
maximumAttestationCount);
final UInt64 oldestSlot = dataHashBySlot.firstKey();
// Remove slot immediately following the oldest to ensure we always keep at least one slot
removeAttestationsPriorToSlot(oldestSlot.plus(1));
final int newSize = getSize();
// Break if removal failed to change size or get oldest key (edge case for concurrent
// modification)
if (newSize == currentSize || oldestSlot.equals(dataHashBySlot.firstKey())) {
LOG.warn(
"V2 Failed to prune oldest slot {}, possibly due to concurrent access or no removable attestations. Skipping further pruning this cycle.",
oldestSlot);
break;
}
currentSize = newSize;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do think we need a set of lock logic around the clean... i think i had a cleanup function in my draft that used a lock here...

The reasoning is really the degenerate scenario where 2 onSlot run because of a long prune and hilarity ensues.

I would suggest breaking this cleanup logic out, and having a lock where if its already running then just return

final AtomicBoolean isCleanupRunning = new AtomicBoolean(false);
...
  void cleanupCache(final Optional<UInt64> maybeSlot) {
    // one cleanup at a time can run
    if (!isCleanupRunning.compareAndSet(false, true)) {
      return;
    }

    try {
      if (maybeSlot.isEmpty()) {
        while (dataHashBySlot.size() > 1 && size.get() > maximumAttestationCount) {
          LOG.trace("Attestation cache at {} exceeds {}, ", size.get(), maximumAttestationCount);
          removeAttestationsPriorToSlot(dataHashBySlot.firstKey().plus(1));
        }
      } else {
        removeAttestationsPriorToSlot(maybeSlot.get());
      }
    } finally {
      isCleanupRunning.set(false);
    }
  }

something like this concept...

i'd prefer to have a cache thats too large for a period than a broken cache or a bunch of onSlots taking a long time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are thinking about that because you think that two onSlot could run concurrently?
I actually don't think it can happen because IIRC event channel always queues pending events and execute them in sequence. I'll double check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh so removeAttestationsPriorToSlot is actually a dangerous function, it cant run from multiple sources concurrently...
In v1 it ran in add and in onSlot, and now it just runs in onSlot... that's good, as long as onSlot is not able to run more than once at a time.
The safer thing is to have a cacheCleanup thats behind a lock, and then it just skips if its already running. It still probably needs a 'dont call this anywhere else because' on that removeAttestationsPriorToSlot
This could be done in the onSlot, just having the same lock concept and exiting if the cache cleanup is already running (so isCleanupRunning could easily be inside onSlot rather than its own function and would be equivalent)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i think more correctly that concept of the looping and reducing is the fun bit, 2 of them running at the same time we want to avoid as we'll flush out more than we want in the worst case.

@tbenr
Copy link
Contributor Author

tbenr commented May 12, 2025

Closing in favour of a new wave of PRs

@tbenr tbenr closed this May 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Aggregation production is slow (for BNs serving a high number of vals)
2 participants