diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml
index b82583bee..b4ea3dc7b 100644
--- a/.github/workflows/itests.yml
+++ b/.github/workflows/itests.yml
@@ -41,7 +41,7 @@ jobs:
install-version: '7.0.x'
env:
NUPKG_OUTDIR: bin/Release/nugets
- GOVER: 1.17
+ GOVER: 1.19
GOOS: linux
GOARCH: amd64
GOPROXY: https://proxy.golang.org
@@ -49,7 +49,7 @@ jobs:
DAPR_RUNTIME_VER: 1.8.0
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
DAPR_CLI_REF: ''
- DAPR_REF: ''
+ DAPR_REF: '25213183ab8fd20f85f74591ee6ec3a00822adf7'
steps:
- name: Set up Dapr CLI
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}
diff --git a/src/Dapr.Actors/Client/ActorProxyFactory.cs b/src/Dapr.Actors/Client/ActorProxyFactory.cs
index 9fd5edddb..2893d76fb 100644
--- a/src/Dapr.Actors/Client/ActorProxyFactory.cs
+++ b/src/Dapr.Actors/Client/ActorProxyFactory.cs
@@ -17,7 +17,8 @@ namespace Dapr.Actors.Client
using System.Net.Http;
using Dapr.Actors.Builder;
using Dapr.Actors.Communication.Client;
-
+ using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+ using Grpc.Net.Client;
///
/// Represents a factory class to create a proxy to the remote actor objects.
///
@@ -66,7 +67,25 @@ public ActorProxy Create(ActorId actorId, string actorType, ActorProxyOptions op
options ??= this.DefaultOptions;
var actorProxy = new ActorProxy();
- var daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout);
+ IDaprInteractor daprInteractor;
+ if (options.UseGrpc) {
+ var grpcEndpoint = new Uri(options.GrpcEndpoint);
+ if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https")
+ {
+ throw new InvalidOperationException("The gRPC endpoint must use http or https.");
+ }
+
+ if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp))
+ {
+ // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel.
+ AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
+ }
+ var channel = GrpcChannel.ForAddress(options.GrpcEndpoint, options.GrpcChannelOptions);
+ var client = new Autogenerated.Dapr.DaprClient(channel);
+ daprInteractor = new DaprGrpcInteractor(channel, client, options.DaprApiToken);
+ } else {
+ daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout);
+ }
var nonRemotingClient = new ActorNonRemotingClient(daprInteractor);
actorProxy.Initialize(nonRemotingClient, actorId, actorType, options);
@@ -77,8 +96,25 @@ public ActorProxy Create(ActorId actorId, string actorType, ActorProxyOptions op
public object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string actorType, ActorProxyOptions options = null)
{
options ??= this.DefaultOptions;
+ IDaprInteractor daprInteractor;
+ if (options.UseGrpc) {
+ var grpcEndpoint = new Uri(options.GrpcEndpoint);
+ if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https")
+ {
+ throw new InvalidOperationException("The gRPC endpoint must use http or https.");
+ }
- var daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout);
+ if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp))
+ {
+ // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel.
+ AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
+ }
+ var channel = GrpcChannel.ForAddress(options.GrpcEndpoint, options.GrpcChannelOptions);
+ var client = new Autogenerated.Dapr.DaprClient(channel);
+ daprInteractor = new DaprGrpcInteractor(channel, client, options.DaprApiToken);
+ } else {
+ daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout);
+ }
var remotingClient = new ActorRemotingClient(daprInteractor);
var proxyGenerator = ActorCodeBuilder.GetOrCreateProxyGenerator(actorInterfaceType);
var actorProxy = proxyGenerator.CreateActorProxy();
diff --git a/src/Dapr.Actors/Client/ActorProxyOptions.cs b/src/Dapr.Actors/Client/ActorProxyOptions.cs
index 808605c70..dc7213358 100644
--- a/src/Dapr.Actors/Client/ActorProxyOptions.cs
+++ b/src/Dapr.Actors/Client/ActorProxyOptions.cs
@@ -15,6 +15,7 @@ namespace Dapr.Actors.Client
{
using System;
using System.Text.Json;
+ using Grpc.Net.Client;
///
/// The class containing customizable options for how the Actor Proxy is initialized.
@@ -58,9 +59,29 @@ public JsonSerializerOptions JsonSerializerOptions
///
public string HttpEndpoint { get; set; } = DaprDefaults.GetDefaultHttpEndpoint();
+ ///
+ /// Gets or sets the Grpc endpoint URI used to communicate with the Dapr sidecar.
+ ///
+ ///
+ /// The URI endpoint to use for Grpc calls to the Dapr runtime. The default value will be
+ /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the
+ /// DAPR_GRPC_PORT environment variable.
+ ///
+ ///
+ public string GrpcEndpoint { get; set; } = DaprDefaults.GetDefaultGrpcEndpoint();
+
///
/// The timeout allowed for an actor request. Can be set to System.Threading.Timeout.InfiniteTimeSpan to disable any timeouts.
///
public TimeSpan? RequestTimeout { get; set; } = null;
+ ///
+ /// Option to use GRPC or HTTP
+ ///
+ public bool UseGrpc { get; set; } = false;
+
+ ///
+ /// Options for grpc channel
+ ///
+ public GrpcChannelOptions GrpcChannelOptions { get; set; } = new GrpcChannelOptions(){ThrowOperationCanceledOnCancellation = true,};
}
}
diff --git a/src/Dapr.Actors/DaprGrpcInteractor.cs b/src/Dapr.Actors/DaprGrpcInteractor.cs
new file mode 100644
index 000000000..bd3799722
--- /dev/null
+++ b/src/Dapr.Actors/DaprGrpcInteractor.cs
@@ -0,0 +1,427 @@
+// ------------------------------------------------------------------------
+// Copyright 2023 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr.Actors
+{
+ using System.Collections.Generic;
+ using System.Globalization;
+ using System.IO;
+ using System.Linq;
+ using System.Text;
+ using System.Text.Json;
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Dapr.Actors.Communication;
+ using Dapr.Actors.Resources;
+ using System.Xml;
+ using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+ using Grpc.Core;
+ using Google.Protobuf;
+ using Dapr.Actors.Runtime;
+ using Grpc.Net.Client;
+ using Google.Protobuf.WellKnownTypes;
+
+ ///
+ /// Class to interact with Dapr runtime over grpc.
+ ///
+ internal class DaprGrpcInteractor : IDaprInteractor
+ {
+ private readonly JsonSerializerOptions jsonSerializerOptions = JsonSerializerDefaults.Web;
+ private bool disposed;
+ private string daprApiToken;
+ private readonly Autogenerated.Dapr.DaprClient client;
+ internal Autogenerated.Dapr.DaprClient Client => client;
+ private readonly GrpcChannel channel;
+
+ private const string EXCEPTION_HEADER_TAG = "b:KeyValueOfstringbase64Binary";
+
+ public DaprGrpcInteractor(
+ GrpcChannel channel,
+ Autogenerated.Dapr.DaprClient inner,
+ string apiToken)
+ {
+ this.channel = channel;
+ this.client = inner;
+ this.daprApiToken = apiToken;
+ }
+
+ public async Task GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default)
+ {
+ var request = new Autogenerated.GetActorStateRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ Key = keyName,
+ };
+ var options = CreateCallOptions(cancellationToken);
+
+ Autogenerated.GetActorStateResponse response = new Autogenerated.GetActorStateResponse();
+ try
+ {
+ response = await client.GetActorStateAsync(request, options);
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("GetActorState operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+ return response.Data.ToStringUtf8();
+ }
+
+ public async Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default, List grpcData = default)
+ {
+ var request = new Autogenerated.ExecuteActorStateTransactionRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ };
+ request.Operations.AddRange(grpcData);
+ var options = CreateCallOptions(cancellationToken);
+
+ try
+ {
+ await client.ExecuteActorStateTransactionAsync(request, options);
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("SaveStateTransactionallyAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+ }
+
+ public async Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, IActorStateSerializer actorStateSerializer, JsonSerializerOptions jsonSerializerOptions, CancellationToken cancellationToken = default)
+ {
+ List grpcOps = new List();
+
+ foreach (var stateChange in stateChanges)
+ {
+ var operation = this.GetDaprStateOperation(stateChange.ChangeKind);
+ var op = new Autogenerated.TransactionalActorStateOperation()
+ {
+ OperationType = operation,
+ };
+
+ switch (stateChange.ChangeKind)
+ {
+ case StateChangeKind.Remove:
+ break;
+ case StateChangeKind.Add:
+ case StateChangeKind.Update:
+ op.Key = stateChange.StateName;
+ if (actorStateSerializer != null)
+ {
+
+ var buffer = ByteString.CopyFrom(actorStateSerializer.Serialize(stateChange.Type, stateChange.Value));
+ op.Value = new Any()
+ {
+ Value = buffer,
+ };
+ }
+ else
+ {
+ var buffer = ByteString.CopyFrom(JsonSerializer.SerializeToUtf8Bytes(stateChange.Value, stateChange.Type, jsonSerializerOptions));
+ op.Value = new Any()
+ {
+ Value = buffer,
+ };
+
+ }
+ break;
+ default:
+ break;
+ }
+ grpcOps.Add(op);
+ }
+
+ await this.SaveStateTransactionallyAsync(actorType, actorId, null, cancellationToken, grpcOps);
+ }
+
+ public async Task InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default)
+ {
+ var requestMessageHeader = remotingRequestRequestMessage.GetHeader();
+
+ var actorId = requestMessageHeader.ActorId.ToString();
+ var methodName = requestMessageHeader.MethodName;
+ var actorType = requestMessageHeader.ActorType;
+ var interfaceId = requestMessageHeader.InterfaceId;
+
+ var serializedHeader = serializersManager.GetHeaderSerializer()
+ .SerializeRequestHeader(remotingRequestRequestMessage.GetHeader());
+
+ var msgBodySeriaizer = serializersManager.GetRequestMessageBodySerializer(interfaceId);
+ var serializedMsgBody = msgBodySeriaizer.Serialize(remotingRequestRequestMessage.GetBody());
+
+ var request = new Autogenerated.InvokeActorRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ Method = methodName,
+ };
+
+ if (serializedMsgBody != null)
+ {
+ request.Data = ByteString.CopyFrom(serializedMsgBody);
+ }
+
+ var options = CreateCallOptions(cancellationToken);
+
+ request.Metadata.Add(Constants.RequestHeaderName, Encoding.UTF8.GetString(serializedHeader, 0, serializedHeader.Length));
+
+ var reentrancyId = ActorReentrancyContextAccessor.ReentrancyContext;
+ if (reentrancyId != null)
+ {
+ request.Metadata.Add(Constants.ReentrancyRequestHeaderName, reentrancyId);
+ }
+
+ Autogenerated.InvokeActorResponse response = new Autogenerated.InvokeActorResponse();
+ try
+ {
+ response = await client.InvokeActorAsync(request, options);
+
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("InvokeActorAsync operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+
+ IActorResponseMessageHeader actorResponseMessageHeader = null;
+ IActorResponseMessageBody actorResponseMessageBody = null;
+ if (response != null)
+ {
+ var responseMessageBody = new MemoryStream(response.Data.ToArray());
+
+ var responseBodySerializer = serializersManager.GetResponseMessageBodySerializer(interfaceId);
+ try {
+ actorResponseMessageBody = responseBodySerializer.Deserialize(responseMessageBody);
+ }
+ catch
+ {
+ var isDeserialzied =
+ ActorInvokeException.ToException(
+ responseMessageBody,
+ out var remoteMethodException);
+ if (isDeserialzied)
+ {
+ throw new ActorMethodInvocationException(
+ "Remote Actor Method Exception, DETAILS: " + remoteMethodException.Message,
+ remoteMethodException,
+ false /* non transient */);
+ }
+ else
+ {
+ throw new ActorInvokeException(remoteMethodException.GetType().FullName, string.Format(
+ CultureInfo.InvariantCulture,
+ SR.ErrorDeserializationFailure,
+ remoteMethodException.ToString()));
+ }
+ }
+
+ }
+
+ return new ActorResponseMessage(actorResponseMessageHeader, actorResponseMessageBody);
+ }
+
+ private string GetExceptionDetails(string header) {
+ XmlDocument xmlHeader = new XmlDocument();
+ xmlHeader.LoadXml(header);
+ XmlNodeList exceptionValueXML = xmlHeader.GetElementsByTagName(EXCEPTION_HEADER_TAG);
+ string exceptionDetails = "";
+ if (exceptionValueXML != null && exceptionValueXML.Item(1) != null)
+ {
+ exceptionDetails = exceptionValueXML.Item(1).LastChild.InnerText;
+ }
+ var base64EncodedBytes = System.Convert.FromBase64String(exceptionDetails);
+ return Encoding.UTF8.GetString(base64EncodedBytes);
+ }
+
+ public async Task InvokeActorMethodWithoutRemotingAsync(string actorType, string actorId, string methodName, string jsonPayload, CancellationToken cancellationToken = default)
+ {
+ var request = new Autogenerated.InvokeActorRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ Method = methodName,
+ };
+
+ if (jsonPayload != null)
+ {
+ request.Data = ByteString.CopyFromUtf8(jsonPayload);
+ }
+
+ var options = CreateCallOptions(cancellationToken);
+
+ var reentrancyId = ActorReentrancyContextAccessor.ReentrancyContext;
+ if (reentrancyId != null)
+ {
+ options.Headers.Add(Constants.ReentrancyRequestHeaderName, reentrancyId);
+ }
+
+ Autogenerated.InvokeActorResponse response = new Autogenerated.InvokeActorResponse();
+ try
+ {
+ response = await client.InvokeActorAsync(request, options);
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("InvokeActor operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+ return new MemoryStream(response.Data.ToArray());
+ }
+
+ public async Task RegisterReminderAsync(string actorType, string actorId, string reminderName, string data, CancellationToken cancellationToken = default)
+ {
+
+ var reminderdata = await ReminderInfo.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(data)));
+
+ var request = new Autogenerated.RegisterActorReminderRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ Name = reminderName,
+ DueTime = reminderdata.DueTime != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(reminderdata.DueTime) : "",
+ Ttl = reminderdata.Ttl != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(reminderdata.Ttl) : "",
+ Period = reminderdata.Period != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(reminderdata.Period) : "",
+ Data = ByteString.CopyFrom(reminderdata.Data),
+ };
+ var options = CreateCallOptions(cancellationToken);
+
+ try
+ {
+ await client.RegisterActorReminderAsync(request, options);
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("RegisterReminde operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+ }
+
+ public async Task UnregisterReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default)
+ {
+
+ var request = new Autogenerated.UnregisterActorReminderRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ Name = reminderName,
+ };
+ var options = CreateCallOptions(cancellationToken);
+
+ try
+ {
+ await client.UnregisterActorReminderAsync(request, options);
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("UnregisterReminder operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+ }
+
+ #pragma warning disable 0618
+ public async Task RegisterTimerAsync(string actorType, string actorId, string timerName, string data, CancellationToken cancellationToken = default)
+ {
+ var timerdata = JsonSerializer.Deserialize(data, jsonSerializerOptions);
+
+ var request = new Autogenerated.RegisterActorTimerRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ Name = timerName,
+ DueTime = timerdata.DueTime != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(timerdata.DueTime) : "",
+ Ttl = timerdata.Ttl != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(timerdata.Ttl) : "",
+ Period = timerdata.Period != null ? ConverterUtils.ConvertTimeSpanValueInDaprFormat(timerdata.Period) : "",
+ Data = ByteString.CopyFrom(timerdata.Data),
+ Callback = timerdata.Callback
+ };
+ var options = CreateCallOptions(cancellationToken);
+
+ try
+ {
+ await client.RegisterActorTimerAsync(request, options);
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("RegisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+ }
+ #pragma warning restore 0618
+
+ public async Task UnregisterTimerAsync(string actorType, string actorId, string timerName, CancellationToken cancellationToken = default)
+ {
+ var request = new Autogenerated.UnregisterActorTimerRequest()
+ {
+ ActorId = actorId,
+ ActorType = actorType,
+ Name = timerName,
+ };
+ var options = CreateCallOptions(cancellationToken);
+
+ try
+ {
+ await client.UnregisterActorTimerAsync(request, options);
+ }
+ catch (RpcException ex)
+ {
+ throw new DaprApiException("UnregisterActorTimer operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
+ }
+ }
+
+ ///
+ /// Disposes resources.
+ ///
+ /// False values indicates the method is being called by the runtime, true value indicates the method is called by the user code.
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!this.disposed)
+ {
+ if (disposing)
+ {
+ this.channel.Dispose();
+ }
+
+ this.disposed = true;
+ }
+ }
+
+ private CallOptions CreateCallOptions(CancellationToken cancellationToken)
+ {
+ var options = new CallOptions(headers: new Metadata(), cancellationToken: cancellationToken);
+
+ // add token for dapr api token based authentication
+ if (this.daprApiToken is not null)
+ {
+ options.Headers.Add("dapr-api-token", this.daprApiToken);
+ }
+
+ return options;
+ }
+
+ private string GetDaprStateOperation(StateChangeKind changeKind)
+ {
+ var operation = string.Empty;
+
+ switch (changeKind)
+ {
+ case StateChangeKind.Remove:
+ operation = "delete";
+ break;
+ case StateChangeKind.Add:
+ case StateChangeKind.Update:
+ operation = "upsert";
+ break;
+ default:
+ break;
+ }
+
+ return operation;
+ }
+
+ }
+}
diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs
index df5207f4e..9530e2ab1 100644
--- a/src/Dapr.Actors/DaprHttpInteractor.cs
+++ b/src/Dapr.Actors/DaprHttpInteractor.cs
@@ -28,6 +28,8 @@ namespace Dapr.Actors
using Dapr.Actors.Communication;
using Dapr.Actors.Resources;
using System.Xml;
+ using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+ using Dapr.Actors.Runtime;
///
/// Class to interact with Dapr runtime over http.
@@ -75,7 +77,7 @@ HttpRequestMessage RequestFunc()
return stringResponse;
}
- public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default)
+ public Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default, List grpcData = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorStateRelativeUrlFormat, actorType, actorId);
@@ -93,6 +95,74 @@ HttpRequestMessage RequestFunc()
return this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
}
+ public async Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, IActorStateSerializer actorStateSerializer, JsonSerializerOptions jsonSerializerOptions, CancellationToken cancellationToken = default)
+ {
+ // Transactional state update request body:
+ /*
+ [
+ {
+ "operation": "upsert",
+ "request": {
+ "key": "key1",
+ "value": "myData"
+ }
+ },
+ {
+ "operation": "delete",
+ "request": {
+ "key": "key2"
+ }
+ }
+ ]
+ */
+ using var stream = new MemoryStream();
+ using var writer = new Utf8JsonWriter(stream);
+ writer.WriteStartArray();
+ foreach (var stateChange in stateChanges)
+ {
+ writer.WriteStartObject();
+ var operation = GetDaprStateOperation(stateChange.ChangeKind);
+ writer.WriteString("operation", operation);
+
+ // write the requestProperty
+ writer.WritePropertyName("request");
+ writer.WriteStartObject(); // start object for request property
+ switch (stateChange.ChangeKind)
+ {
+ case StateChangeKind.Remove:
+ writer.WriteString("key", stateChange.StateName);
+ break;
+ case StateChangeKind.Add:
+ case StateChangeKind.Update:
+ writer.WriteString("key", stateChange.StateName);
+
+ // perform default json serialization if custom serializer was not provided.
+ if (actorStateSerializer != null)
+ {
+ var buffer = actorStateSerializer.Serialize(stateChange.Type, stateChange.Value);
+ writer.WriteBase64String("value", buffer);
+ }
+ else
+ {
+ writer.WritePropertyName("value");
+ JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions);
+ }
+ break;
+ default:
+ break;
+ }
+
+ writer.WriteEndObject(); // end object for request property
+ writer.WriteEndObject();
+ }
+
+ writer.WriteEndArray();
+
+ await writer.FlushAsync();
+ var content = Encoding.UTF8.GetString(stream.ToArray());
+ await this.SaveStateTransactionallyAsync(actorType, actorId, content, cancellationToken);
+ }
+
public async Task InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default)
{
var requestMessageHeader = remotingRequestRequestMessage.GetHeader();
@@ -472,6 +542,25 @@ private HttpClient CreateHttpClient()
{
return new HttpClient(this.handler, false);
}
+ private string GetDaprStateOperation(StateChangeKind changeKind)
+ {
+ var operation = string.Empty;
+
+ switch (changeKind)
+ {
+ case StateChangeKind.Remove:
+ operation = "delete";
+ break;
+ case StateChangeKind.Add:
+ case StateChangeKind.Update:
+ operation = "upsert";
+ break;
+ default:
+ break;
+ }
+
+ return operation;
+ }
private void AddDaprApiTokenHeader(HttpRequestMessage request)
{
diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs
index 04eb66de9..9b09efa72 100644
--- a/src/Dapr.Actors/IDaprInteractor.cs
+++ b/src/Dapr.Actors/IDaprInteractor.cs
@@ -17,6 +17,10 @@ namespace Dapr.Actors
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors.Communication;
+ using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+ using System.Collections.Generic;
+ using Dapr.Actors.Runtime;
+ using System.Text.Json;
///
/// Interface for interacting with Dapr runtime.
@@ -40,9 +44,10 @@ internal interface IDaprInteractor
/// Type of actor.
/// ActorId.
/// JSON data with state changes as per the Dapr spec for transaction state update.
+ /// GRPC data with state changes as per the Dapr spec for transaction state update.
/// Cancels the operation.
/// A task that represents the asynchronous operation.
- Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default);
+ Task SaveStateTransactionallyAsync(string actorType, string actorId, string data, CancellationToken cancellationToken = default, List grpcData = default);
///
/// Saves a state to Dapr.
@@ -95,6 +100,18 @@ internal interface IDaprInteractor
/// A representing the result of the asynchronous operation.
Task RegisterTimerAsync(string actorType, string actorId, string timerName, string data, CancellationToken cancellationToken = default);
+ ///
+ /// Parses state changes.
+ ///
+ /// Type of actor.
+ /// ActorId.
+ /// StateChanges.
+ /// StateChanges.
+ /// StateChanges.
+ /// Cancels the operation.
+ /// A representing the result of the asynchronous operation.
+ Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, IActorStateSerializer actorStateSerializer, JsonSerializerOptions jsonSerializerOptions, CancellationToken cancellationToken = default);
+
///
/// Unegisters a timer.
///
diff --git a/src/Dapr.Actors/Runtime/ActorRuntime.cs b/src/Dapr.Actors/Runtime/ActorRuntime.cs
index 8d2ae0cab..95a68a35f 100644
--- a/src/Dapr.Actors/Runtime/ActorRuntime.cs
+++ b/src/Dapr.Actors/Runtime/ActorRuntime.cs
@@ -21,6 +21,8 @@
using System.Threading.Tasks;
using Dapr.Actors.Client;
using Microsoft.Extensions.Logging;
+using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+using Grpc.Net.Client;
namespace Dapr.Actors.Runtime
{
@@ -50,7 +52,25 @@ internal ActorRuntime(ActorRuntimeOptions options, ILoggerFactory loggerFactory,
// Revisit this if actor initialization becomes a significant source of delay for large projects.
foreach (var actor in options.Actors)
{
- var daprInteractor = new DaprHttpInteractor(clientHandler: null, httpEndpoint: options.HttpEndpoint, apiToken: options.DaprApiToken, requestTimeout: null);
+ IDaprInteractor daprInteractor;
+ if (options.UseGrpc) {
+ var grpcEndpoint = new Uri(options.GrpcEndpoint);
+ if (grpcEndpoint.Scheme != "http" && grpcEndpoint.Scheme != "https")
+ {
+ throw new InvalidOperationException("The gRPC endpoint must use http or https.");
+ }
+
+ if (grpcEndpoint.Scheme.Equals(Uri.UriSchemeHttp))
+ {
+ // Set correct switch to maksecure gRPC service calls. This switch must be set before creating the GrpcChannel.
+ AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
+ }
+ var channel = GrpcChannel.ForAddress(options.GrpcEndpoint, options.GrpcChannelOptions);
+ var client = new Autogenerated.Dapr.DaprClient(channel);
+ daprInteractor = new DaprGrpcInteractor(channel, client, options.DaprApiToken);
+ } else {
+ daprInteractor = new DaprHttpInteractor(null, options.HttpEndpoint, options.DaprApiToken, null);
+ }
this.actorManagers[actor.Type.ActorTypeName] = new ActorManager(
actor,
actor.Activator ?? this.activatorFactory.CreateActivator(actor.Type),
diff --git a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs
index 3f4a6df88..91a9fbbbb 100644
--- a/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs
+++ b/src/Dapr.Actors/Runtime/ActorRuntimeOptions.cs
@@ -13,6 +13,7 @@
using System;
using System.Text.Json;
+using Grpc.Net.Client;
namespace Dapr.Actors.Runtime
{
@@ -225,5 +226,26 @@ public int? RemindersStoragePartitions
///
///
public string HttpEndpoint { get; set; } = DaprDefaults.GetDefaultHttpEndpoint();
+
+ ///
+ /// Gets or sets the Grpc endpoint URI used to communicate with the Dapr sidecar.
+ ///
+ ///
+ /// The URI endpoint to use for Grpc calls to the Dapr runtime. The default value will be
+ /// http://127.0.0.1:DAPR_GRPC_PORT where DAPR_GRPC_PORT represents the value of the
+ /// DAPR_GRPC_PORT environment variable.
+ ///
+ ///
+ public string GrpcEndpoint { get; set; } = DaprDefaults.GetDefaultGrpcEndpoint();
+
+ ///
+ /// Option to use GRPC or HTTP
+ ///
+ public bool UseGrpc { get; set; } = false;
+
+ ///
+ /// Options for grpc channel
+ ///
+ public GrpcChannelOptions GrpcChannelOptions { get; set; } = new GrpcChannelOptions(){ThrowOperationCanceledOnCancellation = true,};
}
}
diff --git a/src/Dapr.Actors/Runtime/DaprStateProvider.cs b/src/Dapr.Actors/Runtime/DaprStateProvider.cs
index ae86fb28b..3f62f604f 100644
--- a/src/Dapr.Actors/Runtime/DaprStateProvider.cs
+++ b/src/Dapr.Actors/Runtime/DaprStateProvider.cs
@@ -20,6 +20,9 @@ namespace Dapr.Actors.Runtime
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
+ using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
+ using Google.Protobuf;
+ using Google.Protobuf.WellKnownTypes;
///
/// State Provider to interact with Dapr runtime.
@@ -77,95 +80,8 @@ public async Task ContainsStateAsync(string actorType, string actorId, str
public async Task SaveStateAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default)
{
- await this.DoStateChangesTransactionallyAsync(actorType, actorId, stateChanges, cancellationToken);
+ await this.daprInteractor.DoStateChangesTransactionallyAsync(actorType, actorId, stateChanges, this.actorStateSerializer, this.jsonSerializerOptions, cancellationToken);
}
- private async Task DoStateChangesTransactionallyAsync(string actorType, string actorId, IReadOnlyCollection stateChanges, CancellationToken cancellationToken = default)
- {
- // Transactional state update request body:
- /*
- [
- {
- "operation": "upsert",
- "request": {
- "key": "key1",
- "value": "myData"
- }
- },
- {
- "operation": "delete",
- "request": {
- "key": "key2"
- }
- }
- ]
- */
- using var stream = new MemoryStream();
- using var writer = new Utf8JsonWriter(stream);
- writer.WriteStartArray();
- foreach (var stateChange in stateChanges)
- {
- writer.WriteStartObject();
- var operation = this.GetDaprStateOperation(stateChange.ChangeKind);
- writer.WriteString("operation", operation);
-
- // write the requestProperty
- writer.WritePropertyName("request");
- writer.WriteStartObject(); // start object for request property
- switch (stateChange.ChangeKind)
- {
- case StateChangeKind.Remove:
- writer.WriteString("key", stateChange.StateName);
- break;
- case StateChangeKind.Add:
- case StateChangeKind.Update:
- writer.WriteString("key", stateChange.StateName);
-
- // perform default json serialization if custom serializer was not provided.
- if (this.actorStateSerializer != null)
- {
- var buffer = this.actorStateSerializer.Serialize(stateChange.Type, stateChange.Value);
- writer.WriteBase64String("value", buffer);
- }
- else
- {
- writer.WritePropertyName("value");
- JsonSerializer.Serialize(writer, stateChange.Value, stateChange.Type, jsonSerializerOptions);
- }
- break;
- default:
- break;
- }
-
- writer.WriteEndObject(); // end object for request property
- writer.WriteEndObject();
- }
-
- writer.WriteEndArray();
-
- await writer.FlushAsync();
- var content = Encoding.UTF8.GetString(stream.ToArray());
- await this.daprInteractor.SaveStateTransactionallyAsync(actorType, actorId, content, cancellationToken);
- }
-
- private string GetDaprStateOperation(StateChangeKind changeKind)
- {
- var operation = string.Empty;
-
- switch (changeKind)
- {
- case StateChangeKind.Remove:
- operation = "delete";
- break;
- case StateChangeKind.Add:
- case StateChangeKind.Update:
- operation = "upsert";
- break;
- default:
- break;
- }
-
- return operation;
- }
}
}
diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto
index b5bd00db0..877fd2605 100644
--- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto
+++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto
@@ -504,6 +504,7 @@ message InvokeActorRequest {
string actor_id = 2;
string method = 3;
bytes data = 4;
+ map metadata = 5;
}
// InvokeActorResponse is the method that returns an actor invocation response.
diff --git a/test/Dapr.Actors.Test/ApiTokenTests.cs b/test/Dapr.Actors.Test/ApiTokenTests.cs
index 29ee955c7..bc9f1dccc 100644
--- a/test/Dapr.Actors.Test/ApiTokenTests.cs
+++ b/test/Dapr.Actors.Test/ApiTokenTests.cs
@@ -1,4 +1,4 @@
-// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@ public async Task CreateProxyWithRemoting_WithApiToken()
var options = new ActorProxyOptions
{
DaprApiToken = "test_token",
+ UseGrpc = false,
};
var request = await client.CaptureHttpRequestAsync(async handler =>
@@ -77,6 +78,7 @@ public async Task CreateProxyWithNoRemoting_WithApiToken()
var options = new ActorProxyOptions
{
DaprApiToken = "test_token",
+ UseGrpc = false,
};
var request = await client.CaptureHttpRequestAsync(async handler =>
@@ -99,9 +101,14 @@ public async Task CreateProxyWithNoRemoting_WithNoApiToken()
var actorId = new ActorId("abc");
+ var options = new ActorProxyOptions
+ {
+ UseGrpc = false,
+ };
+
var request = await client.CaptureHttpRequestAsync(async handler =>
{
- var factory = new ActorProxyFactory(null, handler);
+ var factory = new ActorProxyFactory(options, handler);
var proxy = factory.Create(actorId, "TestActor");
await proxy.InvokeMethodAsync("SetCountAsync", 1, new CancellationToken());
});
diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs
index 986a2c4f0..ee58800f5 100644
--- a/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs
+++ b/test/Dapr.E2E.Test/Actors/E2ETests.ExceptionTests.cs
@@ -34,5 +34,18 @@ public async Task ActorCanProvideExceptionDetails()
Assert.Contains("ExceptionExample", ex.Message);
Assert.Contains("32", ex.Message);
}
+
+ [Fact]
+ public async Task ActorCanProvideExceptionDetailsGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var actorIds = new ActorId(Guid.NewGuid().ToString());
+
+ var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ExceptionActor");
+ await WaitForActorRuntimeAsync(proxy, cts.Token);
+ ActorMethodInvocationException ex = await Assert.ThrowsAsync(async () => await proxy.ExceptionExample());
+ Assert.Contains("Remote Actor Method Exception", ex.Message);
+ }
+
}
}
\ No newline at end of file
diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs
index 30b19a450..1534a6b2e 100644
--- a/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs
+++ b/test/Dapr.E2E.Test/Actors/E2ETests.ReentrantTests.cs
@@ -27,7 +27,10 @@ public class ReentrantTests : DaprTestAppLifecycle
{
private static readonly int NumCalls = 10;
private readonly Lazy proxyFactory;
+ private readonly Lazy proxyFactoryGrpc;
private IActorProxyFactory ProxyFactory => this.HttpEndpoint == null ? null : this.proxyFactory.Value;
+ private IActorProxyFactory ProxyFactoryGrpc => this.GrpcEndpoint == null ? null : this.proxyFactoryGrpc.Value;
+
public ReentrantTests(ITestOutputHelper output, DaprTestAppFixture fixture) : base(output, fixture)
{
@@ -41,8 +44,15 @@ public ReentrantTests(ITestOutputHelper output, DaprTestAppFixture fixture) : ba
this.proxyFactory = new Lazy(() =>
{
Debug.Assert(this.HttpEndpoint != null);
- return new ActorProxyFactory(new ActorProxyOptions() { HttpEndpoint = this.HttpEndpoint, });
+ return new ActorProxyFactory(new ActorProxyOptions() { HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint});
});
+
+ this.proxyFactoryGrpc = new Lazy(() =>
+ {
+ Debug.Assert(this.GrpcEndpoint != null);
+ return new ActorProxyFactory(new ActorProxyOptions() { HttpEndpoint = this.HttpEndpoint, GrpcEndpoint = this.GrpcEndpoint, UseGrpc = true,});
+ });
+
}
[Fact]
@@ -75,5 +85,37 @@ public async Task ActorCanPerformReentrantCalls()
}
}
}
+
+ [Fact]
+ public async Task ActorCanPerformReentrantCallsGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ReentrantActor");
+
+ await ActorRuntimeChecker.WaitForActorRuntimeAsync(this.AppId, this.Output, proxy, cts.Token);
+
+ await proxy.ReentrantCall(new ReentrantCallOptions(){ CallsRemaining = NumCalls, });
+ var records = new List();
+ for (int i = 0; i < NumCalls; i++)
+ {
+ var state = await proxy.GetState(i);
+ records.AddRange(state.Records);
+ }
+
+ var enterRecords = records.FindAll(record => record.IsEnter);
+ var exitRecords = records.FindAll(record => !record.IsEnter);
+
+ this.Output.WriteLine($"Got {records.Count} records.");
+ Assert.True(records.Count == NumCalls * 2);
+ for (int i = 0; i < NumCalls; i++)
+ {
+ for (int j = 0; j < NumCalls; j++)
+ {
+ // Assert all the enters happen before the exits.
+ Assert.True(enterRecords[i].Timestamp < exitRecords[j].Timestamp);
+ }
+ }
+ }
+
}
}
diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs
index 12d6c7365..676f43b26 100644
--- a/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs
+++ b/test/Dapr.E2E.Test/Actors/E2ETests.Regression762Tests.cs
@@ -112,5 +112,97 @@ public async Task ActorSuccessfullyClearsStateAfterErrorWithoutRemoting()
var resp = await proxy.InvokeMethodAsync("GetState", key);
Assert.Equal("Real value", resp);
}
+
+ [Fact]
+ public async Task ActorSuccessfullyClearsStateAfterErrorWithRemotingGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "Regression762Actor");
+
+ await WaitForActorRuntimeAsync(proxy, cts.Token);
+
+ var key = Guid.NewGuid().ToString();
+ var throwingCall = new StateCall
+ {
+ Key = key,
+ Value = "Throw value",
+ Operation = "ThrowException"
+ };
+
+ var setCall = new StateCall()
+ {
+ Key = key,
+ Value = "Real value",
+ Operation = "SetState"
+ };
+
+ var savingCall = new StateCall()
+ {
+ Operation = "SaveState"
+ };
+
+ // We attempt to delete it on the unlikely chance it's already there.
+ await proxy.RemoveState(throwingCall.Key);
+
+ // Initiate a call that will set the state, then throw.
+ await Assert.ThrowsAsync(async () => await proxy.SaveState(throwingCall));
+
+ // Save the state and assert that the old value was not persisted.
+ await proxy.SaveState(savingCall);
+ var errorResp = await proxy.GetState(key);
+ Assert.Equal(string.Empty, errorResp);
+
+ // Persist normally and ensure it works.
+ await proxy.SaveState(setCall);
+ var resp = await proxy.GetState(key);
+ Assert.Equal("Real value", resp);
+ }
+
+ [Fact]
+ public async Task ActorSuccessfullyClearsStateAfterErrorWithoutRemotingGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var pingProxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "Regression762Actor");
+ var proxy = this.ProxyFactoryGrpc.Create(ActorId.CreateRandom(), "Regression762Actor");
+
+ await WaitForActorRuntimeAsync(pingProxy, cts.Token);
+
+ var key = Guid.NewGuid().ToString();
+ var throwingCall = new StateCall
+ {
+ Key = key,
+ Value = "Throw value",
+ Operation = "ThrowException"
+ };
+
+ var setCall = new StateCall()
+ {
+ Key = key,
+ Value = "Real value",
+ Operation = "SetState"
+ };
+
+ var savingCall = new StateCall()
+ {
+ Operation = "SaveState"
+ };
+
+ // We attempt to delete it on the unlikely chance it's already there.
+ await proxy.InvokeMethodAsync("RemoveState", throwingCall.Key);
+
+ // Initiate a call that will set the state, then throw.
+ await Assert.ThrowsAsync(async () => await proxy.InvokeMethodAsync("SaveState", throwingCall));
+
+ // Save the state and assert that the old value was not persisted.
+ await proxy.InvokeMethodAsync("SaveState", savingCall);
+ var errorResp = await proxy.InvokeMethodAsync("GetState", key);
+ Assert.Equal(string.Empty, errorResp);
+
+ // Persist normally and ensure it works.
+ await proxy.InvokeMethodAsync("SaveState", setCall);
+ var resp = await proxy.InvokeMethodAsync("GetState", key);
+ Assert.Equal("Real value", resp);
+ }
+
}
}
diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs
index 626de8c9f..ba4c5c841 100644
--- a/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs
+++ b/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs
@@ -70,5 +70,56 @@ public async Task ActorCanStartReminderWithTtl()
Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Reminder may not have triggered.");
Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Reminder triggered too recently. {DateTime.Now} - {state.Timestamp}");
}
+
+ [Fact]
+ public async Task ActorCanStartAndStopReminderGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ReminderActor");
+
+ await WaitForActorRuntimeAsync(proxy, cts.Token);
+
+ // Start reminder, to count up to 10
+ await proxy.StartReminder(new StartReminderOptions(){ Total = 10, });
+
+ State state;
+ while (true)
+ {
+ cts.Token.ThrowIfCancellationRequested();
+
+ state = await proxy.GetState();
+ this.Output.WriteLine($"Got Count: {state.Count} IsReminderRunning: {state.IsReminderRunning}");
+ if (!state.IsReminderRunning)
+ {
+ break;
+ }
+ }
+
+ // Should count up to exactly 10
+ Assert.Equal(10, state.Count);
+ }
+
+ [Fact]
+ public async Task ActorCanStartReminderWithTtlGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "ReminderActor");
+
+ await WaitForActorRuntimeAsync(proxy, cts.Token);
+
+ // Reminder that should fire 3 times (at 0, 1, and 2 seconds)
+ await proxy.StartReminderWithTtl(TimeSpan.FromSeconds(2));
+
+ // Record the start time and wait for longer than the reminder should exist for.
+ var start = DateTime.Now;
+ await Task.Delay(TimeSpan.FromSeconds(5));
+
+ var state = await proxy.GetState();
+
+ // Make sure the reminder has fired and that it didn't fire within the past second since it should have expired.
+ Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Reminder may not have triggered.");
+ Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Reminder triggered too recently. {DateTime.Now} - {state.Timestamp}");
+ }
+
}
}
diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs
index d9aba3863..fafc79ced 100644
--- a/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs
+++ b/test/Dapr.E2E.Test/Actors/E2ETests.TimerTests.cs
@@ -70,5 +70,55 @@ public async Task ActorCanStartTimerWithTtl()
Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Timer may not have fired.");
Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Timer fired too recently. {DateTime.Now} - {state.Timestamp}");
}
+
+ [Fact]
+ public async Task ActorCanStartAndStopTimerGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "TimerActor");
+
+ await WaitForActorRuntimeAsync(proxy, cts.Token);
+
+ // Start timer, to count up to 10
+ await proxy.StartTimer(new StartTimerOptions(){ Total = 10, });
+
+ State state;
+ while (true)
+ {
+ cts.Token.ThrowIfCancellationRequested();
+
+ state = await proxy.GetState();
+ this.Output.WriteLine($"Got Count: {state.Count} IsTimerRunning: {state.IsTimerRunning}");
+ if (!state.IsTimerRunning)
+ {
+ break;
+ }
+ }
+
+ // Should count up to exactly 10
+ Assert.Equal(10, state.Count);
+ }
+
+ [Fact]
+ public async Task ActorCanStartTimerWithTtlGrpc()
+ {
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
+ var proxy = this.ProxyFactoryGrpc.CreateActorProxy(ActorId.CreateRandom(), "TimerActor");
+
+ await WaitForActorRuntimeAsync(proxy, cts.Token);
+
+ // Reminder that should fire 3 times (at 0, 1, and 2 seconds)
+ await proxy.StartTimerWithTtl(TimeSpan.FromSeconds(2));
+
+ // Record the start time and wait for longer than the reminder should exist for.
+ var start = DateTime.Now;
+ await Task.Delay(TimeSpan.FromSeconds(5));
+
+ var state = await proxy.GetState();
+
+ // Make sure the reminder has fired and that it didn't fire within the past second since it should have expired.
+ Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Timer may not have fired.");
+ Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Timer fired too recently. {DateTime.Now} - {state.Timestamp}");
+ }
}
}
diff --git a/test/Dapr.E2E.Test/E2ETests.cs b/test/Dapr.E2E.Test/E2ETests.cs
index 94ebbf3df..69d016c05 100644
--- a/test/Dapr.E2E.Test/E2ETests.cs
+++ b/test/Dapr.E2E.Test/E2ETests.cs
@@ -29,6 +29,7 @@ namespace Dapr.E2E.Test
public partial class E2ETests : IClassFixture, IAsyncLifetime
{
private readonly Lazy proxyFactory;
+ private readonly Lazy proxyFactoryGrpc;
private readonly DaprTestAppFixture fixture;
private DaprTestAppFixture.State state;
@@ -42,6 +43,12 @@ public E2ETests(ITestOutputHelper output, DaprTestAppFixture fixture)
Debug.Assert(this.HttpEndpoint != null);
return new ActorProxyFactory(new ActorProxyOptions(){ HttpEndpoint = this.HttpEndpoint, });
});
+
+ this.proxyFactoryGrpc = new Lazy(() =>
+ {
+ Debug.Assert(this.GrpcEndpoint != null);
+ return new ActorProxyFactory(new ActorProxyOptions(){ GrpcEndpoint = this.GrpcEndpoint, UseGrpc = true, });
+ });
}
protected ITestOutputHelper Output { get; }
@@ -61,6 +68,8 @@ public E2ETests(ITestOutputHelper output, DaprTestAppFixture fixture)
public IActorProxyFactory ProxyFactory => this.HttpEndpoint == null ? null : this.proxyFactory.Value;
+ public IActorProxyFactory ProxyFactoryGrpc => this.GrpcEndpoint == null ? null : this.proxyFactoryGrpc.Value;
+
public async Task InitializeAsync()
{
this.state = await this.fixture.StartAsync(this.Output, this.Configuration);
@@ -94,4 +103,4 @@ protected async Task WaitForActorRuntimeAsync(IPingActor proxy, CancellationToke
await ActorRuntimeChecker.WaitForActorRuntimeAsync(this.AppId, this.Output, proxy, cancellationToken);
}
}
-}
+}
\ No newline at end of file