Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
Add parameters and test for shard case
Browse files Browse the repository at this point in the history
It's the same test as AMB_Basic_Test, except with shard params.
  • Loading branch information
Shannon Joyner committed Dec 7, 2019
1 parent 81180e0 commit aa0b15a
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 19 deletions.
28 changes: 26 additions & 2 deletions Ambrosia/Ambrosia/Initialize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public class AmbrosiaRuntimeParams
public string storageConnectionString;
public long currentVersion;
public long upgradeToVersion;
public long shardID;
public string oldShards;
public string newShards;
}

public static class AmbrosiaRuntimeParms
Expand All @@ -35,6 +38,23 @@ public AmbrosiaNonShardedRuntime()
Runtime = new AmbrosiaRuntime();
}

private long[] ParseLongs(string s)
{
if (s.Trim() == "")
{
return new long[] { };
}
string[] shards = s.Split(',');
long[] ids = new long[shards.Length];

for (int i = 0; i < shards.Length; i++)
{
ids[i] = long.Parse(shards[i]);
}
return ids;

}

public override async Task InitializeAsync(object param)
{
// Workaround because of parameter type limitation in CRA
Expand Down Expand Up @@ -62,7 +82,9 @@ public override async Task InitializeAsync(object param)
p.upgradeToVersion,
ClientLibrary,
AddAsyncInputEndpoint,
AddAsyncOutputEndpoint
AddAsyncOutputEndpoint,
ParseLongs(p.oldShards),
ParseLongs(p.newShards)
);

return;
Expand Down Expand Up @@ -104,7 +126,9 @@ public override async Task InitializeAsync(int shardId, ShardingInfo shardingInf
p.upgradeToVersion,
ClientLibrary,
AddAsyncInputEndpoint,
AddAsyncOutputEndpoint
AddAsyncOutputEndpoint,
new long[0],
new long[0]
);

