From c1546bd16bb19a76e132ffec5571e5741c09203d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 24 Apr 2025 08:35:11 +0900 Subject: [PATCH] Barrier waits in order of completion/failure. --- lib/async/barrier.rb | 37 +++++++++++++++++++++++++++++-------- lib/async/list.rb | 2 +- lib/async/notification.rb | 6 ++++-- lib/async/queue.rb | 13 +++++++++++++ releases.md | 1 + test/async/barrier.rb | 17 ++++++++++++----- 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/lib/async/barrier.rb b/lib/async/barrier.rb index 274a2c75..6b9b795d 100644 --- a/lib/async/barrier.rb +++ b/lib/async/barrier.rb @@ -5,6 +5,7 @@ require_relative "list" require_relative "task" +require_relative "queue" module Async # A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}. @@ -16,6 +17,7 @@ class Barrier # @public Since *Async v1*. def initialize(parent: nil) @tasks = List.new + @finished = Queue.new @parent = parent end @@ -41,11 +43,15 @@ def size # Execute a child task and add it to the barrier. # @asynchronous Executes the given block concurrently. def async(*arguments, parent: (@parent or Task.current), **options, &block) - task = parent.async(*arguments, **options, &block) + waiting = nil - @tasks.append(TaskNode.new(task)) - - return task + parent.async(*arguments, **options) do |task, *arguments| + waiting = TaskNode.new(task) + @tasks.append(waiting) + block.call(task, *arguments) + ensure + @finished.signal(waiting) + end end # Whether there are any tasks being held by the barrier. @@ -55,14 +61,27 @@ def empty? end # Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier. + # + # @yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results. + # # @asynchronous Will wait for tasks to finish executing. def wait - @tasks.each do |waiting| + while !@tasks.empty? + # Wait for a task to finish (we get the task node): + return unless waiting = @finished.wait + + # Remove the task as it is now finishing: + @tasks.remove?(waiting) + + # Get the task: task = waiting.task - begin + + # If a block is given, the user can implement their own behaviour: + if block_given? + yield task + else + # Wait for it to either complete or raise an error: task.wait - ensure - @tasks.remove?(waiting) unless task.alive? end end end @@ -73,6 +92,8 @@ def stop @tasks.each do |waiting| waiting.task.stop end + + @finished.close end end end diff --git a/lib/async/list.rb b/lib/async/list.rb index 11e66164..1209ec3a 100644 --- a/lib/async/list.rb +++ b/lib/async/list.rb @@ -134,7 +134,7 @@ def remove(node) return removed(node) end - # @returns [Boolean] Returns true if the list is empty. + # @returns [Boolean] True if the list is empty. def empty? @size == 0 end diff --git a/lib/async/notification.rb b/lib/async/notification.rb index f7ac780f..9665aec6 100644 --- a/lib/async/notification.rb +++ b/lib/async/notification.rb @@ -10,12 +10,14 @@ module Async # @public Since *Async v1*. class Notification < Condition # Signal to a given task that it should resume operations. + # + # @returns [Boolean] if a task was signalled. def signal(value = nil, task: Task.current) - return if @waiting.empty? + return false if @waiting.empty? Fiber.scheduler.push Signal.new(self.exchange, value) - return nil + return true end Signal = Struct.new(:waiting, :value) do diff --git a/lib/async/queue.rb b/lib/async/queue.rb index 429b5868..1307cc71 100644 --- a/lib/async/queue.rb +++ b/lib/async/queue.rb @@ -20,10 +20,19 @@ class Queue # @parameter available [Notification] The notification to use for signaling when items are available. def initialize(parent: nil, available: Notification.new) @items = [] + @closed = false @parent = parent @available = available end + def close + @closed = true + + while @available.waiting? + @available.signal(nil) + end + end + # @attribute [Array] The items in the queue. attr :items @@ -59,6 +68,10 @@ def enqueue(*items) # Remove and return the next item from the queue. def dequeue while @items.empty? + if @closed + return nil + end + @available.wait end diff --git a/releases.md b/releases.md index 965d6ded..8908516f 100644 --- a/releases.md +++ b/releases.md @@ -4,6 +4,7 @@ - Ruby v3.1 support is dropped. - `Async::Wrapper` which was previously deprecated, is now removed. + - `Async::Barrier` now waits in order of completion rather than order of creation. This means that if you create a barrier with 3 tasks, and one of them completes (or fails) before the others, it will be the first to be yielded to the barrier. ## v2.23.0 diff --git a/test/async/barrier.rb b/test/async/barrier.rb index 3ab96886..068bb19d 100644 --- a/test/async/barrier.rb +++ b/test/async/barrier.rb @@ -85,21 +85,28 @@ # It's possible for Barrier#wait to be interrupted with an unexpected exception, and this should not cause the barrier to incorrectly remove that task from the wait list. it "waits for tasks with timeouts" do + repeats = 5 + count = 0 + begin - reactor.with_timeout(5/100.0/2) do - 5.times do |i| + reactor.with_timeout(repeats/100.0/2) do + repeats.times do |i| barrier.async do |task| sleep(i/100.0) end end - expect(barrier.tasks.size).to be == 5 - barrier.wait + expect(barrier.tasks.size).to be == repeats + + barrier.wait do |task| + task.wait + count += 1 + end end rescue Async::TimeoutError # Expected. ensure - expect(barrier.tasks.size).to be == 2 + expect(barrier.tasks.size).to be == (repeats - count) barrier.stop end end