diff --git a/CHANGELOG.md b/CHANGELOG.md index f387cd22a26a5..126f4e66b8d96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) - Update script supports java.lang.String.sha1() and java.lang.String.sha256() methods ([#16923](https://github.com/opensearch-project/OpenSearch/pull/16923)) - Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)). +- Add support for append only indices([#17039](https://github.com/opensearch-project/OpenSearch/pull/17039)) - Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)). - Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678)) - Introduce framework for auxiliary transports and an experimental gRPC transport plugin ([#16534](https://github.com/opensearch-project/OpenSearch/pull/16534)) diff --git a/libs/core/src/main/java/org/opensearch/core/index/AppendOnlyIndexOperationRetryException.java b/libs/core/src/main/java/org/opensearch/core/index/AppendOnlyIndexOperationRetryException.java new file mode 100644 index 0000000000000..231e1da7ff487 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/index/AppendOnlyIndexOperationRetryException.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.core.index; + +import org.opensearch.OpenSearchException; + +/** + * This exception indicates that retry has been made during indexing for AppendOnly index. If the response of any + * indexing request contains this Exception in the response, we do not need to add a translog entry for this request. + * + * @opensearch.internal + */ +public class AppendOnlyIndexOperationRetryException extends OpenSearchException { + public AppendOnlyIndexOperationRetryException(String message) { + super(message); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/bulk/AppendOnlyIndicesIT.java b/server/src/internalClusterTest/java/org/opensearch/action/bulk/AppendOnlyIndicesIT.java new file mode 100644 index 0000000000000..55c18b94f5486 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/bulk/AppendOnlyIndicesIT.java @@ -0,0 +1,219 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.bulk; + +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.ingest.IngestTestPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.ConnectTransportException; +import org.opensearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.containsString; + +public class AppendOnlyIndicesIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(IngestTestPlugin.class, MockTransportService.TestPlugin.class); + } + + public void testIndexDocumentWithACustomDocIdForAppendOnlyIndices() throws Exception { + Client client = internalCluster().coordOnlyNodeClient(); + assertAcked( + client().admin() + .indices() + .prepareCreate("index") + .setSettings( + Settings.builder() + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true) + ) + ); + ensureGreen("index"); + + BulkRequestBuilder bulkBuilder = client.prepareBulk(); + + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client.prepareIndex("index").setId(Integer.toString(0)).setSource(doc)); + + BulkResponse response = bulkBuilder.get(); + assertThat( + response.getItems()[0].getFailureMessage(), + containsString( + "Operation [INDEX] is not allowed with a custom document id 0 as setting `" + + IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey() + + "` is enabled for this index: index;" + ) + ); + } + + public void testUpdateDeleteDocumentForAppendOnlyIndices() throws Exception { + Client client = internalCluster().coordOnlyNodeClient(); + assertAcked( + client().admin() + .indices() + .prepareCreate("index") + .setSettings( + Settings.builder() + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true) + ) + ); + ensureGreen("index"); + + BulkRequestBuilder bulkBuilder = client.prepareBulk(); + + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client.prepareIndex("index").setSource(doc)); + + bulkBuilder.get(); + BulkResponse response = client().prepareBulk().add(client().prepareUpdate("index", "0").setDoc("foo", "updated")).get(); + assertThat( + response.getItems()[0].getFailureMessage(), + containsString( + "Operation [UPDATE] is not allowed as setting `" + + IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey() + + "` is enabled for this index" + ) + ); + + response = client().prepareBulk().add(client().prepareDelete("index", "0")).get(); + assertThat( + response.getItems()[0].getFailureMessage(), + containsString( + "Operation [DELETE] is not allowed as setting `" + + IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey() + + "` is enabled for this index" + ) + ); + } + + public void testRetryForAppendOnlyIndices() throws Exception { + final AtomicBoolean exceptionThrown = new AtomicBoolean(false); + int numDocs = scaledRandomIntBetween(100, 1000); + Client client = internalCluster().coordOnlyNodeClient(); + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); + NodeStats unluckyNode = randomFrom( + nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode()).collect(Collectors.toList()) + ); + assertAcked( + client().admin() + .indices() + .prepareCreate("index") + .setSettings( + Settings.builder() + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true) + ) + ); + ensureGreen("index"); + logger.info("unlucky node: {}", unluckyNode.getNode()); + // create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry. + for (NodeStats dataNode : nodeStats.getNodes()) { + if (exceptionThrown.get()) { + break; + } + + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + dataNode.getNode().getName() + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), + (connection, requestId, action, request, options) -> { + connection.sendRequest(requestId, action, request, options); + if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) { + logger.debug("Throw ConnectTransportException"); + throw new ConnectTransportException(connection.getNode(), action); + } + } + ); + } + + BulkRequestBuilder bulkBuilder = client.prepareBulk(); + + for (int i = 0; i < numDocs; i++) { + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client.prepareIndex("index").setSource(doc)); + } + + BulkResponse response = bulkBuilder.get(); + for (BulkItemResponse singleIndexResponse : response.getItems()) { + // Retry will not create a new version. + assertThat(singleIndexResponse.getVersion(), equalTo(1L)); + } + } + + public void testNodeReboot() throws Exception { + int numDocs = scaledRandomIntBetween(100, 1000); + Client client = internalCluster().coordOnlyNodeClient(); + assertAcked( + client().admin() + .indices() + .prepareCreate("index") + .setSettings( + Settings.builder() + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey(), true) + ) + ); + + ensureGreen("index"); + + BulkRequestBuilder bulkBuilder = client.prepareBulk(); + + for (int i = 0; i < numDocs; i++) { + XContentBuilder doc = null; + doc = jsonBuilder().startObject().field("foo", "bar").endObject(); + bulkBuilder.add(client.prepareIndex("index").setSource(doc)); + } + + BulkResponse response = bulkBuilder.get(); + assertFalse(response.hasFailures()); + internalCluster().restartRandomDataNode(); + ensureGreen("index"); + refresh(); + SearchResponse searchResponse = client().prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .setIndices("index") + .setSize(numDocs) + .get(); + + assertBusy(() -> { assertHitCount(searchResponse, numDocs); }, 20L, TimeUnit.SECONDS); + + } +} diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index 4e770f5851bc6..08373481d5711 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -38,6 +38,7 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.TransportWriteAction; +import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.Translog; @@ -297,20 +298,36 @@ public void markOperationAsExecuted(Engine.Result result) { locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation()); break; case FAILURE: - executionResult = new BulkItemResponse( - current.id(), - docWriteRequest.opType(), - // Make sure to use request.index() here, if you - // use docWriteRequest.index() it will use the - // concrete index instead of an alias if used! - new BulkItemResponse.Failure( - request.index(), - docWriteRequest.id(), - result.getFailure(), + if (result.getFailure() instanceof AppendOnlyIndexOperationRetryException) { + Engine.IndexResult indexResult = (Engine.IndexResult) result; + DocWriteResponse indexResponse = new IndexResponse( + primary.shardId(), + requestToExecute.id(), result.getSeqNo(), - result.getTerm() - ) - ); + result.getTerm(), + indexResult.getVersion(), + indexResult.isCreated() + ); + + executionResult = new BulkItemResponse(current.id(), current.request().opType(), indexResponse); + // set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though. + executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo()); + } else { + executionResult = new BulkItemResponse( + current.id(), + docWriteRequest.opType(), + // Make sure to use request.index() here, if you + // use docWriteRequest.index() it will use the + // concrete index instead of an alias if used! + new BulkItemResponse.Failure( + request.index(), + docWriteRequest.id(), + result.getFailure(), + result.getSeqNo(), + result.getTerm() + ) + ); + } break; default: throw new AssertionError("unknown result type for " + getCurrentItem() + ": " + result.getResultType()); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 493c80757d668..b409c55181418 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -67,6 +67,7 @@ import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.ValidationException; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; @@ -545,12 +546,17 @@ protected void doRun() { if (docWriteRequest == null) { continue; } + if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, i, metadata)) { continue; } if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) { continue; } + if (addFailureIfAppendOnlyIndexAndOpsDeleteOrUpdate(docWriteRequest, i, concreteIndices, metadata)) { + continue; + } + Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest); try { // The ConcreteIndices#resolveIfAbsent(...) method validates via IndexNameExpressionResolver whether @@ -754,6 +760,47 @@ public void onTimeout(TimeValue timeout) { }); } + private boolean addFailureIfAppendOnlyIndexAndOpsDeleteOrUpdate( + DocWriteRequest request, + int idx, + final ConcreteIndices concreteIndices, + Metadata metadata + ) { + Index concreteIndex = concreteIndices.resolveIfAbsent(request); + final IndexMetadata indexMetadata = metadata.index(concreteIndex); + if (indexMetadata.isAppendOnlyIndex()) { + if ((request.opType() == DocWriteRequest.OpType.UPDATE || request.opType() == DocWriteRequest.OpType.DELETE)) { + ValidationException exception = new ValidationException(); + exception.addValidationError( + "Operation [" + + request.opType() + + "] is not allowed as setting `" + + IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey() + + "` is enabled for this index: " + + request.index() + ); + addFailure(request, idx, exception); + return true; + } else if (request.id() != null && request.opType() == DocWriteRequest.OpType.INDEX) { + ValidationException exception = new ValidationException(); + exception.addValidationError( + "Operation [" + + request.opType() + + "] is not allowed with a custom document id " + + request.id() + + " as setting `" + + IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING.getKey() + + "` is enabled for this index: " + + request.index() + ); + addFailure(request, idx, exception); + return true; + } + } + + return false; + } + private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest request, int idx, final Metadata metadata) { if (request.isRequireAlias() && (metadata.hasAlias(request.index()) == false)) { Exception exception = new IndexNotFoundException( diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index c2867c4d82e3e..5dc34f5178e86 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -350,6 +350,7 @@ public Iterator> settings() { ); public static final String SETTING_REMOTE_STORE_ENABLED = "index.remote_store.enabled"; + public static final String SETTING_INDEX_APPEND_ONLY_ENABLED = "index.append_only.enabled"; public static final String SETTING_REMOTE_SEGMENT_STORE_REPOSITORY = "index.remote_store.segment.repository"; @@ -392,6 +393,16 @@ public Iterator> settings() { Property.Dynamic ); + /** + * Used to specify if the index data should be persisted in the remote store. + */ + public static final Setting INDEX_APPEND_ONLY_ENABLED_SETTING = Setting.boolSetting( + SETTING_INDEX_APPEND_ONLY_ENABLED, + false, + Property.IndexScope, + Property.Final + ); + /** * Used to specify remote store repository to use for this index. */ @@ -723,6 +734,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final boolean isRemoteSnapshot; private final int indexTotalShardsPerNodeLimit; + private final boolean isAppendOnlyIndex; private final Context context; @@ -754,6 +766,7 @@ private IndexMetadata( final Map rolloverInfos, final boolean isSystem, final int indexTotalShardsPerNodeLimit, + boolean isAppendOnlyIndex, final Context context ) { @@ -791,6 +804,7 @@ private IndexMetadata( this.isSystem = isSystem; this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit; + this.isAppendOnlyIndex = isAppendOnlyIndex; this.context = context; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -953,6 +967,10 @@ public int getIndexTotalShardsPerNodeLimit() { return this.indexTotalShardsPerNodeLimit; } + public boolean isAppendOnlyIndex() { + return this.isAppendOnlyIndex; + } + @Nullable public DiscoveryNodeFilters requireFilters() { return requireFilters; @@ -1766,6 +1784,7 @@ public IndexMetadata build() { } final int indexTotalShardsPerNodeLimit = ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(settings); + final boolean isAppendOnlyIndex = INDEX_APPEND_ONLY_ENABLED_SETTING.get(settings); final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); @@ -1797,6 +1816,7 @@ public IndexMetadata build() { rolloverInfos, isSystem, indexTotalShardsPerNodeLimit, + isAppendOnlyIndex, context ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index bddbe963e8013..45ada33ba9ca9 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -109,6 +109,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_FORMAT_SETTING, IndexMetadata.INDEX_HIDDEN_SETTING, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING, + IndexMetadata.INDEX_APPEND_ONLY_ENABLED_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING, SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING, diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index fea66037772e6..e93f8e6cdb1c7 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -89,6 +89,7 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.Assertions; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; @@ -1023,19 +1024,21 @@ public IndexResult index(Index index) throws IOException { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { location = translogManager.add(new Translog.Index(index, indexResult)); - } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no - final NoOp noOp = new NoOp( - indexResult.getSeqNo(), - index.primaryTerm(), - index.origin(), - index.startTime(), - indexResult.getFailure().toString() - ); - location = innerNoOp(noOp).getTranslogLocation(); - } else { - location = null; - } + } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO + && indexResult.getFailure() != null + && !(indexResult.getFailure() instanceof AppendOnlyIndexOperationRetryException)) { + // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no + final NoOp noOp = new NoOp( + indexResult.getSeqNo(), + index.primaryTerm(), + index.origin(), + index.startTime(), + indexResult.getFailure().toString() + ); + location = innerNoOp(noOp).getTranslogLocation(); + } else { + location = null; + } indexResult.setTranslogLocation(location); } if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) { @@ -1046,7 +1049,9 @@ public IndexResult index(Index index) throws IOException { ); } localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo()); - if (indexResult.getTranslogLocation() == null) { + if (indexResult.getTranslogLocation() == null + && !(indexResult.getFailure() != null + && (indexResult.getFailure() instanceof AppendOnlyIndexOperationRetryException))) { // the op is coming from the translog (and is hence persisted already) or it does not have a sequence number assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO; localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo()); @@ -1140,7 +1145,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { } else { versionMap.enforceSafeAccess(); // resolves incoming version - final VersionValue versionValue = resolveDocVersion(index, index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO); + final VersionValue versionValue = resolveDocVersion(index, true); final long currentVersion; final boolean currentNotFoundOrDeleted; if (versionValue == null) { @@ -1183,6 +1188,15 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); if (reserveError != null) { plan = IndexingStrategy.failAsTooManyDocs(reserveError); + } else if (currentVersion >= 1 && engineConfig.getIndexSettings().getIndexMetadata().isAppendOnlyIndex()) { + // Retry happens for indexing requests for append only indices, since we are rejecting update requests + // at Transport layer itself. So for any retry, we are reconstructing response from already indexed + // document version for append only index. + AppendOnlyIndexOperationRetryException retryException = new AppendOnlyIndexOperationRetryException( + "Indexing operation retried for append only indices" + ); + final IndexResult result = new IndexResult(retryException, currentVersion, versionValue.term, versionValue.seqNo); + plan = IndexingStrategy.failAsIndexAppendOnly(result, currentVersion, 0); } else { plan = IndexingStrategy.processNormally( currentNotFoundOrDeleted, @@ -1374,6 +1388,10 @@ static IndexingStrategy failAsTooManyDocs(Exception e) { final IndexResult result = new IndexResult(e, Versions.NOT_FOUND); return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result); } + + static IndexingStrategy failAsIndexAppendOnly(IndexResult result, long versionForIndexing, int reservedDocs) { + return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, result); + } } /** diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java index de096aee45bf9..9745203e91586 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkPrimaryExecutionContextTests.java @@ -40,6 +40,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateRequest; +import org.opensearch.core.index.AppendOnlyIndexOperationRetryException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; @@ -79,6 +80,35 @@ public void testAbortedSkipped() { assertThat(visitedRequests, equalTo(nonAbortedRequests)); } + public void testAppendOnlyIndexOperationRetryException() { + BulkShardRequest shardRequest = generateRandomRequest(); + + final IndexShard primary = mock(IndexShard.class); + when(primary.shardId()).thenReturn(shardRequest.shardId()); + ArrayList> nonAbortedRequests = new ArrayList<>(); + for (BulkItemRequest request : shardRequest.items()) { + if (randomBoolean()) { + request.abort("index", new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices")); + } else { + nonAbortedRequests.add(request.request()); + } + } + + ArrayList> visitedRequests = new ArrayList<>(); + for (BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(shardRequest, primary); context + .hasMoreOperationsToExecute();) { + visitedRequests.add(context.getCurrent()); + context.setRequestToExecute(context.getCurrent()); + // using failures prevents caring about types + context.markOperationAsExecuted( + new Engine.IndexResult(new AppendOnlyIndexOperationRetryException("Indexing operation retried for append only indices"), 1) + ); + context.markAsCompleted(context.getExecutionResult()); + } + + assertThat(visitedRequests, equalTo(nonAbortedRequests)); + } + private BulkShardRequest generateRandomRequest() { BulkItemRequest[] items = new BulkItemRequest[randomInt(20)]; for (int i = 0; i < items.length; i++) {