Skip to content

Commit d1db420

Browse files
committed
Merge branch 'development'
2 parents e1d56ca + 702afea commit d1db420

File tree

9 files changed

+145
-16
lines changed

9 files changed

+145
-16
lines changed

README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,80 @@ sleep( 1000 );
266266
processor.stop();
267267
```
268268

269+
## Dead Letter
270+
271+
When dead lettering a message, you can provide a description and reason, as well as a struct of properties to modify.
272+
273+
```js
274+
275+
var message = receiver.receiveMessage( 2 );
276+
message.deadLetter( deadLetterErrorDescription="dead letter desc", deadLetterReason="dead letter reason" );
277+
```
278+
279+
To read from the dead letter sub queue, build a receiver like so:
280+
```js
281+
var dlReceiver = sbClient.buildReceiver(
282+
queueName='new-orders',
283+
deadLetter=true,
284+
receiveMode='PEEK_LOCK'
285+
);
286+
```
287+
288+
To process multiple messages the dead letter sub queue, build a Processor like so:
289+
```js
290+
var processor = sbClient.buildProcessor(
291+
queueName='new-orders',
292+
onMessage=function( message ){
293+
// process the message
294+
}
295+
deadLetter=true
296+
);
297+
```
298+
299+
To read from the transfer dead letter sub queue, use this flag:
300+
```js
301+
var dlReceiver = sbClient.buildReceiver(
302+
queueName='new-orders',
303+
transferDeadLetter=true,
304+
receiveMode='PEEK_LOCK'
305+
);
306+
```
307+
308+
To process multiple messages the transfer dead letter sub queue, build a Processor like so:
309+
```js
310+
var processor = sbClient.buildProcessor(
311+
queueName='new-orders',
312+
onMessage=function( message ){
313+
// process the message
314+
}
315+
transferDeadLetter=true
316+
);
317+
```
318+
319+
320+
## Scheduled Messages
321+
322+
To have a message obey a delay before being enqueued, you can use the `scheduledEnqueueTime` or the `scheduledDelaySeconds` argument. You can also set the `scheduledEnqueueTime` property on the message, but these arguments to the `sendMessage()` method are more convenient and just set the property for you.
323+
324+
* `scheduledEnqueueTime()` - This accepts a specific UTC time. Pass in a `java.time.OffsetDateTime` instance, a CFML date object, or a string parseable by `OffsetDateTime.parse()`.
325+
* `scheduledDelaySeconds()` - Pass in a single integer which represent a number of seconds from right now. We will calculate the UTC timestamp for you.
326+
327+
This code sends a message in 15 seconds. This call completes right away, and the delay is handled inside of Azure.
328+
```js
329+
sender.sendMessage(
330+
message=messageBody,
331+
scheduledDelaySeconds=15
332+
);
333+
```
334+
335+
This code sends a message in 15 seconds, but by creating a UTC timestamp 15 seconds from now.
336+
```js
337+
var fifteenSecondsFromNowUTC = createObject("java", "java.time.OffsetDateTime")
338+
.now( createObject("java", "java.time.ZoneOffset" ).UTC )
339+
.plusSeconds(secondsToWait);
340+
341+
sender.sendMessage(
342+
message=messageBody,
343+
scheduledEnqueueTime=fifteenSecondsFromNowUTC
344+
);
345+
```

box.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name":"Azure Service Bus SDK",
33
"author":"Brad Wood <[email protected]>",
4-
"version":"1.1.0",
4+
"version":"1.1.1",
55
"location":"https://downloads.ortussolutions.com/ortussolutions/servicebussdk/@build.version@/[email protected]@.zip",
66
"slug":"servicebussdk",
77
"type":"modules",

models/Client.cfc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ component accessors=true singleton ThreadSafe {
181181
* https://javadoc.io/doc/com.azure/azure-messaging-servicebus/7.17.12/com/azure/messaging/servicebus/ServiceBusClientBuilder.ServiceBusProcessorClientBuilder.html
182182
* Creates a new ServiceBusProcessorClientBuilder
183183
*/
184-
// TODO: sub queue, subscription name, maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
184+
// TODO: maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
185185
function buildProcessor(
186186
String queueName='',
187187
String topicName='',
@@ -192,7 +192,11 @@ component accessors=true singleton ThreadSafe {
192192
Function onMessage,
193193
Function onError,
194194
boolean autoStart=false,
195-
String fullyQualifiedNamespace=settings.fullyQualifiedNamespace
195+
String fullyQualifiedNamespace=settings.fullyQualifiedNamespace,
196+
Boolean deadLetter=false,
197+
Boolean transferDeadLetter=false,
198+
String subscriptionName
199+
196200
) {
197201
return registerProcessor(
198202
wirebox.getInstance( 'Processor@ServiceBusSDK', { SBClient : this, processorProperties : arguments } )
@@ -203,15 +207,18 @@ component accessors=true singleton ThreadSafe {
203207
* https://javadoc.io/static/com.azure/azure-messaging-servicebus/7.17.12/com/azure/messaging/servicebus/ServiceBusClientBuilder.ServiceBusReceiverClientBuilder.html
204208
* Creates a new ServiceBusReceiverClientBuilder
205209
*/
206-
// TODO: sub queue, subscription name, maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
210+
// TODO: maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)
207211
function buildReceiver(
208212
String queueName='',
209213
String topicName='',
210214
Boolean autoComplete=true,
211215
Numeric prefetchCount,
212216
String receiveMode='RECEIVE_AND_DELETE',
213217
Boolean async=false,
214-
String fullyQualifiedNamespace=settings.fullyQualifiedNamespace
218+
String fullyQualifiedNamespace=settings.fullyQualifiedNamespace,
219+
Boolean deadLetter=false,
220+
Boolean transferDeadLetter=false,
221+
String subscriptionName
215222
) {
216223
return registerReceiver(
217224
wirebox.getInstance( 'Receiver@ServiceBusSDK', { SBClient : this, receiverProperties : arguments } )

models/Message.cfc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ component accessors='true' {
104104
* This is only available if this message came from a processor or receiver which is
105105
* in PEEK_LOCK mode and autocomplete has not been disabled.
106106
*
107-
* TODO: DeadLetterOptions
108-
*
109107
*/
110108
function deadLetter(){
111109
throw( "message has no assocated receiver context and cannot be dead lettered." );

models/Processor.cfc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,32 @@ component accessors=true ThreadSafe {
3232
if( processorProperties.queueName.isEmpty() && processorProperties.topicName.isEmpty() ) {
3333
throw( message='You must specify either a queueName or topicName to build a processor.' );
3434
}
35+
36+
// Validate that if topicName is provided, subscriptionName is also provided
37+
if( !processorProperties.topicName.isEmpty() &&
38+
( isNull( processorProperties.subscriptionName ) || processorProperties.subscriptionName.isEmpty() ) ) {
39+
throw( message='You must specify a subscriptionName when using a topicName to build a processor.' );
40+
}
41+
3542
var processorBuilder = SBClient.newClientBuilder( processorProperties.fullyQualifiedNamespace )
3643
.processor();
3744

3845
if( !processorProperties.queueName.isEmpty() ) {
3946
processorBuilder.queueName( processorProperties.queueName );
4047
} else if( !processorProperties.topicName.isEmpty() ) {
4148
processorBuilder.topicName( processorProperties.topicName );
49+
processorBuilder.subscriptionName( processorProperties.subscriptionName );
50+
}
51+
52+
// Add sub queue support (dead letter or transfer dead letter)
53+
if( !isNull( processorProperties.deadLetter ) && processorProperties.deadLetter ) {
54+
processorBuilder.subQueue(
55+
createObject( 'java', 'com.azure.messaging.servicebus.models.SubQueue' ).DEAD_LETTER_QUEUE
56+
);
57+
} else if( !isNull( processorProperties.transferDeadLetter ) && processorProperties.transferDeadLetter ) {
58+
processorBuilder.subQueue(
59+
createObject( 'java', 'com.azure.messaging.servicebus.models.SubQueue' ).TRANSFER_DEAD_LETTER_QUEUE
60+
);
4261
}
4362

4463
if( !isNull( processorProperties.prefetchCount ) && isNumeric( processorProperties.prefetchCount ) ) {

models/ProcessorMessage.cfc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ component accessors='true' extends='Message' {
4646
* This is only available if this message came from a processor or receiver which is
4747
* in PEEK_LOCK mode and autocomplete has not been disabled.
4848
*
49-
* TODO: DeadLetterOptions
50-
*
5149
*/
5250
function deadLetter( String deadLetterErrorDescription, String deadLetterReason, Struct propertiesToModify ){
5351
jMessageContext.deadLetter( buildDeadLetterOptions( argumentCollection=arguments ) );

models/Receiver.cfc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,9 +226,7 @@ component accessors=true ThreadSafe {
226226
/**
227227
* Dead letter the message. This will move the message to the dead letter queue.
228228
* This is only available if the receiver is in PEEK_LOCK mode and autocomplete has not been disabled.
229-
*
230-
* TODO: DeadLetterOptions
231-
*
229+
*
232230
* @message The message to dead letter.
233231
*/
234232
function deadLetter( required message, String deadLetterErrorDescription, String deadLetterReason, Struct propertiesToModify ){

models/ReceiverMessage.cfc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ component accessors='true' extends='Message' {
4545
* This is only available if this message came from a processor or receiver which is
4646
* in PEEK_LOCK mode and autocomplete has not been disabled.
4747
*
48-
* TODO: DeadLetterOptions
49-
*
5048
*/
5149
function deadLetter( String deadLetterErrorDescription, String deadLetterReason, Struct propertiesToModify ){
5250
arguments.message = this;

test-harness/tests/specs/ServiceBusTests.cfc

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@
424424

425425
});
426426

427-
it( 'can deadLetter from sub queue', function(){
427+
it( 'can deadLetter from sub queue via Receiver', function(){
428428
var sbClient = getSBClient();
429429

430430
var sender = sbClient.buildSender(
@@ -438,7 +438,7 @@
438438
);
439439

440440
var message = receiver.receiveMessage( 2 );
441-
var seqNum = message.deadLetter( deadLetterErrorDescription="dead letter desc", deadLetterReason="dead letter reason" );
441+
message.deadLetter( deadLetterErrorDescription="dead letter desc", deadLetterReason="dead letter reason" );
442442

443443
var dlReceiver = sbClient.buildReceiver(
444444
queueName='new-orders',
@@ -450,6 +450,40 @@
450450
expect( dlMessage ).notToBeNull();
451451
});
452452

453+
it( 'can deadLetter from sub queue via Processor', function(){
454+
var sbClient = getSBClient();
455+
456+
var sender = sbClient.buildSender(
457+
queueName='new-orders'
458+
);
459+
sender.sendMessage( { orderId=12345, customerName='John Doe' } );
460+
461+
var receiver = sbClient.buildReceiver(
462+
queueName='new-orders',
463+
receiveMode='PEEK_LOCK'
464+
);
465+
466+
var message = receiver.receiveMessage( 2 );
467+
message.deadLetter( deadLetterErrorDescription="dead letter desc", deadLetterReason="dead letter reason" );
468+
469+
var sbClient = getSBClient();
470+
var deadLettersProcessed = 0;
471+
var processor = sbClient.buildProcessor(
472+
queueName='new-orders',
473+
onMessage=function( message ){
474+
createObject('java', 'java.lang.System').out.println( 'Received message: ' & serializeJSON( message.getBody() ) );
475+
deadLettersProcessed++;
476+
},
477+
maxConcurrentCalls=1,
478+
deadLetter=true
479+
);
480+
createObject('java', 'java.lang.System').out.println( 'Starting processor threads...' );
481+
processor.start();
482+
sleep( 1000 );
483+
processor.stop();
484+
expect( deadLettersProcessed ).toBeGTE( 1 );
485+
});
486+
453487
});
454488

455489
});

0 commit comments

Comments
 (0)