Skip to content

Commit 3290e2a

Browse files
committed
Add configurable backoff
1 parent 5a3109b commit 3290e2a

File tree

7 files changed

+152
-95
lines changed

7 files changed

+152
-95
lines changed

lib/sink/connection/client.ex

Lines changed: 64 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ defmodule Sink.Connection.Client do
1515
require Logger
1616
alias Sink.Event
1717
alias Sink.Connection.ClientConnection
18+
alias Sink.Connection.Client.Backoff
19+
alias Sink.Connection.Client.DefaultBackoff
1820

1921
ClientConnection
2022

@@ -27,14 +29,13 @@ defmodule Sink.Connection.Client do
2729
:ssl_opts,
2830
:handler,
2931
:transport,
30-
:connect_attempt_interval,
3132
:disconnect_reason,
32-
:disconnect_time,
33-
:connection_request_succeeded
33+
:connection_request_succeeded,
34+
:backoff_impl,
35+
connection_attempts: 0,
36+
disconnect_time: nil
3437
]
3538

36-
@first_connect_attempt 50
37-
3839
@doc """
3940
This was meant to mean "is the client connected?". However that may not be accurate since
4041
the client can lose network connection and :ssl/:gen_tcp won't know. Probably need to change
@@ -43,45 +44,34 @@ defmodule Sink.Connection.Client do
4344
4445
"""
4546

46-
def init(port, host, ssl_opts, handler, transport) do
47+
def init(port, host, ssl_opts, handler, transport, backoff) do
4748
%State{
4849
connection_pid: nil,
4950
port: port,
5051
host: host,
5152
ssl_opts: ssl_opts,
5253
handler: handler,
5354
transport: transport,
54-
connect_attempt_interval: @first_connect_attempt,
55-
disconnect_time: nil
55+
backoff_impl: backoff
5656
}
5757
end
5858

59-
def backoff(%State{connect_attempt_interval: nil} = state) do
60-
%State{state | connect_attempt_interval: @first_connect_attempt}
61-
end
62-
63-
def backoff(%State{connect_attempt_interval: @first_connect_attempt} = state) do
64-
%State{state | connect_attempt_interval: 1_000}
65-
end
59+
def backoff(
60+
%State{connection_attempts: attempts} = state,
61+
connection_request_rejected
62+
) do
63+
new_state = %State{state | connection_attempts: attempts + 1}
6664

67-
def backoff(%State{connect_attempt_interval: 1_000} = state) do
68-
%State{state | connect_attempt_interval: 5_000}
69-
end
70-
71-
def backoff(%State{connect_attempt_interval: 5_000} = state) do
72-
%State{state | connect_attempt_interval: 30_000}
73-
end
65+
backoff =
66+
new_state.connection_attempts
67+
|> state.backoff_impl.backoff_duration(connection_request_rejected)
68+
|> Backoff.add_jitter()
7469

75-
def backoff(%State{connect_attempt_interval: _} = state) do
76-
%State{state | connect_attempt_interval: 60_000}
70+
{new_state, backoff}
7771
end
7872

7973
def connected(%State{} = state, connection_pid) do
80-
%State{state | connection_pid: connection_pid, connect_attempt_interval: 300_000}
81-
end
82-
83-
def connection_request_succeeded(%State{} = state) do
84-
%State{state | connect_attempt_interval: nil}
74+
%State{state | connection_pid: connection_pid, connection_attempts: 0}
8575
end
8676

8777
def disconnected(%State{} = state, reason, now) do
@@ -146,21 +136,21 @@ defmodule Sink.Connection.Client do
146136
end
147137

148138
# Server callbacks
149-
139+
@impl true
150140
def init(init_arg) do
151141
port = Keyword.fetch!(init_arg, :port)
152142
host = Keyword.fetch!(init_arg, :host)
153143
ssl_opts = Keyword.fetch!(init_arg, :ssl_opts)
154144
handler = Keyword.fetch!(init_arg, :handler)
155145
transport = Keyword.get(init_arg, :transport, Sink.Connection.Transport.SSL)
146+
backoff = Keyword.get(init_arg, :backoff, DefaultBackoff)
156147
Process.flag(:trap_exit, true)
157-
state = State.init(port, host, ssl_opts, handler, transport)
148+
state = State.init(port, host, ssl_opts, handler, transport, backoff)
158149

