Skip to content

Commit

Permalink
CNDB-12620: Add new reason RequestFailureReason.INDEX_BUILD_IN_PROGRESS
Browse files Browse the repository at this point in the history
  • Loading branch information
ekaterinadimitrova2 committed Jan 28, 2025
1 parent 52a28ac commit b7abb57
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.primitives.Ints;

import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.index.IndexIsBuildingException;
import org.apache.cassandra.index.IndexNotAvailableException;
import org.apache.cassandra.index.sai.utils.AbortedOperationException;
import org.apache.cassandra.io.IVersionedSerializer;
Expand All @@ -45,7 +46,8 @@ public enum RequestFailureReason
// We should add new codes in HCD (which do not exist in Apache Cassandra) only with big numbers, to avoid conflicts
UNKNOWN_COLUMN (500),
UNKNOWN_TABLE (501),
REMOTE_STORAGE_FAILURE (502);
REMOTE_STORAGE_FAILURE (502),
INDEX_BUILD_IN_PROGRESS (503);

public static final Serializer serializer = new Serializer();

Expand Down Expand Up @@ -82,6 +84,7 @@ public int codeForNativeProtocol()
exceptionToReasonMap.put(IndexNotAvailableException.class, INDEX_NOT_AVAILABLE);
exceptionToReasonMap.put(UnknownColumnException.class, UNKNOWN_COLUMN);
exceptionToReasonMap.put(UnknownTableException.class, UNKNOWN_TABLE);
exceptionToReasonMap.put(IndexIsBuildingException.class, INDEX_BUILD_IN_PROGRESS);

if (exceptionToReasonMap.size() != reasons.length-2)
throw new RuntimeException("A new RequestFailureReasons was probably added and you may need to update the exceptionToReasonMap");
Expand Down
36 changes: 36 additions & 0 deletions src/java/org/apache/cassandra/index/IndexIsBuildingException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.index;

import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.exceptions.UncheckedInternalRequestExecutionException;

/**
* Thrown if a secondary index is not currently available because it is building.
*/
public final class IndexIsBuildingException extends UncheckedInternalRequestExecutionException
{
/**
* Creates a new <code>IndexIsBuildingException</code> for the specified index.
* @param index the index
*/
public IndexIsBuildingException(Index index)
{
super(RequestFailureReason.INDEX_BUILD_IN_PROGRESS,
String.format("The secondary index '%s' is not yet available as it is building", index.getIndexMetadata().name));
}
}
31 changes: 29 additions & 2 deletions src/java/org/apache/cassandra/index/SecondaryIndexManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ public synchronized Future<?> addIndex(IndexMetadata indexDef, boolean isNewCF)

/**
* Throws an {@link IndexNotAvailableException} if any of the indexes in the specified {@link Index.QueryPlan} is
* not queryable, as it's defined by {@link #isIndexQueryable(Index)}.
* not queryable, as it's defined by {@link #isIndexQueryable(Index)}. If the reason for the index to be not available
* is that it's building, it will throw an {@link IndexIsBuildingException}.
*
* @param queryPlan a query plan
* @throws IndexNotAvailableException if the query plan has any index that is not queryable
Expand All @@ -366,6 +367,9 @@ public void checkQueryability(Index.QueryPlan queryPlan)
{
for (Index index : queryPlan.getIndexes())
{
if (isIndexBuilding(index))
throw new IndexIsBuildingException(index);

if (!isIndexQueryable(index))
throw new IndexNotAvailableException(index);
}
Expand Down Expand Up @@ -404,6 +408,18 @@ public boolean isIndexWritable(Index index)
return writableIndexes.containsKey(index.getIndexMetadata().name);
}

/**
* Checks if the specified index has any running build task.
*
* @param index the index
* @return {@code true} if the index is building, {@code false} otherwise
*/
@VisibleForTesting
public synchronized boolean isIndexBuilding(Index index)
{
return isIndexBuilding(index.getIndexMetadata().name);
}

