Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 63 additions & 80 deletions lib/sink/connection/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ defmodule Sink.Connection.Client do
require Logger
alias Sink.Event
alias Sink.Connection.ClientConnection
alias Sink.Connection.Client.Backoff
alias Sink.Connection.Client.DefaultBackoff

ClientConnection

Expand All @@ -27,14 +29,12 @@ defmodule Sink.Connection.Client do
:ssl_opts,
:handler,
:transport,
:connect_attempt_interval,
:disconnect_reason,
:disconnect_time,
:connection_request_succeeded
:backoff_impl,
connection_attempts: 0,
disconnect_time: nil
]

@first_connect_attempt 50

@doc """
This was meant to mean "is the client connected?". However that may not be accurate since
the client can lose network connection and :ssl/:gen_tcp won't know. Probably need to change
Expand All @@ -43,45 +43,34 @@ defmodule Sink.Connection.Client do

"""

def init(port, host, ssl_opts, handler, transport) do
def init(port, host, ssl_opts, handler, transport, backoff) do
%State{
connection_pid: nil,
port: port,
host: host,
ssl_opts: ssl_opts,
handler: handler,
transport: transport,
connect_attempt_interval: @first_connect_attempt,
disconnect_time: nil
backoff_impl: backoff
}
end

def backoff(%State{connect_attempt_interval: nil} = state) do
%State{state | connect_attempt_interval: @first_connect_attempt}
end

def backoff(%State{connect_attempt_interval: @first_connect_attempt} = state) do
%State{state | connect_attempt_interval: 1_000}
end
def backoff(
%State{connection_attempts: attempts} = state,
connection_request_rejected
) do
new_state = %State{state | connection_attempts: attempts + 1}

def backoff(%State{connect_attempt_interval: 1_000} = state) do
%State{state | connect_attempt_interval: 5_000}
end

def backoff(%State{connect_attempt_interval: 5_000} = state) do
%State{state | connect_attempt_interval: 30_000}
end
backoff =
new_state.connection_attempts
|> state.backoff_impl.backoff_duration(connection_request_rejected)
|> Backoff.add_jitter()

def backoff(%State{connect_attempt_interval: _} = state) do
%State{state | connect_attempt_interval: 60_000}
{new_state, backoff}
end

def connected(%State{} = state, connection_pid) do
%State{state | connection_pid: connection_pid, connect_attempt_interval: 300_000}
end

def connection_request_succeeded(%State{} = state) do
%State{state | connect_attempt_interval: nil}
%State{state | connection_pid: connection_pid, connection_attempts: 0}
end

def disconnected(%State{} = state, reason, now) do
Expand Down Expand Up @@ -146,21 +135,21 @@ defmodule Sink.Connection.Client do
end

# Server callbacks

@impl true
def init(init_arg) do
port = Keyword.fetch!(init_arg, :port)
host = Keyword.fetch!(init_arg, :host)
ssl_opts = Keyword.fetch!(init_arg, :ssl_opts)
handler = Keyword.fetch!(init_arg, :handler)
transport = Keyword.get(init_arg, :transport, Sink.Connection.Transport.SSL)
backoff = Keyword.get(init_arg, :backoff, DefaultBackoff)
Process.flag(:trap_exit, true)
state = State.init(port, host, ssl_opts, handler, transport)
state = State.init(port, host, ssl_opts, handler, transport, backoff)

Process.send_after(self(), :open_connection, state.connect_attempt_interval)

{:ok, state}
{:ok, state, {:continue, :open_connection}}
end

@impl true
def handle_call(:connection_status, _from, %State{disconnect_time: nil} = state) do
{:reply, :disconnected, state}
end
Expand All @@ -170,7 +159,37 @@ defmodule Sink.Connection.Client do
{:reply, {:disconnected, now() - time}, state}
end

@impl true
def handle_info(:open_connection, %State{} = state) do
{:noreply, state, {:continue, :open_connection}}
end

# Ignore error message of `:ssl.connect`
# They are sent to this process before the connection is transfered to `ClientConnection`.
# They are sent even if we handle the `{:error, term}` response above.
def handle_info({err, _}, state)
when err in [:tcp_closed, :ssl_closed, :ssl_error, :tcp_error] do
{:noreply, state}
end

def handle_info({:EXIT, pid, reason}, %{connection_pid: pid} = state) do
case reason do
{:shutdown, :server_rejected_connection} ->
Logger.info("Sink server denied connection")
{:noreply, on_connection_failure(state, connection_request_rejected: true)}

_ ->
Logger.info("Disconnected from Sink server")
{:noreply, on_connection_failure(state)}
end
end

def handle_info({:EXIT, _pid, _reason}, state) do
{:noreply, state}
end

