diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java index c2f95910355e..e890ff12b1c1 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java @@ -24,6 +24,7 @@ import com.google.common.primitives.Ints; import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; +import org.apache.cassandra.index.IndexIsBuildingException; import org.apache.cassandra.index.IndexNotAvailableException; import org.apache.cassandra.index.sai.utils.AbortedOperationException; import org.apache.cassandra.io.IVersionedSerializer; @@ -45,7 +46,8 @@ public enum RequestFailureReason // We should add new codes in HCD (which do not exist in Apache Cassandra) only with big numbers, to avoid conflicts UNKNOWN_COLUMN (500), UNKNOWN_TABLE (501), - REMOTE_STORAGE_FAILURE (502); + REMOTE_STORAGE_FAILURE (502), + INDEX_BUILD_IN_PROGRESS (503); public static final Serializer serializer = new Serializer(); @@ -82,6 +84,7 @@ public int codeForNativeProtocol() exceptionToReasonMap.put(IndexNotAvailableException.class, INDEX_NOT_AVAILABLE); exceptionToReasonMap.put(UnknownColumnException.class, UNKNOWN_COLUMN); exceptionToReasonMap.put(UnknownTableException.class, UNKNOWN_TABLE); + exceptionToReasonMap.put(IndexIsBuildingException.class, INDEX_BUILD_IN_PROGRESS); if (exceptionToReasonMap.size() != reasons.length-2) throw new RuntimeException("A new RequestFailureReasons was probably added and you may need to update the exceptionToReasonMap"); diff --git a/src/java/org/apache/cassandra/index/IndexIsBuildingException.java b/src/java/org/apache/cassandra/index/IndexIsBuildingException.java new file mode 100644 index 000000000000..b68e13d0aa18 --- /dev/null +++ b/src/java/org/apache/cassandra/index/IndexIsBuildingException.java @@ -0,0 +1,36 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index; + +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.UncheckedInternalRequestExecutionException; + +/** + * Thrown if a secondary index is not currently available because it is building. + */ +public final class IndexIsBuildingException extends UncheckedInternalRequestExecutionException +{ + /** + * Creates a new IndexIsBuildingException for the specified index. + * @param index the index + */ + public IndexIsBuildingException(Index index) + { + super(RequestFailureReason.INDEX_BUILD_IN_PROGRESS, + String.format("The secondary index '%s' is not yet available as it is building", index.getIndexMetadata().name)); + } +} diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index b1930e741804..71e10ce193c9 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -357,7 +357,8 @@ public synchronized Future addIndex(IndexMetadata indexDef, boolean isNewCF) /** * Throws an {@link IndexNotAvailableException} if any of the indexes in the specified {@link Index.QueryPlan} is - * not queryable, as it's defined by {@link #isIndexQueryable(Index)}. + * not queryable, as it's defined by {@link #isIndexQueryable(Index)}. If the reason for the index to be not available + * is that it's building, it will throw an {@link IndexIsBuildingException}. * * @param queryPlan a query plan * @throws IndexNotAvailableException if the query plan has any index that is not queryable @@ -366,6 +367,9 @@ public void checkQueryability(Index.QueryPlan queryPlan) { for (Index index : queryPlan.getIndexes()) { + if (isIndexBuilding(index)) + throw new IndexIsBuildingException(index); + if (!isIndexQueryable(index)) throw new IndexNotAvailableException(index); } @@ -404,6 +408,18 @@ public boolean isIndexWritable(Index index) return writableIndexes.containsKey(index.getIndexMetadata().name); } + /** + * Checks if the specified index has any running build task. + * + * @param index the index + * @return {@code true} if the index is building, {@code false} otherwise + */ + @VisibleForTesting + public synchronized boolean isIndexBuilding(Index index) + { + return isIndexBuilding(index.getIndexMetadata().name); + } + /** * Checks if the specified index has any running build task. * @@ -1816,13 +1832,18 @@ public static void shutdownAndWait(long timeout, TimeUnit units) throws Interrup */ public static > E filterForQuery(E liveEndpoints, Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel level) { + Map indexStatusMap = new HashMap<>(); + E queryableEndpoints = liveEndpoints.filter(replica -> { for (Index index : indexQueryPlan.getIndexes()) { Index.Status status = getIndexStatus(replica.endpoint(), keyspace.getName(), index.getIndexMetadata().name); if (!index.isQueryable(status)) + { + indexStatusMap.put(replica.endpoint(), status); return false; + } } return true; @@ -1840,7 +1861,13 @@ public static > E filterForQuery(E liveEndpoints, Keyspac { Map failureReasons = new HashMap<>(); liveEndpoints.without(queryableEndpoints.endpoints()) - .forEach(replica -> failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_NOT_AVAILABLE)); + .forEach(replica -> { + Index.Status status = indexStatusMap.get(replica.endpoint()); + if (status == Index.Status.FULL_REBUILD_STARTED) + failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_BUILD_IN_PROGRESS); + else + failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_NOT_AVAILABLE); + }); throw new ReadFailureException(level, filtered, required, false, failureReasons); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java index 5f5a4918b95f..8b618da9b2bb 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/sai/IndexAvailabilityTest.java @@ -50,6 +50,7 @@ import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.test.sai.SAIUtil.waitForIndexQueryable; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -169,6 +170,90 @@ public void testSkipNonQueryableNodeN1Rf1() throws Exception shouldSkipNonQueryableNode(1, Collections.singletonList(1)); } + @Test + public void testIndexExceptionsTwoIndexesOn3NodeCluster() throws Exception + { + try (Cluster cluster = init(Cluster.build(3) + .withConfig(config -> config.with(GOSSIP) + .with(NETWORK)) + .start())) + { + String ks2 = "ks2"; + String cf1 = "cf1"; + String index1 = "cf1_idx1"; + String index2 = "cf1_idx2"; + String index3 = "cf2_idx1"; + + // Create keyspace, table with correct column types + cluster.schemaChange(String.format(CREATE_KEYSPACE, ks2, 2)); + cluster.schemaChange("CREATE TABLE " + ks2 + "." + cf1 + " (pk int PRIMARY KEY, v1 int, v2 int)"); + executeOnAllCoordinators(cluster, + "SELECT pk FROM " + ks2 + "." + cf1 + " WHERE v1=0 AND v2=0 ALLOW FILTERING", + ConsistencyLevel.LOCAL_QUORUM, + 0); + executeOnAllCoordinators(cluster, + "SELECT pk FROM " + ks2 + "." + cf1 + " WHERE v2=0 ALLOW FILTERING", + ConsistencyLevel.LOCAL_QUORUM, + 0); + executeOnAllCoordinators(cluster, + "SELECT pk FROM " + ks2 + "." + cf1 + " WHERE v1=0 ALLOW FILTERING", + ConsistencyLevel.LOCAL_QUORUM, + 0); + + cluster.schemaChange(String.format(CREATE_INDEX, index1, ks2, cf1, "v1")); + cluster.schemaChange(String.format(CREATE_INDEX, index2, ks2, cf1, "v2")); + cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index1, node), Index.Status.BUILD_SUCCEEDED)); + waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(2), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(2), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(2), Index.Status.BUILD_SUCCEEDED); + waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED); + + // Mark only index2 as building on node3, leave index1 in BUILD_SUCCEEDED state + markIndexBuilding(cluster.get(3), ks2, cf1, index2); + cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index2, node), Index.Status.FULL_REBUILD_STARTED)); + waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED); + waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED); + waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED); + + assertThatThrownBy(() -> + executeOnAllCoordinators(cluster, + "SELECT pk FROM " + ks2 + '.' + cf1 + " WHERE v1=0 AND v2=0", + ConsistencyLevel.LOCAL_QUORUM, + 0)) + .hasMessageContaining("Operation failed - received 1 responses and 1 failures: INDEX_BUILD_IN_PROGRESS"); + + // Mark only index2 as failing on node2, leave index1 in BUILD_SUCCEEDED state + markIndexBuilding(cluster.get(2), ks2, cf1, index2); + cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index2, node), Index.Status.BUILD_FAILED)); + + assertThatThrownBy(() -> + executeOnAllCoordinators(cluster, + "SELECT pk FROM " + ks2 + '.' + cf1 + " WHERE v1=0 AND v2=0", + ConsistencyLevel.LOCAL_QUORUM, + 0)) + .hasMessageContaining("Operation failed - received 1 responses and 1 failures: INDEX_BUILD_IN_PROGRESS"); + + // Mark only index2 as failing on node1, leave index1 in BUILD_SUCCEEDED state + markIndexNonQueryable(cluster.get(1), ks2, cf1, index2); + cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index2, node), Index.Status.BUILD_FAILED)); + waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), Index.Status.BUILD_FAILED); + waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(1), Index.Status.BUILD_FAILED); + waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(1), Index.Status.BUILD_FAILED); + + assertThatThrownBy(() -> + executeOnAllCoordinators(cluster, + "SELECT pk FROM " + ks2 + '.' + cf1 + " WHERE v1=0 AND v2=0", + ConsistencyLevel.LOCAL_QUORUM, + 0)) + .hasMessageMatching("^Operation failed - received 0 responses and 2 failures: INDEX_BUILD_IN_PROGRESS from .+, INDEX_NOT_AVAILABLE from .+$"); + } + } + private void shouldSkipNonQueryableNode(int nodes, List... nonQueryableNodesList) throws Exception { try (Cluster cluster = init(Cluster.build(nodes) diff --git a/test/unit/org/apache/cassandra/exceptions/RequestFailureReasonTest.java b/test/unit/org/apache/cassandra/exceptions/RequestFailureReasonTest.java index 1de16b5de5f2..07afe2d06adb 100644 --- a/test/unit/org/apache/cassandra/exceptions/RequestFailureReasonTest.java +++ b/test/unit/org/apache/cassandra/exceptions/RequestFailureReasonTest.java @@ -33,7 +33,8 @@ public class RequestFailureReasonTest { 6, "INDEX_NOT_AVAILABLE" }, { 500, "UNKNOWN_COLUMN" }, { 501, "UNKNOWN_TABLE" }, - { 502, "REMOTE_STORAGE_FAILURE" } + { 502, "REMOTE_STORAGE_FAILURE" }, + { 503, "INDEX_BUILD_IN_PROGRESS" } }; @Test public void testEnumCodesAndNames() @@ -61,6 +62,7 @@ public void testFromCode() assertEquals(RequestFailureReason.UNKNOWN_COLUMN, RequestFailureReason.fromCode(500)); assertEquals(RequestFailureReason.UNKNOWN_TABLE, RequestFailureReason.fromCode(501)); assertEquals(RequestFailureReason.REMOTE_STORAGE_FAILURE, RequestFailureReason.fromCode(502)); + assertEquals(RequestFailureReason.INDEX_BUILD_IN_PROGRESS, RequestFailureReason.fromCode(503)); // Test invalid codes assertEquals(RequestFailureReason.UNKNOWN, RequestFailureReason.fromCode(200)); diff --git a/test/unit/org/apache/cassandra/index/sai/cql/AllowFilteringTest.java b/test/unit/org/apache/cassandra/index/sai/cql/AllowFilteringTest.java index 181f74de95f4..61bb4a7a746c 100644 --- a/test/unit/org/apache/cassandra/index/sai/cql/AllowFilteringTest.java +++ b/test/unit/org/apache/cassandra/index/sai/cql/AllowFilteringTest.java @@ -17,12 +17,17 @@ */ package org.apache.cassandra.index.sai.cql; +import org.apache.cassandra.index.IndexIsBuildingException; +import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.inject.Injections; +import org.apache.cassandra.inject.InvokePointBuilder; import org.junit.Test; import org.apache.cassandra.cql3.restrictions.StatementRestrictions; import org.apache.cassandra.index.sai.SAITester; import org.apache.cassandra.index.sai.StorageAttachedIndex; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertNotNull; /** @@ -514,4 +519,31 @@ public void testMapRangeQueries() // Show that we're now able to execute the query. assertRows(execute("SELECT partition FROM %s WHERE item_cost['apple'] < 3 AND item_cost['apple'] > 1"), row(0)); } + + private Injections.Barrier blockIndexBuild = Injections.newBarrier("block_index_build", 2, false) + .add(InvokePointBuilder.newInvokePoint().onClass(StorageAttachedIndex.class) + .onMethod("startInitialBuild")) + .build(); + + @Test + public void testAllowFilteringDuringIndexBuild() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)"); + Injections.inject(blockIndexBuild); + String idx = createIndexAsync(String.format("CREATE CUSTOM INDEX ON %%s(v) USING '%s'", StorageAttachedIndex.class.getName())); + + assertThatThrownBy(() -> execute("SELECT * FROM %s WHERE v=0")) + .hasMessage("The secondary index '" + idx + "' is not yet available as it is building") + .isInstanceOf(IndexIsBuildingException.class); + + assertThatThrownBy(() -> execute("SELECT * FROM %s WHERE v=0 ALLOW FILTERING")) + .hasMessage("The secondary index '" + idx + "' is not yet available as it is building") + .isInstanceOf(IndexIsBuildingException.class); + + blockIndexBuild.countDown(); + blockIndexBuild.disable(); + waitForIndexQueryable(idx); + execute("SELECT * FROM %s WHERE v=0"); + execute("SELECT * FROM %s WHERE v=0 ALLOW FILTERING"); + } }