-
-
Notifications
You must be signed in to change notification settings - Fork 141
async pathfinding: fix path race condition #620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,60 +1,38 @@ | ||
| package org.dreeam.leaf.async.path; | ||
|
|
||
| import ca.spottedleaf.moonrise.common.util.TickThread; | ||
| import net.minecraft.core.BlockPos; | ||
| import net.minecraft.server.MinecraftServer; | ||
| import net.minecraft.world.entity.Entity; | ||
| import net.minecraft.world.level.pathfinder.Node; | ||
| import net.minecraft.world.level.pathfinder.Path; | ||
| import net.minecraft.world.phys.Vec3; | ||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.function.Consumer; | ||
| import java.util.function.Supplier; | ||
|
|
||
| /** | ||
| * I'll be using this to represent a path that not be processed yet! | ||
| */ | ||
| public class AsyncPath extends Path { | ||
| public final class AsyncPath extends Path { | ||
|
|
||
| /** | ||
| * Instead of three states, only one is actually required | ||
| * This will update when any thread is done with the path | ||
| */ | ||
| private volatile boolean ready = false; | ||
| private boolean ready = false; | ||
|
|
||
| /** | ||
| * Runnable waiting for this to be processed | ||
| * ConcurrentLinkedQueue is thread-safe, non-blocking and non-synchronized | ||
| */ | ||
| private final ConcurrentLinkedQueue<Runnable> postProcessing = new ConcurrentLinkedQueue<>(); | ||
| private final ArrayList<Consumer<Path>> postProcessing = new ArrayList<>(); | ||
|
|
||
| /** | ||
| * A list of positions that this path could path towards | ||
| */ | ||
| private final Set<BlockPos> positions; | ||
|
|
||
| /** | ||
| * The supplier of the real processed path | ||
| */ | ||
| private final Supplier<Path> pathSupplier; | ||
| private @Nullable Supplier<Path> pathSupplier; | ||
|
|
||
| /* | ||
| * Processed values | ||
| */ | ||
| /// Represents an asynchronous task. `null` indicates that is not ready. | ||
| private volatile @Nullable Path ret; | ||
|
|
||
| /** | ||
| * This is a reference to the nodes list in the parent `Path` object | ||
| */ | ||
| private final List<Node> nodes; | ||
| /** | ||
| * The block we're trying to path to | ||
| * <p> | ||
| * While processing, we have no idea where this is so consumers of `Path` should check that the path is processed before checking the target block | ||
| */ | ||
| private BlockPos target; | ||
| /** | ||
| * How far we are to the target | ||
| * <p> | ||
|
|
@@ -72,29 +50,37 @@ public class AsyncPath extends Path { | |
| public AsyncPath(List<Node> emptyNodeList, Set<BlockPos> positions, Supplier<Path> pathSupplier) { | ||
| super(emptyNodeList, null, false); | ||
|
|
||
| this.nodes = emptyNodeList; | ||
| this.positions = positions; | ||
| this.pathSupplier = pathSupplier; | ||
|
|
||
| AsyncPathProcessor.queue(this); | ||
| AsyncPathProcessor.queue(() -> { | ||
| if (this.ret == null) { | ||
| this.ret = pathSupplier.get(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isProcessed() { | ||
| return this.ready; | ||
| if (this.ready) { | ||
| return true; | ||
| } | ||
| Path ret = this.ret; | ||
| if (ret != null) { | ||
| complete(ret); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the future representing the processing state of this path | ||
| */ | ||
| public final void schedulePostProcessing(Runnable runnable) { | ||
| public void schedulePostProcessing(Consumer<Path> runnable) { | ||
| if (this.ready) { | ||
| runnable.run(); | ||
| runnable.accept(this); | ||
| } else { | ||
| this.postProcessing.offer(runnable); | ||
| if (this.ready) { | ||
hayanesuru marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| this.runAllPostProcessing(true); | ||
| } | ||
| this.postProcessing.add(runnable); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -104,48 +90,38 @@ public final void schedulePostProcessing(Runnable runnable) { | |
| * @param positions - the positions to compare against | ||
| * @return true if we are processing the same positions | ||
| */ | ||
| public final boolean hasSameProcessingPositions(final Set<BlockPos> positions) { | ||
| if (this.positions.size() != positions.size()) { | ||
| return false; | ||
| } | ||
|
|
||
| // For single position (common case), do direct comparison | ||
| if (positions.size() == 1) { // Both have the same size at this point | ||
| return this.positions.iterator().next().equals(positions.iterator().next()); | ||
| } | ||
|
|
||
| return this.positions.containsAll(positions); | ||
| public boolean hasSameProcessingPositions(final Set<BlockPos> positions) { | ||
| return this.positions.equals(positions); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Original is |
||
| } | ||
|
|
||
| /** | ||
| * Starts processing this path | ||
| * Since this is no longer a synchronized function, checkProcessed is no longer required | ||
| */ | ||
| public final void process() { | ||
| if (this.ready) return; | ||
|
|
||
| synchronized (this) { | ||
| if (this.ready) return; // In the worst case, the main thread only waits until any async thread is done and returns immediately | ||
| final Path bestPath = this.pathSupplier.get(); | ||
| this.nodes.addAll(bestPath.nodes); // We mutate this list to reuse the logic in Path | ||
| this.target = bestPath.getTarget(); | ||
| this.distToTarget = bestPath.getDistToTarget(); | ||
| this.canReach = bestPath.canReach(); | ||
| this.ready = true; | ||
| private void process() { | ||
| if (this.ready) { | ||
| return; | ||
| } | ||
|
|
||
| this.runAllPostProcessing(TickThread.isTickThread()); | ||
| } | ||
|
|
||
| private void runAllPostProcessing(boolean isTickThread) { | ||
| Runnable runnable; | ||
| while ((runnable = this.postProcessing.poll()) != null) { | ||
| if (isTickThread) { | ||
| runnable.run(); | ||
| } else { | ||
| MinecraftServer.getServer().scheduleOnMain(runnable); | ||
| } | ||
| final Path ret = this.ret; | ||
| final Path bestPath = ret != null ? ret : (this.ret = Objects.requireNonNull(pathSupplier).get()); | ||
| complete(bestPath); | ||
| } | ||
|
|
||
| /// not [#ready] | ||
| /// | ||
| /// @see #isDone | ||
| /// @see #process | ||
| private void complete(Path bestPath) { | ||
| this.nodes = bestPath.nodes; | ||
| this.target = bestPath.getTarget(); | ||
| this.distToTarget = bestPath.getDistToTarget(); | ||
| this.canReach = bestPath.canReach(); | ||
| this.pathSupplier = null; | ||
| this.ready = true; | ||
| for (Consumer<Path> consumer : this.postProcessing) { | ||
| consumer.accept(this); | ||
| } | ||
| this.postProcessing.clear(); | ||
| } | ||
|
|
||
| /* | ||
|
|
@@ -173,9 +149,15 @@ public boolean canReach() { | |
| /* | ||
| * Overrides to ensure we're processed first | ||
| */ | ||
|
|
||
| @Override | ||
| public boolean isDone() { | ||
| boolean ready = this.ready; | ||
| if (!ready) { | ||
| Path ret = this.ret; | ||
| if (ret != null) { | ||
| complete(ret); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this path is only completed upon I assume |
||
| } | ||
| } | ||
| return this.ready && super.isDone(); | ||
| } | ||
|
|
||
|
|
@@ -268,5 +250,4 @@ public Node getNextNode() { | |
| this.process(); | ||
| return super.getPreviousNode(); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,8 +46,8 @@ public static void init() { | |
| } | ||
| } | ||
|
|
||
| protected static CompletableFuture<Void> queue(AsyncPath path) { | ||
| return CompletableFuture.runAsync(path::process, PATH_PROCESSING_EXECUTOR) | ||
| protected static CompletableFuture<Void> queue(Runnable path) { | ||
| return CompletableFuture.runAsync(path, PATH_PROCESSING_EXECUTOR) | ||
| .orTimeout(60L, TimeUnit.SECONDS) | ||
| .exceptionally(throwable -> { | ||
| if (throwable instanceof TimeoutException e) { | ||
|
|
@@ -67,7 +67,7 @@ protected static CompletableFuture<Void> queue(AsyncPath path) { | |
| */ | ||
| public static void awaitProcessing(@Nullable Path path, Consumer<@Nullable Path> afterProcessing) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This already existed in the original, but the name |
||
| if (path != null && !path.isProcessed() && path instanceof AsyncPath asyncPath) { | ||
| asyncPath.schedulePostProcessing(() -> afterProcessing.accept(path)); // Reduce double lambda allocation | ||
| asyncPath.schedulePostProcessing(afterProcessing); // Reduce double lambda allocation | ||
| } else { | ||
| afterProcessing.accept(path); | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good change!
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming
schedulePostProcessingis only called from the main thread, correct? Otherwise the.addwill cause issues when called from multiple threads