diff --git a/java/PublisherConfirmsAsync.java b/java/PublisherConfirmsAsync.java new file mode 100644 index 00000000..ad596672 --- /dev/null +++ b/java/PublisherConfirmsAsync.java @@ -0,0 +1,74 @@ +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +import com.rabbitmq.client.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Example demonstrating asynchronous publisher confirmations with ConfirmationChannel. + */ +public class PublisherConfirmsAsync { + + public static void main(String[] args) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + + // Create ConfirmationChannel with optional rate limiting + RateLimiter rateLimiter = new ThrottlingRateLimiter(100); // Max 100 in-flight + ConfirmationChannel confirmChannel = ConfirmationChannel.create(channel, rateLimiter); + + String queueName = confirmChannel.queueDeclare().getQueue(); + + // Collect futures for all publishes + List> futures = new ArrayList<>(); + + // Publish messages asynchronously with context for correlation + for (int i = 0; i < 10; i++) { + String messageId = "msg-" + i; + String message = "Message " + i; + + CompletableFuture future = confirmChannel.basicPublishAsync( + "", + queueName, + MessageProperties.PERSISTENT_TEXT_PLAIN, + message.getBytes(), + messageId // Context parameter for correlation + ); + + // Handle confirmation + future.thenAccept(id -> System.out.println("Confirmed: " + id)) + .exceptionally(ex -> { + if (ex.getCause() instanceof PublishException) { + PublishException pe = (PublishException) ex.getCause(); + System.err.println("Failed: " + pe.getContext() + " - " + ex.getMessage()); + } + return null; + }); + + futures.add(future); + } + + // Wait for all confirmations + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + System.out.println("All messages published and confirmed"); + } + } +} diff --git a/java/run-with-dev-client.sh b/java/run-with-dev-client.sh new file mode 100755 index 00000000..e8d4d134 --- /dev/null +++ b/java/run-with-dev-client.sh @@ -0,0 +1,38 @@ +#!/bin/bash +# Run a Java tutorial example with the development version of rabbitmq-java-client + +set -o errexit +set -o nounset +set -o pipefail + +if (( $# == 0 )) +then + echo "Usage: $0 [args...]" >&2 + echo "Example: $0 PublisherConfirmsAsync" >&2 + exit 1 +fi + +declare -r class_name="$1" +shift + +declare -r client_dir="../../rabbitmq-java-client" +declare -r client_jar="$client_dir"/target/amqp-client-*-SNAPSHOT.jar + +if [[ ! -f "$client_jar" ]] +then + echo "Building rabbitmq-java-client..." >&2 + (cd "$client_dir" && ./mvnw clean package -DskipTests) +fi + +echo "Getting compile classpath..." >&2 + +compile_cp=$(cd "$client_dir" && ./mvnw dependency:build-classpath -DincludeScope=compile 2>&1 | grep -E "^/.*\.jar" | head -1) +readonly compile_cp + +declare -r full_cp=".:$client_jar:$compile_cp" + +echo "Compiling ${class_name}.java..." >&2 +javac -cp "$full_cp" "${class_name}.java" + +echo "Running $class_name..." >&2 +java -cp "$full_cp" "$class_name" "$@"