Skip to content

Barrier waits in order of completion/failure. #384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions lib/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

require_relative "list"
require_relative "task"
require_relative "queue"

module Async
# A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}.
Expand All @@ -16,6 +17,7 @@ class Barrier
# @public Since *Async v1*.
def initialize(parent: nil)
@tasks = List.new
@finished = Queue.new

@parent = parent
end
Expand All @@ -41,11 +43,15 @@ def size
# Execute a child task and add it to the barrier.
# @asynchronous Executes the given block concurrently.
def async(*arguments, parent: (@parent or Task.current), **options, &block)
task = parent.async(*arguments, **options, &block)
waiting = nil

@tasks.append(TaskNode.new(task))

return task
parent.async(*arguments, **options) do |task, *arguments|
waiting = TaskNode.new(task)
@tasks.append(waiting)
block.call(task, *arguments)
ensure
@finished.signal(waiting)
end
end

# Whether there are any tasks being held by the barrier.
Expand All @@ -55,14 +61,27 @@ def empty?
end

# Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
#
# @yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results.
#
# @asynchronous Will wait for tasks to finish executing.
def wait
@tasks.each do |waiting|
while [email protected]?
# Wait for a task to finish (we get the task node):
return unless waiting = @finished.wait

# Remove the task as it is now finishing:
@tasks.remove?(waiting)

# Get the task:
task = waiting.task
begin

# If a block is given, the user can implement their own behaviour:
if block_given?
yield task
else
# Wait for it to either complete or raise an error:
task.wait
ensure
@tasks.remove?(waiting) unless task.alive?
end
end
end
Expand All @@ -73,6 +92,8 @@ def stop
@tasks.each do |waiting|
waiting.task.stop
end

@finished.close
end
end
end
2 changes: 1 addition & 1 deletion lib/async/list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def remove(node)
return removed(node)
end

# @returns [Boolean] Returns true if the list is empty.
# @returns [Boolean] True if the list is empty.
def empty?
@size == 0
end
Expand Down
6 changes: 4 additions & 2 deletions lib/async/notification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ module Async
# @public Since *Async v1*.
class Notification < Condition
# Signal to a given task that it should resume operations.
#
# @returns [Boolean] if a task was signalled.
def signal(value = nil, task: Task.current)
return if @waiting.empty?
return false if @waiting.empty?

Fiber.scheduler.push Signal.new(self.exchange, value)

return nil
return true
end

Signal = Struct.new(:waiting, :value) do
Expand Down
13 changes: 13 additions & 0 deletions lib/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,19 @@ class Queue
# @parameter available [Notification] The notification to use for signaling when items are available.
def initialize(parent: nil, available: Notification.new)
@items = []
@closed = false
@parent = parent
@available = available
end

def close
@closed = true

while @available.waiting?
@available.signal(nil)
end
end

# @attribute [Array] The items in the queue.
attr :items

Expand Down Expand Up @@ -59,6 +68,10 @@ def enqueue(*items)
# Remove and return the next item from the queue.
def dequeue
while @items.empty?
if @closed
return nil
end

@available.wait
end

Expand Down
1 change: 1 addition & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Ruby v3.1 support is dropped.
- `Async::Wrapper` which was previously deprecated, is now removed.
- `Async::Barrier` now waits in order of completion rather than order of creation. This means that if you create a barrier with 3 tasks, and one of them completes (or fails) before the others, it will be the first to be yielded to the barrier.

## v2.23.0

Expand Down
17 changes: 12 additions & 5 deletions test/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,28 @@

# It's possible for Barrier#wait to be interrupted with an unexpected exception, and this should not cause the barrier to incorrectly remove that task from the wait list.
it "waits for tasks with timeouts" do
repeats = 5
count = 0

begin
reactor.with_timeout(5/100.0/2) do
5.times do |i|
reactor.with_timeout(repeats/100.0/2) do
repeats.times do |i|
barrier.async do |task|
sleep(i/100.0)
end
end

expect(barrier.tasks.size).to be == 5
barrier.wait
expect(barrier.tasks.size).to be == repeats

barrier.wait do |task|
task.wait
count += 1
end
end
rescue Async::TimeoutError
# Expected.
ensure
expect(barrier.tasks.size).to be == 2
expect(barrier.tasks.size).to be == (repeats - count)
barrier.stop
end
end
Expand Down
Loading