return;
Expand Down
82 changes: 71 additions & 11 deletions Ambrosia/Ambrosia/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,8 @@ public async Task FromStreamAsync(Stream stream, string otherProcess, string oth
bool _sharded;
internal bool _createService;
long _shardID;
long[] _oldShards;
long[] _newShards;
bool _runningRepro;
long _currentVersion;
long _upgradeToVersion;
Expand Down Expand Up @@ -3903,7 +3905,9 @@ public void Initialize(int serviceReceiveFromPort,
long upgradeToVersion,
CRAClientLibrary coral,
Action<string, IAsyncVertexInputEndpoint> addInput,
Action<string, IAsyncVertexOutputEndpoint> addOutput
Action<string, IAsyncVertexOutputEndpoint> addOutput,
long[] oldShards,
long[] newShards
)
{
InitializeLogWriterStatics();
Expand All @@ -3921,6 +3925,8 @@ Action<string, IAsyncVertexOutputEndpoint> addOutput
{
Console.WriteLine("Ready ...");
}
_oldShards = oldShards;
_newShards = newShards;
_persistLogs = persistLogs;
_activeActive = activeActive;
_newLogTriggerSize = logTriggerSizeMB * 1000000;
Expand Down Expand Up @@ -3992,6 +3998,45 @@ class Program
private static long _logTriggerSizeMB = 1000;
private static int _currentVersion = 0;
private static long _upgradeVersion = -1;
private static long _shardID = -1;
private static string _oldShards = "";
private static string _newShards = "";

private static void DefineVertex(CRAClientLibrary client, string vertexDefinition, bool sharded)
{
Task<CRAErrorCode> result;
if (!sharded)
{
result = client.DefineVertexAsync(vertexDefinition, () => new AmbrosiaNonShardedRuntime());
} else
{
result = client.DefineVertexAsync(vertexDefinition, () => new AmbrosiaShardedRuntime());
}

if (result.GetAwaiter().GetResult() != CRAErrorCode.Success)
{
throw new Exception();
}
}

private static void InstantiateVertex(CRAClientLibrary client, string instanceName, string vertexName, string vertexDefinition, object vertexParameter, long shardID)
{
CRAErrorCode result;
if (shardID == -1)
{
result = client.InstantiateVertexAsync(instanceName, vertexName, vertexDefinition, vertexParameter).GetAwaiter().GetResult();
}
else
{
ConcurrentDictionary<string, int> vertexShards = new ConcurrentDictionary<string, int>();
vertexShards[instanceName] = (int)shardID;
result = client.InstantiateShardedVertex(vertexName, vertexDefinition, vertexParameter, vertexShards);
}
if (result != CRAErrorCode.Success)
{
throw new Exception();
}
}

static void Main(string[] args)
{
Expand All @@ -4005,6 +4050,7 @@ static void Main(string[] args)
_isTestingUpgrade, _serviceReceiveFromPort, _serviceSendToPort);
return;
case LocalAmbrosiaRuntimeModes.AddReplica:
case LocalAmbrosiaRuntimeModes.AddShard:
case LocalAmbrosiaRuntimeModes.RegisterInstance:
if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddReplica)
{
Expand All @@ -4016,6 +4062,7 @@ static void Main(string[] args)
client.DisableArtifactUploading();

var replicaName = $"{_instanceName}{_replicaNumber}";

AmbrosiaRuntimeParams param = new AmbrosiaRuntimeParams();
param.createService = _recoveryMode == AmbrosiaRecoveryModes.A
? (bool?)null
Expand All @@ -4032,13 +4079,13 @@ static void Main(string[] args)
param.serviceLogPath = _serviceLogPath;
param.AmbrosiaBinariesLocation = _binariesLocation;
param.storageConnectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONN_STRING");
param.shardID = _shardID;
param.oldShards = _oldShards;
param.newShards = _newShards;

try
{
if (client.DefineVertexAsync(param.AmbrosiaBinariesLocation, () => new AmbrosiaNonShardedRuntime()).GetAwaiter().GetResult() != CRAErrorCode.Success)
{
throw new Exception();
}
DefineVertex(client, param.AmbrosiaBinariesLocation, _shardID >= 0);

// Workaround because of limitation in parameter serialization in CRA
XmlSerializer xmlSerializer = new XmlSerializer(param.GetType());
Expand All @@ -4048,11 +4095,7 @@ static void Main(string[] args)
xmlSerializer.Serialize(textWriter, param);
serializedParams = textWriter.ToString();
}

if (client.InstantiateVertexAsync(replicaName, param.serviceName, param.AmbrosiaBinariesLocation, serializedParams).GetAwaiter().GetResult() != CRAErrorCode.Success)
{
throw new Exception();
}
InstantiateVertex(client, replicaName, param.serviceName, param.AmbrosiaBinariesLocation, serializedParams, _shardID);
client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaDataInputsName, true, true).Wait();
client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaDataOutputsName, false, true).Wait();
client.AddEndpointAsync(param.serviceName, AmbrosiaRuntime.AmbrosiaControlInputsName, true, true).Wait();
Expand Down Expand Up @@ -4085,6 +4128,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp)
{ "rp|receivePort=", "The service receive from port [REQUIRED].", rp => _serviceReceiveFromPort = int.Parse(rp) },
{ "sp|sendPort=", "The service send to port. [REQUIRED]", sp => _serviceSendToPort = int.Parse(sp) },
{ "l|log=", "The service log path.", l => _serviceLogPath = l },
{"si|shardID=", "The shard ID of the instance", si => _shardID = long.Parse(si) },
};

var helpOption = new OptionSet
Expand Down Expand Up @@ -4118,16 +4162,23 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp)
{ "tu|testingUpgrade", "Is testing upgrade.", u => _isTestingUpgrade = true },
});

var addShardOptionSet = new OptionSet
{
{"os|oldShards=", "Comma separated list of shards to recover from [REQUIRED].", os => _oldShards = os },
{"ns|newShards=", "Comma separated list of new shards being created [REQUIRED].", ns => _newShards = ns }
}.AddMany(registerInstanceOptionSet);

registerInstanceOptionSet = registerInstanceOptionSet.AddMany(helpOption);
addReplicaOptionSet = addReplicaOptionSet.AddMany(helpOption);
debugInstanceOptionSet = debugInstanceOptionSet.AddMany(helpOption);

addShardOptionSet = addShardOptionSet.AddMany(helpOption);

