@@ -767,45 +767,64 @@ public void recoverExchange(RecordedExchange x, boolean retry) {
767767 }
768768 }
769769
770-
770+ /**
771+ * Recover the queue. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
772+ * @param oldName queue name
773+ * @param q recorded queue
774+ * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
775+ */
771776 public void recoverQueue (final String oldName , RecordedQueue q , boolean retry ) {
772777 try {
773- if (topologyRecoveryFilter .filterQueue (q )) {
774- LOGGER .debug ("Recovering {}" , q );
775- if (retry ) {
776- final RecordedQueue entity = q ;
777- q = (RecordedQueue ) wrapRetryIfNecessary (q , () -> {
778- entity .recover ();
779- return null ;
780- }).getRecordedEntity ();
781- } else {
782- q .recover ();
783- }
784- String newName = q .getName ();
785- if (!oldName .equals (newName )) {
786- // make sure server-named queues are re-added with
787- // their new names. MK.
788- synchronized (this .recordedQueues ) {
789- this .propagateQueueNameChangeToBindings (oldName , newName );
790- this .propagateQueueNameChangeToConsumers (oldName , newName );
791- // bug26552:
792- // remove old name after we've updated the bindings and consumers,
793- deleteRecordedQueue (oldName );
794- this .recordedQueues .put (newName , q );
795- }
796- }
797- for (QueueRecoveryListener qrl : Utility .copy (this .queueRecoveryListeners )) {
798- qrl .queueRecovered (oldName , newName );
799- }
800- LOGGER .debug ("{} has recovered" , q );
801- }
778+ internalRecoverQueue (oldName , q , retry );
802779 } catch (Exception cause ) {
803780 final String message = "Caught an exception while recovering queue " + oldName +
804781 ": " + cause .getMessage ();
805782 TopologyRecoveryException e = new TopologyRecoveryException (message , cause , q );
806783 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , q .getDelegateChannel (), e );
807784 }
808785 }
786+
787+ /**
788+ * Recover the queue. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
789+ * @param oldName queue name
790+ * @param q recorded queue
791+ * @throws Exception if an error occurs recovering the queue
792+ */
793+ void recoverQueue (final String oldName , RecordedQueue q ) throws Exception {
794+ internalRecoverQueue (oldName , q , false );
795+ }
796+
797+ private void internalRecoverQueue (final String oldName , RecordedQueue q , boolean retry ) throws Exception {
798+ if (topologyRecoveryFilter .filterQueue (q )) {
799+ LOGGER .debug ("Recovering {}" , q );
800+ if (retry ) {
801+ final RecordedQueue entity = q ;
802+ q = (RecordedQueue ) wrapRetryIfNecessary (q , () -> {
803+ entity .recover ();
804+ return null ;
805+ }).getRecordedEntity ();
806+ } else {
807+ q .recover ();
808+ }
809+ String newName = q .getName ();
810+ if (!oldName .equals (newName )) {
811+ // make sure queues are re-added with
812+ // their new names, if applicable. MK.
813+ synchronized (this .recordedQueues ) {
814+ this .propagateQueueNameChangeToBindings (oldName , newName );
815+ this .propagateQueueNameChangeToConsumers (oldName , newName );
816+ // bug26552:
817+ // remove old name after we've updated the bindings and consumers,
818+ deleteRecordedQueue (oldName );
819+ this .recordedQueues .put (newName , q );
820+ }
821+ }
822+ for (QueueRecoveryListener qrl : Utility .copy (this .queueRecoveryListeners )) {
823+ qrl .queueRecovered (oldName , newName );
824+ }
825+ LOGGER .debug ("{} has recovered" , q );
826+ }
827+ }
809828
810829 public void recoverBinding (RecordedBinding b , boolean retry ) {
811830 try {
@@ -829,41 +848,61 @@ public void recoverBinding(RecordedBinding b, boolean retry) {
829848 }
830849 }
831850
851+ /**
852+ * Recover the consumer. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
853+ * @param tag consumer tag
854+ * @param consumer recorded consumer
855+ * @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
856+ */
832857 public void recoverConsumer (final String tag , RecordedConsumer consumer , boolean retry ) {
833858 try {
834- if (this .topologyRecoveryFilter .filterConsumer (consumer )) {
835- LOGGER .debug ("Recovering {}" , consumer );
836- String newTag = null ;
837- if (retry ) {
838- final RecordedConsumer entity = consumer ;
839- RetryResult retryResult = wrapRetryIfNecessary (consumer , entity ::recover );
840- consumer = (RecordedConsumer ) retryResult .getRecordedEntity ();
841- newTag = (String ) retryResult .getResult ();
842- } else {
843- newTag = consumer .recover ();
844- }
845-
846- // make sure server-generated tags are re-added. MK.
847- if (tag != null && !tag .equals (newTag )) {
848- synchronized (this .consumers ) {
849- this .consumers .remove (tag );
850- this .consumers .put (newTag , consumer );
851- }
852- consumer .getChannel ().updateConsumerTag (tag , newTag );
853- }
854-
855- for (ConsumerRecoveryListener crl : Utility .copy (this .consumerRecoveryListeners )) {
856- crl .consumerRecovered (tag , newTag );
857- }
858- LOGGER .debug ("{} has recovered" , consumer );
859- }
859+ internalRecoverConsumer (tag , consumer , retry );
860860 } catch (Exception cause ) {
861861 final String message = "Caught an exception while recovering consumer " + tag +
862862 ": " + cause .getMessage ();
863863 TopologyRecoveryException e = new TopologyRecoveryException (message , cause , consumer );
864864 this .getExceptionHandler ().handleTopologyRecoveryException (delegate , consumer .getDelegateChannel (), e );
865865 }
866866 }
867+
868+ /**
869+ * Recover the consumer. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
870+ * @param tag consumer tag
871+ * @param consumer recorded consumer
872+ * @throws Exception if an error occurs recovering the consumer
873+ */
874+ void recoverConsumer (final String tag , RecordedConsumer consumer ) throws Exception {
875+ internalRecoverConsumer (tag , consumer , false );
876+ }
877+
878+ private void internalRecoverConsumer (final String tag , RecordedConsumer consumer , boolean retry ) throws Exception {
879+ if (this .topologyRecoveryFilter .filterConsumer (consumer )) {
880+ LOGGER .debug ("Recovering {}" , consumer );
881+ String newTag = null ;
882+ if (retry ) {
883+ final RecordedConsumer entity = consumer ;
884+ RetryResult retryResult = wrapRetryIfNecessary (consumer , entity ::recover );
885+ consumer = (RecordedConsumer ) retryResult .getRecordedEntity ();
886+ newTag = (String ) retryResult .getResult ();
887+ } else {
888+ newTag = consumer .recover ();
889+ }
890+
891+ // make sure server-generated tags are re-added. MK.
892+ if (tag != null && !tag .equals (newTag )) {
893+ synchronized (this .consumers ) {
894+ this .consumers .remove (tag );
895+ this .consumers .put (newTag , consumer );
896+ }
897+ consumer .getChannel ().updateConsumerTag (tag , newTag );
898+ }
899+
900+ for (ConsumerRecoveryListener crl : Utility .copy (this .consumerRecoveryListeners )) {
901+ crl .consumerRecovered (tag , newTag );
902+ }
903+ LOGGER .debug ("{} has recovered" , consumer );
904+ }
905+ }
867906
868907 private <T > RetryResult wrapRetryIfNecessary (RecordedEntity entity , Callable <T > recoveryAction ) throws Exception {
869908 if (this .retryHandler == null ) {
0 commit comments