Skip to content

Commit ab7bd85

Browse files
committed
Send events to dispatchers even if there are no consumers
If you are using the demand dispatcher (the default) and a consumer leaves, any demand from this consumer is stored as pending. The idea is, if a consumer asks for 10 events, and then leaves, the producer was already notified of those 10 events. If another consumer joins, then those previously asked 10 events should be sent to next consumer. However, if a consumer left and there were no additional consumers, any event sent by the producer would be buffered immediately, and the dispatcher would never know its pending demand was satisfied. This would also happen in cases you had 2 consumers, both asked for 5 events, and then one left. The producer could send 8 events, and the dispatcher would never take into account those 3 additional events were satisfying a pending demand. This patch addresses both issues. Closes #311. Closes #312.
1 parent cbeca52 commit ab7bd85

File tree

5 files changed

+47
-11
lines changed

5 files changed

+47
-11
lines changed

lib/gen_stage.ex

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2415,11 +2415,6 @@ defmodule GenStage do
24152415
stage
24162416
end
24172417

2418-
defp dispatch_events(events, _length, %{consumers: consumers} = stage)
2419-
when map_size(consumers) == 0 do
2420-
buffer_events(events, stage)
2421-
end
2422-
24232418
defp dispatch_events(events, length, stage) do
24242419
%{dispatcher_mod: dispatcher_mod, dispatcher_state: dispatcher_state} = stage
24252420
{:ok, events, dispatcher_state} = dispatcher_mod.dispatch(events, length, dispatcher_state)

lib/gen_stage/dispatchers/demand_dispatcher.ex

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,20 @@ defmodule GenStage.DemandDispatcher do
7878
end
7979

8080
def dispatch(events, length, {demands, pending, max, false}) do
81-
{events, demands} = dispatch_demand(events, length, demands)
82-
{:ok, events, {demands, pending, max, false}}
81+
{events, to_buffer, demands} = dispatch_demand(events, length, demands)
82+
{:ok, events, {demands, max(pending - to_buffer, 0), max, false}}
8383
end
8484

85-
defp dispatch_demand([], _length, demands) do
86-
{[], demands}
85+
defp dispatch_demand([], length, demands) do
86+
{[], length, demands}
8787
end
8888

89-
defp dispatch_demand(events, _length, [{0, _, _} | _] = demands) do
90-
{events, demands}
89+
defp dispatch_demand(events, length, []) do
90+
{events, length, []}
91+
end
92+
93+
defp dispatch_demand(events, length, [{0, _, _} | _] = demands) do
94+
{events, length, demands}
9195
end
9296

9397
defp dispatch_demand(events, length, [{counter, pid, ref} | demands]) do

test/gen_stage/broadcast_dispatcher_test.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ defmodule GenStage.BroadcastDispatcherTest do
3535

3636
{:ok, 0, disp} = D.cancel({pid, ref}, disp)
3737
assert disp == {[], 0, MapSet.new()}
38+
39+
# Now attempt to dispatch with no consumers
40+
{:ok, [1, 2, 3], disp} = D.dispatch([1, 2, 3], 3, disp)
41+
assert disp == {[], 0, MapSet.new()}
3842
end
3943

4044
test "multiple subscriptions with early demand" do

test/gen_stage/demand_dispatcher_test.exs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,35 @@ defmodule GenStage.DemandDispatcherTest do
4949

5050
{:ok, 0, disp} = D.cancel({pid, ref}, disp)
5151
assert disp == {[], 10, 10, @default_shuffle_flag}
52+
53+
# Now attempt to dispatch with no consumers
54+
{:ok, [:a, :b, :c], disp} = D.dispatch([:a, :b, :c], 3, disp)
55+
assert disp == {[], 7, 10, @default_shuffle_flag}
56+
57+
{:ok, _, disp} = D.dispatch(Enum.to_list(1..10), 10, disp)
58+
assert disp == {[], 0, 10, @default_shuffle_flag}
59+
end
60+
61+
test "subscribes twice, asks twice and cancel one" do
62+
pid = self()
63+
ref1 = make_ref()
64+
ref2 = make_ref()
65+
disp = dispatcher([])
66+
67+
# Subscribe twice and ask twice
68+
{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
69+
{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
70+
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)
71+
{:ok, 3, disp} = D.ask(3, {pid, ref2}, disp)
72+
assert disp == {[{3, pid, ref1}, {3, pid, ref2}], 0, 3, @default_shuffle_flag}
73+
74+
# Cancel and leave some demand
75+
{:ok, 0, disp} = D.cancel({pid, ref1}, disp)
76+
assert disp == {[{3, pid, ref2}], 3, 3, @default_shuffle_flag}
77+
78+
# Send events for both subscriptions
79+
{:ok, [:d, :e], disp} = D.dispatch([:a, :b, :c, :d, :e], 5, disp)
80+
assert disp == {[{0, pid, ref2}], 1, 3, @default_shuffle_flag}
5281
end
5382

5483
test "subscribes, asks and dispatches" do

test/gen_stage/partition_dispatcher_test.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ defmodule GenStage.PartitionDispatcherTest do
3030
assert {10, 5} = waiting_and_pending(disp)
3131
{:ok, 0, disp} = D.cancel({pid, ref}, disp)
3232
assert {10, 10} = waiting_and_pending(disp)
33+
34+
# Now attempt to dispatch with no consumers
35+
{:ok, [], disp} = D.dispatch([0, 1, 2, 3], 4, disp)
36+
assert {6, 10} = waiting_and_pending(disp)
3337
end
3438

3539
test "subscribes, asks and dispatches" do

0 commit comments

Comments
 (0)