var runtimeModeToOptionSet = new Dictionary<LocalAmbrosiaRuntimeModes, OptionSet>
{
{ LocalAmbrosiaRuntimeModes.RegisterInstance, registerInstanceOptionSet},
{ LocalAmbrosiaRuntimeModes.AddReplica, addReplicaOptionSet},
{ LocalAmbrosiaRuntimeModes.DebugInstance, debugInstanceOptionSet},
{ LocalAmbrosiaRuntimeModes.AddShard, addShardOptionSet },
};

_runtimeMode = default(LocalAmbrosiaRuntimeModes);
Expand Down Expand Up @@ -4160,6 +4211,7 @@ public enum LocalAmbrosiaRuntimeModes
AddReplica,
RegisterInstance,
DebugInstance,
AddShard,
}

public enum AmbrosiaRecoveryModes
Expand Down Expand Up @@ -4194,6 +4246,14 @@ private static void ValidateOptions(OptionSet options, bool shouldShowHelp)
}
}

if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddShard)
{
if (_shardID == -1)
{
errorMessage += "Shard ID is required.\n";
}
}

// handles the case when an upgradeversion is not specified
if (_upgradeVersion == -1)
{
Expand Down
1 change: 1 addition & 0 deletions AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB1.cmp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically
1 change: 1 addition & 0 deletions AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_AMB2.cmp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically
9 changes: 9 additions & 0 deletions AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_ClientJob.cmp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Bytes per RPC Throughput (GB/sec)
*X* 32768 0.0538327740000449
Service Received 1024 MB so far
*X* 16384 0.0709862409498754
Service Received 2048 MB so far
*X* 8192 0.0695878693925042
Service Received 3072 MB so far
Bytes received: 3221225472
DONE
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Bytes per RPC Throughput (GB/sec)
*X* 32768 0.00943044129323776
Service Received 1024 MB so far
*X* 16384 0.00989352845861985
Service Received 2048 MB so far
*X* 8192 0.00993638850272688
Service Received 3072 MB so far
Bytes received: 3221225472
DONE
18 changes: 18 additions & 0 deletions AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server.cmp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
*X* At checkpoint, received 0 messages
*X* becoming primary
*X* Server in Entry Point
*X* I'm healthy after 3000 checks at time:10/24/2018 1:15:13 PM
*X* I'm healthy after 6000 checks at time:10/24/2018 1:15:19 PM
*X* At checkpoint, received 30564 messages
Received 1024 MB so far
*X* I'm healthy after 9000 checks at time:10/24/2018 1:15:25 PM
*X* I'm healthy after 12000 checks at time:10/24/2018 1:15:31 PM
*X* At checkpoint, received 89584 messages
Received 2048 MB so far
*X* I'm healthy after 15000 checks at time:10/24/2018 1:15:37 PM
*X* I'm healthy after 18000 checks at time:10/24/2018 1:15:43 PM
*X* At checkpoint, received 202934 messages
*X* I'm healthy after 21000 checks at time:10/24/2018 1:15:49 PM
Received 3072 MB so far
Bytes received: 3221225472
DONE
18 changes: 18 additions & 0 deletions AmbrosiaTest/AmbrosiaTest/Cmp/shardbasictest_Server_Verify.cmp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
*X* Server in Entry Point
*X* I'm healthy after 3000 checks at time:10/24/2018 1:20:10 PM
*X* I'm healthy after 6000 checks at time:10/24/2018 1:20:16 PM
Received 1024 MB so far
*X* I'm healthy after 9000 checks at time:10/24/2018 1:20:22 PM
*X* I'm healthy after 12000 checks at time:10/24/2018 1:20:28 PM
Received 2048 MB so far
*X* I'm healthy after 15000 checks at time:10/24/2018 1:20:34 PM
*X* I'm healthy after 18000 checks at time:10/24/2018 1:20:40 PM
*X* I'm healthy after 21000 checks at time:10/24/2018 1:20:46 PM
*X* I'm healthy after 24000 checks at time:10/24/2018 1:20:52 PM
*X* I'm healthy after 27000 checks at time:10/24/2018 1:20:58 PM
*X* I'm healthy after 30000 checks at time:10/24/2018 1:21:04 PM
Received 3072 MB so far
Bytes received: 3221225472
DONE
*X* I'm healthy after 33000 checks at time:10/24/2018 1:21:10 PM
*X* I'm healthy after 36000 checks at time:10/24/2018 1:21:16 PM
87 changes: 87 additions & 0 deletions AmbrosiaTest/AmbrosiaTest/EndToEndStressIntegration_Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,93 @@ public void AMB_Basic_Test()
MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true,AMB1.AMB_Version);
}

