Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.connection;

import com.mongodb.internal.VisibleForTesting;
import org.bson.BsonBinaryWriter;
import org.bson.BsonElement;
import org.bson.FieldNameValidator;
Expand Down Expand Up @@ -60,8 +61,8 @@ String getFirstSequenceId() {
String getSecondSequenceId() {
return secondSequenceId;
}

protected abstract EncodeDocumentsResult encodeDocuments(WritersProviderAndLimitsChecker writersProviderAndLimitsChecker);
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PROTECTED)
public abstract EncodeDocumentsResult encodeDocuments(WritersProviderAndLimitsChecker writersProviderAndLimitsChecker);

/**
* @see #tryWrite(WriteAction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,33 +772,24 @@ ClientBulkWriteResult build(@Nullable final MongoException topLevelError, final
deletedCount += response.getNDeleted();
Map<Integer, BsonValue> insertModelDocumentIds = batchResult.getInsertModelDocumentIds();
for (BsonDocument individualOperationResponse : response.getCursorExhaust()) {
int okFieldValue = individualOperationResponse.getNumber("ok").intValue();
if (okFieldValue == 1 && !verboseResultsSetting) {
//TODO-JAVA-6002 Previously, assertTrue(verboseResultsSetting) was used when okStatus == 1 because the server
// was not supposed to return successful operation results in the cursor when verboseResultsSetting is false.
// Due to server bug SERVER-113344, these unexpected results must be ignored until we stop supporting server
// versions affected by this bug. When that happens, restore assertTrue(verboseResultsSetting).
continue;
}
int individualOperationIndexInBatch = individualOperationResponse.getInt32("idx").getValue();
int writeModelIndex = batchStartModelIndex + individualOperationIndexInBatch;
if (individualOperationResponse.getNumber("ok").intValue() == 1) {
assertTrue(verboseResultsSetting);
AbstractClientNamespacedWriteModel writeModel = getNamespacedModel(models, writeModelIndex);
if (writeModel instanceof ConcreteClientNamespacedInsertOneModel) {
insertResults.put(
writeModelIndex,
new ConcreteClientInsertOneResult(insertModelDocumentIds.get(individualOperationIndexInBatch)));
} else if (writeModel instanceof ConcreteClientNamespacedUpdateOneModel
|| writeModel instanceof ConcreteClientNamespacedUpdateManyModel
|| writeModel instanceof ConcreteClientNamespacedReplaceOneModel) {
BsonDocument upsertedIdDocument = individualOperationResponse.getDocument("upserted", null);
updateResults.put(
writeModelIndex,
new ConcreteClientUpdateResult(
individualOperationResponse.getInt32("n").getValue(),
individualOperationResponse.getInt32("nModified", new BsonInt32(0)).getValue(),
upsertedIdDocument == null ? null : upsertedIdDocument.get("_id")));
} else if (writeModel instanceof ConcreteClientNamespacedDeleteOneModel
|| writeModel instanceof ConcreteClientNamespacedDeleteManyModel) {
deleteResults.put(
writeModelIndex,
new ConcreteClientDeleteResult(individualOperationResponse.getInt32("n").getValue()));
} else {
fail(writeModel.getClass().toString());
}
if (okFieldValue == 1) {
collectSuccessfulIndividualOperationResult(
individualOperationResponse,
writeModelIndex,
individualOperationIndexInBatch, insertResults,
insertModelDocumentIds,
updateResults,
deleteResults);
} else {
batchResultsHaveInfoAboutSuccessfulIndividualOperations = batchResultsHaveInfoAboutSuccessfulIndividualOperations
|| (orderedSetting && individualOperationIndexInBatch > 0);
Expand Down Expand Up @@ -838,6 +829,42 @@ ClientBulkWriteResult build(@Nullable final MongoException topLevelError, final
}
}

private void collectSuccessfulIndividualOperationResult(final BsonDocument individualOperationResponse,
final int writeModelIndex,
final int individualOperationIndexInBatch,
final Map<Integer, ClientInsertOneResult> insertResults,
final Map<Integer, BsonValue> insertModelDocumentIds,
final Map<Integer, ClientUpdateResult> updateResults,
final Map<Integer, ClientDeleteResult> deleteResults) {
AbstractClientNamespacedWriteModel writeModel = getNamespacedModel(models, writeModelIndex);
if (writeModel instanceof ConcreteClientNamespacedInsertOneModel) {
insertResults.put(
writeModelIndex,
new ConcreteClientInsertOneResult(insertModelDocumentIds.get(individualOperationIndexInBatch)));
} else if (writeModel instanceof ConcreteClientNamespacedUpdateOneModel
|| writeModel instanceof ConcreteClientNamespacedUpdateManyModel
|| writeModel instanceof ConcreteClientNamespacedReplaceOneModel) {
BsonDocument upsertedIdDocument = individualOperationResponse.getDocument("upserted", null);
updateResults.put(
writeModelIndex,
new ConcreteClientUpdateResult(
individualOperationResponse.getInt32("n").getValue(),
//TODO-JAVA-6005 Previously, we did not provide a default value of 0 because the
// server was supposed to return nModified as 0 when no documents were changed.
// Due to server bug SERVER-113026, we must provide a default of 0 until we stop supporting
// server versions affected by this bug. When that happens, remove the default value for nModified.
individualOperationResponse.getInt32("nModified", new BsonInt32(0)).getValue(),
upsertedIdDocument == null ? null : upsertedIdDocument.get("_id")));
} else if (writeModel instanceof ConcreteClientNamespacedDeleteOneModel
|| writeModel instanceof ConcreteClientNamespacedDeleteManyModel) {
deleteResults.put(
writeModelIndex,
new ConcreteClientDeleteResult(individualOperationResponse.getInt32("n").getValue()));
} else {
fail(writeModel.getClass().toString());
}
}

void onNewServerAddress(final ServerAddress serverAddress) {
this.serverAddress = serverAddress;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.operation;

import com.mongodb.ClusterFixture;
import com.mongodb.MongoNamespace;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedReplaceOneModel;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.connection.ClusterId;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerConnectionState;
import com.mongodb.connection.ServerDescription;
import com.mongodb.connection.ServerId;
import com.mongodb.connection.ServerType;
import com.mongodb.internal.binding.ConnectionSource;
import com.mongodb.internal.binding.ReadWriteBinding;
import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.connection.DualMessageSequences;
import com.mongodb.internal.connection.OperationContext;
import org.bson.BsonBinaryWriter;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.DecoderContext;
import org.bson.io.BasicOutputBuffer;
import org.bson.json.JsonReader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.List;

import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
import static com.mongodb.client.model.bulk.ClientReplaceOneOptions.clientReplaceOneOptions;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ClientBulkWriteOperationTest {
private static final MongoNamespace NAMESPACE = new MongoNamespace("testDb.testCol");
@Mock(answer = Answers.RETURNS_SMART_NULLS)
private Connection connection;
@Mock(answer = Answers.RETURNS_SMART_NULLS)
private ConnectionSource connectionSource;
@Mock(answer = Answers.RETURNS_SMART_NULLS)
private ReadWriteBinding binding;

@BeforeEach
void setUp() {
when(connection.getDescription()).thenReturn(new ConnectionDescription(new ServerId(new ClusterId("test"), new ServerAddress())));
when(connectionSource.getConnection(any(OperationContext.class))).thenReturn(connection);
when(connectionSource.getServerDescription()).thenReturn(ServerDescription.builder().address(new ServerAddress())
.state(ServerConnectionState.CONNECTED)
.type(ServerType.STANDALONE)
.build());
when(binding.getWriteConnectionSource(any(OperationContext.class))).thenReturn(connectionSource);
}


/**
* This test exists due to SERVER-113344 bug.
*/
//TODO-JAVA-6002
@Test
void shouldIgnoreSuccessfulCursorResultWhenVerboseResultIsFalse() {
//given
mockCommandExecutionResult(
"{'cursor': {"
+ " 'id': NumberLong(0),"
+ " 'firstBatch': [ { 'ok': 1, 'idx': 0, 'n': 1, 'upserted': { '_id': 1 } } ],"
+ " 'ns': 'admin.$cmd.bulkWrite'"
+ "},"
+ " 'nErrors': 0,"
+ " 'nInserted': 0,"
+ " 'nMatched': 0,"
+ " 'nModified': 0,"
+ " 'nUpserted': 1,"
+ " 'nDeleted': 0,"
+ " 'ok': 1"
+ "}"
);
ClientBulkWriteOptions options = ClientBulkWriteOptions.clientBulkWriteOptions()
.ordered(false).verboseResults(false);
List<ClientNamespacedReplaceOneModel> clientNamespacedReplaceOneModels = singletonList(ClientNamespacedWriteModel.replaceOne(
NAMESPACE,
Filters.empty(),
new Document(),
clientReplaceOneOptions().upsert(true)
));
ClientBulkWriteOperation op = new ClientBulkWriteOperation(
clientNamespacedReplaceOneModels,
options,
WriteConcern.ACKNOWLEDGED,
false,
getDefaultCodecRegistry());
//when
ClientBulkWriteResult result = op.execute(binding, ClusterFixture.OPERATION_CONTEXT);

//then
assertEquals(0, result.getInsertedCount());
assertEquals(1, result.getUpsertedCount());
assertEquals(0, result.getMatchedCount());
assertEquals(0, result.getModifiedCount());
assertEquals(0, result.getDeletedCount());
assertFalse(result.getVerboseResults().isPresent());
}

