@@ -759,49 +759,68 @@ public void recoverExchange(RecordedExchange x, boolean retry) {
759759 }
760760 }
761761
762-
762+ /**
763+ * Recover the queue. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
764+ * @param oldName queue name
765+ * @param q recorded queue
766+ * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
767+ */
763768 public void recoverQueue (final String oldName , RecordedQueue q , boolean retry ) {
764769 try {
765- if (topologyRecoveryFilter .filterQueue (q )) {
766- LOGGER .debug ("Recovering {}" , q );
767- if (retry ) {
768- final RecordedQueue entity = q ;
769- q = (RecordedQueue ) wrapRetryIfNecessary (q , () -> {
770- entity .recover ();
771- return null ;
772- }).getRecordedEntity ();
773- } else {
774- q .recover ();
775- }
776- String newName = q .getName ();
777- if (!oldName .equals (newName )) {
778- // make sure server-named queues are re-added with
779- // their new names. MK.
780- synchronized (this .recordedQueues ) {
781- this .propagateQueueNameChangeToBindings (oldName , newName );
782- this .propagateQueueNameChangeToConsumers (oldName , newName );
783- // bug26552:
784- // remove old name after we've updated the bindings and consumers,
785- // plus only for server-named queues, both to make sure we don't lose
786- // anything to recover. MK.
787- if (q .isServerNamed ()) {
788- deleteRecordedQueue (oldName );
789- }
790- this .recordedQueues .put (newName , q );
791- }
792- }
793- for (QueueRecoveryListener qrl : Utility .copy (this .queueRecoveryListeners )) {
794- qrl .queueRecovered (oldName , newName );
795- }
796- LOGGER .debug ("{} has recovered" , q );
797- }
770+ internalRecoverQueue (oldName , q , retry );
798771 } catch (Exception cause ) {
799772 final String message = "Caught an exception while recovering queue " + oldName +
800773 ": " + cause .getMessage ();
801774 TopologyRecoveryException e = new TopologyRecoveryException (message , cause , q );
802775 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , q .getDelegateChannel (), e );
803776 }
804777 }
778+
779+ /**
780+ * Recover the queue. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
781+ * @param oldName queue name
782+ * @param q recorded queue
783+ * @throws Exception if an error occurs recovering the queue
784+ */
785+ void recoverQueue (final String oldName , RecordedQueue q ) throws Exception {
786+ internalRecoverQueue (oldName , q , false );
787+ }
788+
789+ private void internalRecoverQueue (final String oldName , RecordedQueue q , boolean retry ) throws Exception {
790+ if (topologyRecoveryFilter .filterQueue (q )) {
791+ LOGGER .debug ("Recovering {}" , q );
792+ if (retry ) {
793+ final RecordedQueue entity = q ;
794+ q = (RecordedQueue ) wrapRetryIfNecessary (q , () -> {
795+ entity .recover ();
796+ return null ;
797+ }).getRecordedEntity ();
798+ } else {
799+ q .recover ();
800+ }
801+ String newName = q .getName ();
802+ if (!oldName .equals (newName )) {
803+ // make sure server-named queues are re-added with
804+ // their new names. MK.
805+ synchronized (this .recordedQueues ) {
806+ this .propagateQueueNameChangeToBindings (oldName , newName );
807+ this .propagateQueueNameChangeToConsumers (oldName , newName );
808+ // bug26552:
809+ // remove old name after we've updated the bindings and consumers,
810+ // plus only for server-named queues, both to make sure we don't lose
811+ // anything to recover. MK.
812+ if (q .isServerNamed ()) {
813+ deleteRecordedQueue (oldName );
814+ }
815+ this .recordedQueues .put (newName , q );
816+ }
817+ }
818+ for (QueueRecoveryListener qrl : Utility .copy (this .queueRecoveryListeners )) {
819+ qrl .queueRecovered (oldName , newName );
820+ }
821+ LOGGER .debug ("{} has recovered" , q );
822+ }
823+ }
805824
806825 public void recoverBinding (RecordedBinding b , boolean retry ) {
807826 try {
@@ -825,41 +844,61 @@ public void recoverBinding(RecordedBinding b, boolean retry) {
825844 }
826845 }
827846
847+ /**
848+ * Recover the consumer. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
849+ * @param tag consumer tag
850+ * @param consumer recorded consumer
851+ * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
852+ */
828853 public void recoverConsumer (final String tag , RecordedConsumer consumer , boolean retry ) {
829854 try {
830- if (this .topologyRecoveryFilter .filterConsumer (consumer )) {
831- LOGGER .debug ("Recovering {}" , consumer );
832- String newTag = null ;
833- if (retry ) {
834- final RecordedConsumer entity = consumer ;
835- RetryResult retryResult = wrapRetryIfNecessary (consumer , entity ::recover );
836- consumer = (RecordedConsumer ) retryResult .getRecordedEntity ();
837- newTag = (String ) retryResult .getResult ();
838- } else {
839- newTag = consumer .recover ();
840- }
841-
842- // make sure server-generated tags are re-added. MK.
843- if (tag != null && !tag .equals (newTag )) {
844- synchronized (this .consumers ) {
845- this .consumers .remove (tag );
846- this .consumers .put (newTag , consumer );
847- }
848- consumer .getChannel ().updateConsumerTag (tag , newTag );
849- }
850-
851- for (ConsumerRecoveryListener crl : Utility .copy (this .consumerRecoveryListeners )) {
852- crl .consumerRecovered (tag , newTag );
853- }
854- LOGGER .debug ("{} has recovered" , consumer );
855- }
855+ internalRecoverConsumer (tag , consumer , retry );
856856 } catch (Exception cause ) {
857857 final String message = "Caught an exception while recovering consumer " + tag +
858858 ": " + cause .getMessage ();
859859 TopologyRecoveryException e = new TopologyRecoveryException (message , cause , consumer );
860860 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , consumer .getDelegateChannel (), e );
861861 }
862862 }
863+
864+ /**
865+ * Recover the consumer. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
866+ * @param tag consumer tag
867+ * @param consumer recorded consumer
868+ * @throws Exception if an error occurs recovering the consumer
869+ */
870+ void recoverConsumer (final String tag , RecordedConsumer consumer ) throws Exception {
871+ internalRecoverConsumer (tag , consumer , false );
872+ }
873+
874+ private void internalRecoverConsumer (final String tag , RecordedConsumer consumer , boolean retry ) throws Exception {
875+ if (this .topologyRecoveryFilter .filterConsumer (consumer )) {
876+ LOGGER .debug ("Recovering {}" , consumer );
877+ String newTag = null ;
878+ if (retry ) {
879+ final RecordedConsumer entity = consumer ;
880+ RetryResult retryResult = wrapRetryIfNecessary (consumer , entity ::recover );
881+ consumer = (RecordedConsumer ) retryResult .getRecordedEntity ();
882+ newTag = (String ) retryResult .getResult ();
883+ } else {
884+ newTag = consumer .recover ();
885+ }
886+
887+ // make sure server-generated tags are re-added. MK.
888+ if (tag != null && !tag .equals (newTag )) {
889+ synchronized (this .consumers ) {
890+ this .consumers .remove (tag );
891+ this .consumers .put (newTag , consumer );
892+ }
893+ consumer .getChannel ().updateConsumerTag (tag , newTag );
894+ }
895+
896+ for (ConsumerRecoveryListener crl : Utility .copy (this .consumerRecoveryListeners )) {
897+ crl .consumerRecovered (tag , newTag );
898+ }
899+ LOGGER .debug ("{} has recovered" , consumer );
900+ }
901+ }
863902
864903 private <T > RetryResult wrapRetryIfNecessary (RecordedEntity entity , Callable <T > recoveryAction ) throws Exception {
865904 if (this .retryHandler == null ) {
0 commit comments