diff --git a/lib/sink/connection/client.ex b/lib/sink/connection/client.ex index 7057c8d..2d8e48b 100644 --- a/lib/sink/connection/client.ex +++ b/lib/sink/connection/client.ex @@ -15,6 +15,8 @@ defmodule Sink.Connection.Client do require Logger alias Sink.Event alias Sink.Connection.ClientConnection + alias Sink.Connection.Client.Backoff + alias Sink.Connection.Client.DefaultBackoff ClientConnection @@ -27,14 +29,12 @@ defmodule Sink.Connection.Client do :ssl_opts, :handler, :transport, - :connect_attempt_interval, :disconnect_reason, - :disconnect_time, - :connection_request_succeeded + :backoff_impl, + connection_attempts: 0, + disconnect_time: nil ] - @first_connect_attempt 50 - @doc """ This was meant to mean "is the client connected?". However that may not be accurate since the client can lose network connection and :ssl/:gen_tcp won't know. Probably need to change @@ -43,7 +43,7 @@ defmodule Sink.Connection.Client do """ - def init(port, host, ssl_opts, handler, transport) do + def init(port, host, ssl_opts, handler, transport, backoff) do %State{ connection_pid: nil, port: port, @@ -51,37 +51,26 @@ defmodule Sink.Connection.Client do ssl_opts: ssl_opts, handler: handler, transport: transport, - connect_attempt_interval: @first_connect_attempt, - disconnect_time: nil + backoff_impl: backoff } end - def backoff(%State{connect_attempt_interval: nil} = state) do - %State{state | connect_attempt_interval: @first_connect_attempt} - end - - def backoff(%State{connect_attempt_interval: @first_connect_attempt} = state) do - %State{state | connect_attempt_interval: 1_000} - end + def backoff( + %State{connection_attempts: attempts} = state, + connection_request_rejected + ) do + new_state = %State{state | connection_attempts: attempts + 1} - def backoff(%State{connect_attempt_interval: 1_000} = state) do - %State{state | connect_attempt_interval: 5_000} - end - - def backoff(%State{connect_attempt_interval: 5_000} = state) do - %State{state | connect_attempt_interval: 30_000} - end + backoff = + new_state.connection_attempts + |> state.backoff_impl.backoff_duration(connection_request_rejected) + |> Backoff.add_jitter() - def backoff(%State{connect_attempt_interval: _} = state) do - %State{state | connect_attempt_interval: 60_000} + {new_state, backoff} end def connected(%State{} = state, connection_pid) do - %State{state | connection_pid: connection_pid, connect_attempt_interval: 300_000} - end - - def connection_request_succeeded(%State{} = state) do - %State{state | connect_attempt_interval: nil} + %State{state | connection_pid: connection_pid, connection_attempts: 0} end def disconnected(%State{} = state, reason, now) do @@ -146,21 +135,21 @@ defmodule Sink.Connection.Client do end # Server callbacks - + @impl true def init(init_arg) do port = Keyword.fetch!(init_arg, :port) host = Keyword.fetch!(init_arg, :host) ssl_opts = Keyword.fetch!(init_arg, :ssl_opts) handler = Keyword.fetch!(init_arg, :handler) transport = Keyword.get(init_arg, :transport, Sink.Connection.Transport.SSL) + backoff = Keyword.get(init_arg, :backoff, DefaultBackoff) Process.flag(:trap_exit, true) - state = State.init(port, host, ssl_opts, handler, transport) + state = State.init(port, host, ssl_opts, handler, transport, backoff) - Process.send_after(self(), :open_connection, state.connect_attempt_interval) - - {:ok, state} + {:ok, state, {:continue, :open_connection}} end + @impl true def handle_call(:connection_status, _from, %State{disconnect_time: nil} = state) do {:reply, :disconnected, state} end @@ -170,7 +159,37 @@ defmodule Sink.Connection.Client do {:reply, {:disconnected, now() - time}, state} end + @impl true def handle_info(:open_connection, %State{} = state) do + {:noreply, state, {:continue, :open_connection}} + end + + # Ignore error message of `:ssl.connect` + # They are sent to this process before the connection is transfered to `ClientConnection`. + # They are sent even if we handle the `{:error, term}` response above. + def handle_info({err, _}, state) + when err in [:tcp_closed, :ssl_closed, :ssl_error, :tcp_error] do + {:noreply, state} + end + + def handle_info({:EXIT, pid, reason}, %{connection_pid: pid} = state) do + case reason do + {:shutdown, :server_rejected_connection} -> + Logger.info("Sink server denied connection") + {:noreply, on_connection_failure(state, connection_request_rejected: true)} + + _ -> + Logger.info("Disconnected from Sink server") + {:noreply, on_connection_failure(state)} + end + end + + def handle_info({:EXIT, _pid, _reason}, state) do + {:noreply, state} + end + + @impl true + def handle_continue(:open_connection, %State{} = state) do opts = [:binary] ++ Keyword.merge(state.ssl_opts, @@ -193,53 +212,19 @@ defmodule Sink.Connection.Client do :ok -> Logger.info("Connected to Sink server @ #{state.host}") # todo: send message to handler that we're connected - - Process.send_after(self(), :check_if_connection_request_succeeded, 100) {:noreply, State.connected(state, pid)} {:error, reason} -> - _ = ClientConnection.stop(:connection_tranfer_failed) - log_ssl_error(reason) - {:noreply, on_connection_init_failure(state)} + _ = ClientConnection.stop(:connection_transfer_failed) + {:noreply, state, {:continue, {:open_connection_failed, reason}}} end {:error, reason} -> - log_ssl_error(reason) - {:noreply, on_connection_init_failure(state)} - end - end - - # Ignore error message of `:ssl.connect` - # They are sent to this process before the connection is transfered to `ClientConnection`. - # They are sent even if we handle the `{:error, term}` response above. - def handle_info({err, _}, state) - when err in [:tcp_closed, :ssl_closed, :ssl_error, :tcp_error] do - {:noreply, state} - end - - def handle_info(:check_if_connection_request_succeeded, state) do - if connected?() do - {:noreply, State.connection_request_succeeded(state)} - else - Process.send_after(self(), :check_if_connection_request_succeeded, 100) - {:noreply, state} + {:noreply, state, {:continue, {:open_connection_failed, reason}}} end end - def handle_info({:EXIT, pid, reason}, state) do - new_state = - if pid == state.connection_pid do - Logger.info("Disconnected from Sink server") - on_connection_init_failure(state) - State.disconnected(state, reason, now()) - else - state - end - - {:noreply, new_state} - end - - defp log_ssl_error(reason) do + def handle_continue({:open_connection_failed, reason}, state) do case reason do r when r in [[:econnrefused, :closed], :nxdomain] -> Logger.info("Can't find Sink server - #{inspect(reason)}") @@ -251,20 +236,18 @@ defmodule Sink.Connection.Client do _ -> Logger.error("Failed to connect to Sink server, #{inspect(reason)}") end + + {:noreply, on_connection_failure(state)} end - defp on_connection_init_failure(state) do - new_state = State.backoff(state) - Process.send_after(self(), :open_connection, add_jitter(new_state.connect_attempt_interval)) + defp on_connection_failure(state, opts \\ []) do + connection_request_rejected = Keyword.get(opts, :connection_request_rejected, false) + {new_state, backoff} = State.backoff(state, connection_request_rejected) + Process.send_after(self(), :open_connection, backoff) new_state end defp now do System.monotonic_time(:millisecond) end - - defp add_jitter(interval) do - variance = div(interval, 10) - interval + Enum.random(-variance..variance) - end end diff --git a/lib/sink/connection/client/backoff.ex b/lib/sink/connection/client/backoff.ex new file mode 100644 index 0000000..2a54310 --- /dev/null +++ b/lib/sink/connection/client/backoff.ex @@ -0,0 +1,25 @@ +defmodule Sink.Connection.Client.Backoff do + @moduledoc """ + A behaviour for implementing client backoff on consecutive unsuccessful connection attempts. + """ + + @typedoc "Number of consecutive connection attempts having been unsuccessful before" + @type attempts :: pos_integer() + + @typedoc "True if the last connection attempt received a connection request rejection" + @type connection_request_rejected :: boolean() + + @typedoc "Backoff duration in milliseconds" + @type backoff :: non_neg_integer() + + @doc """ + Calculate the backoff to apply. + """ + @callback backoff_duration(attempts, connection_request_rejected) :: backoff + + @doc false + def add_jitter(interval) do + variance = div(interval, 10) + interval + Enum.random(-variance..variance) + end +end diff --git a/lib/sink/connection/client/connection_status.ex b/lib/sink/connection/client/connection_status.ex index 88597f8..906516f 100644 --- a/lib/sink/connection/client/connection_status.ex +++ b/lib/sink/connection/client/connection_status.ex @@ -34,6 +34,12 @@ defmodule Sink.Connection.Client.ConnectionStatus do def connected?(%__MODULE__{connection_state: :connected}), do: true def connected?(%__MODULE__{connection_state: _}), do: false + @doc """ + Is the client disconnecting due to a rejected connection request? + """ + def disconnecting?(%__MODULE__{connection_state: :disconnecting}), do: true + def disconnecting?(%__MODULE__{connection_state: _}), do: false + def instance_ids(%__MODULE__{instance_ids: map}), do: map def connection_response( diff --git a/lib/sink/connection/client/default_backoff.ex b/lib/sink/connection/client/default_backoff.ex new file mode 100644 index 0000000..141922f --- /dev/null +++ b/lib/sink/connection/client/default_backoff.ex @@ -0,0 +1,13 @@ +defmodule Sink.Connection.Client.DefaultBackoff do + @behaviour Sink.Connection.Client.Backoff + + # Always backoff 5 minutes when the server actively denied a connection + def backoff_duration(_, true), do: :timer.minutes(5) + + # Backoff in growing steps for other issues in connecting + def backoff_duration(1, false), do: 50 + def backoff_duration(2, false), do: :timer.seconds(1) + def backoff_duration(3, false), do: :timer.seconds(5) + def backoff_duration(4, false), do: :timer.seconds(30) + def backoff_duration(_, false), do: :timer.seconds(60) +end diff --git a/lib/sink/connection/client_connection.ex b/lib/sink/connection/client_connection.ex index b420d11..4bd38e8 100644 --- a/lib/sink/connection/client_connection.ex +++ b/lib/sink/connection/client_connection.ex @@ -36,6 +36,10 @@ defmodule Sink.Connection.ClientConnection do ConnectionStatus.connected?(state.connection_status) end + def disconnecting?(state) do + ConnectionStatus.disconnecting?(state.connection_status) + end + def connection_response(state, result) do %__MODULE__{ state @@ -377,19 +381,15 @@ defmodule Sink.Connection.ClientConnection do end end - def handle_info({:tcp_closed, _}, state) do - {:stop, :normal, state} - end - - def handle_info({:ssl_closed, _}, state) do - {:stop, :normal, state} - end - - def handle_info({:ssl_error, reason}, state) do - {:stop, {:error, reason}, state} + def handle_info({closed, _}, state) when closed in [:tcp_closed, :ssl_closed] do + if State.disconnecting?(state) do + {:stop, {:shutdown, :server_rejected_connection}, state} + else + {:stop, :normal, state} + end end - def handle_info({:tcp_error, _, reason}, state) do + def handle_info({error, reason}, state) when error in [:tcp_error, :ssl_error] do {:stop, {:error, reason}, state} end diff --git a/test/sink/connection_test.exs b/test/sink/connection_test.exs index 19cb363..0d5c54e 100644 --- a/test/sink/connection_test.exs +++ b/test/sink/connection_test.exs @@ -9,6 +9,7 @@ defmodule Sink.ConnectionTest do @mod_transport Sink.Connection.Transport.SSLMock @client_handler Sink.Connection.ClientConnectionHandlerMock @server_handler Sink.Connection.ServerConnectionHandlerMock + @backoff Sink.Connection.Client.BackoffMock @event %Event{ event_type_id: 1, key: <<1, 2, 3>>, @@ -266,6 +267,8 @@ defmodule Sink.ConnectionTest do :ok end) + expect(@backoff, :backoff_duration, fn _, true -> :timer.seconds(1) end) + start_supervised!( {Sink.Connection.ServerListener, port: 9999, ssl_opts: server_ssl, handler: @server_handler} @@ -273,7 +276,11 @@ defmodule Sink.ConnectionTest do start_supervised!( {Sink.Connection.Client, - port: 9999, host: "localhost", ssl_opts: client_ssl, handler: @client_handler} + port: 9999, + host: "localhost", + ssl_opts: client_ssl, + handler: @client_handler, + backoff: @backoff} ) # # give it time to connect @@ -311,6 +318,8 @@ defmodule Sink.ConnectionTest do :ok end) + expect(@backoff, :backoff_duration, fn _, true -> :timer.seconds(1) end) + start_supervised!( {Sink.Connection.ServerListener, port: 9999, ssl_opts: server_ssl, handler: @server_handler} @@ -318,7 +327,11 @@ defmodule Sink.ConnectionTest do start_supervised!( {Sink.Connection.Client, - port: 9999, host: "localhost", ssl_opts: client_ssl, handler: @client_handler} + port: 9999, + host: "localhost", + ssl_opts: client_ssl, + handler: @client_handler, + backoff: @backoff} ) # # give it time to connect @@ -390,6 +403,8 @@ defmodule Sink.ConnectionTest do :ok end) + expect(@backoff, :backoff_duration, fn _, true -> :timer.seconds(1) end) + start_supervised!( {Sink.Connection.ServerListener, port: 9999, ssl_opts: server_ssl, handler: @server_handler} @@ -397,7 +412,11 @@ defmodule Sink.ConnectionTest do start_supervised!( {Sink.Connection.Client, - port: 9999, host: "localhost", ssl_opts: client_ssl, handler: @client_handler} + port: 9999, + host: "localhost", + ssl_opts: client_ssl, + handler: @client_handler, + backoff: @backoff} ) # give it time to connect @@ -616,6 +635,8 @@ defmodule Sink.ConnectionTest do stub(@client_handler, :instance_ids, fn -> %{client: 1, server: 2} end) stub(@mod_transport, :send, fn _, _ -> :ok end) + expect(@backoff, :backoff_duration, fn 1, false -> :timer.seconds(1) end) + logs = capture_log(fn -> start_supervised!( @@ -626,7 +647,11 @@ defmodule Sink.ConnectionTest do client = start_supervised!( {Sink.Connection.Client, - port: 9999, host: "localhost", ssl_opts: client_ssl, handler: @client_handler} + port: 9999, + host: "localhost", + ssl_opts: client_ssl, + handler: @client_handler, + backoff: @backoff} ) assert Process.alive?(client) diff --git a/test/support/mocks.ex b/test/support/mocks.ex index 6f1db19..efb5c01 100644 --- a/test/support/mocks.ex +++ b/test/support/mocks.ex @@ -8,6 +8,10 @@ Mox.defmock(Sink.Connection.ClientConnectionHandlerMock, for: Sink.Connection.ClientConnectionHandler ) +Mox.defmock(Sink.Connection.Client.BackoffMock, + for: Sink.Connection.Client.Backoff +) + Mox.defmock(Sink.Connection.ServerHandlerMock, for: :ranch_protocol )