diff --git a/lib/gen_stage/fair_dispatcher.ex b/lib/gen_stage/fair_dispatcher.ex new file mode 100644 index 0000000..d703ab0 --- /dev/null +++ b/lib/gen_stage/fair_dispatcher.ex @@ -0,0 +1,111 @@ +alias Experimental.GenStage + +defmodule GenStage.FairDispatcher do + @moduledoc """ + A dispatcher that sends batches to consumers using round robin. + + ## Options + + The partition dispatcher accepts the following options + on initialization: + + * `:batch` - the maximum batch size before moving on to the next consumer + when dispatching. For example `batch: 5` will send at most 5 events to a + consumer at a time when there are other consumers. + """ + + @behaviour GenStage.Dispatcher + + @doc false + def init(opts) do + {:ok, {:queue.new(), %{}, 0, 0, batch_size(opts)}} + end + + @doc false + def notify(msg, {_, demands, _, _, _} = state) do + Enum.each(demands, fn {ref, {pid, _}} -> + Process.send(pid, {:"$gen_consumer", {self(), ref}, {:notification, msg}}, [:noconnect]) + end) + {:ok, state} + end + + @doc false + def subscribe(_, {pid, ref}, {robin, demands, held, pending, batch}) do + robin = :queue.in(ref, robin) + demands = Map.put(demands, ref, {pid, 0}) + {:ok, 0, {robin, demands, held, pending, batch}} + end + + @doc false + def cancel({_, ref}, {robin, demands, held, pending, batch}) do + robin = :queue.filter(&(&1 != ref), robin) + {{_, current}, demands} = Map.pop(demands, ref) + {:ok, 0, {robin, demands, held-current, current+pending, batch}} + end + + @doc false + def ask(counter, {_, ref}, {robin, demands, held, pending, batch}) do + update = fn({pid, current}) -> {pid, current+counter} end + demands = Map.update!(demands, ref, update) + + already_sent = min(pending, counter) + + new = counter-already_sent + held = held+counter + pending = pending-already_sent + + {:ok, new, {robin, demands, held, pending, batch}} + end + + @doc false + def dispatch(events, {robin, demands, held, pending, batch}) do + {events, robin, demands, held} = + dispatch(events, robin, demands, held, batch) + {:ok, events, {robin, demands, held, pending, batch}} + end + + defp batch_size(opts) do + case Keyword.get(opts, :batch, 100) do + batch when is_integer(batch) and batch > 0 -> + batch + batch -> + msg = "expected :batch to be a positive integer, got #{inspect batch}" + raise ArgumentError, msg + end + end + + defp dispatch(events, robin, demands, 0, _batch) do + {events, robin, demands, 0} + end + + defp dispatch([], robin, demands, held, _batch) do + {[], robin, demands, held} + end + + defp dispatch(events, robin, demands, held, batch) do + {{:value, ref}, robin} = :queue.out(robin) + dispatch_to(ref, events, :queue.in(ref, robin), demands, held, batch) + end + + defp dispatch_to(ref, events, robin, demands, held, batch) do + case demands do + %{^ref => {_pid, 0}} -> + dispatch(events, robin, demands, held, batch) + %{^ref => {pid, counter}} -> + {deliver_now, deliver_later} = split_events(events, counter, held, batch) + Process.send(pid, {:"$gen_consumer", {self(), ref}, deliver_now}, [:noconnect]) + len = length(deliver_now) + counter = counter - len + held = held - len + demands = %{demands | ref => {pid, counter}} + dispatch(deliver_later, robin, demands, held, batch) + end + end + + defp split_events(events, held, held, _batch) do + Enum.split(events, held) + end + defp split_events(events, counter, _held, batch) do + Enum.split(events, min(counter, batch)) + end +end diff --git a/test/gen_stage/fair_dispatcher_test.exs b/test/gen_stage/fair_dispatcher_test.exs new file mode 100644 index 0000000..b47448c --- /dev/null +++ b/test/gen_stage/fair_dispatcher_test.exs @@ -0,0 +1,231 @@ +alias Experimental.GenStage + +defmodule GenStage.FairDispatcherTest do + use ExUnit.Case, async: true + + alias GenStage.FairDispatcher, as: D + + defp dispatcher(opts) do + {:ok, {_, %{}, 0, 0, _} = state} = D.init(opts) + state + end + + test "subscribes and cancels" do + pid = self() + ref = make_ref() + disp = dispatcher([]) + + {:ok, 0, disp} = D.subscribe([], {pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 0}}, 0, 0, 100} + + {:ok, 0, disp} = D.cancel({pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [] + assert rest == {%{}, 0, 0, 100} + end + + test "subscribes, asks and cancels" do + pid = self() + ref = make_ref() + disp = dispatcher([]) + + # Subscribe, ask and cancel and leave some demand + {:ok, 0, disp} = D.subscribe([], {pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 0}}, 0, 0, 100} + + {:ok, 10, disp} = D.ask(10, {pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 10}}, 10, 0, 100} + + {:ok, 0, disp} = D.cancel({pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [] + assert rest == {%{}, 0, 10, 100} + + # Subscribe, ask and cancel and leave the same demand + {:ok, 0, disp} = D.subscribe([], {pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 0}}, 0, 10, 100} + + {:ok, 0, disp} = D.ask(5, {pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 5}}, 5, 5, 100} + + {:ok, 0, disp} = D.cancel({pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [] + assert rest == {%{}, 0, 10, 100} + end + + test "subscribes, asks and dispatches" do + pid = self() + ref = make_ref() + disp = dispatcher([]) + {:ok, 0, disp} = D.subscribe([], {pid, ref}, disp) + + {:ok, 3, disp} = D.ask(3, {pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 3}}, 3, 0, 100} + + {:ok, [], disp} = D.dispatch([:a], disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 2}}, 2, 0, 100} + assert_received {:"$gen_consumer", {_, ^ref}, [:a]} + + {:ok, 3, disp} = D.ask(3, {pid, ref}, disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 5}}, 5, 0, 100} + + {:ok, [:g, :h], disp} = D.dispatch([:b, :c, :d, :e, :f, :g, :h], disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 0}}, 0, 0, 100} + assert_received {:"$gen_consumer", {_, ^ref}, [:b, :c, :d, :e, :f]} + + {:ok, [:i, :j], disp} = D.dispatch([:i, :j], disp) + {queue, rest} = split(disp) + assert queue == [ref] + assert rest == {%{ref => {pid, 0}}, 0, 0, 100} + refute_received {:"$gen_consumer", {_, ^ref}, _} + end + + test "subscribes, asks multiple consumers" do + pid = self() + ref1 = make_ref() + ref2 = make_ref() + ref3 = make_ref() + disp = dispatcher([]) + + {:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp) + {:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp) + {:ok, 0, disp} = D.subscribe([], {pid, ref3}, disp) + + {:ok, 4, disp} = D.ask(4, {pid, ref1}, disp) + {:ok, 2, disp} = D.ask(2, {pid, ref2}, disp) + {:ok, 3, disp} = D.ask(3, {pid, ref3}, disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2, ref3] + assert rest == {%{ref1 => {pid, 4}, ref2 => {pid, 2}, ref3 => {pid, 3}}, + 9, 0, 100} + + {:ok, 2, disp} = D.ask(2, {pid, ref3}, disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2, ref3] + assert rest == {%{ref1 => {pid, 4}, ref2 => {pid, 2}, ref3 => {pid, 5}}, + 11, 0, 100} + + {:ok, 4, disp} = D.ask(4, {pid, ref2}, disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2, ref3] + assert rest == {%{ref1 => {pid, 4}, ref2 => {pid, 6}, ref3 => {pid, 5}}, + 15, 0, 100} + end + + test "subscribes, asks and dispatches to multiple consumers" do + pid = self() + ref1 = make_ref() + ref2 = make_ref() + disp = dispatcher([]) + + {:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp) + {:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp) + + {:ok, 3, disp} = D.ask(3, {pid, ref1}, disp) + {:ok, 2, disp} = D.ask(2, {pid, ref2}, disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2] + assert rest == {%{ref1 => {pid, 3}, ref2 => {pid, 2}}, 5, 0, 100} + + # One batch fits all + {:ok, [], disp} = D.dispatch([:a, :b, :c, :d, :e], disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2] + assert rest == {%{ref1 => {pid, 0}, ref2 => {pid, 0}}, 0, 0, 100} + assert_received {:"$gen_consumer", {_, ^ref1}, [:a, :b, :c]} + assert_received {:"$gen_consumer", {_, ^ref2}, [:d, :e]} + + {:ok, [:a, :b, :c], disp} = D.dispatch([:a, :b, :c], disp) + assert {^queue, ^rest} = split(disp) + refute_received {:"$gen_consumer", {_, _}, _} + + # Two batches with left over + {:ok, 3, disp} = D.ask(3, {pid, ref1}, disp) + {:ok, 3, disp} = D.ask(3, {pid, ref2}, disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2] + assert rest == {%{ref1 => {pid, 3}, ref2 => {pid, 3}}, 6, 0, 100} + + {:ok, [], disp} = D.dispatch([:a, :b], disp) + {queue, rest} = split(disp) + assert queue == [ref2, ref1] + assert rest == {%{ref1 => {pid, 1}, ref2 => {pid, 3}}, 4, 0, 100} + assert_received {:"$gen_consumer", {_, ^ref1}, [:a, :b]} + + {:ok, [], disp} = D.dispatch([:c, :d], disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2] + assert rest == {%{ref1 => {pid, 1}, ref2 => {pid, 1}}, 2, 0, 100} + assert_received {:"$gen_consumer", {_, ^ref2}, [:c, :d]} + + # Eliminate the left-over + {:ok, [:g], disp} = D.dispatch([:e, :f, :g], disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2] + assert rest == {%{ref1 => {pid, 0}, ref2 => {pid, 0}}, 0, 0, 100} + assert_received {:"$gen_consumer", {_, ^ref1}, [:e]} + assert_received {:"$gen_consumer", {_, ^ref2}, [:f]} + end + + test "delivers notifications to all consumers" do + pid = self() + ref1 = make_ref() + ref2 = make_ref() + disp = dispatcher([]) + + {:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp) + {:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp) + {:ok, 3, disp} = D.ask(3, {pid, ref1}, disp) + + {:ok, notify_disp} = D.notify(:hello, disp) + assert disp == notify_disp + + assert_received {:"$gen_consumer", {_, ^ref1}, {:notification, :hello}} + assert_received {:"$gen_consumer", {_, ^ref2}, {:notification, :hello}} + end + + test "batch limits max events per consumer" do + pid = self() + ref1 = make_ref() + ref2 = make_ref() + disp = dispatcher([batch: 2]) + {_, rest} = split(disp) + assert rest == {%{}, 0, 0, 2} + + {:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp) + {:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp) + + {:ok, 3, disp} = D.ask(3, {pid, ref1}, disp) + {:ok, 4, disp} = D.ask(4, {pid, ref2}, disp) + + {:ok, [], disp} = D.dispatch([:a, :b, :c], disp) + {queue, rest} = split(disp) + assert queue == [ref1, ref2] + assert rest == {%{ref1 => {pid, 1}, ref2 => {pid, 3}}, 4, 0, 2} + assert_received {:"$gen_consumer", {_, ^ref1}, [:a, :b]} + assert_received {:"$gen_consumer", {_, ^ref2}, [:c]} + end + + defp split(disp) do + {:queue.to_list(elem(disp, 0)), Tuple.delete_at(disp, 0)} + end +end