Skip to content

Commit 0628cc5

Browse files
committed
Add publisher confirms tutorial using ConfirmationChannel
The new `ConfirmationChannel` API introduced in rabbitmq/rabbitmq-java-client#1824 provides asynchronous publisher confirmation tracking with a `CompletableFuture`-based API, rate limiting, and message correlation support. This change adds `PublisherConfirmsAsync.java` to demonstrate the `ConfirmationChannel` API. The tutorial shows how to create a `ConfirmationChannel` wrapper with rate limiting, publish messages asynchronously with correlation context, and wait for all confirmations using `CompletableFuture.allOf()`. Depends on rabbitmq/rabbitmq-java-client#1824
1 parent dd83d6d commit 0628cc5

File tree

2 files changed

+112
-0
lines changed

2 files changed

+112
-0
lines changed

java/PublisherConfirmsAsync.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
import com.rabbitmq.client.*;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
21+
22+
/**
23+
* Example demonstrating asynchronous publisher confirmations with ConfirmationChannel.
24+
*/
25+
public class PublisherConfirmsAsync {
26+
27+
public static void main(String[] args) throws Exception {
28+
ConnectionFactory factory = new ConnectionFactory();
29+
factory.setHost("localhost");
30+
31+
try (Connection connection = factory.newConnection();
32+
Channel channel = connection.createChannel()) {
33+
34+
// Create ConfirmationChannel with optional rate limiting
35+
RateLimiter rateLimiter = new ThrottlingRateLimiter(100); // Max 100 in-flight
36+
ConfirmationChannel confirmChannel = ConfirmationChannel.create(channel, rateLimiter);
37+
38+
String queueName = confirmChannel.queueDeclare().getQueue();
39+
40+
// Collect futures for all publishes
41+
List<CompletableFuture<String>> futures = new ArrayList<>();
42+
43+
// Publish messages asynchronously with context for correlation
44+
for (int i = 0; i < 10; i++) {
45+
String messageId = "msg-" + i;
46+
String message = "Message " + i;
47+
48+
CompletableFuture<String> future = confirmChannel.basicPublishAsync(
49+
"",
50+
queueName,
51+
MessageProperties.PERSISTENT_TEXT_PLAIN,
52+
message.getBytes(),
53+
messageId // Context parameter for correlation
54+
);
55+
56+
// Handle confirmation
57+
future.thenAccept(id -> System.out.println("Confirmed: " + id))
58+
.exceptionally(ex -> {
59+
if (ex.getCause() instanceof PublishException) {
60+
PublishException pe = (PublishException) ex.getCause();
61+
System.err.println("Failed: " + pe.getContext() + " - " + ex.getMessage());
62+
}
63+
return null;
64+
});
65+
66+
futures.add(future);
67+
}
68+
69+
// Wait for all confirmations
70+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
71+
System.out.println("All messages published and confirmed");
72+
}
73+
}
74+
}

java/run-with-dev-client.sh

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/bin/bash
2+
# Run a Java tutorial example with the development version of rabbitmq-java-client
3+
4+
set -o errexit
5+
set -o nounset
6+
set -o pipefail
7+
8+
if (( $# == 0 ))
9+
then
10+
echo "Usage: $0 <ClassName> [args...]" >&2
11+
echo "Example: $0 PublisherConfirmsAsync" >&2
12+
exit 1
13+
fi
14+
15+
declare -r class_name="$1"
16+
shift
17+
18+
declare -r client_dir="../../rabbitmq-java-client"
19+
declare -r client_jar="$client_dir"/target/amqp-client-*-SNAPSHOT.jar
20+
21+
if [[ ! -f "$client_jar" ]]
22+
then
23+
echo "Building rabbitmq-java-client..." >&2
24+
(cd "$client_dir" && ./mvnw clean package -DskipTests)
25+
fi
26+
27+
echo "Getting compile classpath..." >&2
28+
29+
compile_cp=$(cd "$client_dir" && ./mvnw dependency:build-classpath -DincludeScope=compile 2>&1 | grep -E "^/.*\.jar" | head -1)
30+
readonly compile_cp
31+
32+
declare -r full_cp=".:$client_jar:$compile_cp"
33+
34+
echo "Compiling ${class_name}.java..." >&2
35+
javac -cp "$full_cp" "${class_name}.java"
36+
37+
echo "Running $class_name..." >&2
38+
java -cp "$full_cp" "$class_name" "$@"

0 commit comments

Comments
 (0)