Skip to content

Commit 125042f

Browse files
authored
Merge pull request #115 from Particular/imessagesession-support
2 parents 0611a51 + ae88465 commit 125042f

File tree

7 files changed

+319
-12
lines changed

7 files changed

+319
-12
lines changed

src/NServiceBus.AzureFunctions.ServiceBus/FunctionEndpoint.cs

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private async Task InitializeEndpointIfNecessary(FunctionExecutionContext execut
109109
LogManager.GetLogger("Previews").Info(
110110
"NServiceBus.AzureFunctions.ServiceBus is a preview package. Preview packages are licensed separately from the rest of the Particular Software platform and have different support guarantees. You can view the license at https://particular.net/eula/previews and the support policy at https://docs.particular.net/previews/support-policy. Customer adoption drives whether NServiceBus.AzureFunctions.ServiceBus will be incorporated into the Particular Software platform. Let us know you are using it, if you haven't already, by emailing us at [email protected].");
111111

112-
var endpoint = await endpointFactory(executionContext).ConfigureAwait(false);
112+
endpoint = await endpointFactory(executionContext).ConfigureAwait(false);
113113

114114
pipeline = configuration.PipelineInvoker;
115115
}
@@ -121,6 +121,107 @@ private async Task InitializeEndpointIfNecessary(FunctionExecutionContext execut
121121
}
122122
}
123123

124+
/// <inheritdoc />
125+
public async Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
126+
{
127+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
128+
129+
await endpoint.Send(message, options).ConfigureAwait(false);
130+
}
131+
132+
/// <inheritdoc />
133+
public Task Send(object message, ExecutionContext executionContext, ILogger functionsLogger = null)
134+
{
135+
return Send(message, new SendOptions(), executionContext, functionsLogger);
136+
}
137+
138+
/// <inheritdoc />
139+
public async Task Send<T>(Action<T> messageConstructor, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
140+
{
141+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
142+
143+
await endpoint.Send(messageConstructor, options).ConfigureAwait(false);
144+
}
145+
146+
/// <inheritdoc />
147+
public Task Send<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null)
148+
{
149+
return Send(messageConstructor, new SendOptions(), executionContext, functionsLogger);
150+
}
151+
152+
/// <inheritdoc />
153+
public async Task Publish(object message, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
154+
{
155+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
156+
157+
await endpoint.Publish(message, options).ConfigureAwait(false);
158+
}
159+
160+
/// <inheritdoc />
161+
public async Task Publish<T>(Action<T> messageConstructor, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
162+
{
163+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
164+
165+
await endpoint.Publish(messageConstructor, options).ConfigureAwait(false);
166+
}
167+
168+
/// <inheritdoc />
169+
public async Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null)
170+
{
171+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
172+
173+
await endpoint.Publish(message).ConfigureAwait(false);
174+
}
175+
176+
/// <inheritdoc />
177+
public async Task Publish<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null)
178+
{
179+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
180+
181+
await endpoint.Publish(messageConstructor).ConfigureAwait(false);
182+
}
183+
184+
/// <inheritdoc />
185+
public async Task Subscribe(Type eventType, SubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
186+
{
187+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
188+
189+
await endpoint.Subscribe(eventType, options).ConfigureAwait(false);
190+
}
191+
192+
/// <inheritdoc />
193+
public async Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null)
194+
{
195+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
196+
197+
await endpoint.Subscribe(eventType).ConfigureAwait(false);
198+
}
199+
200+
/// <inheritdoc />
201+
public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
202+
{
203+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
204+
205+
await endpoint.Unsubscribe(eventType, options).ConfigureAwait(false);
206+
}
207+
208+
/// <inheritdoc />
209+
public async Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null)
210+
{
211+
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
212+
213+
await endpoint.Unsubscribe(eventType).ConfigureAwait(false);
214+
}
215+
216+
private async Task InitializeEndpointUsedOutsideHandlerIfNecessary(ExecutionContext executionContext, ILogger functionsLogger)
217+
{
218+
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
219+
220+
var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger);
221+
222+
await InitializeEndpointIfNecessary(functionExecutionContext).ConfigureAwait(false);
223+
}
224+
124225
internal static void LoadAssemblies(string assemblyDirectory)
125226
{
126227
var binFiles = Directory.EnumerateFiles(
@@ -189,5 +290,6 @@ static bool IsRuntimeAssembly(byte[] publicKeyToken)
189290
private ServiceBusTriggeredEndpointConfiguration configuration;
190291

191292
PipelineInvoker pipeline;
293+
private IEndpointInstance endpoint;
192294
}
193295
}

