Skip to content

Commit

Permalink
Fix FAILOVER replica attach workflow (#293)
Browse files Browse the repository at this point in the history
* code cleanup

* handle replica attach during failover with unreachable replicas

* more cleanup

* unify client retrieval for failover and add gossip re-use connection flag

* refactor failoverDeadline bookkeeping

* attach primary as a new replica on default option failover

---------

Co-authored-by: Badrish Chandramouli <[email protected]>
  • Loading branch information
vazois and badrishc authored Apr 24, 2024
1 parent ad2d4b7 commit 47da479
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 224 deletions.
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void TryStopWrites(string replicaId)
}

/// <summary>
/// Takeover as new primary but forcefully claiming ownernship of old primary's slots.
/// Takeover as new primary but forcefully claim ownership of old primary's slots.
/// </summary>
public void TryTakeOverForPrimary()
{
Expand Down
45 changes: 17 additions & 28 deletions libs/cluster/Server/Failover/FailoverManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private void Reset()
/// <returns></returns>
public string GetFailoverStatus()
{
var status = currentFailoverSession?.GetStatus;
var status = currentFailoverSession?.status;
return status.HasValue ? FailoverUtils.GetFailoverStatus(status) :
FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER);
}
Expand All @@ -57,31 +57,21 @@ public bool TryStartReplicaFailover(FailoverOption option, TimeSpan failoverTime
if (!failoverTaskLock.TryWriteLock())
return false;

var (address, port) = clusterProvider.clusterManager.CurrentConfig.GetLocalNodePrimaryAddress();
if (address == null)
{
failoverTaskLock.WriteUnlock();
return false;
}

currentFailoverSession = new FailoverSession(
clusterProvider,
option,
clusterTimeout: clusterTimeout,
failoverTimeout: failoverTimeout,
hostAddress: address,
hostPort: port,
isReplicaSession: true,
logger: logger);
Task.Run(ReplicaFailoverAsyncTask);
_ = Task.Run(async () =>
{
_ = await currentFailoverSession.BeginAsyncReplicaFailover();
Reset();
});
return true;
}

private async void ReplicaFailoverAsyncTask()
{
await currentFailoverSession.BeginAsyncReplicaFailover();
Reset();
}

/// <summary>
/// Method used to initiate a failover from a primary (FAILOVER command).
/// </summary>
Expand All @@ -96,21 +86,20 @@ public bool TryStartPrimaryFailover(string replicaAddress, int replicaPort, Fail
return false;

currentFailoverSession = new FailoverSession(
clusterProvider,
option,
clusterTimeout,
timeout,
clusterProvider: clusterProvider,
option: option,
clusterTimeout: clusterTimeout,
failoverTimeout: timeout,
isReplicaSession: false,
hostAddress: replicaAddress,
hostPort: replicaPort,
logger: logger);
Task.Run(PrimaryFailoverAsyncTask);
_ = Task.Run(async () =>
{
_ = await currentFailoverSession.BeginAsyncPrimaryFailover();
Reset();
});
return true;
}

private async void PrimaryFailoverAsyncTask()
{
await currentFailoverSession.BeginAsyncPrimaryFailover();
Reset();
}
}
}
63 changes: 34 additions & 29 deletions libs/cluster/Server/Failover/FailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,67 @@ internal sealed partial class FailoverSession : IDisposable
readonly ILogger logger;

readonly GarnetClient[] clients = null;
readonly long failoverStart;
readonly long failoverEnd;
FailoverStatus status;
readonly DateTime failoverDeadline;

public FailoverStatus GetStatus => status;
public FailoverStatus status { get; private set; }

public bool FailoverTimeout => failoverEnd < DateTimeOffset.UtcNow.Ticks;
public bool FailoverTimeout => failoverDeadline < DateTime.UtcNow;

readonly ClusterConfig currentConfig;