[TestMethod]
public void AMB_Shard_Basic_Test()
{
// Test that one shard per server and client works
string testName = "shardbasictest";
string clientJobName = testName + "clientjob";
string serverName = testName + "server";
string ambrosiaLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\";
string byteSize = "3221225472";

Utilities MyUtils = new Utilities();

//AMB1 - Job
string logOutputFileName_AMB1 = testName + "_AMB1.log";
AMB_Settings AMB1 = new AMB_Settings
{
AMB_ServiceName = clientJobName,
AMB_PortAppReceives = "1000",
AMB_PortAMBSends = "1001",
AMB_ServiceLogPath = ambrosiaLogDir,
AMB_CreateService = "A",
AMB_PauseAtStart = "N",
AMB_PersistLogs = "Y",
AMB_NewLogTriggerSize = "1000",
AMB_ActiveActive = "N",
AMB_Version = "0",
AMB_ShardID = "0",
};
MyUtils.CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.RegisterInstance);

//AMB2
string logOutputFileName_AMB2 = testName + "_AMB2.log";
AMB_Settings AMB2 = new AMB_Settings
{
AMB_ServiceName = serverName,
AMB_PortAppReceives = "2000",
AMB_PortAMBSends = "2001",
AMB_ServiceLogPath = ambrosiaLogDir,
AMB_CreateService = "A",
AMB_PauseAtStart = "N",
AMB_PersistLogs = "Y",
AMB_NewLogTriggerSize = "1000",
AMB_ActiveActive = "N",
AMB_Version = "0",
AMB_ShardID = "0",
};
MyUtils.CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.RegisterInstance);

//ImmCoord1
string logOutputFileName_ImmCoord1 = testName + "_ImmCoord1.log";
int ImmCoordProcessID1 = MyUtils.StartImmCoord(clientJobName, 1500, logOutputFileName_ImmCoord1);

//ImmCoord2
string logOutputFileName_ImmCoord2 = testName + "_ImmCoord2.log";
int ImmCoordProcessID2 = MyUtils.StartImmCoord(serverName, 2500, logOutputFileName_ImmCoord2);

//Client Job Call
string logOutputFileName_ClientJob = testName + "_ClientJob.log";
int clientJobProcessID = MyUtils.StartPerfClientJob("1001", "1000", clientJobName, serverName, "32768", "3", logOutputFileName_ClientJob);

//Server Call
string logOutputFileName_Server = testName + "_Server.log";
int serverProcessID = MyUtils.StartPerfServer("2001", "2000", clientJobName, serverName, logOutputFileName_Server, 1, false);

//Delay until client is done - also check Server just to make sure
bool pass = MyUtils.WaitForProcessToFinish(logOutputFileName_ClientJob, byteSize, 15, false, testName, true); // number of bytes processed
pass = MyUtils.WaitForProcessToFinish(logOutputFileName_Server, byteSize, 15, false, testName, true);

// Stop things so file is freed up and can be opened in verify
MyUtils.KillProcess(clientJobProcessID);
MyUtils.KillProcess(serverProcessID);
MyUtils.KillProcess(ImmCoordProcessID1);
MyUtils.KillProcess(ImmCoordProcessID2);

//Verify AMB
MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB1);
MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB2);

// Verify Client
MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_ClientJob);

// Verify Server
MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_Server);

// Verify integrity of Ambrosia logs by replaying
MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true, AMB1.AMB_Version, shardID: 1);
}

//** This test does 5 rounds of messages starting with 64MB and cutting in half each time
//** Basically same as the basic test but passing giant message - the difference is in the job.exe call and that is it
Expand Down
Loading

0 comments on commit aa0b15a

Please sign in to comment.