/**
* This test exists due to SERVER-113026 bug.
*/
//TODO-JAVA-6005
@Test
void shouldUseDefaultNumberOfModifiedDocumentsWhenMissingInCursor() {
//given
mockCommandExecutionResult("{"
+ " cursor: {"
+ " id: NumberLong(0),"
+ " firstBatch: [ {"
+ " 'ok': 1.0,"
+ " 'idx': 0,"
+ " 'n': 1,"
//nMofified field is missing here
+ " 'upserted': {"
+ " '_id': 1"
+ " }"
+ " }],"
+ " ns: 'admin.$cmd.bulkWrite'"
+ " },"
+ " nErrors: 0,"
+ " nInserted: 1,"
+ " nMatched: 0,"
+ " nModified: 0,"
+ " nUpserted: 1,"
+ " nDeleted: 0,"
+ " ok: 1"
+ "}");
ClientBulkWriteOptions options = ClientBulkWriteOptions.clientBulkWriteOptions()
.ordered(false).verboseResults(true);
List<ClientNamespacedReplaceOneModel> clientNamespacedReplaceOneModels = singletonList(ClientNamespacedWriteModel.replaceOne(
NAMESPACE,
Filters.empty(),
new Document(),
clientReplaceOneOptions().upsert(true)
));
ClientBulkWriteOperation op = new ClientBulkWriteOperation(
clientNamespacedReplaceOneModels,
options,
WriteConcern.ACKNOWLEDGED,
false,
getDefaultCodecRegistry());
//when
ClientBulkWriteResult result = op.execute(binding, ClusterFixture.OPERATION_CONTEXT);

//then
assertEquals(1, result.getInsertedCount());
assertEquals(1, result.getUpsertedCount());
assertEquals(0, result.getMatchedCount());
assertEquals(0, result.getModifiedCount());
assertEquals(0, result.getDeletedCount());
assertTrue(result.getVerboseResults().isPresent());
}

private void mockCommandExecutionResult(final String serverResponse) {
when(connection.command(
anyString(),
any(BsonDocument.class),
any(),
isNull(),
any(),
any(OperationContext.class),
anyBoolean(),
any(DualMessageSequences.class))
).thenAnswer(invocationOnMock -> {
DualMessageSequences dualMessageSequences = invocationOnMock.getArgument(7);
dualMessageSequences.encodeDocuments(write -> {
write.doAndGetBatchCount(new BsonBinaryWriter(new BasicOutputBuffer()), new BsonBinaryWriter(new BasicOutputBuffer()));
return DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
});
return toBsonDocument(serverResponse);
});
}

private static BsonDocument toBsonDocument(final String serverResponse) {
Codec<BsonDocument> bsonDocumentCodec =
CommandResultDocumentCodec.create(getDefaultCodecRegistry().get(BsonDocument.class), CommandBatchCursorHelper.FIRST_BATCH);
return bsonDocumentCodec.decode(new JsonReader(serverResponse), DecoderContext.builder().build());
}
}