159-
Process.send_after(self(), :open_connection, state.connect_attempt_interval)
160-
161-
{:ok, state}
150+
{:ok, state, {:continue, :open_connection}}
162151
end
163152

153+
@impl true
164154
def handle_call(:connection_status, _from, %State{disconnect_time: nil} = state) do
165155
{:reply, :disconnected, state}
166156
end
@@ -170,7 +160,37 @@ defmodule Sink.Connection.Client do
170160
{:reply, {:disconnected, now() - time}, state}
171161
end
172162

163+
@impl true
173164
def handle_info(:open_connection, %State{} = state) do
165+
{:noreply, state, {:continue, :open_connection}}
166+
end
167+
168+
# Ignore error message of `:ssl.connect`
169+
# They are sent to this process before the connection is transfered to `ClientConnection`.
170+
# They are sent even if we handle the `{:error, term}` response above.
171+
def handle_info({err, _}, state)
172+
when err in [:tcp_closed, :ssl_closed, :ssl_error, :tcp_error] do
173+
{:noreply, state}
174+
end
175+
176+
def handle_info({:EXIT, pid, reason}, %{connection_pid: pid} = state) do
177+
case reason do
178+
{:shutdown, :server_rejected_connection} ->
179+
Logger.info("Sink server denied connection")
180+
{:noreply, on_connection_failure(state, connection_request_rejected: true)}
181+
182+
_ ->
183+
Logger.info("Disconnected from Sink server")
184+
{:noreply, on_connection_failure(state)}
185+
end
186+
end
187+
188+
def handle_info({:EXIT, _pid, _reason}, state) do
189+
{:noreply, state}
190+
end
191+
192+
@impl true
193+
def handle_continue(:open_connection, %State{} = state) do
174194
opts =
175195
[:binary] ++
176196
Keyword.merge(state.ssl_opts,
@@ -193,53 +213,19 @@ defmodule Sink.Connection.Client do
193213
:ok ->
194214
Logger.info("Connected to Sink server @ #{state.host}")
195215
# todo: send message to handler that we're connected
196-
197-
Process.send_after(self(), :check_if_connection_request_succeeded, 100)
198216
{:noreply, State.connected(state, pid)}
199217

200218
{:error, reason} ->
201-
_ = ClientConnection.stop(:connection_tranfer_failed)
202-
log_ssl_error(reason)
203-
{:noreply, on_connection_init_failure(state)}
219+
_ = ClientConnection.stop(:connection_transfer_failed)
220+
{:noreply, state, {:continue, {:open_connection_failed, reason}}}
204221
end
205222

206223
{:error, reason} ->
207-
log_ssl_error(reason)
208-
{:noreply, on_connection_init_failure(state)}
209-
end
210-
end
211-
212-
# Ignore error message of `:ssl.connect`
213-
# They are sent to this process before the connection is transfered to `ClientConnection`.
214-
# They are sent even if we handle the `{:error, term}` response above.
215-
def handle_info({err, _}, state)
216-
when err in [:tcp_closed, :ssl_closed, :ssl_error, :tcp_error] do
217-
{:noreply, state}
218-
end
219-
220-
def handle_info(:check_if_connection_request_succeeded, state) do
221-
if connected?() do
222-
{:noreply, State.connection_request_succeeded(state)}
223-
else
224-
Process.send_after(self(), :check_if_connection_request_succeeded, 100)
225-
{:noreply, state}
224+
{:noreply, state, {:continue, {:open_connection_failed, reason}}}
226225
end
227226
end
228227

229-
def handle_info({:EXIT, pid, reason}, state) do
230-
new_state =
231-
if pid == state.connection_pid do
232-
Logger.info("Disconnected from Sink server")
233-
on_connection_init_failure(state)
234-
State.disconnected(state, reason, now())
235-
else
236-
state
237-
end
238-
239-
{:noreply, new_state}
240-
end
241-
242-
defp log_ssl_error(reason) do
228+
def handle_continue({:open_connection_failed, reason}, state) do
243229
case reason do
244230
r when r in [[:econnrefused, :closed], :nxdomain] ->
245231
Logger.info("Can't find Sink server - #{inspect(reason)}")
@@ -251,20 +237,18 @@ defmodule Sink.Connection.Client do
251237
_ ->
252238
Logger.error("Failed to connect to Sink server, #{inspect(reason)}")
253239
end
240+
241+
{:noreply, on_connection_failure(state)}
254242
end
255243

256-
defp on_connection_init_failure(state) do
257-
new_state = State.backoff(state)
258-
Process.send_after(self(), :open_connection, add_jitter(new_state.connect_attempt_interval))
244+
defp on_connection_failure(state, opts \\ []) do
245+
connection_request_rejected = Keyword.get(opts, :connection_request_rejected, false)
246+
{new_state, backoff} = State.backoff(state, connection_request_rejected)
247+
Process.send_after(self(), :open_connection, backoff)
259248
new_state
260249
end
261250

262251
defp now do
263252
System.monotonic_time(:millisecond)
264253
end
265-
266-
defp add_jitter(interval) do
267-
variance = div(interval, 10)
268-
interval + Enum.random(-variance..variance)
269-
end
270254
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
defmodule Sink.Connection.Client.Backoff do
2+
@moduledoc """
3+
A behaviour for implementing client backoff on consecutive unsuccessful connection attempts.
4+
"""
5+
6+
@typedoc "Number of consecutive connection attempts having been unsuccessful before"
7+
@type attempts :: pos_integer()
8+
9+
@typedoc "True if the last connection attempt received a connection request rejection"
10+
@type connection_request_rejected :: boolean()
11+
12+
@typedoc "Backoff duration in milliseconds"
13+
@type backoff :: non_neg_integer()
14+
15+
@doc """
16+
Calculate the backoff to apply.
17+
"""
18+
@callback backoff_duration(attempts, connection_request_rejected) :: backoff
19+
20+
@doc false
21+
def add_jitter(interval) do
22+
variance = div(interval, 10)
23+
interval + Enum.random(-variance..variance)
24+
end
25+
end

lib/sink/connection/client/connection_status.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ defmodule Sink.Connection.Client.ConnectionStatus do
3434
def connected?(%__MODULE__{connection_state: :connected}), do: true
3535
def connected?(%__MODULE__{connection_state: _}), do: false
3636

37+
@doc """
38+
Is the client disconnecting due to a rejected connection request?
39+
"""
40+
def disconnecting?(%__MODULE__{connection_state: :disconnecting}), do: true
41+
def disconnecting?(%__MODULE__{connection_state: _}), do: false
42+
3743
def instance_ids(%__MODULE__{instance_ids: map}), do: map
3844

3945
def connection_response(
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
defmodule Sink.Connection.Client.DefaultBackoff do
2+
@behaviour Sink.Connection.Client.Backoff
3+
4+
# Always backoff 5 minutes when the server actively denied a connection
5+
def backoff_duration(_, true), do: :timer.minutes(5)
6+
7+
# Backoff in growing steps for other issues in connecting
8+
def backoff_duration(1, false), do: 50
9+
def backoff_duration(2, false), do: :timer.seconds(1)
10+
def backoff_duration(3, false), do: :timer.seconds(5)
11+
def backoff_duration(4, false), do: :timer.seconds(30)
12+
def backoff_duration(_, false), do: :timer.seconds(60)
13+
end

lib/sink/connection/client_connection.ex

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ defmodule Sink.Connection.ClientConnection do
3636
ConnectionStatus.connected?(state.connection_status)
3737
end
3838

39+
def disconnecting?(state) do
40+
ConnectionStatus.disconnecting?(state.connection_status)
41+
end
42+
3943
def connection_response(state, result) do
4044
%__MODULE__{
4145
state
@@ -377,19 +381,15 @@ defmodule Sink.Connection.ClientConnection do
377381
end
378382
end
379383

380-
def handle_info({:tcp_closed, _}, state) do
381-
{:stop, :normal, state}
382-
end
383-
384-
def handle_info({:ssl_closed, _}, state) do
385-
{:stop, :normal, state}
386-
end
387-
388-
def handle_info({:ssl_error, reason}, state) do
389-
{:stop, {:error, reason}, state}
384+
def handle_info({closed, _}, state) when closed in [:tcp_closed, :ssl_closed] do
385+
if State.disconnecting?(state) do
386+
{:stop, {:shutdown, :server_rejected_connection}, state}
387+
else
388+
{:stop, :normal, state}
389+
end
390390
end
391391

392-
def handle_info({:tcp_error, _, reason}, state) do
392+
def handle_info({error, reason}, state) when error in [:tcp_error, :ssl_error] do
393393
{:stop, {:error, reason}, state}
394394
end
395395

0 commit comments

Comments
 (0)