diff --git a/docs/using-the-jdbc-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md b/docs/using-the-jdbc-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md index 0e13859f2..031409c8a 100644 --- a/docs/using-the-jdbc-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md +++ b/docs/using-the-jdbc-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md @@ -8,7 +8,7 @@ The read/write splitting plugin is not loaded by default. To load the plugin, in ``` final Properties properties = new Properties(); -properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover,efm"); +properties.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,failover2,efm2"); ``` If you would like to use the read/write splitting plugin without the failover plugin, make sure you have the `readWriteSplitting` plugin in the `wrapperPlugins` property, and that the failover plugin is not part of it. @@ -30,7 +30,10 @@ The read/write splitting plugin is not currently supported for non-Aurora cluste > [!WARNING]\ > If internal connection pools are enabled, database passwords may not be verified with every connection request. The initial connection request for each database instance in the cluster will verify the password, but subsequent requests may return a cached pool connection without re-verifying the password. This behavior is inherent to the nature of connection pools in general and not a bug with the driver. `ConnectionProviderManager.releaseResources` can be called to close all pools and remove all cached pool connections. See [InternalConnectionPoolPasswordWarning.java](../../../examples/AWSDriverExample/src/main/java/software/amazon/InternalConnectionPoolPasswordWarning.java) for more details. -Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. Future calls to `setReadOnly `on the same `Connection` object will not require opening a new physical connection. However, calling `setReadOnly(true)` for the first time on a new `Connection` object will require the plugin to establish another new physical connection to a reader. If your application frequently calls `setReadOnly`, you can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the read/write plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects. +Whenever `setReadOnly(true)` is first called on a `Connection` object, the read/write plugin will internally open a new physical connection to a reader. After this first call, the physical reader connection will be cached for the given `Connection`. By default, this cached connection will never expire, meaning all subsequent `setReadOnly(true)` calls on the same `Connection` object will keep reusing the same reader connection. +If your application frequently calls `setReadOnly`, this may have a performance impact. There are two ways to improve performance: +1. You can enable internal connection pooling to improve performance. When enabled, the wrapper driver will maintain an internal connection pool for each instance in the cluster. This allows the Read/Write Splitting plugin to reuse connections that were established by `setReadOnly` calls on previous `Connection` objects. +2. You can also use the [`cachedReaderKeepAliveTimeoutMs` connection parameter](#reader-keep-alive-timeout). This sets an expiration time on the reader connection. When `setReadOnly(true)` is called and the reader connection has expired, the plugin will create a new reader connection using the specified [reader selection strategy](#reader-selection). > [!NOTE]\ > Initial connections to a cluster URL will not be pooled. The driver does not pool cluster URLs because it can be problematic to pool a URL that resolves to different instances over time. The main benefit of internal connection pools is when setReadOnly is called. When setReadOnly is called (regardless of the initial connection URL), an internal pool will be created for the writer/reader that the plugin switches to and connections for that instance can be reused in the future. @@ -87,6 +90,16 @@ To indicate which selection strategy to use, the `readerHostSelectorStrategy` co props.setProperty(ReadWriteSplittingPlugin.READER_HOST_SELECTOR_STRATEGY.name, "leastConnections"); ``` +## Reader keep-alive timeout +If no connection pool is used, reader connections created by calls to `setReadOnly(true)` will be cached for the entire lifetime of the `Connection` object. This may have a negative performance impact if your application makes frequent calls to `setReadOnly(true)`, as all read traffic is directed to a single reader instance. +To improve performance, you can specify a timeout for the cached reader connection using `cachedReaderKeepAliveTimeoutMs`. Once the reader has expired, the next call to `setReadOnly(true)` will create a new reader connection determined by the reader host selection strategy. + +```java +final Properties properties = new Properties(); +properties.setProperty("cachedReaderKeepAliveTimeoutMs", "600000"); +``` +> [!NOTE]\ +> If a connection pool is used, this setting is ignored and the lifespan of this cached connection object will be handled by the connection pool instead. ## Limitations diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java index 8355c9de3..ec9bfed99 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPlugin.java @@ -24,10 +24,10 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import org.checkerframework.checker.nullness.qual.NonNull; import software.amazon.jdbc.AwsWrapperProperty; -import software.amazon.jdbc.ConnectionProviderManager; import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; import software.amazon.jdbc.HostSpec; @@ -35,11 +35,11 @@ import software.amazon.jdbc.NodeChangeOptions; import software.amazon.jdbc.OldConnectionSuggestedAction; import software.amazon.jdbc.PluginService; -import software.amazon.jdbc.PooledConnectionProvider; import software.amazon.jdbc.PropertyDefinition; import software.amazon.jdbc.cleanup.CanReleaseResources; import software.amazon.jdbc.plugin.AbstractConnectionPlugin; import software.amazon.jdbc.plugin.failover.FailoverSQLException; +import software.amazon.jdbc.util.CacheItem; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.SqlState; import software.amazon.jdbc.util.Utils; @@ -68,10 +68,10 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin private volatile boolean inReadWriteSplit = false; private HostListProviderService hostListProviderService; private Connection writerConnection; - private Connection readerConnection; private HostSpec readerHostSpec; private boolean isReaderConnFromInternalPool; private boolean isWriterConnFromInternalPool; + private CacheItem readerConnection; public static final AwsWrapperProperty READER_HOST_SELECTOR_STRATEGY = new AwsWrapperProperty( @@ -79,6 +79,13 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin "random", "The strategy that should be used to select a new reader host."); + public static final AwsWrapperProperty CACHED_READER_KEEP_ALIVE_TIMEOUT = + new AwsWrapperProperty( + "cachedReaderKeepAliveTimeoutMs", + "0", + "The time in milliseconds to keep a reader connection alive in the cache. " + + "Default value 0 means the Wrapper will keep reusing the same cached reader connection."); + static { PropertyDefinition.registerPluginProperties(ReadWriteSplittingPlugin.class); } @@ -101,7 +108,7 @@ public class ReadWriteSplittingPlugin extends AbstractConnectionPlugin this(pluginService, properties); this.hostListProviderService = hostListProviderService; this.writerConnection = writerConnection; - this.readerConnection = readerConnection; + this.readerConnection = new CacheItem<>(readerConnection, CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties)); } @Override @@ -134,7 +141,7 @@ public Connection connect( if (!pluginService.acceptsStrategy(hostSpec.getRole(), this.readerSelectorStrategy)) { throw new UnsupportedOperationException( Messages.get("ReadWriteSplittingPlugin.unsupportedHostSpecSelectorStrategy", - new Object[] { this.readerSelectorStrategy })); + new Object[] {this.readerSelectorStrategy})); } final Connection currentConnection = connectFunc.call(); @@ -196,8 +203,8 @@ public T execute( if (this.writerConnection != null && !this.writerConnection.isClosed()) { this.writerConnection.clearWarnings(); } - if (this.readerConnection != null && !this.readerConnection.isClosed()) { - this.readerConnection.clearWarnings(); + if (this.readerConnection != null && isConnectionUsable(this.readerConnection.get())) { + this.readerConnection.get().clearWarnings(); } } catch (final SQLException e) { throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e); @@ -269,7 +276,8 @@ private void setWriterConnection(final Connection writerConnection, } private void setReaderConnection(final Connection conn, final HostSpec host) { - this.readerConnection = conn; + closeReaderConnectionIfIdle(this.readerConnection); + this.readerConnection = new CacheItem<>(conn, this.getKeepAliveTimeout(host)); this.readerHostSpec = host; LOGGER.finest( () -> Messages.get( @@ -306,7 +314,7 @@ void switchConnectionIfRequired(final boolean readOnly) throws SQLException { } catch (final SQLException e) { if (!isConnectionUsable(currentConnection)) { logAndThrowException( - Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", new Object[] { e.getMessage() }), + Messages.get("ReadWriteSplittingPlugin.errorSwitchingToReader", new Object[] {e.getMessage()}), SqlState.CONNECTION_UNABLE_TO_CONNECT, e); return; @@ -374,7 +382,7 @@ private void switchToWriterConnection( } if (this.isReaderConnFromInternalPool) { - this.closeConnectionIfIdle(this.readerConnection); + this.closeReaderConnectionIfIdle(this.readerConnection); } LOGGER.finer(() -> Messages.get("ReadWriteSplittingPlugin.switchedFromReaderToWriter", @@ -407,15 +415,15 @@ private void switchToReaderConnection(final List hosts) if (this.readerHostSpec != null && !hosts.contains(this.readerHostSpec)) { // The old reader cannot be used anymore because it is no longer in the list of allowed hosts. - closeConnectionIfIdle(this.readerConnection); + closeReaderConnectionIfIdle(this.readerConnection); } this.inReadWriteSplit = true; - if (!isConnectionUsable(this.readerConnection)) { + if (this.readerConnection == null || !isConnectionUsable(this.readerConnection.get())) { initializeReaderConnection(hosts); } else { try { - switchCurrentConnectionTo(this.readerConnection, this.readerHostSpec); + switchCurrentConnectionTo(this.readerConnection.get(), this.readerHostSpec); LOGGER.finer(() -> Messages.get("ReadWriteSplittingPlugin.switchedFromWriterToReader", new Object[] {this.readerHostSpec.getUrl()})); } catch (SQLException e) { @@ -428,15 +436,13 @@ private void switchToReaderConnection(final List hosts) new Object[] {this.readerHostSpec.getUrl()})); } - this.readerConnection.close(); - this.readerConnection = null; - this.readerHostSpec = null; + closeReaderConnectionIfIdle(this.readerConnection); initializeReaderConnection(hosts); } } if (this.isWriterConnFromInternalPool) { - this.closeConnectionIfIdle(this.writerConnection); + this.closeWriterConnectionIfIdle(this.writerConnection); } } @@ -503,13 +509,22 @@ private void getNewReaderConnection() throws SQLException { () -> Messages.get("ReadWriteSplittingPlugin.successfullyConnectedToReader", new Object[] {finalReaderHost.getUrl()})); setReaderConnection(conn, readerHost); - switchCurrentConnectionTo(this.readerConnection, this.readerHostSpec); + switchCurrentConnectionTo(this.readerConnection.get(), this.readerHostSpec); } private boolean isConnectionUsable(final Connection connection) throws SQLException { return connection != null && !connection.isClosed(); } + private long getKeepAliveTimeout(final HostSpec host) { + if (this.pluginService.isPooledConnectionProvider(host, properties)) { + // Let the connection pool handle the lifetime of the reader connection. + return 0; + } + final long keepAliveMs = CACHED_READER_KEEP_ALIVE_TIMEOUT.getLong(properties); + return keepAliveMs > 0 ? System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(keepAliveMs) : 0; + } + @Override public void releaseResources() { closeIdleConnections(); @@ -517,25 +532,37 @@ public void releaseResources() { private void closeIdleConnections() { LOGGER.finest(() -> Messages.get("ReadWriteSplittingPlugin.closingInternalConnections")); - closeConnectionIfIdle(this.readerConnection); - closeConnectionIfIdle(this.writerConnection); + closeReaderConnectionIfIdle(this.readerConnection); + closeWriterConnectionIfIdle(this.writerConnection); } - void closeConnectionIfIdle(final Connection internalConnection) { + void closeReaderConnectionIfIdle(CacheItem readerConnection) { + if (readerConnection == null) { + return; + } + final Connection currentConnection = this.pluginService.getCurrentConnection(); + final Connection readerConnectionCache = readerConnection.get(true); + try { - if (internalConnection != null - && internalConnection != currentConnection - && !internalConnection.isClosed()) { - internalConnection.close(); - if (internalConnection == writerConnection) { - writerConnection = null; - } + if (isConnectionUsable(readerConnectionCache) && readerConnectionCache != currentConnection) { + readerConnectionCache.close(); + } + } catch (SQLException e) { + // Do nothing. + } - if (internalConnection == readerConnection) { - readerConnection = null; - readerHostSpec = null; - } + this.readerConnection = null; + this.readerHostSpec = null; + } + + void closeWriterConnectionIfIdle(final Connection internalConnection) { + final Connection currentConnection = this.pluginService.getCurrentConnection(); + try { + if (isConnectionUsable(internalConnection) + && internalConnection != currentConnection) { + internalConnection.close(); + writerConnection = null; } } catch (final SQLException e) { // ignore @@ -550,6 +577,6 @@ Connection getWriterConnection() { } Connection getReaderConnection() { - return this.readerConnection; + return this.readerConnection == null ? null : this.readerConnection.get(); } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/util/CacheItem.java b/wrapper/src/main/java/software/amazon/jdbc/util/CacheItem.java new file mode 100644 index 000000000..31379014b --- /dev/null +++ b/wrapper/src/main/java/software/amazon/jdbc/util/CacheItem.java @@ -0,0 +1,68 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.jdbc.util; + +import java.util.Objects; + +public class CacheItem { + + final V item; + final long expirationTime; + + public CacheItem(final V item, final long expirationTime) { + this.item = item; + this.expirationTime = expirationTime; + } + + public boolean isExpired() { + if (expirationTime <= 0) { + // No expiration time. + return false; + } + return System.nanoTime() > expirationTime; + } + + public V get() { + return get(false); + } + + public V get(final boolean returnExpired) { + return (this.isExpired() && !returnExpired) ? null : item; + } + + @Override + public int hashCode() { + return Objects.hashCode(item); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CacheItem)) { + return false; + } + CacheItem other = (CacheItem) obj; + return Objects.equals(this.item, other.item); + } + + @Override + public String toString() { + return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]"; + } +} diff --git a/wrapper/src/main/java/software/amazon/jdbc/util/CacheMap.java b/wrapper/src/main/java/software/amazon/jdbc/util/CacheMap.java index 7d2e2dbbb..f54cae72c 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/util/CacheMap.java +++ b/wrapper/src/main/java/software/amazon/jdbc/util/CacheMap.java @@ -93,50 +93,4 @@ protected void cleanUp() { }); } } - - static class CacheItem { - final V item; - final long expirationTime; - - public CacheItem(final V item, final long expirationTime) { - this.item = item; - this.expirationTime = expirationTime; - } - - boolean isExpired() { - return System.nanoTime() > expirationTime; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((item == null) ? 0 : item.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final CacheItem other = (CacheItem) obj; - if (item == null) { - return other.item == null; - } else { - return item.equals(other.item); - } - } - - @Override - public String toString() { - return "CacheItem [item=" + item + ", expirationTime=" + expirationTime + "]"; - } - } } diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPluginTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPluginTest.java index ff7282483..948f01661 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPluginTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/readwritesplitting/ReadWriteSplittingPluginTest.java @@ -45,7 +45,6 @@ import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import software.amazon.jdbc.HikariPooledConnectionProvider; import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; import software.amazon.jdbc.HostSpec; @@ -55,7 +54,6 @@ import software.amazon.jdbc.OldConnectionSuggestedAction; import software.amazon.jdbc.PluginService; import software.amazon.jdbc.PropertyDefinition; -import software.amazon.jdbc.dialect.Dialect; import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy; import software.amazon.jdbc.plugin.failover.FailoverSuccessSQLException; import software.amazon.jdbc.util.SqlState; @@ -96,7 +94,6 @@ public class ReadWriteSplittingPluginTest { @Mock private JdbcCallable mockConnectFunc; @Mock private JdbcCallable mockSqlFunction; @Mock private PluginService mockPluginService; - @Mock private Dialect mockDialect; @Mock private HostListProviderService mockHostListProviderService; @Mock private Connection mockWriterConn; @Mock private Connection mockNewWriterConn; @@ -356,6 +353,35 @@ public void testSetReadOnlyOnClosedConnection() throws SQLException { assertNull(plugin.getReaderConnection()); } + @Test + public void testSetReadOnly_readerExpires() throws SQLException, InterruptedException { + when(this.mockPluginService.connect(eq(readerHostSpec1), any(Properties.class), any())) + .thenReturn(mockReaderConn1) + .thenReturn(mockReaderConn2); + + final Properties propsWithExpirationTime = new Properties(); + propsWithExpirationTime.put("cachedReaderKeepAliveTimeoutMs", "5000"); + + final ReadWriteSplittingPlugin plugin = new ReadWriteSplittingPlugin( + mockPluginService, + propsWithExpirationTime); + + plugin.switchConnectionIfRequired(true); + assertEquals(mockReaderConn1, plugin.getReaderConnection()); + + Thread.sleep(1000); + + plugin.switchConnectionIfRequired(true); + // Ensure the cached reader connection hasn't changed yet since it hasn't expired. + assertEquals(mockReaderConn1, plugin.getReaderConnection()); + + Thread.sleep(6000); + plugin.switchConnectionIfRequired(true); + + // Ensure the cached reader connection has expired and updated. + assertEquals(mockReaderConn2, plugin.getReaderConnection()); + } + @Test public void testExecute_failoverToNewWriter() throws SQLException { when(mockSqlFunction.call()).thenThrow(FailoverSuccessSQLException.class); @@ -581,7 +607,7 @@ public void testClosePooledReaderConnectionAfterSetReadOnly() throws SQLExceptio spyPlugin.switchConnectionIfRequired(true); spyPlugin.switchConnectionIfRequired(false); - verify(spyPlugin, times(1)).closeConnectionIfIdle(eq(mockReaderConn1)); + verify(spyPlugin, times(1)).closeReaderConnectionIfIdle(any()); } @Test @@ -608,7 +634,7 @@ public void testClosePooledWriterConnectionAfterSetReadOnly() throws SQLExceptio spyPlugin.switchConnectionIfRequired(false); spyPlugin.switchConnectionIfRequired(true); - verify(spyPlugin, times(1)).closeConnectionIfIdle(eq(mockWriterConn)); + verify(spyPlugin, times(1)).closeWriterConnectionIfIdle(eq(mockWriterConn)); } private static HikariConfig getHikariConfig(HostSpec hostSpec, Properties props) {