/// <summary>
/// FailoverSession constructor
/// </summary>
/// <param name="clusterProvider"></param>
/// <param name="option"></param>
/// <param name="clusterTimeout">network request timeout</param>
/// <param name="failoverTimeout">failover timeout</param>
/// <param name="clusterProvider">ClusterProvider object</param>
/// <param name="option">Failover options for replica failover session.</param>
/// <param name="clusterTimeout">Timeout for individual communication between replica.</param>
/// <param name="failoverTimeout">End to end timeout for failover</param>
/// <param name="isReplicaSession">Flag indicating if this session is controlled by a replica</param>
/// <param name="hostAddress"></param>
/// <param name="hostPort"></param>
/// <param name="logger"></param>
public FailoverSession(
ClusterProvider clusterProvider,
FailoverOption option,
TimeSpan clusterTimeout,
TimeSpan failoverTimeout = default,
TimeSpan failoverTimeout,
bool isReplicaSession = true,
string hostAddress = "",
int hostPort = -1,
ILogger logger = null)
{
this.clusterProvider = clusterProvider;
this.clusterTimeout = clusterTimeout;
this.failoverTimeout = failoverTimeout == default ? TimeSpan.FromSeconds(300) : failoverTimeout;
this.option = option;
this.cts = new();
this.logger = logger;
currentConfig = clusterProvider.clusterManager.CurrentConfig;
cts = new();

var endpoints = hostPort == -1 ?
clusterProvider.clusterManager.CurrentConfig.GetLocalNodePrimaryEndpoints(includeMyPrimaryFirst: true) :
hostPort == 0 ? clusterProvider.clusterManager.CurrentConfig.GetLocalNodeReplicaEndpoints() : null;

clients = endpoints != null ? new GarnetClient[endpoints.Count] : new GarnetClient[1];

if (clients.Length > 1)
// Initialize connections only when failover is initiated by the primary
if (!isReplicaSession)
{
for (int i = 0; i < endpoints.Count; i++)
var endpoints = hostPort == -1
? currentConfig.GetLocalNodePrimaryEndpoints(includeMyPrimaryFirst: true)
: hostPort == 0 ? currentConfig.GetLocalNodeReplicaEndpoints() : null;
clients = endpoints != null ? new GarnetClient[endpoints.Count] : new GarnetClient[1];

if (clients.Length > 1)
{
clients[i] = new GarnetClient(endpoints[i].Item1, endpoints[i].Item2, clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger);
for (var i = 0; i < endpoints.Count; i++)
{
clients[i] = new GarnetClient(endpoints[i].Item1, endpoints[i].Item2, clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger);
}
}
else
{
clients[0] = new GarnetClient(hostAddress, hostPort, clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger);
}
}
else
{
clients[0] = new GarnetClient(hostAddress, hostPort, clusterProvider.serverOptions.TlsOptions?.TlsClientOptions, authUsername: clusterProvider.ClusterUsername, authPassword: clusterProvider.ClusterPassword, logger: logger);
}

//Timeout deadline
this.failoverStart = DateTimeOffset.UtcNow.Ticks;
this.failoverEnd = failoverStart + this.failoverTimeout.Ticks;
this.status = FailoverStatus.BEGIN_FAILOVER;
// Timeout deadline
this.failoverTimeout = failoverTimeout == default ? TimeSpan.FromSeconds(600) : failoverTimeout;
failoverDeadline = DateTime.UtcNow.Add(failoverTimeout);
status = FailoverStatus.BEGIN_FAILOVER;
}

public void Dispose()
Expand Down
18 changes: 9 additions & 9 deletions libs/cluster/Server/Failover/PrimaryFailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,24 @@ private async Task<GarnetClient> WaitForFirstReplicaSync()
{
if (clients.Length > 1)
{
Task<long>[] tasks = new Task<long>[clients.Length + 1];
var tasks = new Task<long>[clients.Length + 1];

int tcount = 0;
var tcount = 0;
foreach (var _gclient in clients)
tasks[tcount++] = CheckReplicaSync(_gclient);

tasks[clients.Length] = Task.Delay(failoverTimeout).ContinueWith(_ => default(long));
var completedTask = await Task.WhenAny(tasks);

//No replica was able to catch up with primary so timeout
// No replica was able to catch up with primary so timeout
if (completedTask == tasks[clients.Length])
{
logger?.LogError("WaitForReplicasSync timeout");
return null;
}

//Return client for replica that has caught up with replication primary
for (int i = 0; i < tasks.Length; i++)
// Return client for replica that has caught up with replication primary
for (var i = 0; i < tasks.Length; i++)
{
if (completedTask == tasks[i] && tasks[i].Result == clusterProvider.replicationManager.ReplicationOffset)
return clients[i];
Expand All @@ -61,7 +61,7 @@ private async Task<GarnetClient> WaitForFirstReplicaSync()
var timeoutTask = Task.Delay(failoverTimeout, cts.Token);
var completedTask = await Task.WhenAny(syncTask, timeoutTask);

//Replica trying to failover did not caught up on time so timeout
// Replica trying to failover did not caught up on time so timeout
if (completedTask == timeoutTask)
{
logger?.LogError("WaitForFirstReplicaSync timeout");
Expand Down Expand Up @@ -95,16 +95,16 @@ public async Task<bool> BeginAsyncPrimaryFailover()
{
try
{
//Change local node role to suspend any write workload
// Change local node role to suspend any write workload
status = FailoverStatus.ISSUING_PAUSE_WRITES;
var localId = clusterProvider.clusterManager.CurrentConfig.LocalNodeId;
var oldRole = clusterProvider.clusterManager.CurrentConfig.LocalNodeRole;
var replicas = clusterProvider.clusterManager.CurrentConfig.GetReplicaIds(localId);
clusterProvider.clusterManager.TryStopWrites(replicas[0]);
clusterProvider.WaitForConfigTransition();
_ = clusterProvider.WaitForConfigTransition();

status = FailoverStatus.WAITING_FOR_SYNC;
GarnetClient newPrimary = await WaitForFirstReplicaSync();
var newPrimary = await WaitForFirstReplicaSync();
if (newPrimary == null) return false;

status = FailoverStatus.TAKING_OVER_AS_PRIMARY;
Expand Down
Loading

0 comments on commit 47da479

Please sign in to comment.