src/NServiceBus.AzureFunctions.ServiceBus/IFunctionEndpoint.cs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace NServiceBus
1+
using System;
2+
3+
namespace NServiceBus
24
{
35
using System.Threading.Tasks;
46
using Microsoft.Azure.ServiceBus;
@@ -15,5 +17,67 @@ public interface IFunctionEndpoint
1517
/// Processes a message received from an AzureServiceBus trigger using the NServiceBus message pipeline.
1618
/// </summary>
1719
Task Process(Message message, ExecutionContext executionContext, ILogger functionsLogger = null);
20+
21+
/// <summary>
22+
/// Sends the provided message.
23+
/// </summary>
24+
Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);
25+
26+
/// <summary>
27+
/// Sends the provided message.
28+
/// </summary>
29+
Task Send(object message, ExecutionContext executionContext, ILogger functionsLogger = null);
30+
31+
/// <summary>
32+
/// Instantiates a message of type T and sends it.
33+
/// </summary>
34+
Task Send<T>(Action<T> messageConstructor, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);
35+
36+
/// <summary>
37+
/// Instantiates a message of type T and sends it.
38+
/// </summary>
39+
Task Send<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null);
40+
41+
/// <summary>
42+
/// Publish the message to subscribers.
43+
/// </summary>
44+
Task Publish(object message, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);
45+
46+
/// <summary>
47+
/// Instantiates a message of type T and publishes it.
48+
/// </summary>
49+
Task Publish<T>(Action<T> messageConstructor, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);
50+
51+
/// <summary>
52+
/// Instantiates a message of type T and publishes it.
53+
/// </summary>
54+
Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null);
55+
56+
/// <summary>
57+
/// Instantiates a message of type T and publishes it.
58+
/// </summary>
59+
Task Publish<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null);
60+
61+
/// <summary>
62+
/// Subscribes to receive published messages of the specified type.
63+
/// This method is only necessary if you turned off auto-subscribe.
64+
/// </summary>
65+
Task Subscribe(Type eventType, SubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);
66+
67+
/// <summary>
68+
/// Subscribes to receive published messages of the specified type.
69+
/// This method is only necessary if you turned off auto-subscribe.
70+
/// </summary>
71+
Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null);
72+
73+
/// <summary>
74+
/// Unsubscribes to receive published messages of the specified type.
75+
/// </summary>
76+
Task Unsubscribe(Type eventType, UnsubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null);
77+
78+
/// <summary>
79+
/// Unsubscribes to receive published messages of the specified type.
80+
/// </summary>
81+
Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null);
1882
}
1983
}

src/NServiceBus.AzureFunctions.ServiceBus/NServiceBus.AzureFunctions.ServiceBus.csproj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="[1.1, 2)" />
11-
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="[2.2.0, 3)" />
12-
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="[1.5.0, 2)" />
13-
<PackageReference Include="NServiceBus.Extensions.DependencyInjection" Version="[1.0,2)" />
14-
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="[4.1.0, 5.0.0)" />
10+
<PackageReference Include="Microsoft.Azure.Functions.Extensions" Version="1.1" />
11+
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="2.2.0" />
12+
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="1.8.0" />
13+
<PackageReference Include="NServiceBus.Extensions.DependencyInjection" Version="1.0.1" />
14+
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="4.2.0" />
1515
<PackageReference Include="Particular.CodeRules" Version="0.7.0" PrivateAssets="All" />
1616
<PackageReference Include="Particular.Packaging" Version="0.8.0" PrivateAssets="All" />
1717
</ItemGroup>

