diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md index fc2fcd361f497..0dbcd653f3231 100644 --- a/CHANGELOG-3.0.md +++ b/CHANGELOG-3.0.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957)) - Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958)) - Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223) +- Add cluster and index level settings to limit the total primary shards per node and per index [#17295](https://github.com/opensearch-project/OpenSearch/pull/17295) - Add execution_hint to cardinality aggregator request (#[17312](https://github.com/opensearch-project/OpenSearch/pull/17312)) - Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderIT.java new file mode 100644 index 0000000000000..fdc6a7e6b96b2 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderIT.java @@ -0,0 +1,305 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3) +public class ShardsLimitAllocationDeciderIT extends OpenSearchIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).build(); + } + + public void testClusterWideShardsLimit() { + // Set the cluster-wide shard limit to 2 + updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 4); + + // Create the first two indices with 3 shards and 1 replica each + createIndex("test1", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + + // Create the third index with 2 shards and 1 replica + createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + + // Wait for the shard limit to be applied + try { + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + // Check total number of shards + assertEquals(16, state.getRoutingTable().allShards().size()); + + // Check number of unassigned shards + int unassignedShards = state.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size(); + assertEquals(4, unassignedShards); + + // Check shard distribution across nodes + for (RoutingNode routingNode : state.getRoutingNodes()) { + assertTrue("Node exceeds shard limit", routingNode.numberOfOwningShards() <= 4); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Additional assertions to verify shard distribution + ClusterState state = client().admin().cluster().prepareState().get().getState(); + int totalAssignedShards = 0; + for (RoutingNode routingNode : state.getRoutingNodes()) { + totalAssignedShards += routingNode.numberOfOwningShards(); + } + assertEquals("Total assigned shards should be 12", 12, totalAssignedShards); + + } + + public void testIndexSpecificShardLimit() { + // Set the index-specific shard limit to 2 for the first index only + Settings indexSettingsWithLimit = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 4) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2) + .build(); + + Settings indexSettingsWithoutLimit = Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build(); + + // Create the first index with 4 shards, 1 replica, and the index-specific limit + createIndex("test1", indexSettingsWithLimit); + + // Create the second index with 4 shards and 1 replica, without the index-specific limit + createIndex("test2", indexSettingsWithoutLimit); + + // Create the third index with 3 shards and 1 replica, without the index-specific limit + createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + + try { + // Wait for the shard limit to be applied + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + // Check total number of shards + assertEquals(22, state.getRoutingTable().allShards().size()); + + // Check total number of assigned and unassigned shards + int totalAssignedShards = 0; + int totalUnassignedShards = 0; + Map unassignedShardsByIndex = new HashMap<>(); + + for (IndexRoutingTable indexRoutingTable : state.getRoutingTable()) { + String index = indexRoutingTable.getIndex().getName(); + int indexUnassignedShards = 0; + + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : shardRoutingTable) { + if (shardRouting.unassigned()) { + totalUnassignedShards++; + indexUnassignedShards++; + } else { + totalAssignedShards++; + } + } + } + + unassignedShardsByIndex.put(index, indexUnassignedShards); + } + + assertEquals("Total assigned shards should be 20", 20, totalAssignedShards); + assertEquals("Total unassigned shards should be 2", 2, totalUnassignedShards); + + // Check unassigned shards for each index + assertEquals("test1 should have 2 unassigned shards", 2, unassignedShardsByIndex.get("test1").intValue()); + assertEquals("test2 should have 0 unassigned shards", 0, unassignedShardsByIndex.get("test2").intValue()); + assertEquals("test3 should have 0 unassigned shards", 0, unassignedShardsByIndex.get("test3").intValue()); + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void testCombinedClusterAndIndexSpecificShardLimits() { + // Set the cluster-wide shard limit to 6 + updateClusterSetting(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 6); + + // Create the first index with 3 shards, 1 replica, and index-specific limit of 1 + Settings indexSettingsWithLimit = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 3) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1) + .build(); + createIndex("test1", indexSettingsWithLimit); + + // Create the second index with 4 shards and 1 replica + createIndex("test2", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 4).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + + // Create the third index with 3 shards and 1 replica + createIndex("test3", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 3).put(SETTING_NUMBER_OF_REPLICAS, 1).build()); + + try { + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + // Check total number of shards + assertEquals("Total shards should be 20", 20, state.getRoutingTable().allShards().size()); + + int totalAssignedShards = 0; + int totalUnassignedShards = 0; + Map unassignedShardsByIndex = new HashMap<>(); + Map nodeShardCounts = new HashMap<>(); + Map> indexShardsPerNode = new HashMap<>(); + + for (RoutingNode routingNode : state.getRoutingNodes()) { + String nodeName = routingNode.node().getName(); + nodeShardCounts.put(nodeName, routingNode.numberOfOwningShards()); + indexShardsPerNode.put(nodeName, new HashSet<>()); + + for (ShardRouting shardRouting : routingNode) { + indexShardsPerNode.get(nodeName).add(shardRouting.getIndexName()); + } + } + + for (IndexRoutingTable indexRoutingTable : state.getRoutingTable()) { + String index = indexRoutingTable.getIndex().getName(); + int indexUnassignedShards = 0; + + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : shardRoutingTable) { + if (shardRouting.unassigned()) { + totalUnassignedShards++; + indexUnassignedShards++; + } else { + totalAssignedShards++; + } + } + } + + unassignedShardsByIndex.put(index, indexUnassignedShards); + } + + assertEquals("Total assigned shards should be 17", 17, totalAssignedShards); + assertEquals("Total unassigned shards should be 3", 3, totalUnassignedShards); + assertEquals("test1 should have 3 unassigned shards", 3, unassignedShardsByIndex.get("test1").intValue()); + assertEquals("test2 should have 0 unassigned shards", 0, unassignedShardsByIndex.getOrDefault("test2", 0).intValue()); + assertEquals("test3 should have 0 unassigned shards", 0, unassignedShardsByIndex.getOrDefault("test3", 0).intValue()); + + // Check shard distribution across nodes + List shardCounts = new ArrayList<>(nodeShardCounts.values()); + Collections.sort(shardCounts, Collections.reverseOrder()); + assertEquals("Two nodes should have 6 shards", 6, shardCounts.get(0).intValue()); + assertEquals("Two nodes should have 6 shards", 6, shardCounts.get(1).intValue()); + assertEquals("One node should have 5 shards", 5, shardCounts.get(2).intValue()); + + // Check that all nodes have only one shard of the first index + for (Set indexesOnNode : indexShardsPerNode.values()) { + assertTrue("Each node should have a shard from test1", indexesOnNode.contains("test1")); + } + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Integration test to verify the behavior of INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING + * in a non-remote store environment. + * + * Scenario: + * An end-user attempts to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING + * on a cluster where remote store is not enabled. + * + * Expected Outcome: + * The system should reject the index creation request and throw an appropriate exception, + * indicating that this setting is only applicable for remote store enabled clusters. + */ + public void testIndexTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { + // Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING + Settings indexSettings = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 3) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1) + .build(); + + // Assert that creating the index throws an exception + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> { createIndex("test_index", indexSettings); } + ); + + // Verify the exception message + assertTrue( + "Exception should mention that the setting requires remote store", + exception.getMessage() + .contains( + "Setting [index.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters" + ) + ); + } + + /** + * Integration test to verify the behavior of CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING + * in a non-remote store environment. + * + * Scenario: + * An end-user attempts to create an index with CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING + * on a cluster where remote store is not enabled. + * + * Expected Outcome: + * The system should reject the index creation request and throw an appropriate exception, + * indicating that this setting is only applicable for remote store enabled clusters. + */ + public void testClusterTotalPrimaryShardsPerNodeSettingWithoutRemoteStore() { + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { + updateClusterSetting(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1); + }); + + // Verify the exception message + assertTrue( + "Exception should mention that the setting requires remote store", + exception.getMessage() + .contains( + "Setting [cluster.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters" + ) + ); + + // Attempt to create an index with INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING + Settings indexSettings = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 3) + .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1) + .build(); + + createIndex("test_index", indexSettings); + } + + private void updateClusterSetting(String setting, int value) { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(setting, value)).get(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderRemoteStoreEnabledIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderRemoteStoreEnabledIT.java new file mode 100644 index 0000000000000..401db7790de92 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderRemoteStoreEnabledIT.java @@ -0,0 +1,248 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class ShardsLimitAllocationDeciderRemoteStoreEnabledIT extends RemoteStoreBaseIntegTestCase { + @Before + public void setup() { + setupCustomCluster(); + } + + private void setupCustomCluster() { + // Start cluster manager node first + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY); + // Start data nodes + List dataNodes = internalCluster().startDataOnlyNodes(3); + // Wait for green cluster state + ensureGreen(); + } + + public void testIndexPrimaryShardLimit() throws Exception { + // Create first index with primary shard limit + Settings firstIndexSettings = Settings.builder() + .put(remoteStoreIndexSettings(0, 4)) // 4 shards, 0 replicas + .put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1) + .build(); + + // Create first index + createIndex("test1", firstIndexSettings); + + // Create second index + createIndex("test2", remoteStoreIndexSettings(0, 4)); + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + // Check total number of shards (8 total: 4 from each index) + assertEquals("Total shards should be 8", 8, state.getRoutingTable().allShards().size()); + + // Count assigned and unassigned shards for test1 + int test1AssignedShards = 0; + int test1UnassignedShards = 0; + Map nodePrimaryCount = new HashMap<>(); + + // Check test1 shard distribution + for (IndexShardRoutingTable shardRouting : state.routingTable().index("test1")) { + for (ShardRouting shard : shardRouting) { + if (shard.assignedToNode()) { + test1AssignedShards++; + // Count primaries per node for test1 + String nodeId = shard.currentNodeId(); + nodePrimaryCount.merge(nodeId, 1, Integer::sum); + } else { + test1UnassignedShards++; + } + } + } + + // Check test2 shard assignment + int test2UnassignedShards = 0; + for (IndexShardRoutingTable shardRouting : state.routingTable().index("test2")) { + for (ShardRouting shard : shardRouting) { + if (!shard.assignedToNode()) { + test2UnassignedShards++; + } + } + } + + // Assertions + assertEquals("test1 should have 3 assigned shards", 3, test1AssignedShards); + assertEquals("test1 should have 1 unassigned shard", 1, test1UnassignedShards); + assertEquals("test2 should have no unassigned shards", 0, test2UnassignedShards); + + // Verify no node has more than one primary shard of test1 + for (Integer count : nodePrimaryCount.values()) { + assertTrue("No node should have more than 1 primary shard of test1", count <= 1); + } + }); + } + + public void testClusterPrimaryShardLimitss() throws Exception { + // Update cluster setting to limit primary shards per node + updateClusterSetting(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1); + + // Create index with 4 shards and 1 replica + createIndex("test1", remoteStoreIndexSettings(1, 4)); + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + // Check total number of shards (8 total: 4 primaries + 4 replicas) + assertEquals("Total shards should be 8", 8, state.getRoutingTable().allShards().size()); + + // Count assigned and unassigned shards for test1 + int assignedShards = 0; + int unassignedShards = 0; + int unassignedPrimaries = 0; + int unassignedReplicas = 0; + Map nodePrimaryCount = new HashMap<>(); + + // Check shard distribution + for (IndexShardRoutingTable shardRouting : state.routingTable().index("test1")) { + for (ShardRouting shard : shardRouting) { + if (shard.assignedToNode()) { + assignedShards++; + if (shard.primary()) { + // Count primaries per node + String nodeId = shard.currentNodeId(); + nodePrimaryCount.merge(nodeId, 1, Integer::sum); + } + } else { + unassignedShards++; + if (shard.primary()) { + unassignedPrimaries++; + } else { + unassignedReplicas++; + } + } + } + } + + // Assertions + assertEquals("Should have 6 assigned shards", 6, assignedShards); + assertEquals("Should have 2 unassigned shards", 2, unassignedShards); + assertEquals("Should have 1 unassigned primary", 1, unassignedPrimaries); + assertEquals("Should have 1 unassigned replica", 1, unassignedReplicas); + + // Verify no node has more than one primary shard + for (Integer count : nodePrimaryCount.values()) { + assertTrue("No node should have more than 1 primary shard", count <= 1); + } + }); + } + + public void testCombinedIndexAndClusterPrimaryShardLimits() throws Exception { + // Set cluster-wide primary shard limit to 3 + updateClusterSetting(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 3); + + // Create first index with index-level primary shard limit + Settings firstIndexSettings = Settings.builder() + .put(remoteStoreIndexSettings(1, 4)) // 4 shards, 1 replica + .put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1) + .build(); + + // Create first index + createIndex("test1", firstIndexSettings); + + // Create second index with no index-level limits + createIndex("test2", remoteStoreIndexSettings(1, 4)); // 4 shards, 1 replica + + assertBusy(() -> { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + + // Check total number of shards (16 total: 8 from each index - 4 primaries + 4 replicas each) + assertEquals("Total shards should be 16", 16, state.getRoutingTable().allShards().size()); + + // Count assigned and unassigned shards for both indices + int totalAssignedShards = 0; + int test1UnassignedPrimaries = 0; + int test1UnassignedReplicas = 0; + int test2UnassignedShards = 0; + Map nodePrimaryCount = new HashMap<>(); + + // Check test1 shard distribution + for (IndexShardRoutingTable shardRouting : state.routingTable().index("test1")) { + for (ShardRouting shard : shardRouting) { + if (shard.assignedToNode()) { + totalAssignedShards++; + if (shard.primary()) { + String nodeId = shard.currentNodeId(); + nodePrimaryCount.merge(nodeId, 1, Integer::sum); + } + } else { + if (shard.primary()) { + test1UnassignedPrimaries++; + } else { + test1UnassignedReplicas++; + } + } + } + } + + // Check test2 shard distribution + for (IndexShardRoutingTable shardRouting : state.routingTable().index("test2")) { + for (ShardRouting shard : shardRouting) { + if (shard.assignedToNode()) { + totalAssignedShards++; + if (shard.primary()) { + String nodeId = shard.currentNodeId(); + nodePrimaryCount.merge(nodeId, 1, Integer::sum); + } + } else { + test2UnassignedShards++; + } + } + } + + // Assertions + assertEquals("Should have 14 assigned shards", 14, totalAssignedShards); + assertEquals("Should have 1 unassigned primary in test1", 1, test1UnassignedPrimaries); + assertEquals("Should have 1 unassigned replica in test1", 1, test1UnassignedReplicas); + assertEquals("Should have no unassigned shards in test2", 0, test2UnassignedShards); + + // Verify no node has more than one primary shard for test1 + for (IndexShardRoutingTable shardRouting : state.routingTable().index("test1")) { + Map test1NodePrimaryCount = new HashMap<>(); + for (ShardRouting shard : shardRouting) { + if (shard.assignedToNode() && shard.primary()) { + test1NodePrimaryCount.merge(shard.currentNodeId(), 1, Integer::sum); + } + } + for (Integer count : test1NodePrimaryCount.values()) { + assertTrue("No node should have more than 1 primary shard of test1", count <= 1); + } + } + + // Verify no node has more than three primary shards total (cluster-wide limit) + for (Integer count : nodePrimaryCount.values()) { + assertTrue("No node should have more than 3 primary shards total", count <= 3); + } + }); + } + + private void updateClusterSetting(String setting, int value) { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put(setting, value)).get(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 3988d50b2ce1e..60c04d5a620f8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -64,6 +64,7 @@ import java.io.IOException; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration; /** @@ -257,6 +258,7 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(final ClusterState currentState) { + validateClusterTotalPrimaryShardsPerNodeSetting(currentState, request); boolean isCompatibilityModeChanging = validateCompatibilityModeSettingRequest(request, state); ClusterState clusterState = updater.updateSettings( currentState, @@ -324,4 +326,34 @@ private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) { ); } } + + private void validateClusterTotalPrimaryShardsPerNodeSetting(ClusterState currentState, ClusterUpdateSettingsRequest request) { + if (request.transientSettings().hasValue(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey()) + || request.persistentSettings().hasValue(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey())) { + + Settings settings = Settings.builder().put(request.transientSettings()).put(request.persistentSettings()).build(); + + int newValue = CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings); + + // If default value (-1), no validation needed + if (newValue == -1) { + return; + } + + // Check current state + boolean allNodesRemoteStoreEnabled = currentState.nodes() + .getNodes() + .values() + .stream() + .allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode()); + + if (!allNodesRemoteStoreEnabled) { + throw new IllegalArgumentException( + "Setting [" + + CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + + "] can only be used with remote store enabled clusters" + ); + } + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index d50192f106cfe..fe81895fa375e 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -817,6 +817,7 @@ public void validate(final String value, final Map, Object> settings) private final boolean isRemoteSnapshot; private final int indexTotalShardsPerNodeLimit; + private final int indexTotalPrimaryShardsPerNodeLimit; private final boolean isAppendOnlyIndex; private final Context context; @@ -849,6 +850,7 @@ private IndexMetadata( final Map rolloverInfos, final boolean isSystem, final int indexTotalShardsPerNodeLimit, + final int indexTotalPrimaryShardsPerNodeLimit, boolean isAppendOnlyIndex, final Context context ) { @@ -887,6 +889,7 @@ private IndexMetadata( this.isSystem = isSystem; this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit; + this.indexTotalPrimaryShardsPerNodeLimit = indexTotalPrimaryShardsPerNodeLimit; this.isAppendOnlyIndex = isAppendOnlyIndex; this.context = context; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; @@ -1069,6 +1072,10 @@ public int getIndexTotalShardsPerNodeLimit() { return this.indexTotalShardsPerNodeLimit; } + public int getIndexTotalPrimaryShardsPerNodeLimit() { + return this.indexTotalPrimaryShardsPerNodeLimit; + } + public boolean isAppendOnlyIndex() { return this.isAppendOnlyIndex; } @@ -1867,6 +1874,9 @@ public IndexMetadata build() { } final int indexTotalShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings); + final int indexTotalPrimaryShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get( + settings + ); final boolean isAppendOnlyIndex = INDEX_APPEND_ONLY_ENABLED_SETTING.get(settings); final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); @@ -1899,6 +1909,7 @@ public IndexMetadata build() { rolloverInfos, isSystem, indexTotalShardsPerNodeLimit, + indexTotalPrimaryShardsPerNodeLimit, isAppendOnlyIndex, context ); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index a2e1ca440512d..a81fe01f0e7f4 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -155,6 +155,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING; import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; @@ -1094,6 +1095,7 @@ static Settings aggregateIndexSettings( if (FeatureFlags.isEnabled(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING)) { validateSearchOnlyReplicasSettings(indexSettings); } + validateIndexTotalPrimaryShardsPerNodeSetting(indexSettings); return indexSettings; } @@ -1844,6 +1846,28 @@ public static void validateRefreshIntervalSettings(Settings requestSettings, Clu } } + /** + * Validates {@code index.routing.allocation.total_primary_shards_per_node} is only set for remote store enabled cluster + */ + // TODO : Update this check for SegRep to DocRep migration on need basis + public static void validateIndexTotalPrimaryShardsPerNodeSetting(Settings indexSettings) { + // Get the setting value + int indexPrimaryShardsPerNode = INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(indexSettings); + + // If default value (-1), no validation needed + if (indexPrimaryShardsPerNode == -1) { + return; + } + + // Check if remote store is enabled + boolean isRemoteStoreEnabled = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings); + if (!isRemoteStoreEnabled) { + throw new IllegalArgumentException( + "Setting [" + INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey() + "] can only be used with remote store enabled clusters" + ); + } + } + /** * Validates {@code index.translog.durability} is not async if the {@code cluster.remote_store.index.restrict.async-durability} is set to true. * diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java index 5d20388b74e1f..b032ade720612 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java @@ -101,6 +101,7 @@ import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.MetadataCreateDataStreamService.validateTimestampFieldMapping; +import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateIndexTotalPrimaryShardsPerNodeSetting; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogFlushIntervalSettingsForCompositeIndex; import static org.opensearch.common.util.concurrent.ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME; @@ -1642,6 +1643,9 @@ private void validate(String name, @Nullable Settings settings, List ind validateRefreshIntervalSettings(settings, clusterService.getClusterSettings()); validateTranslogFlushIntervalSettingsForCompositeIndex(settings, clusterService.getClusterSettings()); validateTranslogDurabilitySettingsInTemplate(settings, clusterService.getClusterSettings()); + + // validate index total primary shards per node setting + validateIndexTotalPrimaryShardsPerNodeSetting(settings); } if (indexPatterns.stream().anyMatch(Regex::isMatchAllPattern)) { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index a35af0e607c31..eb10fd5d04288 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -78,6 +78,7 @@ import static org.opensearch.action.support.ContextPreservingActionListener.wrapPreservingContext; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateIndexTotalPrimaryShardsPerNodeSetting; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateOverlap; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateRefreshIntervalSettings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.validateTranslogDurabilitySettings; @@ -139,6 +140,7 @@ public void updateSettings( validateRefreshIntervalSettings(normalizedSettings, clusterService.getClusterSettings()); validateTranslogDurabilitySettings(normalizedSettings, clusterService.getClusterSettings(), clusterService.getSettings()); + validateIndexTotalPrimaryShardsPerNodeSetting(normalizedSettings); final int defaultReplicaCount = clusterService.getClusterSettings().get(Metadata.DEFAULT_REPLICA_COUNT_SETTING); Settings.Builder settingsForClosedIndices = Settings.builder(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java index 24c3077960444..15f1b99ac2754 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNode.java @@ -124,6 +124,62 @@ public Iterator iterator() { Collections.unmodifiableCollection(this.shardTuple.v2().values()).stream() ).iterator(); } + + public int numberOfPrimaryShards() { + return this.shardTuple.v1().size(); + } + } + + static class RelocatingShardsBucket { + private final LinkedHashSet relocatingShards; + private final LinkedHashSet relocatingPrimaryShards; + + RelocatingShardsBucket() { + relocatingShards = new LinkedHashSet<>(); + relocatingPrimaryShards = new LinkedHashSet<>(); + } + + public boolean add(ShardRouting shard) { + boolean res = relocatingShards.add(shard); + if (shard.primary()) { + relocatingPrimaryShards.add(shard); + } + return res; + } + + public boolean remove(ShardRouting shard) { + boolean res = relocatingShards.remove(shard); + relocatingPrimaryShards.remove(shard); + return res; + } + + public int size() { + return relocatingShards.size(); + } + + public int primarySize() { + return relocatingPrimaryShards.size(); + } + + public Set getRelocatingShards() { + return Collections.unmodifiableSet(relocatingShards); + } + + public Set getRelocatingPrimaryShards() { + return Collections.unmodifiableSet(relocatingPrimaryShards); + } + + public List getRelocatingShardsList() { + return new ArrayList<>(relocatingShards); + } + + // For assertions/verification + public boolean invariant() { + assert relocatingShards.containsAll(relocatingPrimaryShards); + assert relocatingPrimaryShards.stream().allMatch(ShardRouting::primary); + assert relocatingPrimaryShards.size() == relocatingShards.stream().filter(ShardRouting::primary).count(); + return true; + } } private final String nodeId; @@ -132,9 +188,9 @@ public Iterator iterator() { private final BucketedShards shards; - private final LinkedHashSet initializingShards; + private final RelocatingShardsBucket relocatingShardsBucket; - private final LinkedHashSet relocatingShards; + private final LinkedHashSet initializingShards; private final HashMap> shardsByIndex; @@ -144,7 +200,7 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRouti final LinkedHashMap primaryShards = new LinkedHashMap<>(); final LinkedHashMap replicaShards = new LinkedHashMap<>(); this.shards = new BucketedShards(primaryShards, replicaShards); - this.relocatingShards = new LinkedHashSet<>(); + this.relocatingShardsBucket = new RelocatingShardsBucket(); this.initializingShards = new LinkedHashSet<>(); this.shardsByIndex = new LinkedHashMap<>(); @@ -152,7 +208,7 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shardRouti if (shardRouting.initializing()) { initializingShards.add(shardRouting); } else if (shardRouting.relocating()) { - relocatingShards.add(shardRouting); + relocatingShardsBucket.add(shardRouting); } shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting); @@ -231,7 +287,7 @@ void add(ShardRouting shard) { if (shard.initializing()) { initializingShards.add(shard); } else if (shard.relocating()) { - relocatingShards.add(shard); + relocatingShardsBucket.add(shard); } shardsByIndex.computeIfAbsent(shard.index(), k -> new LinkedHashSet<>()).add(shard); assert invariant(); @@ -251,7 +307,7 @@ void update(ShardRouting oldShard, ShardRouting newShard) { boolean exist = initializingShards.remove(oldShard); assert exist : "expected shard " + oldShard + " to exist in initializingShards"; } else if (oldShard.relocating()) { - boolean exist = relocatingShards.remove(oldShard); + boolean exist = relocatingShardsBucket.remove(oldShard); assert exist : "expected shard " + oldShard + " to exist in relocatingShards"; } shardsByIndex.get(oldShard.index()).remove(oldShard); @@ -261,7 +317,7 @@ void update(ShardRouting oldShard, ShardRouting newShard) { if (newShard.initializing()) { initializingShards.add(newShard); } else if (newShard.relocating()) { - relocatingShards.add(newShard); + relocatingShardsBucket.add(newShard); } shardsByIndex.computeIfAbsent(newShard.index(), k -> new LinkedHashSet<>()).add(newShard); assert invariant(); @@ -275,7 +331,7 @@ void remove(ShardRouting shard) { boolean exist = initializingShards.remove(shard); assert exist : "expected shard " + shard + " to exist in initializingShards"; } else if (shard.relocating()) { - boolean exist = relocatingShards.remove(shard); + boolean exist = relocatingShardsBucket.remove(shard); assert exist : "expected shard " + shard + " to exist in relocatingShards"; } shardsByIndex.get(shard.index()).remove(shard); @@ -295,7 +351,7 @@ public int numberOfShardsWithState(ShardRoutingState... states) { if (states[0] == ShardRoutingState.INITIALIZING) { return initializingShards.size(); } else if (states[0] == ShardRoutingState.RELOCATING) { - return relocatingShards.size(); + return relocatingShardsBucket.size(); } } @@ -320,7 +376,7 @@ public List shardsWithState(ShardRoutingState... states) { if (states[0] == ShardRoutingState.INITIALIZING) { return new ArrayList<>(initializingShards); } else if (states[0] == ShardRoutingState.RELOCATING) { - return new ArrayList<>(relocatingShards); + return relocatingShardsBucket.getRelocatingShardsList(); } } @@ -354,7 +410,7 @@ public List shardsWithState(String index, ShardRoutingState... sta } return shards; } else if (states[0] == ShardRoutingState.RELOCATING) { - for (ShardRouting shardEntry : relocatingShards) { + for (ShardRouting shardEntry : relocatingShardsBucket.getRelocatingShards()) { if (shardEntry.getIndexName().equals(index) == false) { continue; } @@ -381,7 +437,11 @@ public List shardsWithState(String index, ShardRoutingState... sta * The number of shards on this node that will not be eventually relocated. */ public int numberOfOwningShards() { - return shards.size() - relocatingShards.size(); + return shards.size() - relocatingShardsBucket.size(); + } + + public int numberOfOwningPrimaryShards() { + return shards.numberOfPrimaryShards() - relocatingShardsBucket.primarySize(); } public int numberOfOwningShardsForIndex(final Index index) { @@ -393,6 +453,20 @@ public int numberOfOwningShardsForIndex(final Index index) { } } + public int numberOfOwningPrimaryShardsForIndex(final Index index) { + final LinkedHashSet shardRoutings = shardsByIndex.get(index); + if (shardRoutings == null) { + return 0; + } else { + return Math.toIntExact( + shardRoutings.stream() + .filter(sr -> sr.relocating() == false) + .filter(ShardRouting::primary) // Add this filter for primary shards + .count() + ); + } + } + public String prettyPrint() { StringBuilder sb = new StringBuilder(); sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n"); @@ -441,8 +515,19 @@ private boolean invariant() { Collection shardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false) .filter(ShardRouting::relocating) .collect(Collectors.toList()); - assert relocatingShards.size() == shardRoutingsRelocating.size(); - assert relocatingShards.containsAll(shardRoutingsRelocating); + assert relocatingShardsBucket.getRelocatingShards().size() == shardRoutingsRelocating.size(); + assert relocatingShardsBucket.getRelocatingShards().containsAll(shardRoutingsRelocating); + + // relocatingPrimaryShards must be consistent with primary shards that are relocating + Collection primaryShardRoutingsRelocating = StreamSupport.stream(shards.spliterator(), false) + .filter(ShardRouting::relocating) + .filter(ShardRouting::primary) + .collect(Collectors.toList()); + assert relocatingShardsBucket.getRelocatingPrimaryShards().size() == primaryShardRoutingsRelocating.size(); + assert relocatingShardsBucket.getRelocatingPrimaryShards().containsAll(primaryShardRoutingsRelocating); + + // relocatingPrimaryShards and relocatingShards should be consistent + assert relocatingShardsBucket.invariant(); final Map> shardRoutingsByIndex = StreamSupport.stream(shards.spliterator(), false) .collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet())); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java index 6f211f370de95..ad77aed4e4fd5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.routing.allocation.decider; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; @@ -46,13 +47,14 @@ /** * This {@link AllocationDecider} limits the number of shards per node on a per * index or node-wide basis. The allocator prevents a single node to hold more - * than {@code index.routing.allocation.total_shards_per_node} per index and - * {@code cluster.routing.allocation.total_shards_per_node} globally during the allocation + * than {@code index.routing.allocation.total_shards_per_node} per index, {@code index.routing.allocation.total_primary_shards_per_node} per index, + * {@code cluster.routing.allocation.total_shards_per_node} globally and + * {@code cluster.routing.allocation.total_primary_shards_per_node} globally during the allocation * process. The limits of this decider can be changed in real-time via a the * index settings API. *

- * If {@code index.routing.allocation.total_shards_per_node} is reset to a negative value shards - * per index are unlimited per node. Shards currently in the + * If {@code index.routing.allocation.total_shards_per_node} or {@code index.routing.allocation.total_primary_shards_per_node}is reset to a negative value shards + * per index are unlimited per node or primary shards per index are unlimited per node respectively. Shards currently in the * {@link ShardRoutingState#RELOCATING relocating} state are ignored by this * {@link AllocationDecider} until the shard changed its state to either * {@link ShardRoutingState#STARTED started}, @@ -70,6 +72,7 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { public static final String NAME = "shards_limit"; private volatile int clusterShardLimit; + private volatile int clusterPrimaryShardLimit; /** * Controls the maximum number of shards per index on a single OpenSearch @@ -84,7 +87,19 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { ); /** - * Controls the maximum number of shards per node on a global level. + * Controls the maximum number of primary shards per index on a single OpenSearch + * node for segment replication enabled indices. Negative values are interpreted as unlimited. + */ + public static final Setting INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING = Setting.intSetting( + "index.routing.allocation.total_primary_shards_per_node", + -1, + -1, + Property.Dynamic, + Property.IndexScope + ); + + /** + * Controls the maximum number of shards per node on a cluster level. * Negative values are interpreted as unlimited. */ public static final Setting CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING = Setting.intSetting( @@ -95,18 +110,36 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { Property.NodeScope ); + /** + * Controls the maximum number of primary shards per node on a cluster level. + * Negative values are interpreted as unlimited. + */ + public static final Setting CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING = Setting.intSetting( + "cluster.routing.allocation.total_primary_shards_per_node", + -1, + -1, + Property.Dynamic, + Property.NodeScope + ); + private final Settings settings; public ShardsLimitAllocationDecider(Settings settings, ClusterSettings clusterSettings) { this.settings = settings; this.clusterShardLimit = CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.get(settings); + this.clusterPrimaryShardLimit = CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, this::setClusterShardLimit); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, this::setClusterPrimaryShardLimit); } private void setClusterShardLimit(int clusterShardLimit) { this.clusterShardLimit = clusterShardLimit; } + private void setClusterPrimaryShardLimit(int clusterPrimaryShardLimit) { + this.clusterPrimaryShardLimit = clusterPrimaryShardLimit; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return doDecide(shardRouting, node, allocation, (count, limit) -> count >= limit); @@ -115,7 +148,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return doDecide(shardRouting, node, allocation, (count, limit) -> count > limit); - } private Decision doDecide( @@ -124,18 +156,22 @@ private Decision doDecide( RoutingAllocation allocation, BiPredicate decider ) { - final int indexShardLimit = allocation.metadata().getIndexSafe(shardRouting.index()).getIndexTotalShardsPerNodeLimit(); + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); + final int indexShardLimit = indexMetadata.getIndexTotalShardsPerNodeLimit(); + final int indexPrimaryShardLimit = indexMetadata.getIndexTotalPrimaryShardsPerNodeLimit(); // Capture the limit here in case it changes during this method's // execution final int clusterShardLimit = this.clusterShardLimit; - - if (indexShardLimit <= 0 && clusterShardLimit <= 0) { + final int clusterPrimaryShardLimit = this.clusterPrimaryShardLimit; + if (indexShardLimit <= 0 && indexPrimaryShardLimit <= 0 && clusterShardLimit <= 0 && clusterPrimaryShardLimit <= 0) { return allocation.decision( Decision.YES, NAME, - "total shard limits are disabled: [index: %d, cluster: %d] <= 0", + "total shard limits are disabled: [index: %d, index primary: %d, cluster: %d, cluster primary: %d] <= 0", indexShardLimit, - clusterShardLimit + indexPrimaryShardLimit, + clusterShardLimit, + clusterPrimaryShardLimit ); } @@ -151,6 +187,19 @@ private Decision doDecide( clusterShardLimit ); } + if (shardRouting.primary() && clusterPrimaryShardLimit > 0) { + final int nodePrimaryShardCount = node.numberOfOwningPrimaryShards(); + if (decider.test(nodePrimaryShardCount, clusterPrimaryShardLimit)) { + return allocation.decision( + Decision.NO, + NAME, + "too many primary shards [%d] allocated to this node, cluster setting [%s=%d]", + nodePrimaryShardCount, + CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), + clusterPrimaryShardLimit + ); + } + } if (indexShardLimit > 0) { final int indexShardCount = node.numberOfOwningShardsForIndex(shardRouting.index()); if (decider.test(indexShardCount, indexShardLimit)) { @@ -165,13 +214,29 @@ private Decision doDecide( ); } } + if (indexPrimaryShardLimit > 0 && shardRouting.primary()) { + final int indexPrimaryShardCount = node.numberOfOwningPrimaryShardsForIndex(shardRouting.index()); + if (decider.test(indexPrimaryShardCount, indexPrimaryShardLimit)) { + return allocation.decision( + Decision.NO, + NAME, + "too many primary shards [%d] allocated to this node for index [%s], index setting [%s=%d]", + indexPrimaryShardCount, + shardRouting.getIndexName(), + INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), + indexPrimaryShardLimit + ); + } + } return allocation.decision( Decision.YES, NAME, - "the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]", + "the shard count [%d] for this node is under the index limit [%d], index primary limit [%d], cluster level node limit [%d] and cluster level primary node limit [%d]", nodeShardCount, indexShardLimit, - clusterShardLimit + indexPrimaryShardLimit, + clusterShardLimit, + clusterPrimaryShardLimit ); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index d204c383524c2..b4b85e0a9d367 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -433,6 +433,7 @@ public void apply(Settings value, Settings current, Settings previous) { SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, + ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING, HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING, HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 0e21104fb6426..f76b6cbb38bab 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -166,6 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.MAX_SLICES_PER_PIT, IndexSettings.MAX_REGEX_LENGTH_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, + ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, IndexSettings.INDEX_GC_DELETES_SETTING, IndexSettings.INDEX_SOFT_DELETES_SETTING, IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index cc35426ee15b8..dfe3928ac37f3 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -155,6 +155,7 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_MERGE_POLICY; @@ -2548,6 +2549,96 @@ public void testApplyContextWithSettingsOverlap() throws IOException { }); } + public void testIndexTotalPrimaryShardsPerNodeSettingValidationWithRemoteStore() { + // Test case where setting is used with remote store enabled (should succeed) + Settings settings = Settings.builder().build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + + final Settings.Builder requestSettings = Settings.builder() + // Enable remote store + .put(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.getKey(), true) + // Set primary shards per node to valid value + .put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT.toString()); + + request.settings(requestSettings.build()); + + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + Settings.EMPTY, + null, + settings, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + + // Verify that the value is the same as set earlier and validation was successful + assertEquals(Integer.valueOf(2), INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(indexSettings)); + } + + public void testIndexTotalPrimaryShardsPerNodeSettingValidationWithoutRemoteStore() { + // Test case where setting is used without remote store (should fail) + Settings settings = Settings.builder().build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + + final Settings.Builder requestSettings = Settings.builder() + // Remote store not enabled + .put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT.toString()); + + request.settings(requestSettings.build()); + + // Expect IllegalArgumentException + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + Settings.EMPTY, + null, + settings, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ) + ); + + // Verify error message + assertEquals( + "Setting [index.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters", + exception.getMessage() + ); + } + + public void testIndexTotalPrimaryShardsPerNodeSettingValidationWithDefaultValue() { + // Test case with default value (-1) without remote store (should succeed) + Settings settings = Settings.builder().build(); + request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + + final Settings.Builder requestSettings = Settings.builder().put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), -1); + + request.settings(requestSettings.build()); + + Settings indexSettings = aggregateIndexSettings( + ClusterState.EMPTY_STATE, + request, + Settings.EMPTY, + null, + settings, + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + randomShardLimitService(), + Collections.emptySet(), + clusterSettings + ); + + // Verify that default value passes validation + assertEquals(Integer.valueOf(-1), INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(indexSettings)); + } + private IndexTemplateMetadata addMatchingTemplate(Consumer configurator) { IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*"); configurator.accept(builder); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java index 05ae67d10f4cb..795d1713772c2 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java @@ -69,6 +69,7 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.InvalidIndexTemplateException; import org.opensearch.indices.SystemIndices; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.ThreadPool; @@ -92,6 +93,7 @@ import static java.util.Collections.singletonList; import static org.opensearch.cluster.applicationtemplates.ClusterStateSystemTemplateLoader.TEMPLATE_LOADER_IDENTIFIER; import static org.opensearch.cluster.applicationtemplates.SystemTemplateMetadata.fromComponentTemplateInfo; +import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING; import static org.opensearch.common.settings.Settings.builder; import static org.opensearch.common.util.concurrent.ThreadContext.ACTION_ORIGIN_TRANSIENT_NAME; import static org.opensearch.env.Environment.PATH_HOME_SETTING; @@ -2440,6 +2442,23 @@ public void testMaxTranslogFlushSizeWithCompositeIndex() { assertThat(throwables.get(0), instanceOf(IllegalArgumentException.class)); } + public void testIndexPrimaryShardsSetting() { + Settings clusterSettings = Settings.builder().build(); + PutRequest request = new PutRequest("test", "test_index_primary_shard_constraint"); + request.patterns(singletonList("test_shards_wait*")); + Settings.Builder settingsBuilder = builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") + .put(INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT.toString()); + request.settings(settingsBuilder.build()); + List throwables = putTemplate(xContentRegistry(), request, clusterSettings); + assertThat(throwables.get(0), instanceOf(IllegalArgumentException.class)); + assertEquals( + "Setting [index.routing.allocation.total_primary_shards_per_node] can only be used with remote store enabled clusters", + throwables.get(0).getMessage() + ); + } + private static List putTemplate(NamedXContentRegistry xContentRegistry, PutRequest request) { return putTemplate(xContentRegistry, request, Settings.EMPTY); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java index cc4f2e510cb31..c78e5582155d1 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodeTests.java @@ -165,6 +165,32 @@ public void testNumberOfOwningShards() { assertThat(routingNode.numberOfOwningShards(), equalTo(2)); } + public void testNumberOfOwningPrimaryShards() { + final ShardRouting test1Shard0 = TestShardRouting.newShardRouting("test1", 0, "node-1", true, ShardRoutingState.STARTED); + final ShardRouting test2Shard0 = TestShardRouting.newShardRouting( + "test2", + 0, + "node-1", + "node-2", + false, + ShardRoutingState.RELOCATING + ); + final ShardRouting test3Shard0 = TestShardRouting.newShardRouting( + "test3", + 0, + "node-1", + "node-2", + true, + ShardRoutingState.RELOCATING + ); + final ShardRouting test3Shard1 = TestShardRouting.newShardRouting("test3", 1, "node-1", true, ShardRoutingState.STARTED); + routingNode.add(test1Shard0); + routingNode.add(test2Shard0); + routingNode.add(test3Shard0); + routingNode.add(test3Shard1); + assertThat(routingNode.numberOfOwningPrimaryShards(), equalTo(2)); + } + public void testNumberOfOwningShardsForIndex() { final ShardRouting test1Shard0 = TestShardRouting.newShardRouting("test1", 0, "node-1", false, ShardRoutingState.STARTED); final ShardRouting test2Shard0 = TestShardRouting.newShardRouting( @@ -183,4 +209,33 @@ public void testNumberOfOwningShardsForIndex() { assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test3", IndexMetadata.INDEX_UUID_NA_VALUE)), equalTo(0)); } + public void testNumberOfOwningPrimaryShardsForIndex() { + final ShardRouting test1Shard0 = TestShardRouting.newShardRouting("test1", 0, "node-1", true, ShardRoutingState.STARTED); + final ShardRouting test2Shard0 = TestShardRouting.newShardRouting( + "test2", + 0, + "node-1", + "node-2", + false, + ShardRoutingState.RELOCATING + ); + final ShardRouting test3Shard0 = TestShardRouting.newShardRouting( + "test3", + 0, + "node-1", + "node-2", + true, + ShardRoutingState.RELOCATING + ); + final ShardRouting test3Shard1 = TestShardRouting.newShardRouting("test3", 1, "node-1", true, ShardRoutingState.STARTED); + routingNode.add(test1Shard0); + routingNode.add(test2Shard0); + routingNode.add(test3Shard0); + routingNode.add(test3Shard1); + assertThat(routingNode.numberOfOwningPrimaryShardsForIndex(new Index("test", IndexMetadata.INDEX_UUID_NA_VALUE)), equalTo(0)); + assertThat(routingNode.numberOfOwningPrimaryShardsForIndex(new Index("test1", IndexMetadata.INDEX_UUID_NA_VALUE)), equalTo(1)); + assertThat(routingNode.numberOfOwningPrimaryShardsForIndex(new Index("test2", IndexMetadata.INDEX_UUID_NA_VALUE)), equalTo(0)); + assertThat(routingNode.numberOfOwningPrimaryShardsForIndex(new Index("test3", IndexMetadata.INDEX_UUID_NA_VALUE)), equalTo(1)); + } + } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderTests.java new file mode 100644 index 0000000000000..ffc42d11d3696 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDeciderTests.java @@ -0,0 +1,349 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchTestCase; + +import static org.opensearch.cluster.routing.allocation.decider.Decision.Type.NO; +import static org.opensearch.cluster.routing.allocation.decider.Decision.Type.YES; + +public class ShardsLimitAllocationDeciderTests extends OpenSearchTestCase { + + public void testWithNoLimit() { + Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ShardsLimitAllocationDecider decider = new ShardsLimitAllocationDecider(settings, clusterSettings); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(0)) + .build(); + + // Create a RoutingTable with shards 0 and 1 initialized on node1, and shard 2 unassigned + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test").getIndex()); + + // Shard 0 and 1: STARTED on node1 + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED)); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 1, "node1", null, true, ShardRoutingState.STARTED)); + + // Shard 2: Unassigned + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + RoutingTable routingTable = routingTableBuilder.build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0); + allocation.debugDecision(true); + + ShardRouting shard1 = routingTable.index("test").shard(0).primaryShard(); + ShardRouting shard2 = routingTable.index("test").shard(1).primaryShard(); + ShardRouting shard3 = routingTable.index("test").shard(2).primaryShard(); + + // Test allocation decisions + assertEquals(YES, decider.canAllocate(shard3, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(YES, decider.canRemain(shard1, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(YES, decider.canAllocate(shard3, clusterState.getRoutingNodes().node("node2"), allocation).type()); + } + + public void testClusterShardLimit() { + Settings settings = Settings.builder().put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ShardsLimitAllocationDecider decider = new ShardsLimitAllocationDecider(settings, clusterSettings); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(0)) + .build(); + + // Create a RoutingTable with shards 0 and 1 initialized on node1, and shard 2 unassigned + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test").getIndex()); + + // Shard 0 and 1: STARTED on node1 + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 0, "node1", null, true, ShardRoutingState.STARTED)); + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 1, "node1", null, true, ShardRoutingState.STARTED)); + + // Shard 2: Unassigned + indexRoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + + routingTableBuilder.add(indexRoutingTableBuilder.build()); + RoutingTable routingTable = routingTableBuilder.build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0); + allocation.debugDecision(true); + + ShardRouting shard1 = routingTable.index("test").shard(0).primaryShard(); + ShardRouting shard2 = routingTable.index("test").shard(1).primaryShard(); + ShardRouting shard3 = routingTable.index("test").shard(2).primaryShard(); + + // Test allocation decisions + assertEquals(NO, decider.canAllocate(shard3, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(YES, decider.canRemain(shard1, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(YES, decider.canAllocate(shard3, clusterState.getRoutingNodes().node("node2"), allocation).type()); + } + + public void testClusterPrimaryShardLimit() { + Settings settings = Settings.builder() + .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 2) + .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 3) + .build(); + + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ShardsLimitAllocationDecider decider = new ShardsLimitAllocationDecider(settings, clusterSettings); + + // Create metadata for two indices + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(0)) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) + .build(); + + // Create routing table + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + // Setup routing for test1 (3 primaries) + IndexRoutingTable.Builder test1RoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test1").getIndex()); + + // test1: First primary on node1 + test1RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test1", 0, "node1", null, true, ShardRoutingState.STARTED)); + + // test1: Second primary on node2 + test1RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test1", 1, "node2", null, true, ShardRoutingState.STARTED)); + + // test1: Third primary unassigned + test1RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test1", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + + // Setup routing for test2 (2 primaries, 1 replica) + IndexRoutingTable.Builder test2RoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test2").getIndex()); + + // test2: First primary on node1 + test2RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test2", 0, "node1", null, true, ShardRoutingState.STARTED)); + + // test2: Second primary on node2 + test2RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test2", 1, "node2", null, true, ShardRoutingState.STARTED)); + + // test2: First replica on node2 + test2RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test2", 0, "node2", null, false, ShardRoutingState.STARTED)); + // test2: Second replica unassigned + test2RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test2", 1, null, null, false, ShardRoutingState.UNASSIGNED)); + + routingTableBuilder.add(test1RoutingTableBuilder.build()); + routingTableBuilder.add(test2RoutingTableBuilder.build()); + RoutingTable routingTable = routingTableBuilder.build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0); + allocation.debugDecision(true); + + // Get shards for testing + ShardRouting test1Shard1 = routingTable.index("test1").shard(0).primaryShard(); + ShardRouting test1Shard3 = routingTable.index("test1").shard(2).primaryShard(); + ShardRouting test2Replica2 = routingTable.index("test2").shard(1).replicaShards().get(0); + + // Test allocation decisions + // Cannot allocate third primary to node1 (would exceed primary shard limit) + assertEquals(NO, decider.canAllocate(test1Shard3, clusterState.getRoutingNodes().node("node1"), allocation).type()); + + // Cannot allocate third primary to node2 (would exceed primary shard limit) + assertEquals(NO, decider.canAllocate(test1Shard3, clusterState.getRoutingNodes().node("node2"), allocation).type()); + + // Can allocate second replica to node1 (within total shard limit) + assertEquals(YES, decider.canAllocate(test2Replica2, clusterState.getRoutingNodes().node("node1"), allocation).type()); + + // Cannot allocate second replica to node2 (would exceed total shard limit) + assertEquals(NO, decider.canAllocate(test2Replica2, clusterState.getRoutingNodes().node("node2"), allocation).type()); + + // Existing primary can remain + assertEquals(YES, decider.canRemain(test1Shard1, clusterState.getRoutingNodes().node("node1"), allocation).type()); + + } + + public void testIndexShardLimit() { + Settings clusterSettings = Settings.builder() + .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 2) + .build(); + ClusterSettings clusterSettingsObject = new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ShardsLimitAllocationDecider decider = new ShardsLimitAllocationDecider(clusterSettings, clusterSettingsObject); + + // Create index settings with INDEX_TOTAL_SHARDS_PER_NODE_SETTING and version + Settings indexSettings = Settings.builder() + .put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), 1) // Set index-level limit to 1 + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .build(); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(indexSettings).numberOfShards(3).numberOfReplicas(0)) + .put(IndexMetadata.builder("test2").settings(indexSettings).numberOfShards(3).numberOfReplicas(0)) + .build(); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + // Set up routing table for test1 + IndexRoutingTable.Builder test1RoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test1").getIndex()); + test1RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test1", 0, "node1", null, true, ShardRoutingState.STARTED)); + test1RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test1", 1, null, null, true, ShardRoutingState.UNASSIGNED)); + test1RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test1", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + routingTableBuilder.add(test1RoutingTableBuilder.build()); + + // Set up routing table for test2 + IndexRoutingTable.Builder test2RoutingTableBuilder = IndexRoutingTable.builder(metadata.index("test2").getIndex()); + test2RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test2", 0, "node2", null, true, ShardRoutingState.STARTED)); + test2RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test2", 1, null, null, true, ShardRoutingState.UNASSIGNED)); + test2RoutingTableBuilder.addShard(TestShardRouting.newShardRouting("test2", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + routingTableBuilder.add(test2RoutingTableBuilder.build()); + + RoutingTable routingTable = routingTableBuilder.build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0); + allocation.debugDecision(true); + + // Test allocation decisions + ShardRouting test1Shard1 = routingTable.index("test1").shard(1).primaryShard(); + ShardRouting test1Shard2 = routingTable.index("test1").shard(2).primaryShard(); + ShardRouting test2Shard1 = routingTable.index("test2").shard(1).primaryShard(); + ShardRouting test2Shard2 = routingTable.index("test2").shard(2).primaryShard(); + + assertEquals(NO, decider.canAllocate(test1Shard2, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(YES, decider.canRemain(test1Shard1, clusterState.getRoutingNodes().node("node1"), allocation).type()); + assertEquals(YES, decider.canAllocate(test1Shard2, clusterState.getRoutingNodes().node("node2"), allocation).type()); + assertEquals(NO, decider.canAllocate(test2Shard2, clusterState.getRoutingNodes().node("node2"), allocation).type()); + assertEquals(YES, decider.canRemain(test2Shard1, clusterState.getRoutingNodes().node("node2"), allocation).type()); + assertEquals(YES, decider.canAllocate(test2Shard2, clusterState.getRoutingNodes().node("node1"), allocation).type()); + } + + public void testIndexPrimaryShardLimit() { + Settings clusterSettings = Settings.builder() + .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), -1) + .build(); + ClusterSettings clusterSettingsObject = new ClusterSettings(clusterSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ShardsLimitAllocationDecider decider = new ShardsLimitAllocationDecider(clusterSettings, clusterSettingsObject); + + // Create index settings for three indices + Settings indexSettingsTest1 = Settings.builder() + .put(ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT.toString()) + .build(); + + Settings indexSettingsTest2 = Settings.builder() + .put(ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 2) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .build(); + + Settings indexSettingsTest3 = Settings.builder() + .put(ShardsLimitAllocationDecider.INDEX_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .build(); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(indexSettingsTest1).numberOfShards(3).numberOfReplicas(0)) + .put(IndexMetadata.builder("test2").settings(indexSettingsTest2).numberOfShards(3).numberOfReplicas(0)) + .put(IndexMetadata.builder("test3").settings(indexSettingsTest3).numberOfShards(3).numberOfReplicas(0)) + .build(); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + + // Set up routing table for test1 + IndexRoutingTable.Builder test1Builder = IndexRoutingTable.builder(metadata.index("test1").getIndex()); + test1Builder.addShard(TestShardRouting.newShardRouting("test1", 0, "node1", null, true, ShardRoutingState.STARTED)); + test1Builder.addShard(TestShardRouting.newShardRouting("test1", 1, "node2", null, true, ShardRoutingState.STARTED)); + test1Builder.addShard(TestShardRouting.newShardRouting("test1", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + routingTableBuilder.add(test1Builder.build()); + + // Set up routing table for test2 + IndexRoutingTable.Builder test2Builder = IndexRoutingTable.builder(metadata.index("test2").getIndex()); + test2Builder.addShard(TestShardRouting.newShardRouting("test2", 0, "node1", null, true, ShardRoutingState.STARTED)); + test2Builder.addShard(TestShardRouting.newShardRouting("test2", 1, "node2", null, true, ShardRoutingState.STARTED)); + test2Builder.addShard(TestShardRouting.newShardRouting("test2", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + routingTableBuilder.add(test2Builder.build()); + + // Set up routing table for test3 + IndexRoutingTable.Builder test3Builder = IndexRoutingTable.builder(metadata.index("test3").getIndex()); + test3Builder.addShard(TestShardRouting.newShardRouting("test3", 0, "node1", null, true, ShardRoutingState.STARTED)); + test3Builder.addShard(TestShardRouting.newShardRouting("test3", 1, "node2", null, true, ShardRoutingState.STARTED)); + test3Builder.addShard(TestShardRouting.newShardRouting("test3", 2, null, null, true, ShardRoutingState.UNASSIGNED)); + routingTableBuilder.add(test3Builder.build()); + + RoutingTable routingTable = routingTableBuilder.build(); + + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .build(); + + RoutingAllocation allocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, null, 0); + allocation.debugDecision(true); + + // Get unassigned shards for testing + ShardRouting test1Shard2 = routingTable.index("test1").shard(2).primaryShard(); + ShardRouting test2Shard2 = routingTable.index("test2").shard(2).primaryShard(); + ShardRouting test3Shard2 = routingTable.index("test3").shard(2).primaryShard(); + + // Test assertions + assertEquals(NO, decider.canAllocate(test1Shard2, clusterState.getRoutingNodes().node("node1"), allocation).type()); // Cannot + // assign 3rd + // shard of + // test1 to + // node1 + assertEquals(NO, decider.canAllocate(test3Shard2, clusterState.getRoutingNodes().node("node2"), allocation).type()); // Cannot + // assign 3rd + // shard of + // test3 to + // node2 + assertEquals(YES, decider.canAllocate(test2Shard2, clusterState.getRoutingNodes().node("node1"), allocation).type()); // Can assign + // 3rd shard + // of test2 to + // node1 + assertEquals(YES, decider.canAllocate(test2Shard2, clusterState.getRoutingNodes().node("node2"), allocation).type()); // Can assign + // 3rd shard + // of test2 to + // node2 + } + + private DiscoveryNode newNode(String nodeId) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), Version.CURRENT); + } +}