@impl true
def handle_continue(:open_connection, %State{} = state) do
opts =
[:binary] ++
Keyword.merge(state.ssl_opts,
Expand All @@ -193,53 +212,19 @@ defmodule Sink.Connection.Client do
:ok ->
Logger.info("Connected to Sink server @ #{state.host}")
# todo: send message to handler that we're connected

Process.send_after(self(), :check_if_connection_request_succeeded, 100)
{:noreply, State.connected(state, pid)}

{:error, reason} ->
_ = ClientConnection.stop(:connection_tranfer_failed)
log_ssl_error(reason)
{:noreply, on_connection_init_failure(state)}
_ = ClientConnection.stop(:connection_transfer_failed)
{:noreply, state, {:continue, {:open_connection_failed, reason}}}
end

{:error, reason} ->
log_ssl_error(reason)
{:noreply, on_connection_init_failure(state)}
end
end

# Ignore error message of `:ssl.connect`
# They are sent to this process before the connection is transfered to `ClientConnection`.
# They are sent even if we handle the `{:error, term}` response above.
def handle_info({err, _}, state)
when err in [:tcp_closed, :ssl_closed, :ssl_error, :tcp_error] do
{:noreply, state}
end

def handle_info(:check_if_connection_request_succeeded, state) do
if connected?() do
{:noreply, State.connection_request_succeeded(state)}
else
Process.send_after(self(), :check_if_connection_request_succeeded, 100)
{:noreply, state}
{:noreply, state, {:continue, {:open_connection_failed, reason}}}
end
end

def handle_info({:EXIT, pid, reason}, state) do
new_state =
if pid == state.connection_pid do
Logger.info("Disconnected from Sink server")
on_connection_init_failure(state)
State.disconnected(state, reason, now())
else
state
end

{:noreply, new_state}
end

defp log_ssl_error(reason) do
def handle_continue({:open_connection_failed, reason}, state) do
case reason do
r when r in [[:econnrefused, :closed], :nxdomain] ->
Logger.info("Can't find Sink server - #{inspect(reason)}")
Expand All @@ -251,20 +236,18 @@ defmodule Sink.Connection.Client do
_ ->
Logger.error("Failed to connect to Sink server, #{inspect(reason)}")
end

{:noreply, on_connection_failure(state)}
end

defp on_connection_init_failure(state) do
new_state = State.backoff(state)
Process.send_after(self(), :open_connection, add_jitter(new_state.connect_attempt_interval))
defp on_connection_failure(state, opts \\ []) do
connection_request_rejected = Keyword.get(opts, :connection_request_rejected, false)
{new_state, backoff} = State.backoff(state, connection_request_rejected)
Process.send_after(self(), :open_connection, backoff)
new_state
end

defp now do
System.monotonic_time(:millisecond)
end

defp add_jitter(interval) do
variance = div(interval, 10)
interval + Enum.random(-variance..variance)
end
end
25 changes: 25 additions & 0 deletions lib/sink/connection/client/backoff.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Sink.Connection.Client.Backoff do
@moduledoc """
A behaviour for implementing client backoff on consecutive unsuccessful connection attempts.
"""

@typedoc "Number of consecutive connection attempts having been unsuccessful before"
@type attempts :: pos_integer()

@typedoc "True if the last connection attempt received a connection request rejection"
@type connection_request_rejected :: boolean()

@typedoc "Backoff duration in milliseconds"
@type backoff :: non_neg_integer()

@doc """
Calculate the backoff to apply.
"""
@callback backoff_duration(attempts, connection_request_rejected) :: backoff

@doc false
def add_jitter(interval) do
variance = div(interval, 10)
interval + Enum.random(-variance..variance)
end
end
6 changes: 6 additions & 0 deletions lib/sink/connection/client/connection_status.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ defmodule Sink.Connection.Client.ConnectionStatus do
def connected?(%__MODULE__{connection_state: :connected}), do: true
def connected?(%__MODULE__{connection_state: _}), do: false

@doc """
Is the client disconnecting due to a rejected connection request?
"""
def disconnecting?(%__MODULE__{connection_state: :disconnecting}), do: true
def disconnecting?(%__MODULE__{connection_state: _}), do: false

def instance_ids(%__MODULE__{instance_ids: map}), do: map

def connection_response(
Expand Down
13 changes: 13 additions & 0 deletions lib/sink/connection/client/default_backoff.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Sink.Connection.Client.DefaultBackoff do
@behaviour Sink.Connection.Client.Backoff

# Always backoff 5 minutes when the server actively denied a connection
def backoff_duration(_, true), do: :timer.minutes(5)

# Backoff in growing steps for other issues in connecting
def backoff_duration(1, false), do: 50
def backoff_duration(2, false), do: :timer.seconds(1)
def backoff_duration(3, false), do: :timer.seconds(5)
def backoff_duration(4, false), do: :timer.seconds(30)
def backoff_duration(_, false), do: :timer.seconds(60)
end
22 changes: 11 additions & 11 deletions lib/sink/connection/client_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ defmodule Sink.Connection.ClientConnection do
ConnectionStatus.connected?(state.connection_status)
end

def disconnecting?(state) do
ConnectionStatus.disconnecting?(state.connection_status)
end

def connection_response(state, result) do
%__MODULE__{
state
Expand Down Expand Up @@ -377,19 +381,15 @@ defmodule Sink.Connection.ClientConnection do
end
end

def handle_info({:tcp_closed, _}, state) do
{:stop, :normal, state}
end

def handle_info({:ssl_closed, _}, state) do
{:stop, :normal, state}
end

def handle_info({:ssl_error, reason}, state) do
{:stop, {:error, reason}, state}
def handle_info({closed, _}, state) when closed in [:tcp_closed, :ssl_closed] do
if State.disconnecting?(state) do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to think about this the other way: if the connection was closed when the connection state was :requesting_connection then we have a pretty good guess (though this is still a bit of a hack) that the server rejected the connection.

Copy link
Collaborator Author

@LostKobrakai LostKobrakai Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that's a reasonable assumption to make. It could just as much be caused by a network or application issue while sink is in that state. Getting a negative connection response should be the reason for sink to consider itself being rejected. Any negative connection response will result in State.disconnecting?(state) being true.

I don't think penalizing a nova on bad network with a 5 minutes timeout would be a good idea and we should rather expect that connection responses are actually received if the nova is not already having a hard time keeping the connection open.

{:stop, {:shutdown, :server_rejected_connection}, state}
else
{:stop, :normal, state}
end
end

def handle_info({:tcp_error, _, reason}, state) do
def handle_info({error, reason}, state) when error in [:tcp_error, :ssl_error] do
{:stop, {:error, reason}, state}
end

Expand Down
Loading