src/ServiceBus.Tests/ApprovalFiles/APIApprovals.Approve.approved.txt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,18 @@ namespace NServiceBus
66
protected System.Func<NServiceBus.FunctionExecutionContext, string> AssemblyDirectoryResolver;
77
public FunctionEndpoint(System.Func<NServiceBus.FunctionExecutionContext, NServiceBus.ServiceBusTriggeredEndpointConfiguration> configurationFactory) { }
88
public System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
9+
public System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
10+
public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
11+
public System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
12+
public System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
13+
public System.Threading.Tasks.Task Send(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
14+
public System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
15+
public System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
16+
public System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
17+
public System.Threading.Tasks.Task Subscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
18+
public System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
19+
public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
20+
public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null) { }
921
}
1022
public class FunctionExecutionContext
1123
{
@@ -20,6 +32,18 @@ namespace NServiceBus
2032
public interface IFunctionEndpoint
2133
{
2234
System.Threading.Tasks.Task Process(Microsoft.Azure.ServiceBus.Message message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
35+
System.Threading.Tasks.Task Publish(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
36+
System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
37+
System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
38+
System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, NServiceBus.PublishOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
39+
System.Threading.Tasks.Task Send(object message, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
40+
System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
41+
System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
42+
System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, NServiceBus.SendOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
43+
System.Threading.Tasks.Task Subscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
44+
System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
45+
System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
46+
System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Microsoft.Azure.WebJobs.ExecutionContext executionContext, Microsoft.Extensions.Logging.ILogger functionsLogger = null);
2347
}
2448
public class ServiceBusTriggeredEndpointConfiguration
2549
{

src/ServiceBus.Tests/DefaultEndpoint.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,21 @@ public Task<EndpointConfiguration> GetConfiguration(
3131

3232
var transport = configuration.UseTransport<AzureServiceBusTransport>();
3333
transport.ConnectionString(Environment.GetEnvironmentVariable(ServiceBusTriggeredEndpointConfiguration.DefaultServiceBusConnectionName));
34-
transport.RuleNameShortener(x => x
35-
.Replace(typeof(DefaultEndpoint).Namespace, string.Empty)
36-
.Replace("+", string.Empty));
34+
transport.SubscriptionRuleNamingConvention(type =>
35+
{
36+
if (type.FullName.Length <= 50)
37+
{
38+
return type.FullName;
39+
}
40+
41+
return type.Name;
42+
});
3743

3844
configuration.UseSerialization<NewtonsoftSerializer>();
3945

4046
configurationBuilderCustomization(configuration);
4147

4248
return Task.FromResult(configuration);
4349
}
44-
45-
4650
}
4751
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
namespace ServiceBus.Tests
2+
{
3+
using System.Threading.Tasks;
4+
using NServiceBus;
5+
using NServiceBus.AcceptanceTesting;
6+
using NUnit.Framework;
7+
8+
public class When_publishing_event_from_function_outside_handler
9+
{
10+
[Test]
11+
public async Task Should_publish_to_subscribers()
12+
{
13+
var context = await Scenario.Define<Context>()
14+
.WithEndpoint<SubscriberEndpoint>(b =>
15+
b.When(async session => await session.Publish(new TestEvent())))
16+
.Done(c => c.EventReceived)
17+
.Run();
18+
19+
Assert.IsTrue(context.EventReceived);
20+
}
21+
22+
class Context : ScenarioContext
23+
{
24+
public bool EventReceived { get; set; }
25+
}
26+
27+
class SubscriberEndpoint : EndpointConfigurationBuilder
28+
{
29+
public SubscriberEndpoint()
30+
{
31+
EndpointSetup<DefaultEndpoint>();
32+
}
33+
34+
public class EventHandler : IHandleMessages<TestEvent>
35+
{
36+
Context testContext;
37+
38+
public EventHandler(Context testContext)
39+
{
40+
this.testContext = testContext;
41+
}
42+
43+
public Task Handle(TestEvent message, IMessageHandlerContext context)
44+
{
45+
testContext.EventReceived = true;
46+
return Task.CompletedTask;
47+
}
48+
}
49+
}
50+
51+
class TestEvent : IEvent
52+
{
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)