diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 271e9302a3d74d..a26eefa0179a5c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1948,4 +1948,91 @@ private Constants() { * AWS S3 PutObject API Documentation */ public static final String IF_NONE_MATCH_STAR = "*"; + + // ==================== AWS Client Shared Thread Pool Configuration =========== + // These settings control whether AWS SDK clients share a thread pool + // instead of each client creating its own. This prevents thread leaks + // when many S3A filesystem instances are created. + + /** + * Default thread pool size for AWS client shared thread pools: {@value}. + */ + public static final int AWS_CLIENT_SHARED_THREADPOOL_SIZE_DEFAULT = 5; + + /** + * Default keepalive in seconds for AWS client shared thread pool: {@value}. + */ + public static final int AWS_CLIENT_SHARED_THREADPOOL_KEEPALIVE_DEFAULT = 60; + + /** + * Enable shared thread pool for AWS S3 sync client: {@value}. + */ + public static final String AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED = + "fs.s3a.aws.s3.client.shared.threadpool.enabled"; + + /** + * Thread pool size for AWS S3 sync client shared thread pool: {@value}. + */ + public static final String AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE = + "fs.s3a.aws.s3.client.shared.threadpool.size"; + + /** + * Keepalive in seconds for AWS S3 sync client shared thread pool: {@value}. + */ + public static final String AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE = + "fs.s3a.aws.s3.client.shared.threadpool.keepalive.seconds"; + + /** + * Enable shared thread pool for AWS S3 async client: {@value}. + */ + public static final String AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_ENABLED = + "fs.s3a.aws.s3.async.client.shared.threadpool.enabled"; + + /** + * Thread pool size for AWS S3 async client shared thread pool: {@value}. + */ + public static final String AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_SIZE = + "fs.s3a.aws.s3.async.client.shared.threadpool.size"; + + /** + * Keepalive in seconds for AWS S3 async client shared thread pool: {@value}. + */ + public static final String AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_KEEPALIVE = + "fs.s3a.aws.s3.async.client.shared.threadpool.keepalive.seconds"; + + /** + * Enable shared thread pool for AWS STS client: {@value}. + */ + public static final String AWS_STS_CLIENT_SHARED_THREADPOOL_ENABLED = + "fs.s3a.aws.sts.client.shared.threadpool.enabled"; + + /** + * Thread pool size for AWS STS client shared thread pool: {@value}. + */ + public static final String AWS_STS_CLIENT_SHARED_THREADPOOL_SIZE = + "fs.s3a.aws.sts.client.shared.threadpool.size"; + + /** + * Keepalive in seconds for AWS STS client shared thread pool: {@value}. + */ + public static final String AWS_STS_CLIENT_SHARED_THREADPOOL_KEEPALIVE = + "fs.s3a.aws.sts.client.shared.threadpool.keepalive.seconds"; + + /** + * Enable shared thread pool for AWS KMS client: {@value}. + */ + public static final String AWS_KMS_CLIENT_SHARED_THREADPOOL_ENABLED = + "fs.s3a.aws.kms.client.shared.threadpool.enabled"; + + /** + * Thread pool size for AWS KMS client shared thread pool: {@value}. + */ + public static final String AWS_KMS_CLIENT_SHARED_THREADPOOL_SIZE = + "fs.s3a.aws.kms.client.shared.threadpool.size"; + + /** + * Keepalive in seconds for AWS KMS client shared thread pool: {@value}. + */ + public static final String AWS_KMS_CLIENT_SHARED_THREADPOOL_KEEPALIVE = + "fs.s3a.aws.kms.client.shared.threadpool.keepalive.seconds"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 41e904ec9de1bb..b0106286fbadb5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -21,11 +21,13 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.concurrent.ScheduledExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.s3a.impl.AWSClientConfig; +import org.apache.hadoop.fs.s3a.impl.LazySharedThreadPoolHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,17 +65,23 @@ import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_KEEPALIVE; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION; +import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3; import static org.apache.hadoop.fs.s3a.Constants.CENTRAL_ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_CLASS_NAME; import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.HTTP_SIGNER_ENABLED_DEFAULT; -import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_SECURE_CONNECTIONS; import static org.apache.hadoop.fs.s3a.Constants.SECURE_CONNECTIONS; -import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_S3; import static org.apache.hadoop.fs.s3a.auth.SignerFactory.createHttpSigner; import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.REQUESTER_PAYS_HEADER; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AUTH_SCHEME_AWS_SIGV_4; @@ -97,6 +105,26 @@ public class DefaultS3ClientFactory extends Configured private static final Pattern VPC_ENDPOINT_PATTERN = Pattern.compile("^(?:.+\\.)?([a-z0-9-]+)\\.vpce\\.amazonaws\\.(?:com|com\\.cn)$"); + /** + * Shared executor for S3 sync clients. + */ + private static final LazySharedThreadPoolHolder S3_SYNC_EXECUTOR = + new LazySharedThreadPoolHolder( + AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED, + AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE, + AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE, + "s3a-s3-sync-scheduler"); + + /** + * Shared executor for S3 async clients. + */ + private static final LazySharedThreadPoolHolder S3_ASYNC_EXECUTOR = + new LazySharedThreadPoolHolder( + AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_ENABLED, + AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_SIZE, + AWS_S3_ASYNC_CLIENT_SHARED_THREADPOOL_KEEPALIVE, + "s3a-s3-async-scheduler"); + /** * Subclasses refer to this. */ @@ -235,8 +263,10 @@ private , ClientT> Build .pathStyleAccessEnabled(parameters.isPathStyleAccess()) .build(); - final ClientOverrideConfiguration.Builder override = - createClientOverrideConfiguration(parameters, conf); + final ClientOverrideConfiguration.Builder override = createClientOverrideConfiguration( + parameters, + conf, + builder instanceof S3AsyncClientBuilder); S3BaseClientBuilder s3BaseClientBuilder = builder .overrideConfiguration(override.build()) @@ -265,13 +295,14 @@ private , ClientT> Build * Create an override configuration for an S3 client. * @param parameters parameter object * @param conf configuration object - * @throws IOException any IOE raised, or translated exception - * @throws RuntimeException some failures creating an http signer + * @param isAsync true for async client, false for sync client * @return the override configuration * @throws IOException any IOE raised, or translated exception + * @throws RuntimeException some failures creating an http signer */ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration( - S3ClientCreationParameters parameters, Configuration conf) throws IOException { + S3ClientCreationParameters parameters, Configuration conf, boolean isAsync) + throws IOException { final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder = AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3); @@ -302,6 +333,13 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration( final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf); clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build()); + ScheduledExecutorService executor = isAsync + ? S3_ASYNC_EXECUTOR.get(conf) + : S3_SYNC_EXECUTOR.get(conf); + if (executor != null) { + clientOverrideConfigBuilder.scheduledExecutorService(executor); + } + return clientOverrideConfigBuilder; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java index ab151744969695..703302ab20c1ed 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -35,6 +36,7 @@ import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import software.amazon.awssdk.services.sts.model.Credentials; import software.amazon.awssdk.services.sts.model.GetSessionTokenRequest; +import org.apache.hadoop.fs.s3a.impl.LazySharedThreadPoolHolder; import org.apache.hadoop.fs.s3a.impl.AWSClientConfig; import org.apache.hadoop.util.Preconditions; @@ -51,6 +53,9 @@ import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.Constants.AWS_SERVICE_IDENTIFIER_STS; +import static org.apache.hadoop.fs.s3a.Constants.AWS_STS_CLIENT_SHARED_THREADPOOL_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_STS_CLIENT_SHARED_THREADPOOL_KEEPALIVE; +import static org.apache.hadoop.fs.s3a.Constants.AWS_STS_CLIENT_SHARED_THREADPOOL_SIZE; import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.*; /** @@ -63,6 +68,16 @@ public class STSClientFactory { private static final Logger LOG = LoggerFactory.getLogger(STSClientFactory.class); + /** + * Shared executor for STS clients. + */ + private static final LazySharedThreadPoolHolder STS_EXECUTOR = + new LazySharedThreadPoolHolder( + AWS_STS_CLIENT_SHARED_THREADPOOL_ENABLED, + AWS_STS_CLIENT_SHARED_THREADPOOL_SIZE, + AWS_STS_CLIENT_SHARED_THREADPOOL_KEEPALIVE, + "s3a-sts-scheduler"); + /** * Create the builder ready for any final configuration options. * Picks up connection settings from the Hadoop configuration, including @@ -139,6 +154,10 @@ public static StsClientBuilder builder(final AwsCredentialsProvider credentials, final ProxyConfiguration proxyConfig = AWSClientConfig.createProxyConfiguration(conf, bucket); clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build()); + ScheduledExecutorService executor = STS_EXECUTOR.get(conf); + if (executor != null) { + clientOverrideConfigBuilder.scheduledExecutorService(executor); + } httpClientBuilder.proxyConfiguration(proxyConfig); stsClientBuilder.httpClientBuilder(httpClientBuilder) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java index 3c2756dfb058a7..3b1090ade31f1b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.net.URI; +import java.util.concurrent.ScheduledExecutorService; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kms.KmsClient; import software.amazon.awssdk.services.kms.KmsClientBuilder; @@ -39,6 +41,9 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.functional.LazyAtomicReference; +import static org.apache.hadoop.fs.s3a.Constants.AWS_KMS_CLIENT_SHARED_THREADPOOL_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_KMS_CLIENT_SHARED_THREADPOOL_KEEPALIVE; +import static org.apache.hadoop.fs.s3a.Constants.AWS_KMS_CLIENT_SHARED_THREADPOOL_SIZE; import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable; /** @@ -46,6 +51,16 @@ */ public class EncryptionS3ClientFactory extends DefaultS3ClientFactory { + /** + * Shared executor for KMS clients. + */ + private static final LazySharedThreadPoolHolder KMS_EXECUTOR = + new LazySharedThreadPoolHolder( + AWS_KMS_CLIENT_SHARED_THREADPOOL_ENABLED, + AWS_KMS_CLIENT_SHARED_THREADPOOL_SIZE, + AWS_KMS_CLIENT_SHARED_THREADPOOL_KEEPALIVE, + "s3a-kms-scheduler"); + /** * Encryption client class name. * value: {@value} @@ -201,6 +216,13 @@ private S3Client createS3EncryptionClient(S3ClientCreationParameters parameters) private Keyring createKmsKeyring(S3ClientCreationParameters parameters, CSEMaterials cseMaterials) { KmsClientBuilder kmsClientBuilder = KmsClient.builder(); + ScheduledExecutorService executor = KMS_EXECUTOR.get(cseMaterials.getConf()); + if (executor != null) { + kmsClientBuilder.overrideConfiguration( + ClientOverrideConfiguration.builder() + .scheduledExecutorService(executor) + .build()); + } if (parameters.getCredentialSet() != null) { kmsClientBuilder.credentialsProvider(parameters.getCredentialSet()); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LazySharedThreadPoolHolder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LazySharedThreadPoolHolder.java new file mode 100644 index 00000000000000..92025fa141a712 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/LazySharedThreadPoolHolder.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_CLIENT_SHARED_THREADPOOL_KEEPALIVE_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.AWS_CLIENT_SHARED_THREADPOOL_SIZE_DEFAULT; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Holder for a lazily initialized shared ScheduledExecutorService. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class LazySharedThreadPoolHolder { + + private static final Logger LOG = + LoggerFactory.getLogger(LazySharedThreadPoolHolder.class); + + private final String enabledKey; + private final String sizeKey; + private final String keepAliveKey; + private final String namePrefix; + + private volatile Optional executor; + + /** + * Create a holder for a lazy shared thread pool. + * @param enabledKey config key to enable the shared pool + * @param sizeKey config key for pool size + * @param keepAliveKey config key for thread keep-alive in seconds + * @param namePrefix thread name prefix for debugging + */ + public LazySharedThreadPoolHolder(String enabledKey, String sizeKey, + String keepAliveKey, String namePrefix) { + this.enabledKey = enabledKey; + this.sizeKey = sizeKey; + this.keepAliveKey = keepAliveKey; + this.namePrefix = namePrefix; + } + + /** + * Get the shared executor, creating it on first call if enabled. + * @param conf configuration + * @return the executor, or null if not enabled + */ + public synchronized ScheduledExecutorService get(Configuration conf) { + if (executor == null) { + if (conf.getBoolean(enabledKey, false)) { + int poolSize = conf.getInt(sizeKey, AWS_CLIENT_SHARED_THREADPOOL_SIZE_DEFAULT); + int keepAlive = conf.getInt(keepAliveKey, AWS_CLIENT_SHARED_THREADPOOL_KEEPALIVE_DEFAULT); + checkArgument(poolSize > 0, + "Value of %s must be positive, got: %s", sizeKey, poolSize); + checkArgument(keepAlive > 0, + "Value of %s must be positive, got: %s", keepAliveKey, keepAlive); + executor = Optional.of(createScheduledExecutor(namePrefix, poolSize, keepAlive)); + } else { + executor = Optional.empty(); + } + } + return executor.orElse(null); + } + + /** + * Create a scheduled executor with idle thread timeout. + * @param namePrefix thread name prefix for debugging + * @param poolSize core pool size + * @param keepAliveSeconds keepalive time in seconds + * @return the executor + */ + public static ScheduledExecutorService createScheduledExecutor( + String namePrefix, int poolSize, int keepAliveSeconds) { + ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(poolSize, + runnable -> { + Thread t = new Thread(runnable, namePrefix); + t.setDaemon(true); + return t; + }); + pool.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); + pool.allowCoreThreadTimeOut(true); + LOG.debug("Created shared executor '{}' with pool size {} and keepalive {}s", + namePrefix, poolSize, keepAliveSeconds); + return pool; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSharedScheduledExecutor.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSharedScheduledExecutor.java new file mode 100644 index 00000000000000..b2c4a645a208a2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestSharedScheduledExecutor.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.impl.LazySharedThreadPoolHolder; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE; +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE; + +/** + * Tests for the shared scheduled executors in DefaultS3ClientFactory. + */ +public class TestSharedScheduledExecutor { + + @Test + public void testLazyHolderDisabledByDefault() { + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + "test.enabled", "test.size", "test.keepalive", "test-disabled"); + Configuration conf = new Configuration(); + ScheduledExecutorService executor = holder.get(conf); + Assertions.assertThat(executor) + .as("Executor should be null when disabled") + .isNull(); + } + + @Test + public void testCreateScheduledExecutorConfiguration() { + ScheduledExecutorService executor = + LazySharedThreadPoolHolder.createScheduledExecutor("test-scheduler", 10, 30); + Assertions.assertThat(executor) + .as("Executor should be a ScheduledThreadPoolExecutor") + .isInstanceOf(ScheduledThreadPoolExecutor.class); + + ScheduledThreadPoolExecutor poolExecutor = (ScheduledThreadPoolExecutor) executor; + Assertions.assertThat(poolExecutor.getCorePoolSize()) + .as("Core pool size should be 10") + .isEqualTo(10); + Assertions.assertThat(poolExecutor.allowsCoreThreadTimeOut()) + .as("Core threads should be allowed to time out") + .isTrue(); + + executor.shutdown(); + } + + @Test + public void testCreateScheduledExecutorThreadsAreDaemon() throws Exception { + ScheduledExecutorService executor = + LazySharedThreadPoolHolder.createScheduledExecutor("test-daemon", 5, 60); + final boolean[] isDaemon = new boolean[1]; + executor.submit(() -> { + isDaemon[0] = Thread.currentThread().isDaemon(); + }).get(); + Assertions.assertThat(isDaemon[0]) + .as("Executor threads should be daemon threads") + .isTrue(); + executor.shutdown(); + } + + @Test + public void testCreateScheduledExecutorThreadName() throws Exception { + ScheduledExecutorService executor = + LazySharedThreadPoolHolder.createScheduledExecutor("custom-prefix", 5, 60); + final String[] threadName = new String[1]; + executor.submit(() -> { + threadName[0] = Thread.currentThread().getName(); + }).get(); + Assertions.assertThat(threadName[0]) + .as("Thread name should match custom prefix") + .startsWith("custom-prefix"); + executor.shutdown(); + } + + @Test + public void testLazyHolderRejectsNegativePoolSize() { + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + "test.enabled", "test.size", "test.keepalive", "test-pool"); + Configuration conf = new Configuration(); + conf.setBoolean("test.enabled", true); + conf.setInt("test.size", -1); + conf.setInt("test.keepalive", 60); + Assertions.assertThatThrownBy(() -> holder.get(conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("test.size") + .hasMessageContaining("must be positive"); + } + + @Test + public void testLazyHolderRejectsZeroPoolSize() { + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + "test.enabled", "test.size", "test.keepalive", "test-pool"); + Configuration conf = new Configuration(); + conf.setBoolean("test.enabled", true); + conf.setInt("test.size", 0); + conf.setInt("test.keepalive", 60); + Assertions.assertThatThrownBy(() -> holder.get(conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("test.size") + .hasMessageContaining("must be positive"); + } + + @Test + public void testLazyHolderRejectsNegativeKeepAlive() { + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + "test.enabled", "test.size", "test.keepalive", "test-pool"); + Configuration conf = new Configuration(); + conf.setBoolean("test.enabled", true); + conf.setInt("test.size", 5); + conf.setInt("test.keepalive", -1); + Assertions.assertThatThrownBy(() -> holder.get(conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("test.keepalive") + .hasMessageContaining("must be positive"); + } + + @Test + public void testLazyHolderRejectsZeroKeepAlive() { + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + "test.enabled", "test.size", "test.keepalive", "test-pool"); + Configuration conf = new Configuration(); + conf.setBoolean("test.enabled", true); + conf.setInt("test.size", 5); + conf.setInt("test.keepalive", 0); + Assertions.assertThatThrownBy(() -> holder.get(conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("test.keepalive") + .hasMessageContaining("must be positive"); + } + + @Test + public void testLazyHolderAcceptsValidConfig() { + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + "test.enabled", "test.size", "test.keepalive", "test-valid"); + Configuration conf = new Configuration(); + conf.setBoolean("test.enabled", true); + conf.setInt("test.size", 5); + conf.setInt("test.keepalive", 30); + ScheduledExecutorService executor = holder.get(conf); + Assertions.assertThat(executor) + .as("Executor should be created with valid config") + .isNotNull(); + executor.shutdown(); + } + + /** + * Count threads matching the given prefix. + * @param prefix thread name prefix to match + * @return count of matching threads + */ + private int countThreadsWithPrefix(String prefix) { + int count = 0; + for (Thread t : Thread.getAllStackTraces().keySet()) { + if (t.getName().startsWith(prefix)) { + count++; + } + } + return count; + } + + /** + * Test that without shared pool, each holder creates its own threads. + * This demonstrates the thread growth problem. + */ + @Test + public void testWithoutSharedPoolThreadsGrow() throws Exception { + final String prefix = "test-growth-"; + final int poolSize = 3; + final int numHolders = 5; + List executors = new ArrayList<>(); + + int initialCount = countThreadsWithPrefix(prefix); + + for (int i = 0; i < numHolders; i++) { + ScheduledExecutorService executor = + LazySharedThreadPoolHolder.createScheduledExecutor(prefix + i, poolSize, 60); + executors.add(executor); + executor.submit(() -> {}).get(); + } + + int afterCount = countThreadsWithPrefix(prefix); + int newThreads = afterCount - initialCount; + + Assertions.assertThat(newThreads) + .as("Without shared pool, thread count should grow with each executor") + .isGreaterThanOrEqualTo(numHolders); + + for (ScheduledExecutorService executor : executors) { + executor.shutdown(); + } + } + + /** + * Test that with shared pool enabled, thread count is bounded. + * This demonstrates the fix for thread leak. + */ + @Test + public void testWithSharedPoolThreadCountBounded() throws Exception { + final String prefix = "test-shared-"; + final int poolSize = 5; + final int numCalls = 10; + + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED, + AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE, + AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE, + prefix); + + Configuration conf = new Configuration(); + conf.setBoolean(AWS_S3_CLIENT_SHARED_THREADPOOL_ENABLED, true); + conf.setInt(AWS_S3_CLIENT_SHARED_THREADPOOL_SIZE, poolSize); + conf.setInt(AWS_S3_CLIENT_SHARED_THREADPOOL_KEEPALIVE, 60); + + int initialCount = countThreadsWithPrefix(prefix); + + for (int i = 0; i < numCalls; i++) { + ScheduledExecutorService executor = holder.get(conf); + Assertions.assertThat(executor) + .as("Should return same executor instance") + .isNotNull(); + executor.submit(() -> {}).get(); + } + + int afterCount = countThreadsWithPrefix(prefix); + int newThreads = afterCount - initialCount; + + Assertions.assertThat(newThreads) + .as("With shared pool, thread count should be bounded by pool size") + .isLessThanOrEqualTo(poolSize); + + holder.get(conf).shutdown(); + } + + /** + * Test that holder returns the same executor instance on repeated calls. + */ + @Test + public void testHolderReturnsSameInstance() { + LazySharedThreadPoolHolder holder = new LazySharedThreadPoolHolder( + "test.enabled", "test.size", "test.keepalive", "test-same"); + Configuration conf = new Configuration(); + conf.setBoolean("test.enabled", true); + conf.setInt("test.size", 5); + conf.setInt("test.keepalive", 60); + + ScheduledExecutorService first = holder.get(conf); + ScheduledExecutorService second = holder.get(conf); + ScheduledExecutorService third = holder.get(conf); + + Assertions.assertThat(first) + .as("Holder should return same instance on repeated calls") + .isSameAs(second) + .isSameAs(third); + + first.shutdown(); + } +}