Skip to content

Commit

Permalink
Merge pull request #322 from microsoft/dev
Browse files Browse the repository at this point in the history
merge dev into main
  • Loading branch information
markusheiliger authored Feb 14, 2022
2 parents d0b6e8b + 0bf357e commit d644a83
Show file tree
Hide file tree
Showing 37 changed files with 314 additions and 19,277 deletions.
8 changes: 5 additions & 3 deletions src/TeamCloud.Data.Expanders/ComponentTaskExpander.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public async Task ExpandAsync(ComponentTask document)

private async Task<string> GetEventsAsync(ComponentTask document)
{
var output = default(string);

if (document.TaskState.IsActive() && AzureResourceIdentifier.TryParse(document.ResourceId, out var resourceId))
{
try
Expand All @@ -83,8 +85,8 @@ private async Task<string> GetEventsAsync(ComponentTask document)

if (containerGroup is not null)
{
return await containerGroup
.GetEventContentAsync("runner")
output = await containerGroup
.GetEventContentAsync(document.Id)
.ConfigureAwait(false);
}
}
Expand All @@ -94,7 +96,7 @@ private async Task<string> GetEventsAsync(ComponentTask document)
}
}

return default;
return output;
}

private async Task<string> GetOutputAsync(ComponentTask document)
Expand Down
26 changes: 19 additions & 7 deletions src/TeamCloud.Data/IDocumentSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,36 @@
*/

using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading.Tasks;
using TeamCloud.Model.Data.Core;

namespace TeamCloud.Data;

