Skip to content

Commit c0e5202

Browse files
author
Petr Chalupa
committed
Merge pull request #428 from pitr-ch/futures
Fix Future#flat when failures happen
2 parents 262bd58 + 4f08f90 commit c0e5202

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

lib/concurrent/edge/future.rb

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,17 +1141,24 @@ def blocked_by
11411141

11421142
def process_on_done(future)
11431143
countdown = super(future)
1144-
value = future.value!
11451144
if countdown.nonzero?
1145+
internal_state = future.internal_state
1146+
1147+
unless internal_state.success?
1148+
complete_with internal_state
1149+
return countdown
1150+
end
1151+
1152+
value = internal_state.value
11461153
case value
11471154
when Future
11481155
@BlockedBy.push value
11491156
value.add_callback :pr_callback_notify_blocked, self
11501157
@Countdown.value
11511158
when Event
1152-
raise TypeError, 'cannot flatten to Event'
1159+
evaluate_to(lambda { raise TypeError, 'cannot flatten to Event' })
11531160
else
1154-
raise TypeError, "returned value #{value.inspect} is not a Future"
1161+
evaluate_to(lambda { raise TypeError, "returned value #{value.inspect} is not a Future" })
11551162
end
11561163
end
11571164
countdown
@@ -1174,6 +1181,10 @@ def clear_blocked_by!
11741181
@BlockedBy.clear
11751182
nil
11761183
end
1184+
1185+
def completable?(countdown)
1186+
!@Future.internal_state.completed? && super(countdown)
1187+
end
11771188
end
11781189

11791190
# @!visibility private

spec/concurrent/edge/future_spec.rb

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@
194194
describe 'Future' do
195195
it 'has sync and async callbacks' do
196196
callbacks_tester = ->(future) do
197-
queue = Queue.new
197+
queue = Queue.new
198198
future.on_completion(:io) { |result| queue.push("async on_completion #{ result.inspect }") }
199199
future.on_completion! { |result| queue.push("sync on_completion #{ result.inspect }") }
200200
future.on_success(:io) { |value| queue.push("async on_success #{ value.inspect }") }
@@ -309,9 +309,30 @@
309309
expect(Concurrent.zip(branch1, branch2).value!).to eq [2, 3]
310310
end
311311

312-
it 'has flat map' do
313-
f = Concurrent.future { Concurrent.future { 1 } }.flat.then(&:succ)
314-
expect(f.value!).to eq 2
312+
describe '#flat' do
313+
it 'returns value of inner future' do
314+
f = Concurrent.future { Concurrent.future { 1 } }.flat.then(&:succ)
315+
expect(f.value!).to eq 2
316+
end
317+
318+
it 'propagates failure of inner future' do
319+
err = StandardError.new('boo')
320+
f = Concurrent.future { Concurrent.failed_future(err) }.flat
321+
expect(f.reason).to eq err
322+
end
323+
324+
it 'it propagates failure of the future which was suppose to provide inner future' do
325+
f = Concurrent.future { raise 'boo' }.flat
326+
expect(f.reason.message).to eq 'boo'
327+
end
328+
329+
it 'fails if inner value is not a future' do
330+
f = Concurrent.future { 'boo' }.flat
331+
expect(f.reason).to be_an_instance_of TypeError
332+
333+
f = Concurrent.future { Concurrent.completed_event }.flat
334+
expect(f.reason).to be_an_instance_of TypeError
335+
end
315336
end
316337
end
317338

0 commit comments

Comments
 (0)