Skip to content

Commit f660527

Browse files
committed
Barrier waits in order of completion/failure.
1 parent a5b59e1 commit f660527

File tree

4 files changed

+47
-11
lines changed

4 files changed

+47
-11
lines changed

lib/async/barrier.rb

+29-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
require_relative "list"
77
require_relative "task"
8+
require_relative "queue"
89

910
module Async
1011
# A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}.
@@ -16,6 +17,7 @@ class Barrier
1617
# @public Since *Async v1*.
1718
def initialize(parent: nil)
1819
@tasks = List.new
20+
@finished = Queue.new
1921

2022
@parent = parent
2123
end
@@ -41,11 +43,15 @@ def size
4143
# Execute a child task and add it to the barrier.
4244
# @asynchronous Executes the given block concurrently.
4345
def async(*arguments, parent: (@parent or Task.current), **options, &block)
44-
task = parent.async(*arguments, **options, &block)
46+
waiting = nil
4547

46-
@tasks.append(TaskNode.new(task))
47-
48-
return task
48+
parent.async(*arguments, **options) do |task, *arguments|
49+
waiting = TaskNode.new(task)
50+
@tasks.append(waiting)
51+
block.call(task, *arguments)
52+
ensure
53+
@finished.signal(waiting)
54+
end
4955
end
5056

5157
# Whether there are any tasks being held by the barrier.
@@ -55,14 +61,27 @@ def empty?
5561
end
5662

5763
# Wait for all tasks to complete by invoking {Task#wait} on each waiting task, which may raise an error. As long as the task has completed, it will be removed from the barrier.
64+
#
65+
# @yields {|task| ...} If a block is given, the unwaited task is yielded. You must invoke {Task#wait} yourself. In addition, you may `break` if you have captured enough results.
66+
#
5867
# @asynchronous Will wait for tasks to finish executing.
5968
def wait
60-
@tasks.each do |waiting|
69+
while !@tasks.empty?
70+
# Wait for a task to finish (we get the task node):
71+
return unless waiting = @finished.wait
72+
73+
# Remove the task as it is now finishing:
74+
@tasks.remove?(waiting)
75+
76+
# Get the task:
6177
task = waiting.task
62-
begin
78+
79+
# If a block is given, the user can implement their own behaviour:
80+
if block_given?
81+
yield task
82+
else
83+
# Wait for it to either complete or raise an error:
6384
task.wait
64-
ensure
65-
@tasks.remove?(waiting) unless task.alive?
6685
end
6786
end
6887
end
@@ -73,6 +92,8 @@ def stop
7392
@tasks.each do |waiting|
7493
waiting.task.stop
7594
end
95+
96+
@finished.close
7697
end
7798
end
7899
end

lib/async/list.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def remove(node)
134134
return removed(node)
135135
end
136136

137-
# @returns [Boolean] Returns true if the list is empty.
137+
# @returns [Boolean] True if the list is empty.
138138
def empty?
139139
@size == 0
140140
end

lib/async/notification.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ module Async
1010
# @public Since *Async v1*.
1111
class Notification < Condition
1212
# Signal to a given task that it should resume operations.
13+
#
14+
# @returns [Boolean] if a task was signalled.
1315
def signal(value = nil, task: Task.current)
14-
return if @waiting.empty?
16+
return false if @waiting.empty?
1517

1618
Fiber.scheduler.push Signal.new(self.exchange, value)
1719

18-
return nil
20+
return true
1921
end
2022

2123
Signal = Struct.new(:waiting, :value) do

lib/async/queue.rb

+13
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@ class Queue
2020
# @parameter available [Notification] The notification to use for signaling when items are available.
2121
def initialize(parent: nil, available: Notification.new)
2222
@items = []
23+
@closed = false
2324
@parent = parent
2425
@available = available
2526
end
2627

28+
def close
29+
@closed = true
30+
31+
while @available.waiting?
32+
@available.signal(nil)
33+
end
34+
end
35+
2736
# @attribute [Array] The items in the queue.
2837
attr :items
2938

@@ -59,6 +68,10 @@ def enqueue(*items)
5968
# Remove and return the next item from the queue.
6069
def dequeue
6170
while @items.empty?
71+
if @closed
72+
return nil
73+
end
74+
6275
@available.wait
6376
end
6477

0 commit comments

Comments
 (0)