/**
* Checks if the specified index has any running build task.
*
Expand Down Expand Up @@ -1816,13 +1832,18 @@ public static void shutdownAndWait(long timeout, TimeUnit units) throws Interrup
*/
public static <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, Keyspace keyspace, Index.QueryPlan indexQueryPlan, ConsistencyLevel level)
{
Map<InetAddressAndPort, Index.Status> indexStatusMap = new HashMap<>();

E queryableEndpoints = liveEndpoints.filter(replica -> {

for (Index index : indexQueryPlan.getIndexes())
{
Index.Status status = getIndexStatus(replica.endpoint(), keyspace.getName(), index.getIndexMetadata().name);
if (!index.isQueryable(status))
{
indexStatusMap.put(replica.endpoint(), status);
return false;
}
}

return true;
Expand All @@ -1840,7 +1861,13 @@ public static <E extends Endpoints<E>> E filterForQuery(E liveEndpoints, Keyspac
{
Map<InetAddressAndPort, RequestFailureReason> failureReasons = new HashMap<>();
liveEndpoints.without(queryableEndpoints.endpoints())
.forEach(replica -> failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_NOT_AVAILABLE));
.forEach(replica -> {
Index.Status status = indexStatusMap.get(replica.endpoint());
if (status == Index.Status.FULL_REBUILD_STARTED)
failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_BUILD_IN_PROGRESS);
else
failureReasons.put(replica.endpoint(), RequestFailureReason.INDEX_NOT_AVAILABLE);
});

throw new ReadFailureException(level, filtered, required, false, failureReasons);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.test.sai.SAIUtil.waitForIndexQueryable;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -169,6 +170,90 @@ public void testSkipNonQueryableNodeN1Rf1() throws Exception
shouldSkipNonQueryableNode(1, Collections.singletonList(1));
}

@Test
public void testIndexExceptionsTwoIndexesOn3NodeCluster() throws Exception
{
try (Cluster cluster = init(Cluster.build(3)
.withConfig(config -> config.with(GOSSIP)
.with(NETWORK))
.start()))
{
String ks2 = "ks2";
String cf1 = "cf1";
String index1 = "cf1_idx1";
String index2 = "cf1_idx2";
String index3 = "cf2_idx1";

// Create keyspace, table with correct column types
cluster.schemaChange(String.format(CREATE_KEYSPACE, ks2, 2));
cluster.schemaChange("CREATE TABLE " + ks2 + "." + cf1 + " (pk int PRIMARY KEY, v1 int, v2 int)");
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + "." + cf1 + " WHERE v1=0 AND v2=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
0);
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + "." + cf1 + " WHERE v2=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
0);
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + "." + cf1 + " WHERE v1=0 ALLOW FILTERING",
ConsistencyLevel.LOCAL_QUORUM,
0);

cluster.schemaChange(String.format(CREATE_INDEX, index1, ks2, cf1, "v1"));
cluster.schemaChange(String.format(CREATE_INDEX, index2, ks2, cf1, "v2"));
cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index1, node), Index.Status.BUILD_SUCCEEDED));
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(2), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(2), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(2), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(1), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(1), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(2), Index.Status.BUILD_SUCCEEDED);
waitForIndexingStatus(cluster.get(3), ks2, index1, cluster.get(3), Index.Status.BUILD_SUCCEEDED);

// Mark only index2 as building on node3, leave index1 in BUILD_SUCCEEDED state
markIndexBuilding(cluster.get(3), ks2, cf1, index2);
cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index2, node), Index.Status.FULL_REBUILD_STARTED));
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(3), Index.Status.FULL_REBUILD_STARTED);

assertThatThrownBy(() ->
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + cf1 + " WHERE v1=0 AND v2=0",
ConsistencyLevel.LOCAL_QUORUM,
0))
.hasMessageContaining("Operation failed - received 1 responses and 1 failures: INDEX_BUILD_IN_PROGRESS");

// Mark only index2 as failing on node2, leave index1 in BUILD_SUCCEEDED state
markIndexBuilding(cluster.get(2), ks2, cf1, index2);
cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index2, node), Index.Status.BUILD_FAILED));

assertThatThrownBy(() ->
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + cf1 + " WHERE v1=0 AND v2=0",
ConsistencyLevel.LOCAL_QUORUM,
0))
.hasMessageContaining("Operation failed - received 1 responses and 1 failures: INDEX_BUILD_IN_PROGRESS");