public abstract class DocumentSubscription : IDocumentSubscription
{
private static readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, MethodInfo>> HandleMethodCache = new ConcurrentDictionary<Type,ConcurrentDictionary<Type, MethodInfo>>();

private MethodInfo GetHandleMethod(IContainerDocument containerDocument) => HandleMethodCache
.GetOrAdd(GetType(), _ => new ConcurrentDictionary<Type, MethodInfo>())
.GetOrAdd(containerDocument.GetType(), containerDocumentType =>
{
var subscriberInterface = typeof(IDocumentSubscription<>)
.MakeGenericType(containerDocument.GetType());
if (subscriberInterface.IsAssignableFrom(GetType()))
return subscriberInterface.GetMethod(nameof(HandleAsync), new Type[] { containerDocument.GetType(), typeof(DocumentSubscriptionEvent) });
return null;
});

public virtual bool CanHandle(IContainerDocument containerDocument)
{
if (containerDocument is null)
throw new ArgumentNullException(nameof(containerDocument));

return typeof(IDocumentSubscription<>).MakeGenericType(containerDocument.GetType()).IsAssignableFrom(GetType());
return GetHandleMethod(containerDocument) is not null;
}

public virtual Task HandleAsync(IContainerDocument containerDocument, DocumentSubscriptionEvent subscriptionEvent)
Expand All @@ -25,12 +42,7 @@ public virtual Task HandleAsync(IContainerDocument containerDocument, DocumentSu
throw new ArgumentNullException(nameof(containerDocument));

if (CanHandle(containerDocument))
{
return (Task)typeof(IDocumentExpander<>)
.MakeGenericType(containerDocument.GetType())
.GetMethod(nameof(HandleAsync), new Type[] { containerDocument.GetType(), typeof(DocumentSubscriptionEvent) })
.Invoke(this, new object[] { containerDocument, subscriptionEvent });
}
return (Task)GetHandleMethod(containerDocument).Invoke(this, new object[] { containerDocument, subscriptionEvent });

throw new NotImplementedException($"Missing document subscription implementation IDocumentSubscription<{containerDocument.GetType().Name}> at {GetType()}");
}
Expand Down
2 changes: 1 addition & 1 deletion src/TeamCloud.Orchestrator/API/CommandTrigger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ await commandAuditWriter
else
{
commandResult = await commandHandler
.HandleAsync(command, commandCollector, durableClient, null, log ?? NullLogger.Instance)
.HandleAsync(command, commandCollector, null, log ?? NullLogger.Instance)
.ConfigureAwait(false);

if (!commandResult.RuntimeStatus.IsFinal())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

namespace TeamCloud.Orchestrator.Command.Activities;

public sealed class CommandCollectActivity
public sealed class CommandEnqueueActivity
{
[FunctionName(nameof(CommandCollectActivity))]
[FunctionName(nameof(CommandEnqueueActivity))]
public async Task RunActivity(
[ActivityTrigger] IDurableActivityContext activityContext,
[Queue(CommandHandler.ProcessorQueue)] IAsyncCollector<ICommand> commandCollector,
Expand All @@ -40,7 +40,7 @@ await commandCollector
}
catch (Exception exc)
{
log.LogError(exc, $"Failed to collect command: {exc.Message}");
log.LogError(exc, $"Failed to enqeueu command: {exc.Message}");

throw exc.AsSerializable();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/

using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using TeamCloud.Serialization;

namespace TeamCloud.Orchestrator.Command.Activities;

public sealed class CommandStatusActivity
{
[FunctionName(nameof(CommandStatusActivity))]
public Task<DurableOrchestrationStatus> RunActivity(
[ActivityTrigger] IDurableActivityContext activityContext,
[DurableClient] IDurableClient orchestrationClient,
ILogger log)
{
if (activityContext is null)
throw new ArgumentNullException(nameof(activityContext));

if (orchestrationClient is null)
throw new ArgumentNullException(nameof(orchestrationClient));

if (log is null)
throw new ArgumentNullException(nameof(log));

try
{
var input = activityContext.GetInput<Input>();

return orchestrationClient
.GetStatusAsync(input.CommandId.ToString(), showHistory: input.ShowHistory, showHistoryOutput: input.ShowHistoryOutput, showInput: input.ShowInput);
}
catch (Exception exc)
{
log.LogError(exc, $"Failed to enqeueu command: {exc.Message}");

throw exc.AsSerializable();
}
}

internal struct Input
{
public Guid CommandId { get; set; }

public bool ShowHistory { get; set; }

public bool ShowHistoryOutput { get; set; }

public bool ShowInput { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
using TeamCloud.Serialization;

namespace TeamCloud.Orchestrator.Command.Activities;

public sealed class CommandTerminateActivity
{
[FunctionName(nameof(CommandTerminateActivity))]
public async Task RunActivity(
[ActivityTrigger] IDurableActivityContext activityContext,
[DurableClient] IDurableClient orchestrationClient,
ILogger log)
{
if (activityContext is null)
throw new ArgumentNullException(nameof(activityContext));

if (orchestrationClient is null)
throw new ArgumentNullException(nameof(orchestrationClient));

if (log is null)
throw new ArgumentNullException(nameof(log));

try
{
var input = activityContext.GetInput<Input>();

await orchestrationClient
.TerminateAsync(input.CommandId.ToString(), input.Reason ?? string.Empty)
.ConfigureAwait(false);
}
catch (Exception exc)
{
log.LogError(exc, $"Failed to enqeueu command: {exc.Message}");

throw exc.AsSerializable();
}
}

internal struct Input
{
public Guid CommandId { get; set; }

public string Reason { get; set; }
}
}
18 changes: 13 additions & 5 deletions src/TeamCloud.Orchestrator/Command/CommandCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ public sealed class CommandCollector : IAsyncCollector<ICommand>
private readonly ICommand commandContext;
private readonly IDurableOrchestrationContext orchestrationContext;

public CommandCollector(IAsyncCollector<ICommand> collector, ICommand commandContext = null, IDurableOrchestrationContext orchestrationContext = null)
public CommandCollector(IAsyncCollector<ICommand> collector, ICommand commandContext = null)
{
this.collector = collector ?? throw new ArgumentNullException(nameof(collector));
this.commandContext = commandContext;
this.orchestrationContext = orchestrationContext;
}
public CommandCollector(IDurableOrchestrationContext orchestrationContext, ICommand commandContext = null)
{
this.orchestrationContext = orchestrationContext ?? throw new ArgumentNullException(nameof(orchestrationContext));
this.commandContext = commandContext;
}

public async Task AddAsync(ICommand item, CancellationToken cancellationToken = default)
Expand All @@ -33,18 +37,22 @@ public async Task AddAsync(ICommand item, CancellationToken cancellationToken =

item.ParentId = commandContext?.CommandId ?? Guid.Empty;

if (orchestrationContext is null)
if (collector is not null)
{
await collector
.AddAsync(item, cancellationToken)
.ConfigureAwait(false);
}
else
else if (orchestrationContext is not null)
{
await orchestrationContext
.CallActivityAsync(nameof(CommandCollectActivity), new CommandCollectActivity.Input() { Command = item })
.CallActivityAsync(nameof(CommandEnqueueActivity), new CommandEnqueueActivity.Input() { Command = item })
.ConfigureAwait(true);
}
else
{
throw new NotSupportedException();
}
}

public Task FlushAsync(CancellationToken cancellationToken = default)
Expand Down
20 changes: 20 additions & 0 deletions src/TeamCloud.Orchestrator/Command/CommandExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,32 @@
using System;
using System.Threading.Tasks;
using TeamCloud.Model.Commands.Core;
using TeamCloud.Model.Data;
using TeamCloud.Orchestrator.Command.Activities;
using TeamCloud.Serialization;

namespace TeamCloud.Orchestrator.Command;

internal static class CommandExtensions
{
internal static Task TerminateCommandAsync(this IDurableOrchestrationContext orchestrationContext, ComponentTask componentTask, string reason = null)
=> orchestrationContext.TerminateCommandAsync(Guid.Parse(componentTask.Id), reason);

internal static Task TerminateCommandAsync(this IDurableOrchestrationContext orchestrationContext, ICommand command, string reason = null)
=> orchestrationContext.TerminateCommandAsync(command.CommandId, reason);

internal static Task TerminateCommandAsync(this IDurableOrchestrationContext orchestrationContext, Guid commandId, string reason = null)
=> orchestrationContext.CallActivityAsync(nameof(CommandTerminateActivity), new CommandTerminateActivity.Input() { CommandId = commandId, Reason = reason });

internal static Task<DurableOrchestrationStatus> GetCommandStatusAsync(this IDurableOrchestrationContext orchestrationContext, ComponentTask componentTask, bool showHistory = false, bool showHistoryOutput = false, bool showInput = true)
=> orchestrationContext.GetCommandStatusAsync(Guid.Parse(componentTask.Id), showHistory, showHistoryOutput, showInput);

internal static Task<DurableOrchestrationStatus> GetCommandStatusAsync(this IDurableOrchestrationContext orchestrationContext, ICommand command, bool showHistory = false, bool showHistoryOutput = false, bool showInput = true)
=> orchestrationContext.GetCommandStatusAsync(command.CommandId, showHistory, showHistoryOutput, showInput);

internal static Task<DurableOrchestrationStatus> GetCommandStatusAsync(this IDurableOrchestrationContext orchestrationContext, Guid commandId, bool showHistory = false, bool showHistoryOutput = false, bool showInput = true)
=> orchestrationContext.CallActivityAsync<DurableOrchestrationStatus>(nameof(CommandStatusActivity), new CommandStatusActivity.Input() { CommandId = commandId, ShowHistory = showHistory, ShowHistoryOutput = showHistoryOutput, ShowInput = showInput });

internal static async Task<ICommand> GetCommandAsync(this IDurableClient durableClient, Guid commandId)
{
if (durableClient is null)
Expand Down
36 changes: 23 additions & 13 deletions src/TeamCloud.Orchestrator/Command/CommandHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,41 @@
*/

using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading.Tasks;
using Jose;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Microsoft.VisualStudio.Services.Common;
using TeamCloud.Model.Commands.Core;

namespace TeamCloud.Orchestrator.Command;

public abstract class CommandHandler<TCommand> : CommandHandler, ICommandHandler<TCommand>
where TCommand : class, ICommand
{
public abstract Task<ICommandResult> HandleAsync(TCommand command, IAsyncCollector<ICommand> commandQueue, IDurableClient orchestrationClient, IDurableOrchestrationContext orchestrationContext, ILogger log);
public abstract Task<ICommandResult> HandleAsync(TCommand command, IAsyncCollector<ICommand> commandQueue, IDurableOrchestrationContext orchestrationContext, ILogger log);
}

public abstract class CommandHandler : ICommandHandler
{
private static readonly ConcurrentDictionary<Type, ConcurrentDictionary<Type, MethodInfo>> HandleMethodCache = new ConcurrentDictionary<Type, ConcurrentDictionary<Type, MethodInfo>>();

private MethodInfo GetHandleMethod(ICommand command) => HandleMethodCache
.GetOrAdd(GetType(), _ => new ConcurrentDictionary<Type, MethodInfo>())
.GetOrAdd(command.GetType(), commandType =>
{
var handlerInterface = typeof(ICommandHandler<>)
.MakeGenericType(commandType);
if (handlerInterface.IsAssignableFrom(GetType()))
return handlerInterface.GetMethod(nameof(HandleAsync), new Type[] { command.GetType(), typeof(IAsyncCollector<ICommand>), typeof(IDurableOrchestrationContext), typeof(ILogger) });
return null;
});

public const string ProcessorQueue = "command-processor";
public const string MonitorQueue = "command-monitor";

Expand All @@ -30,25 +49,16 @@ public virtual bool CanHandle(ICommand command)
if (command is null)
throw new ArgumentNullException(nameof(command));

return typeof(ICommandHandler<>)
.MakeGenericType(command.GetType())
.IsAssignableFrom(GetType());
return GetHandleMethod(command) is not null;
}

public virtual Task<ICommandResult> HandleAsync(ICommand command, IAsyncCollector<ICommand> commandQueue, IDurableClient orchestrationClient, IDurableOrchestrationContext orchestrationContext, ILogger log)
public virtual Task<ICommandResult> HandleAsync(ICommand command, IAsyncCollector<ICommand> commandQueue, IDurableOrchestrationContext orchestrationContext, ILogger log)
{
if (command is null)
throw new ArgumentNullException(nameof(command));

if (CanHandle(command))
{
var handleMethod = typeof(ICommandHandler<>)
.MakeGenericType(command.GetType())
.GetMethod(nameof(HandleAsync), new Type[] { command.GetType(), typeof(IAsyncCollector<ICommand>), typeof(IDurableClient), typeof(IDurableOrchestrationContext), typeof(ILogger) });

return (Task<ICommandResult>)handleMethod
.Invoke(this, new object[] { command, commandQueue, orchestrationClient, orchestrationContext, log });
}
return (Task<ICommandResult>)GetHandleMethod(command).Invoke(this, new object[] { command, commandQueue, orchestrationContext, log });

throw new NotImplementedException($"Missing orchestrator command handler implementation ICommandHandler<{command.GetTypeName(prettyPrint: true)}> at {GetType()}");
}
Expand Down
7 changes: 1 addition & 6 deletions src/TeamCloud.Orchestrator/Command/CommandOrchestration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,8 @@ public CommandOrchestration(ICommandHandler[] commandHandlers)
[FunctionName(nameof(CommandOrchestration))]
public async Task Execute(
[OrchestrationTrigger] IDurableOrchestrationContext orchestratorContext,
[DurableClient] IDurableClient orchestratorClient,
[Queue(CommandHandler.ProcessorQueue)] IAsyncCollector<ICommand> commandQueue,
ILogger log)
{
if (orchestratorClient is null)
throw new ArgumentNullException(nameof(orchestratorClient));

if (orchestratorContext is null)
throw new ArgumentNullException(nameof(orchestratorContext));

Expand All @@ -60,7 +55,7 @@ await orchestratorContext
.ConfigureAwait(true);

commandResult = await commandHandler
.HandleAsync(command, new CommandCollector(commandQueue, command, orchestratorContext), orchestratorClient, orchestratorContext, log)
.HandleAsync(command, new CommandCollector(orchestratorContext, command), orchestratorContext, log)
.ConfigureAwait(true);

if (commandResult is null)
Expand Down
Loading

0 comments on commit d644a83

Please sign in to comment.