diff --git a/src/main/java/apoc/broker/BrokerExceptionHandler.java b/src/main/java/apoc/broker/BrokerExceptionHandler.java
new file mode 100644
index 00000000..20475c46
--- /dev/null
+++ b/src/main/java/apoc/broker/BrokerExceptionHandler.java
@@ -0,0 +1,222 @@
+package apoc.broker;
+
+import apoc.broker.exception.BrokerConnectionInitializationException;
+import apoc.broker.exception.BrokerConnectionRecoveryException;
+import apoc.broker.exception.BrokerConnectionUnknownException;
+import apoc.broker.exception.BrokerDisconnectedException;
+import apoc.broker.exception.BrokerLoggerException;
+import apoc.broker.exception.BrokerReceiveException;
+import apoc.broker.exception.BrokerResendDisabledException;
+import apoc.broker.exception.BrokerRuntimeException;
+import apoc.broker.exception.BrokerSendException;
+import org.neo4j.logging.Log;
+
+/**
+ * @author alexanderiudice
+ * @since 2020.02.20
+ *
+ * This class is used to log out errors and then instantiate a corresponding Broker exception
+ */
+public class BrokerExceptionHandler
+{
+ static Log log;
+
+ private BrokerExceptionHandler()
+ {
+ }
+
+ public static BrokerDisconnectedException brokerDisconnectedException( String msg )
+ {
+ return brokerDisconnectedException( msg, null );
+ }
+
+ public static BrokerConnectionUnknownException brokerConnectionUnknownException( String msg )
+ {
+ return brokerConnectionUnknownException( msg, null );
+ }
+
+ public static BrokerRuntimeException brokerRuntimeException( String msg )
+ {
+ return brokerRuntimeException( msg, null );
+ }
+
+ public static BrokerSendException brokerSendException( String msg )
+ {
+ return brokerSendException( msg, null );
+ }
+
+ public static BrokerLoggerException brokerLoggerException( String msg )
+ {
+ return brokerLoggerException( msg, null );
+ }
+
+ public static BrokerResendDisabledException brokerResendDisabledException( String msg )
+ {
+ return brokerResendDisabledException( msg, null );
+ }
+
+ public static BrokerConnectionRecoveryException brokerConnectionRecoveryException( String msg )
+ {
+ return brokerConnectionRecoveryException( msg, null );
+ }
+
+ public static BrokerConnectionInitializationException brokerConnectionInitializationException( String msg )
+ {
+ return brokerConnectionInitializationException( msg, null );
+ }
+
+ public static BrokerReceiveException brokerReceiveException( String msg )
+ {
+ return brokerReceiveException( msg, null );
+ }
+
+ public static BrokerDisconnectedException brokerDisconnectedException( String msg, Throwable e )
+ {
+ BrokerDisconnectedException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerDisconnectedException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerDisconnectedException( msg );
+ log.error( brokerException.getMessage() );
+ }
+ return brokerException;
+ }
+
+ public static BrokerConnectionUnknownException brokerConnectionUnknownException( String msg, Throwable e )
+ {
+ BrokerConnectionUnknownException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerConnectionUnknownException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerConnectionUnknownException( msg );
+ log.error( brokerException.getMessage() );
+ }
+ return brokerException;
+ }
+
+ public static BrokerRuntimeException brokerRuntimeException( String msg, Throwable e )
+ {
+ BrokerRuntimeException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerRuntimeException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerRuntimeException( msg );
+ log.error( brokerException.getMessage() );
+ }
+ return brokerException;
+ }
+
+ public static BrokerSendException brokerSendException( String msg, Throwable e )
+ {
+ BrokerSendException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerSendException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerSendException( msg );
+ log.error( brokerException.getMessage() );
+ }
+
+ return brokerException;
+ }
+
+ public static BrokerLoggerException brokerLoggerException( String msg, Throwable e )
+ {
+ BrokerLoggerException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerLoggerException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerLoggerException( msg );
+ log.error( brokerException.getMessage() );
+ }
+
+ return brokerException;
+ }
+
+ public static BrokerResendDisabledException brokerResendDisabledException( String msg, Throwable e )
+ {
+ BrokerResendDisabledException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerResendDisabledException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerResendDisabledException( msg );
+ log.error( brokerException.getMessage() );
+ }
+
+ return brokerException;
+ }
+
+ public static BrokerConnectionRecoveryException brokerConnectionRecoveryException( String msg, Throwable e )
+ {
+ BrokerConnectionRecoveryException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerConnectionRecoveryException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerConnectionRecoveryException( msg );
+ log.error( brokerException.getMessage() );
+ }
+
+ return brokerException;
+ }
+
+ public static BrokerConnectionInitializationException brokerConnectionInitializationException( String msg, Throwable e )
+ {
+ BrokerConnectionInitializationException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerConnectionInitializationException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerConnectionInitializationException( msg );
+ log.error( brokerException.getMessage() );
+ }
+
+ return brokerException;
+ }
+
+ public static BrokerReceiveException brokerReceiveException( String msg, Throwable e )
+ {
+ BrokerReceiveException brokerException;
+ if ( e != null )
+ {
+ brokerException = new BrokerReceiveException( msg, e );
+ log.error( brokerException.getMessage(), e );
+ }
+ else
+ {
+ brokerException = new BrokerReceiveException( msg );
+ log.error( brokerException.getMessage() );
+ }
+
+ return brokerException;
+ }
+}
diff --git a/src/main/java/apoc/broker/BrokerHandler.java b/src/main/java/apoc/broker/BrokerHandler.java
index a1dc7e64..e9fbdf36 100644
--- a/src/main/java/apoc/broker/BrokerHandler.java
+++ b/src/main/java/apoc/broker/BrokerHandler.java
@@ -2,9 +2,14 @@
import apoc.ApocConfig;
import apoc.Pools;
+import apoc.broker.exception.BrokerDisconnectedException;
+import apoc.broker.exception.BrokerResendDisabledException;
+import apoc.broker.exception.BrokerRuntimeException;
+import apoc.broker.exception.BrokerSendException;
import apoc.broker.logging.BrokerLogManager;
import apoc.broker.logging.BrokerLogger;
import apoc.result.MapResult;
+import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ImmutableConfiguration;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
@@ -12,11 +17,14 @@
import org.neo4j.logging.Log;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -52,6 +60,7 @@ public BrokerHandler( GraphDatabaseAPI db, Log log, ApocConfig apocConfig, Pools
this.neo4jLog = log;
this.entireConfiguration = apocConfig.getConfig();
this.pools = pools;
+ BrokerExceptionHandler.log = log;
}
@Override
@@ -114,7 +123,7 @@ public void start()
}
catch ( Exception e )
{
- neo4jLog.error( "Hit an error while trying to reconnect to dead-on-arrival connections. Error: " + e.getMessage() );
+ BrokerExceptionHandler.brokerRuntimeException( "Unable to reconnect to dead-on-arrival connections. Error: " + e.getMessage(), e );
}
try
@@ -123,7 +132,7 @@ public void start()
}
catch ( Exception e )
{
- neo4jLog.error( "Hit an error trying to resend messages to healthy connections. Error: " + e.getMessage() );
+ BrokerExceptionHandler.brokerRuntimeException( "Unable to resend messages to healthy connections.", e );
}
}
}
@@ -164,7 +173,7 @@ public Stream sendMessageToBrokerConnection( String connection, M
{
if ( !brokerConnection.isConnected() )
{
- throw new Exception( "Broker Connection '" + connection + "' is not connected." );
+ throw BrokerExceptionHandler.brokerDisconnectedException( "Broker Connection '" + connection + "' is not connected to its broker." );
}
brokerConnection.checkConnectionHealth();
@@ -179,16 +188,36 @@ public Stream sendMessageToBrokerConnection( String connection, M
}
catch ( Exception e )
{
+ BrokerSendException brokerSendException;
+ if ( e instanceof BrokerDisconnectedException )
+ {
+ // No need to log out stacktrace
+ brokerSendException = BrokerExceptionHandler.brokerSendException( "Unable to send message to connection '" + connection + "'. Error: " + e.getMessage() );
+ }
+ else
+ {
+ brokerSendException =
+ BrokerExceptionHandler.brokerSendException( "Unable to send message to connection '" + connection + "'. Error: " + e.getMessage(), e );
+ }
- neo4jLog.error( "Unable to send message to connection '" + connection + "'. Error: " + e.getMessage() );
if ( loggingEnabled )
{
- BrokerLogManager.getBrokerLogger( connection ).error( new BrokerLogger.LogLine.LogEntry( connection, message, configuration ) );
- brokerConnection.setConnected( false );
- reconnectAndResendAsync( connection );
+ try
+ {
+ BrokerLogManager.getBrokerLogger( connection ).error( new BrokerLogger.LogLine.LogEntry( connection, message, configuration ) );
+ }
+ catch ( BrokerRuntimeException | JsonProcessingException jpe )
+ {
+ throw BrokerExceptionHandler.brokerRuntimeException( "BrokerLogger was unable to persist unsent message to retry logs.", jpe );
+ }
+ finally
+ {
+ brokerConnection.setConnected( false );
+ reconnectAndResendAsync( connection );
+ }
}
+ throw brokerSendException;
}
- throw new RuntimeException( "Unable to send message to connection '" + connection + "'." );
}
public Stream receiveMessageFromBrokerConnection( String connection, Map configuration ) throws IOException
@@ -238,33 +267,37 @@ public Stream checkReconnect( String connectionName )
private void resendMessagesForHealthyConnections() throws Exception
{
+ List thrownExceptions = new ArrayList<>();
BrokerLogManager.streamBrokerLogInfo().forEach( logInfo -> {
- resendMessagesForConnection( logInfo.getBrokerName() );
+ try
+ {
+ resendMessagesForConnection( logInfo.getBrokerName() );
+ }
+ catch ( BrokerResendDisabledException | RuntimeException e )
+ {
+ thrownExceptions.add( e );
+ }
} );
+
+ if ( !thrownExceptions.isEmpty() )
+ {
+ throw new BrokerRuntimeException( "Errors resending messages on initialization. Exceptions thrown: " +
+ thrownExceptions.stream().map( Throwable::getMessage ).collect( Collectors.joining( ",", "[", "]" ) ) );
+ }
}
- private void resendMessagesForConnection( String connectionName )
+ private void resendMessagesForConnection( String connectionName ) throws BrokerResendDisabledException
{
if ( loggingEnabled )
{
- try
+ if ( getConnection( connectionName ).isConnected() && BrokerLogManager.getBrokerLogger( connectionName ).calculateNumberOfLogEntries() > 0L )
{
- if ( getConnection( connectionName ).isConnected() && BrokerLogManager.getBrokerLogger( connectionName ).calculateNumberOfLogEntries() > 0L )
- {
- retryMessagesForConnectionAsync( connectionName );
- }
- }
- catch ( Exception e )
- {
- neo4jLog.error(
- "In 'resendMessagesForConnection'. Unable to either getConnection, calculate number of log entries, or retryMessagesForConnectionAsync." +
- " Error: " + e.getMessage() );
+ retryMessagesForConnectionAsync( connectionName );
}
}
else
{
- neo4jLog.error( "Broker logging must be enabled to resend messages." );
- throw new RuntimeException( "Broker logging must be enabled to resend messages." );
+ throw BrokerExceptionHandler.brokerResendDisabledException( "Broker logging must be enabled to resend messages." );
}
}
private void retryMessagesForConnectionAsync( String connectionName )
@@ -286,13 +319,16 @@ public void retryMessagesForConnectionAsync( String connectionName, Long numToSe
AtomicLong nextLinePointer = new AtomicLong( logInfo.getNextMessageToSend() );
AtomicLong numSent = new AtomicLong( 0 );
+ AtomicBoolean failedToSend = new AtomicBoolean( false );
+
try(Stream logEntryStream = BrokerLogger.streamLogLines( logInfo ).map( logLine -> logLine.getLogEntry() ))
{
for ( BrokerLogger.LogLine.LogEntry logEntry : logEntryStream.collect( Collectors.toList()) )
{
- neo4jLog.info( "APOC Broker: Resending message for '" + connectionName + "'." );
+ neo4jLog.debug( "APOC Broker: Resending message for '" + connectionName + "'." );
+
Boolean resent = resendBrokerMessage( logEntry.getConnectionName(), logEntry.getMessage(), logEntry.getConfiguration() );
if ( resent )
@@ -310,12 +346,15 @@ public void retryMessagesForConnectionAsync( String connectionName, Long numToSe
else
{
// Send unsuccessful. Break to stop sending messages.
+ failedToSend.set( true );
break;
}
}
- if ( numSent.get() > 0L )
+ if ( numSent.get() > 0L || failedToSend.get() )
{
+ neo4jLog.info( "APOC Broker: Resent " + numSent + " messages for '" + connectionName + "'." );
+
if ( nextLinePointer.get() == (BrokerLogManager.getBrokerLogger( connectionName ).calculateNumberOfLogEntries()) )
{
// All the messsages have been sent, reset the broker log.
@@ -335,31 +374,28 @@ public void retryMessagesForConnectionAsync( String connectionName, Long numToSe
}
}
}
- catch ( Exception e )
+ catch ( IOException e )
{
- neo4jLog.error( "Error in async execute 'retryMessagesForConnectionAsync'. Error: " + e.getMessage() );
+ BrokerExceptionHandler.brokerRuntimeException( "Error in async execute 'retryMessagesForConnectionAsync'. Error: " + e.getMessage(), e );
}
} );
}
}
catch ( Exception e )
{
- neo4jLog.error( "Error in method 'retryMessagesForConnectionAsync'. Error: " + e.getMessage() );
+ BrokerExceptionHandler.brokerRuntimeException( "Error in method 'retryMessagesForConnectionAsync'. Error: " + e.getMessage(), e );
}
}
private static Boolean resendBrokerMessage( String connection, Map message, Map configuration )
{
- if ( !doesExist( connection ) )
- {
- throw new RuntimeException( "Broker Exception. Connection '" + connection + "' is not a configured broker connection." );
- }
try
{
getConnection( connection ).send( message, configuration );
}
catch ( Exception e )
{
+ BrokerExceptionHandler.brokerSendException( "Broker Exception in 'resendBrokerMessage'. Unable to resend message to connection '" + connection + "'. Error: " + e.getMessage(), e );
return false;
}
return true;
diff --git a/src/main/java/apoc/broker/BrokerProcedures.java b/src/main/java/apoc/broker/BrokerProcedures.java
index b85cb1eb..83a5cdd3 100644
--- a/src/main/java/apoc/broker/BrokerProcedures.java
+++ b/src/main/java/apoc/broker/BrokerProcedures.java
@@ -108,11 +108,11 @@ public Stream list( )
return BrokerHandler.listConnections();
}
- private void checkIfExists( String connectionName ) throws IOException
+ private void checkIfExists( String connectionName )
{
if ( !doesExist( connectionName ) )
{
- throw new IOException( "Broker Exception. Connection '" + connectionName + "' is not a configured broker connection." );
+ throw BrokerExceptionHandler.brokerConnectionUnknownException( "Broker Exception. Connection '" + connectionName + "' is not a configured broker connection." );
}
}
}
diff --git a/src/main/java/apoc/broker/ConnectionFactory.java b/src/main/java/apoc/broker/ConnectionFactory.java
index a66cda8c..7ac95866 100644
--- a/src/main/java/apoc/broker/ConnectionFactory.java
+++ b/src/main/java/apoc/broker/ConnectionFactory.java
@@ -14,17 +14,17 @@ static BrokerConnection recreateConnection( BrokerConnection brokerConnection )
if ( brokerConnection instanceof RabbitMqConnectionFactory.RabbitMqConnection )
{
reconnect = new RabbitMqConnectionFactory.RabbitMqConnection( brokerConnection.getLog(), brokerConnection.getConnectionName(),
- brokerConnection.getConfiguration() );
+ brokerConnection.getConfiguration(), false );
}
else if ( brokerConnection instanceof SqsConnectionFactory.SqsConnection )
{
reconnect = new SqsConnectionFactory.SqsConnection( brokerConnection.getLog(), brokerConnection.getConnectionName(),
- brokerConnection.getConfiguration() );
+ brokerConnection.getConfiguration(), false );
}
else //if ( brokerConnection instanceof KafkaConnectionFactory.KafkaConnection )
{
reconnect = new KafkaConnectionFactory.KafkaConnection( brokerConnection.getLog(), brokerConnection.getConnectionName(),
- brokerConnection.getConfiguration() );
+ brokerConnection.getConfiguration(), false );
}
reconnect.checkConnectionHealth();
diff --git a/src/main/java/apoc/broker/ConnectionManager.java b/src/main/java/apoc/broker/ConnectionManager.java
index 1ad3fa12..8abe94d1 100644
--- a/src/main/java/apoc/broker/ConnectionManager.java
+++ b/src/main/java/apoc/broker/ConnectionManager.java
@@ -43,7 +43,7 @@ public static BrokerConnection getConnection( String connectionName )
}
catch ( NullPointerException e )
{
- throw new RuntimeException( "Tried to access non-existent connection '" + connectionName + "' in the brokerConnections map." );
+ throw BrokerExceptionHandler.brokerConnectionUnknownException( "Connection '" + connectionName + "' is not a configured broker connection." );
}
}
@@ -52,8 +52,7 @@ public static Boolean doesExist( String connectionName )
return brokerConnections.containsKey( connectionName );
}
- public static Set getConnectionNames()
- {
+ public static Set getConnectionNames(){
return brokerConnections.keySet();
}
diff --git a/src/main/java/apoc/broker/KafkaConnectionFactory.java b/src/main/java/apoc/broker/KafkaConnectionFactory.java
index 5a4d0f8b..27036172 100644
--- a/src/main/java/apoc/broker/KafkaConnectionFactory.java
+++ b/src/main/java/apoc/broker/KafkaConnectionFactory.java
@@ -49,6 +49,11 @@ public static class KafkaConnection implements BrokerConnection
private AtomicBoolean reconnecting = new AtomicBoolean( false );
public KafkaConnection( Log log, String connectionName, Map configuration )
+ {
+ this( log, connectionName, configuration, true );
+ }
+
+ public KafkaConnection( Log log, String connectionName, Map configuration, boolean verboseErrorLogging )
{
this.log = log;
this.connectionName = connectionName;
@@ -82,8 +87,11 @@ public KafkaConnection( Log log, String connectionName, Map confi
}
catch ( Exception e )
{
- this.log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: " + e.toString() );
- throw e;
+ if ( verboseErrorLogging )
+ {
+ BrokerExceptionHandler.brokerConnectionInitializationException( "Failed to initialize Kafka connection '" + connectionName + "'.", e );
+ }
+ connected.set( false );
}
}
@@ -93,7 +101,7 @@ public Stream send( @Name( "message" ) Map message
// Topic and value are required
if ( !parameters.containsKey( "topic" ) )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'topic' in parameters missing" );
+ throw BrokerExceptionHandler.brokerSendException( "Broker Exception. Connection Name: " + connectionName + ". Error: 'topic' in parameters missing" );
}
String topic = (String) parameters.get( "topic" );
@@ -129,8 +137,8 @@ else if ( !key.isEmpty() )
}
catch ( Exception e )
{
- this.log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: " + e.toString() );
- throw e;
+ throw BrokerExceptionHandler.brokerSendException( "Failed to send message to topic '" + topic + "'. Connection Name: " + connectionName + ".",
+ e );
}
@@ -146,7 +154,7 @@ public Stream receive( @Name( "configuration" ) Map
// Topic is required
if ( !configuration.containsKey( "topic" ) )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'topic' in configuration missing" );
+ throw BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ". Error: 'topic' in parameters missing" );
}
Integer pollSecondsDefault = this.pollSecondsDefault;
@@ -177,7 +185,7 @@ public Stream receive( @Name( "configuration" ) Map
}
catch ( Exception e )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: " + e.toString() );
+ BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ".", e);
}
} );
@@ -185,11 +193,10 @@ public Stream receive( @Name( "configuration" ) Map
}
catch ( Exception e )
{
- this.log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: " + e.toString() );
- throw e;
+ throw BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ".", e);
}
- return Arrays.stream( responseList.toArray( new BrokerResult[responseList.size()] ) );
+ return Arrays.stream( responseList.toArray( new BrokerResult[0] ) );
}
@Override
@@ -207,7 +214,7 @@ public void checkConnectionHealth() throws Exception
}
catch ( Exception e )
{
- throw e;
+ throw BrokerExceptionHandler.brokerRuntimeException( "Kafka Producer for connection '" + connectionName + "' failed healthcheck.", e );
}
try
{
@@ -216,7 +223,7 @@ public void checkConnectionHealth() throws Exception
}
catch ( Exception e )
{
- throw e;
+ throw BrokerExceptionHandler.brokerRuntimeException( "Kafka Consumer for connection '" + connectionName + "' failed healthcheck.", e );
}
}
diff --git a/src/main/java/apoc/broker/RabbitMqConnectionFactory.java b/src/main/java/apoc/broker/RabbitMqConnectionFactory.java
index 1ae3a124..84fc9ac0 100644
--- a/src/main/java/apoc/broker/RabbitMqConnectionFactory.java
+++ b/src/main/java/apoc/broker/RabbitMqConnectionFactory.java
@@ -16,11 +16,21 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.BIND;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.CACHE_QUEUE_BINDING;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.CHECK_KNOWN_BINDING;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.CHECK_KNOWN_EXCHANGE;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.CHECK_KNOWN_QUEUE;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.DECLARE_AND_CACHE_EXCHANGE;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.DECLARE_AND_CACHE_QUEUE;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.END;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.ERROR;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.PUBLISH;
+import static apoc.broker.RabbitMqConnectionFactory.SendState.START;
+
/**
* @author alexanderiudice
*/
@@ -48,10 +58,22 @@ public static class RabbitMqConnection implements BrokerConnection
private AtomicBoolean connected = new AtomicBoolean( false );
private AtomicBoolean reconnecting = new AtomicBoolean( false );
- // exchange -> List of routingKeys
- private Map> bindingsCache = new HashMap<>();
+ /**
+ * List of known exchanges. Serves as a cache of already declared and initialized exchanges to reduce overhead.
+ */
+ private List knownExchanges = new ArrayList<>();
+
+ /**
+ * Map of known queues to their bindings. Serves as a cache of already declared and initialized queues and bindings to reduce overhead.
+ */
+ private Map>> queueBindingsCache = new HashMap<>();
public RabbitMqConnection( Log log, String connectionName, Map configuration )
+ {
+ this( log, connectionName, configuration, true );
+ }
+
+ public RabbitMqConnection( Log log, String connectionName, Map configuration, boolean verboseErrorLogging )
{
this.log = log;
this.connectionName = connectionName;
@@ -71,7 +93,10 @@ public RabbitMqConnection( Log log, String connectionName, Map co
}
catch ( Exception e )
{
- this.log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: " + e.toString() );
+ if ( verboseErrorLogging )
+ {
+ BrokerExceptionHandler.brokerConnectionInitializationException( "Failed to initialize RabbitMQ connection '" + connectionName + "'.", e );
+ }
connected.set( false );
}
}
@@ -81,11 +106,11 @@ public Stream send( @Name( "message" ) Map message
{
if ( !configuration.containsKey( "exchangeName" ) )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'exchangeName' in parameters missing" );
+ throw BrokerExceptionHandler.brokerSendException( "Broker Exception. Connection Name: " + connectionName + ". Error: 'exchangeName' in parameters missing" );
}
if ( !configuration.containsKey( "routingKey" ) )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'routingKey' in parameters missing" );
+ throw BrokerExceptionHandler.brokerSendException( "Broker Exception. Connection Name: " + connectionName + ". Error: 'routingKey' in parameters missing" );
}
String exchangeName = (String) configuration.get( "exchangeName" );
@@ -97,102 +122,191 @@ public Stream send( @Name( "message" ) Map message
Map properties = (Map) configuration.getOrDefault( "amqpProperties", Collections.emptyMap() );
AMQP.BasicProperties basicProperties = basicPropertiesMapper( properties );
- // (1) Queue checking and creation
- // NOTE: If a queueName is included then it will always create the binding!
- // For optimization remove queueName if RabbitMQ queues/bindings already all setup.
- Boolean setBindingForQueue = false;
+ // Get queue name
String queueName = (String) configuration.getOrDefault( "queueName", "" );
- if ( !queueName.isEmpty() )
- {
- try
- {
- channel.queueDeclarePassive( queueName );
- }
- catch ( IOException e )
- {
- recoverFromChannelError();
-
- log.info( "Queue '" + queueName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." );
- // Queue does not exist so create one and setBindingForQueue = true
- setBindingForQueue = true;
+ SendState state = START;
+ String errorStateMessage = "[RabbitMQ State Machine Error] ";
- // Check for config
- Map queueConfiguration =
- (Map) configuration.getOrDefault( "queueConfiguration", Collections.emptyMap() );
+ // Finite state machine to control program flow. Creates queue/exchange/binding if needed.
+ while ( state != END )
+ {
+ switch ( state )
+ {
+ case START:
+ if ( queueName.isEmpty() )
+ {
+ state = CHECK_KNOWN_EXCHANGE;
+ }
+ else
+ {
+ state = CHECK_KNOWN_QUEUE;
+ }
+ break;
+ case CHECK_KNOWN_EXCHANGE:
+ if ( knownExchanges.contains( exchangeName ) )
+ {
+ if ( queueName.isEmpty() )
+ {
+ state = PUBLISH;
+ }
+ else
+ {
+ state = CACHE_QUEUE_BINDING;
+ }
+ }
+ else
+ {
+ state = DECLARE_AND_CACHE_EXCHANGE;
+ }
+ break;
+ case DECLARE_AND_CACHE_EXCHANGE:
+ try
+ {
+ channel.exchangeDeclarePassive( exchangeName );
+ }
+ catch ( IOException e )
+ {
+ recoverFromChannelError();
- Boolean queueDurable = (Boolean) queueConfiguration.getOrDefault( "durable", true );
- Boolean queueExclusive = (Boolean) queueConfiguration.getOrDefault( "exclusive", false );
- Boolean queueAutoDelete = (Boolean) queueConfiguration.getOrDefault( "autoDelete", false );
- Map queueArguments =
- (Map) queueConfiguration.getOrDefault( "arguments", Collections.emptyMap() );
+ log.info( "Exchange '" + exchangeName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." );
- // Declare
- channel.queueDeclare( queueName, queueDurable, queueExclusive, queueAutoDelete, queueArguments );
- }
+ Map channelConfiguration =
+ (Map) configuration.getOrDefault( "channelConfiguration", Collections.emptyMap() );
+ String channelType = (String) channelConfiguration.getOrDefault( "type", "topic" );
+ Boolean channelDurable = (Boolean) channelConfiguration.getOrDefault( "durable", true );
+ Boolean channelAutoDelete = (Boolean) channelConfiguration.getOrDefault( "autoDelete", false );
+ Map channelArguments =
+ (Map) channelConfiguration.getOrDefault( "arguments", Collections.emptyMap() );
- // If we already know the exchange then just create the binding
- if ( isKnownBinding( exchangeName, routingKey ) )
- {
- channel.queueBind( queueName, exchangeName, routingKey );
- setBindingForQueue = false;
- }
- else if ( isKnownExchange( exchangeName ) )
- {
- bindingsCache.get( exchangeName ).add( routingKey );
- channel.queueBind( queueName, exchangeName, routingKey );
- setBindingForQueue = false;
- }
+ // Declare channel
+ channel.exchangeDeclare( exchangeName, channelType, channelDurable, channelAutoDelete, channelArguments );
+ }
+ // cache exchangeName
+ knownExchanges.add( exchangeName );
+ if ( queueName.isEmpty() )
+ {
+ state = PUBLISH;
+ }
+ else
+ {
+ state = CACHE_QUEUE_BINDING;
+ }
+ break;
+ case BIND:
+ try
+ {
+ channel.queueBind( queueName, exchangeName, routingKey );
+ }
+ catch ( Exception e )
+ {
+ throw BrokerExceptionHandler.brokerRuntimeException( "Unable to bind exchange and routing-key <" + exchangeName + "," + routingKey + "> to queue '" + queueName + "' ", e );
+ }
+ state = PUBLISH;
+ break;
+ case CACHE_QUEUE_BINDING:
+ // Guaranteed that is not known in this state
+ Map> bindingMap;
+ if ( queueBindingsCache.containsKey( queueName ) )
+ {
+ bindingMap = queueBindingsCache.get( queueName );
+ }
+ else
+ {
+ bindingMap = new HashMap<>();
+ }
- }
+ if ( bindingMap.containsKey( exchangeName ) )
+ {
+ if ( bindingMap.get( exchangeName ).contains( routingKey ) )
+ {
+ errorStateMessage += "Entered state '" + CACHE_QUEUE_BINDING + "' but queue '" + queueName + "' already has binding pair <" + exchangeName + "," + routingKey + ">";
+ state = ERROR;
+ break;
+ }
- // (2) Send message if exchange is known. Create exchange if unknown.
- if ( isKnownBinding( exchangeName, routingKey ) )
- {
- // Send Message
- channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
- }
- else if ( isKnownExchange( exchangeName ) )
- {
- bindingsCache.get( exchangeName ).add( routingKey );
- channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
- }
- else
- {
- // Check if exchange exists
- try
- {
- channel.exchangeDeclarePassive( exchangeName );
- }
- catch ( IOException e )
- {
- recoverFromChannelError();
+ bindingMap.get( exchangeName ).add( routingKey );
+ }
+ else
+ {
+ bindingMap.put( exchangeName, new ArrayList<>( Collections.singletonList( routingKey ) ) );
+ }
+ // cache
+ queueBindingsCache.put( queueName, bindingMap );
+ state = BIND;
+ break;
+ case CHECK_KNOWN_QUEUE:
+ if ( queueBindingsCache.containsKey( queueName ) )
+ {
+ state = CHECK_KNOWN_BINDING;
+ }
+ else
+ {
+ state = DECLARE_AND_CACHE_QUEUE;
+ }
+ break;
+ case DECLARE_AND_CACHE_QUEUE:
+ try
+ {
+ channel.queueDeclarePassive( queueName );
+ }
+ catch ( IOException e )
+ {
+ recoverFromChannelError();
- log.info( "Exchange '" + exchangeName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." );
+ log.info( "Queue '" + queueName + "' does not exist for RabbitMQ connection '" + connectionName + "'. Creating it now." );
- Map channelConfiguration =
- (Map) configuration.getOrDefault( "channelConfiguration", Collections.emptyMap() );
- String channelType = (String) channelConfiguration.getOrDefault( "type", "topic" );
- Boolean channelDurable = (Boolean) channelConfiguration.getOrDefault( "durable", true );
- Boolean channelAutoDelete = (Boolean) channelConfiguration.getOrDefault( "autoDelete", false );
- Map channelArguments =
- (Map) channelConfiguration.getOrDefault( "arguments", Collections.emptyMap() );
+ // Check for config
+ Map queueConfiguration =
+ (Map) configuration.getOrDefault( "queueConfiguration", Collections.emptyMap() );
- // Declare channel
- channel.exchangeDeclare( exchangeName, channelType, channelDurable, channelAutoDelete, channelArguments );
- }
+ Boolean queueDurable = (Boolean) queueConfiguration.getOrDefault( "durable", true );
+ Boolean queueExclusive = (Boolean) queueConfiguration.getOrDefault( "exclusive", false );
+ Boolean queueAutoDelete = (Boolean) queueConfiguration.getOrDefault( "autoDelete", false );
+ Map queueArguments =
+ (Map) queueConfiguration.getOrDefault( "arguments", Collections.emptyMap() );
- // Add it to the cache
- bindingsCache.put( exchangeName, Arrays.asList( routingKey ) );
+ // Declare
+ channel.queueDeclare( queueName, queueDurable, queueExclusive, queueAutoDelete, queueArguments );
+ }
- // Check if queue was set up as well, so we can add the binding now that the exchange/routingKey is known
- if ( setBindingForQueue )
- {
- channel.queueBind( queueName, exchangeName, routingKey );
+ queueBindingsCache.put( queueName, new HashMap<>() );
+ state = CHECK_KNOWN_EXCHANGE;
+ break;
+ case CHECK_KNOWN_BINDING:
+ // queue should be known
+ if ( !queueBindingsCache.containsKey( queueName ) )
+ {
+ errorStateMessage += "Entered state '" + CHECK_KNOWN_BINDING.name() + "' but queueBindingsCache does not contain key '" + queueName + "'.";
+ state = ERROR;
+ break;
+ }
+ Map> bindings = queueBindingsCache.get( queueName );
+ if ( bindings.containsKey( exchangeName ) && bindings.get( exchangeName ).contains( routingKey ) )
+ {
+ state = PUBLISH;
+ break;
+ }
+ // Either the exchange or routingKey was not found
+ state = CHECK_KNOWN_EXCHANGE;
+ break;
+ case PUBLISH:
+ try
+ {
+ channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
+ }
+ catch ( Exception e )
+ {
+ throw BrokerExceptionHandler.brokerSendException( "Failed to publish message to exchange '" + exchangeName + "'.", e );
+ }
+ state = END;
+ break;
+ case ERROR:
+ throw BrokerExceptionHandler.brokerRuntimeException( errorStateMessage );
+ case END:
+ // Should not get here
+ break;
}
-
- // finally send the message
- channel.basicPublish( exchangeName, routingKey, basicProperties, objectMapper.writeValueAsBytes( message ) );
}
return Stream.of( new BrokerMessage( connectionName, message, configuration ) );
@@ -203,7 +317,7 @@ public Stream receive( @Name( "configuration" ) Map
{
if ( !configuration.containsKey( "queueName" ) )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'queueName' in parameters missing" );
+ throw BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ". Error: 'queueName' in parameters missing" );
}
Long pollRecordsMax = Long.parseLong( maxPollRecordsDefault );
@@ -233,7 +347,7 @@ public Stream receive( @Name( "configuration" ) Map
GetResponse message = channel.basicGet( (String) configuration.get( "queueName" ), false );
if ( message == null )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Message retrieved is null. Possibly no messages in the '" +
+ BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ". Message retrieved is null. Possibly no messages in the '" +
configuration.get( "queueName" ) + "' queue." );
break;
}
@@ -252,13 +366,12 @@ public Stream receive( @Name( "configuration" ) Map
}
catch ( Exception e )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Exception when trying to get a message from the '" +
- configuration.get( "queueName" ) + "' queue." );
- throw e;
+ throw BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ". Exception when trying to get a message from the '" +
+ configuration.get( "queueName" ) + "' queue.", e );
}
}
- return Arrays.stream( messageMap.toArray( new BrokerResult[messageMap.size()] ) );
+ return Arrays.stream( messageMap.toArray( new BrokerResult[0] ) );
}
@Override
@@ -266,15 +379,18 @@ public void stop()
{
try
{
- if ( channel.isOpen() )
+ if ( channel != null && channel.isOpen() )
{
channel.close();
}
- connection.close();
+ if ( connection != null && connection.isOpen() )
+ {
+ connection.close();
+ }
}
catch ( Exception e )
{
- log.error( "Broker Exception. Failed to stop(). Connection Name: " + connectionName + ". Error: " + e.toString() );
+ BrokerExceptionHandler.brokerRuntimeException( "Broker Exception. Failed to stop(). Connection Name: " + connectionName + ". Error: " + e.toString(), e );
}
}
@@ -283,35 +399,13 @@ public void checkConnectionHealth() throws Exception
{
if ( connection == null || !connection.isOpen() )
{
- if ( connected.get() )
- {
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Connection lost. Attempting to reestablish the connection." );
- }
- throw new RuntimeException( "RabbitMQ connection for '" + connectionName + "' has closed." );
+ throw BrokerExceptionHandler.brokerRuntimeException( "RabbitMQ connection '" + connectionName + "' failed healthcheck." );
}
if ( channel == null || !channel.isOpen() )
{
- if ( connected.get() )
- {
- log.error( "Broker Exception. Connection Name: " + connectionName + ". RabbitMQ channel lost. Attempting to create new channel." );
- }
- throw new RuntimeException( "RabbitMQ channel for '" + connectionName + "' has closed." );
- }
- }
-
- private boolean isKnownExchange( String exchange )
- {
- return bindingsCache.containsKey( exchange );
- }
-
- private boolean isKnownBinding( String exchange, String routingKey )
- {
- if ( !bindingsCache.containsKey( exchange ) )
- {
- return false;
+ throw BrokerExceptionHandler.brokerRuntimeException( "RabbitMQ channel for '" + connectionName + "' failed healthcheck." );
}
- return bindingsCache.get( exchange ).contains( routingKey );
}
private void recoverFromChannelError( )
@@ -326,8 +420,7 @@ private void recoverFromChannelError( )
}
catch ( Exception e )
{
- log.error( "Failed to recover from channel error. Error: " + e.getMessage() );
- throw new RuntimeException( "Failed to recover from channel error. Error: " + e.getMessage() );
+ throw BrokerExceptionHandler.brokerConnectionRecoveryException( "Failed to recover from channel error.", e );
}
}
@@ -398,7 +491,7 @@ private AMQP.BasicProperties basicPropertiesMapper( Map propertie
}
catch ( Exception e )
{
- throw new RuntimeException( "Invalid 'timestamp' in the RabbitMQ 'properties' configuration." );
+ throw BrokerExceptionHandler.brokerRuntimeException( "Invalid 'timestamp' in the RabbitMQ 'properties' configuration." );
}
}
if ( propertiesMap.containsKey( "type" ) )
@@ -446,4 +539,20 @@ public void setReconnecting( Boolean reconnecting )
this.reconnecting.getAndSet( reconnecting );
}
}
+
+ // States for the Finite State Machine
+ enum SendState
+ {
+ START,
+ CHECK_KNOWN_EXCHANGE,
+ DECLARE_AND_CACHE_EXCHANGE,
+ BIND,
+ CACHE_QUEUE_BINDING,
+ CHECK_KNOWN_QUEUE,
+ DECLARE_AND_CACHE_QUEUE,
+ CHECK_KNOWN_BINDING,
+ PUBLISH,
+ END,
+ ERROR;
+ }
}
diff --git a/src/main/java/apoc/broker/SqsConnectionFactory.java b/src/main/java/apoc/broker/SqsConnectionFactory.java
index 22ec62aa..f7253022 100644
--- a/src/main/java/apoc/broker/SqsConnectionFactory.java
+++ b/src/main/java/apoc/broker/SqsConnectionFactory.java
@@ -48,25 +48,32 @@ public static class SqsConnection implements BrokerConnection
private AtomicBoolean reconnecting = new AtomicBoolean( false );
public SqsConnection( Log log, String connectionName, Map configuration )
+ {
+ this( log, connectionName, configuration, true );
+ }
+
+ public SqsConnection( Log log, String connectionName, Map configuration, boolean verboseErrorLogging )
{
this.log = log;
this.connectionName = connectionName;
this.configuration = configuration;
- connected.set( true );
-
try
{
amazonSQS = AmazonSQSClientBuilder.standard().withCredentials( new AWSStaticCredentialsProvider(
new BasicAWSCredentials( (String) configuration.get( "access.key.id" ), (String) configuration.get( "secret.key.id" ) ) ) ).withRegion(
(String) configuration.get( "region" ) ).build();
+
+ connected.set( true );
}
catch ( Exception e )
{
+ if ( verboseErrorLogging )
+ {
+ BrokerExceptionHandler.brokerConnectionInitializationException( "Failed to initialize SQS connection '" + connectionName + "'.", e );
+ }
connected.set( false );
}
-
-
}
@Override
@@ -74,7 +81,7 @@ public Stream send( @Name( "message" ) Map message
{
if ( !configuration.containsKey( "queueName" ) )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: 'queueName' in parameters missing" );
+ throw BrokerExceptionHandler.brokerSendException( "Broker Exception. Connection Name: " + connectionName + ". Error: 'queueName' in parameters missing" );
}
String queueName = (String) configuration.get( "queueName" );
@@ -82,11 +89,19 @@ public Stream send( @Name( "message" ) Map message
if ( doesQueueExistInRegion( queueName, region ) )
{
- amazonSQS.sendMessage( new SendMessageRequest().withQueueUrl( queueName ).withMessageBody( objectMapper.writeValueAsString( message ) ) );
+ try
+ {
+ amazonSQS.sendMessage( new SendMessageRequest().withQueueUrl( queueName ).withMessageBody( objectMapper.writeValueAsString( message ) ) );
+ }
+ catch ( Exception e )
+ {
+ throw BrokerExceptionHandler.brokerSendException( "Encountered error while sending SQS message for connection '" + connectionName + "'.",
+ e );
+ }
}
else
{
- throw new RuntimeException(
+ throw BrokerExceptionHandler.brokerSendException(
"Broker Exception. Connection Name: " + connectionName + ". Error: SQS queue '" + queueName + "' does not exist in region '" + region +
"'." );
}
@@ -145,23 +160,23 @@ public Stream receive( @Name( "configuration" ) Map
}
else
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". No messages received from SQS queue '" + queueName +
+ BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ". No messages received from SQS queue '" + queueName +
"' in region '" + region + "'." );
}
}
catch ( Exception e )
{
- log.error( "Broker Exception. Connection Name: " + connectionName + ". Error: " + e.toString() );
+ BrokerExceptionHandler.brokerReceiveException( "Broker Exception. Connection Name: " + connectionName + ". Error: " + e.toString() );
}
}
else
{
- log.error(
+ BrokerExceptionHandler.brokerReceiveException(
"Broker Exception. Connection Name: " + connectionName + ". Error: SQS queue '" + queueName + "' does not exist in region '" + region +
"'." );
}
- return Arrays.stream( responseList.toArray( new BrokerResult[responseList.size()] ) );
+ return Arrays.stream( responseList.toArray( new BrokerResult[0] ) );
}
@Override
@@ -169,11 +184,14 @@ public void stop()
{
try
{
- amazonSQS.shutdown();
+ if ( amazonSQS != null )
+ {
+ amazonSQS.shutdown();
+ }
}
catch ( Exception e )
{
- log.error( "Broker Exception. Failed to stop(). Connection Name: " + connectionName + ". Error: " + e.toString() );
+ BrokerExceptionHandler.brokerRuntimeException( "Broker Exception. Failed to stop(). Connection Name: " + connectionName + ".", e );
}
}
@@ -237,12 +255,13 @@ public void checkConnectionHealth() throws Exception
try
{
amazonSQS.listQueues();
- }catch (Exception e )
+ }
+ catch ( Exception e )
{
amazonSQS = AmazonSQSClientBuilder.standard().withCredentials( new AWSStaticCredentialsProvider(
new BasicAWSCredentials( (String) configuration.get( "access.key.id" ), (String) configuration.get( "secret.key.id" ) ) ) ).withRegion(
(String) configuration.get( "region" ) ).build();
- throw e;
+ throw BrokerExceptionHandler.brokerRuntimeException( "SQS connection '" + connectionName + "' failed healthcheck.", e );
}
}
}
diff --git a/src/main/java/apoc/broker/exception/BrokerConnectionInitializationException.java b/src/main/java/apoc/broker/exception/BrokerConnectionInitializationException.java
new file mode 100644
index 00000000..8f891c88
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerConnectionInitializationException.java
@@ -0,0 +1,14 @@
+package apoc.broker.exception;
+
+public class BrokerConnectionInitializationException extends RuntimeException
+{
+ public BrokerConnectionInitializationException( String message )
+ {
+ super( "[BrokerConnectionInitializationException] " + message );
+ }
+
+ public BrokerConnectionInitializationException( String message, Throwable cause )
+ {
+ super( "[BrokerConnectionInitializationException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerConnectionRecoveryException.java b/src/main/java/apoc/broker/exception/BrokerConnectionRecoveryException.java
new file mode 100644
index 00000000..5a45ecb6
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerConnectionRecoveryException.java
@@ -0,0 +1,14 @@
+package apoc.broker.exception;
+
+public class BrokerConnectionRecoveryException extends RuntimeException
+{
+ public BrokerConnectionRecoveryException( String message )
+ {
+ super( "[BrokerConnectionRecoveryException] " + message );
+ }
+
+ public BrokerConnectionRecoveryException( String message, Throwable cause )
+ {
+ super( "[BrokerConnectionRecoveryException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerConnectionUnknownException.java b/src/main/java/apoc/broker/exception/BrokerConnectionUnknownException.java
new file mode 100644
index 00000000..8dd6d5cf
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerConnectionUnknownException.java
@@ -0,0 +1,14 @@
+package apoc.broker.exception;
+
+public class BrokerConnectionUnknownException extends RuntimeException
+{
+ public BrokerConnectionUnknownException( String message )
+ {
+ super( "[BrokerConnectionUnknownException] " + message );
+ }
+
+ public BrokerConnectionUnknownException( String message, Throwable cause )
+ {
+ super( "[BrokerConnectionUnknownException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerDisconnectedException.java b/src/main/java/apoc/broker/exception/BrokerDisconnectedException.java
new file mode 100644
index 00000000..8e5c14f0
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerDisconnectedException.java
@@ -0,0 +1,16 @@
+package apoc.broker.exception;
+
+import java.io.IOException;
+
+public class BrokerDisconnectedException extends IOException
+{
+ public BrokerDisconnectedException( String message )
+ {
+ super( "[BrokerDisconnectedException] " + message );
+ }
+
+ public BrokerDisconnectedException( String message, Throwable cause )
+ {
+ super( "[BrokerDisconnectedException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerLoggerException.java b/src/main/java/apoc/broker/exception/BrokerLoggerException.java
new file mode 100644
index 00000000..b5138f12
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerLoggerException.java
@@ -0,0 +1,15 @@
+package apoc.broker.exception;
+
+public class BrokerLoggerException extends RuntimeException
+{
+
+ public BrokerLoggerException( String message )
+ {
+ super( "[BrokerLoggerException] " + message );
+ }
+
+ public BrokerLoggerException( String message, Throwable cause )
+ {
+ super( "[BrokerLoggerException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerReceiveException.java b/src/main/java/apoc/broker/exception/BrokerReceiveException.java
new file mode 100644
index 00000000..9ee3e598
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerReceiveException.java
@@ -0,0 +1,14 @@
+package apoc.broker.exception;
+
+public class BrokerReceiveException extends RuntimeException
+{
+ public BrokerReceiveException( String message )
+ {
+ super( "[BrokerReceiveException] " + message );
+ }
+
+ public BrokerReceiveException( String message, Throwable cause )
+ {
+ super( "[BrokerReceiveException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerResendDisabledException.java b/src/main/java/apoc/broker/exception/BrokerResendDisabledException.java
new file mode 100644
index 00000000..423dc097
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerResendDisabledException.java
@@ -0,0 +1,16 @@
+package apoc.broker.exception;
+
+import java.io.IOException;
+
+public class BrokerResendDisabledException extends IOException
+{
+ public BrokerResendDisabledException( String message )
+ {
+ super( "[BrokerResendDisabledException] " + message );
+ }
+
+ public BrokerResendDisabledException( String message, Throwable cause )
+ {
+ super( "[BrokerResendDisabledException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerRuntimeException.java b/src/main/java/apoc/broker/exception/BrokerRuntimeException.java
new file mode 100644
index 00000000..e60c6767
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerRuntimeException.java
@@ -0,0 +1,14 @@
+package apoc.broker.exception;
+
+public class BrokerRuntimeException extends RuntimeException
+{
+ public BrokerRuntimeException( String message )
+ {
+ super( "[BrokerRuntimeException] " + message );
+ }
+
+ public BrokerRuntimeException( String message, Throwable cause )
+ {
+ super( "[BrokerRuntimeException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/exception/BrokerSendException.java b/src/main/java/apoc/broker/exception/BrokerSendException.java
new file mode 100644
index 00000000..c68aa2f0
--- /dev/null
+++ b/src/main/java/apoc/broker/exception/BrokerSendException.java
@@ -0,0 +1,14 @@
+package apoc.broker.exception;
+
+public class BrokerSendException extends RuntimeException
+{
+ public BrokerSendException( String message )
+ {
+ super( "[BrokerSendException] " + message );
+ }
+
+ public BrokerSendException( String message, Throwable cause )
+ {
+ super( "[BrokerSendException] " + message, cause );
+ }
+}
diff --git a/src/main/java/apoc/broker/logging/BrokerLogManager.java b/src/main/java/apoc/broker/logging/BrokerLogManager.java
index 9b57bdd2..5d79f835 100644
--- a/src/main/java/apoc/broker/logging/BrokerLogManager.java
+++ b/src/main/java/apoc/broker/logging/BrokerLogManager.java
@@ -1,5 +1,7 @@
package apoc.broker.logging;
+import apoc.broker.BrokerExceptionHandler;
+import apoc.broker.exception.BrokerRuntimeException;
import apoc.util.JsonUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -12,6 +14,7 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -79,7 +82,7 @@ public static void initializeBrokerLogManager( GraphDatabaseAPI api, String dirP
}
catch ( Exception e )
{
- new RuntimeException( "Unable to create 'brokers.log' log file. Exception: " + e.getMessage() );
+ BrokerExceptionHandler.brokerLoggerException( "Unable to create 'brokers.log' log file. Exception: " + e.getMessage() );
}
nameToLogMap = new ConcurrentHashMap<>( );
@@ -99,7 +102,7 @@ public static void initializeBrokerLogManager( GraphDatabaseAPI api, String dirP
}
catch ( Exception e )
{
- new RuntimeException( "Unable to create '" + name + ".log' log file." );
+ BrokerExceptionHandler.brokerLoggerException( "Unable to create '" + name + ".log' log file." );
}
} );
}
@@ -115,7 +118,7 @@ public static Stream streamLogLines() throws Exception
* @return
* @throws Exception
*/
- public static Stream readBrokerLogLine(String connectionName) throws Exception
+ public static Stream readBrokerLogLine(String connectionName) throws IOException
{
return Files.lines( Paths.get( brokerLog.getPath() ) ).map( LogLine::new ).map( LogLine::getLogInfo ).filter(
logInfo -> logInfo.getBrokerName().equals( connectionName ) );
@@ -165,7 +168,7 @@ public static void updateNextMessageToSend(String connectionName, Long messagePo
}
catch ( Exception e )
{
- throw new RuntimeException( "Failure to update the logInfo for connection '" + connectionName + "'." );
+ throw BrokerExceptionHandler.brokerLoggerException( "Failure to update the logInfo for connection '" + connectionName + "'.", e );
}
} );
}
@@ -182,6 +185,7 @@ public static void updateNextMessageToSend(String connectionName, Long messagePo
{
org.apache.commons.io.FileUtils.deleteQuietly( tmpFile );
}
+ BrokerExceptionHandler.brokerLoggerException( "Error in 'updateNextMessageToSend'.", e );
}
}
}
@@ -192,8 +196,12 @@ public static void resetBrokerLogger(String connectionName)
nameToLogMap.get( connectionName ).resetFile();
}
- public static BrokerLogger getBrokerLogger(String connectionName)
+ public static BrokerLogger getBrokerLogger(String connectionName) throws BrokerRuntimeException
{
+ if (!nameToLogMap.containsKey( connectionName ))
+ {
+ throw BrokerExceptionHandler.brokerLoggerException( "BrokerLogManager does not have a logger for connection '" + connectionName + "'." );
+ }
return nameToLogMap.get( connectionName );
}
@@ -272,7 +280,7 @@ public LogLine( String logLine )
}
catch ( Exception e )
{
- logInfo = new LogInfo( );
+ logInfo = new LogInfo();
}
}
@@ -287,7 +295,7 @@ public String getLogString()
}
catch ( Exception e )
{
- throw new RuntimeException( "Unable to write LogEntry as String" );
+ throw BrokerExceptionHandler.brokerLoggerException( "Unable to write LogEntry as String", e );
}
return result;
}
diff --git a/src/main/java/apoc/broker/logging/BrokerLogger.java b/src/main/java/apoc/broker/logging/BrokerLogger.java
index e21ce534..3b7cbd5a 100644
--- a/src/main/java/apoc/broker/logging/BrokerLogger.java
+++ b/src/main/java/apoc/broker/logging/BrokerLogger.java
@@ -1,19 +1,19 @@
package apoc.broker.logging;
+import apoc.broker.BrokerExceptionHandler;
import apoc.util.JsonUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
@@ -119,7 +119,7 @@ public String getLogString()
}
catch ( Exception e )
{
- throw new RuntimeException( "Unable to write LogEntry as String" );
+ throw BrokerExceptionHandler.brokerLoggerException( "Unable to write LogEntry as String", e );
}
return result;
}
@@ -267,7 +267,7 @@ public BrokerLogger( GraphDatabaseAPI api, String dirPath, String connectionName
}
catch ( Exception e )
{
- throw new RuntimeException( "APOC Broker Exception. Logger failed to initialize." );
+ throw BrokerExceptionHandler.brokerLoggerException( "Logger failed to initialize.", e);
}
}
@@ -278,7 +278,7 @@ public Stream streamStartingFrom(Long lineNumber ) throws Exce
}
catch ( Exception e )
{
- throw new RuntimeException( "Could not start streaming from line number " + lineNumber + "." );
+ throw BrokerExceptionHandler.brokerLoggerException( "Could not start streaming from line number " + lineNumber + ".", e );
}
}
@@ -300,17 +300,21 @@ public Stream> batchConnectionMessages( String connection
* @return
* @throws Exception
*/
- public static Stream streamLogLines( BrokerLogManager.LogLine.LogInfo logInfo ) throws Exception
+ public static Stream streamLogLines( BrokerLogManager.LogLine.LogInfo logInfo ) throws IOException
{
return Files.lines( Paths.get( logInfo.getFilePath())).skip( logInfo.getNextMessageToSend()).map( LogLine::new );
}
- public Long calculateNumberOfLogEntries() throws Exception
+ public Long calculateNumberOfLogEntries()
{
- try(Stream lines = Files.lines( Paths.get( logFile.getPath() ) ))
+ try ( Stream lines = Files.lines( Paths.get( logFile.getPath() ) ) )
{
return lines.count();
}
+ catch ( Exception e )
+ {
+ throw BrokerExceptionHandler.brokerLoggerException( "Unable to calculate the number of log entries for logFile '" + logFile.getPath() + "'.", e );
+ }
}
public void resetFile()
@@ -335,6 +339,7 @@ public void resetFile()
}
catch ( Exception e )
{
+ throw BrokerExceptionHandler.brokerLoggerException("Logger failed to reset log file. Error: " + e.getMessage(), e );
}
}
}
@@ -344,22 +349,22 @@ public Boolean IsAtThreshold()
return (numLogEntries.get() > retryThreshold);
}
- public void info( LogLine.LogEntry logEntry ) throws Exception
+ public void info( LogLine.LogEntry logEntry ) throws JsonProcessingException
{
info( OBJECT_MAPPER.writeValueAsString( logEntry ) );
}
- public void warn( LogLine.LogEntry logEntry ) throws Exception
+ public void warn( LogLine.LogEntry logEntry ) throws JsonProcessingException
{
warn( OBJECT_MAPPER.writeValueAsString( logEntry ) );
}
- public void debug( LogLine.LogEntry logEntry ) throws Exception
+ public void debug( LogLine.LogEntry logEntry ) throws JsonProcessingException
{
debug( OBJECT_MAPPER.writeValueAsString( logEntry ) );
}
- public void error( LogLine.LogEntry logEntry ) throws Exception
+ public void error( LogLine.LogEntry logEntry ) throws JsonProcessingException
{
error( OBJECT_MAPPER.writeValueAsString( logEntry ) );
}