// Mark only index2 as failing on node1, leave index1 in BUILD_SUCCEEDED state
markIndexNonQueryable(cluster.get(1), ks2, cf1, index2);
cluster.forEach(node -> expectedNodeIndexQueryability.put(NodeIndex.create(ks2, index2, node), Index.Status.BUILD_FAILED));
waitForIndexingStatus(cluster.get(2), ks2, index2, cluster.get(1), Index.Status.BUILD_FAILED);
waitForIndexingStatus(cluster.get(1), ks2, index2, cluster.get(1), Index.Status.BUILD_FAILED);
waitForIndexingStatus(cluster.get(3), ks2, index2, cluster.get(1), Index.Status.BUILD_FAILED);

assertThatThrownBy(() ->
executeOnAllCoordinators(cluster,
"SELECT pk FROM " + ks2 + '.' + cf1 + " WHERE v1=0 AND v2=0",
ConsistencyLevel.LOCAL_QUORUM,
0))
.hasMessageMatching("^Operation failed - received 0 responses and 2 failures: INDEX_BUILD_IN_PROGRESS from .+, INDEX_NOT_AVAILABLE from .+$");
}
}

private void shouldSkipNonQueryableNode(int nodes, List<Integer>... nonQueryableNodesList) throws Exception
{
try (Cluster cluster = init(Cluster.build(nodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class RequestFailureReasonTest
{ 6, "INDEX_NOT_AVAILABLE" },
{ 500, "UNKNOWN_COLUMN" },
{ 501, "UNKNOWN_TABLE" },
{ 502, "REMOTE_STORAGE_FAILURE" }
{ 502, "REMOTE_STORAGE_FAILURE" },
{ 503, "INDEX_BUILD_IN_PROGRESS" }
};
@Test
public void testEnumCodesAndNames()
Expand Down Expand Up @@ -61,6 +62,7 @@ public void testFromCode()
assertEquals(RequestFailureReason.UNKNOWN_COLUMN, RequestFailureReason.fromCode(500));
assertEquals(RequestFailureReason.UNKNOWN_TABLE, RequestFailureReason.fromCode(501));
assertEquals(RequestFailureReason.REMOTE_STORAGE_FAILURE, RequestFailureReason.fromCode(502));
assertEquals(RequestFailureReason.INDEX_BUILD_IN_PROGRESS, RequestFailureReason.fromCode(503));

// Test invalid codes
assertEquals(RequestFailureReason.UNKNOWN, RequestFailureReason.fromCode(200));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
*/
package org.apache.cassandra.index.sai.cql;

import org.apache.cassandra.index.IndexIsBuildingException;
import org.apache.cassandra.index.IndexNotAvailableException;
import org.apache.cassandra.inject.Injections;
import org.apache.cassandra.inject.InvokePointBuilder;
import org.junit.Test;

import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.index.sai.SAITester;
import org.apache.cassandra.index.sai.StorageAttachedIndex;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertNotNull;

/**
Expand Down Expand Up @@ -514,4 +519,31 @@ public void testMapRangeQueries()
// Show that we're now able to execute the query.
assertRows(execute("SELECT partition FROM %s WHERE item_cost['apple'] < 3 AND item_cost['apple'] > 1"), row(0));
}

private Injections.Barrier blockIndexBuild = Injections.newBarrier("block_index_build", 2, false)
.add(InvokePointBuilder.newInvokePoint().onClass(StorageAttachedIndex.class)
.onMethod("startInitialBuild"))
.build();

@Test
public void testAllowFilteringDuringIndexBuild() throws Throwable
{
createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)");
Injections.inject(blockIndexBuild);
String idx = createIndexAsync(String.format("CREATE CUSTOM INDEX ON %%s(v) USING '%s'", StorageAttachedIndex.class.getName()));

assertThatThrownBy(() -> execute("SELECT * FROM %s WHERE v=0"))
.hasMessage("The secondary index '" + idx + "' is not yet available as it is building")
.isInstanceOf(IndexIsBuildingException.class);

assertThatThrownBy(() -> execute("SELECT * FROM %s WHERE v=0 ALLOW FILTERING"))
.hasMessage("The secondary index '" + idx + "' is not yet available as it is building")
.isInstanceOf(IndexIsBuildingException.class);

blockIndexBuild.countDown();
blockIndexBuild.disable();
waitForIndexQueryable(idx);
execute("SELECT * FROM %s WHERE v=0");
execute("SELECT * FROM %s WHERE v=0 ALLOW FILTERING");
}
}

0 comments on commit b7abb57

Please sign in to comment.