Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,62 +294,69 @@
}

@SuppressWarnings({"PMD.CloseResource", "java:S2095"})
int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey) throws IOException {
int deleteDocument(Tuple groupingKey, Integer partitionId, Tuple primaryKey, boolean forceDeleteDocument) throws IOException {
final long startTime = System.nanoTime();
final IndexWriter indexWriter = directoryManager.getIndexWriter(groupingKey, partitionId);
@Nullable final LucenePrimaryKeySegmentIndex segmentIndex = directoryManager.getDirectory(groupingKey, partitionId).getPrimaryKeySegmentIndex();

if (segmentIndex != null) {
final DirectoryReader directoryReader = directoryManager.getWriterReader(groupingKey, partitionId);
final LucenePrimaryKeySegmentIndex.DocumentIndexEntry documentIndexEntry = segmentIndex.findDocument(directoryReader, primaryKey);
if (documentIndexEntry != null) {
state.context.ensureActive().clear(documentIndexEntry.entryKey); // TODO: Only if valid?
long valid = indexWriter.tryDeleteDocument(documentIndexEntry.indexReader, documentIndexEntry.docId);
if (valid > 0) {
state.context.record(LuceneEvents.Events.LUCENE_DELETE_DOCUMENT_BY_PRIMARY_KEY, System.nanoTime() - startTime);
return 1;
} else if (LOG.isDebugEnabled()) {
LOG.debug(KeyValueLogMessage.of("try delete document failed",
LuceneLogMessageKeys.GROUP, groupingKey,
LuceneLogMessageKeys.INDEX_PARTITION, partitionId,
LuceneLogMessageKeys.SEGMENT, documentIndexEntry.segmentName,
LuceneLogMessageKeys.DOC_ID, documentIndexEntry.docId,
LuceneLogMessageKeys.PRIMARY_KEY, primaryKey));
}
} else if (LOG.isDebugEnabled()) {
LOG.debug(KeyValueLogMessage.of("primary key segment index entry not found",
LuceneLogMessageKeys.GROUP, groupingKey,
LuceneLogMessageKeys.INDEX_PARTITION, partitionId,
LuceneLogMessageKeys.PRIMARY_KEY, primaryKey,
LuceneLogMessageKeys.SEGMENTS, segmentIndex.findSegments(primaryKey)));
} else {
if (forceDeleteDocument) {
// Try to clear the segment index in case of force deletion even if there is no entry for it
segmentIndex.clearForPrimaryKey(primaryKey);
}
if (LOG.isDebugEnabled()) {
LOG.debug(KeyValueLogMessage.of("primary key segment index entry not found for deletion",
LuceneLogMessageKeys.GROUP, groupingKey,
LuceneLogMessageKeys.INDEX_PARTITION, partitionId,
LuceneLogMessageKeys.PRIMARY_KEY, primaryKey,
LuceneLogMessageKeys.SEGMENTS, segmentIndex.findSegments(primaryKey),
LuceneLogMessageKeys.FORCE_DELETE, forceDeleteDocument));
}
}
}
Query query;
// null format means don't use BinaryPoint for the index primary key
if (keySerializer.hasFormat()) {
try {
byte[][] binaryPoint = keySerializer.asFormattedBinaryPoint(primaryKey);
query = BinaryPoint.newRangeQuery(PRIMARY_KEY_BINARY_POINT_NAME, binaryPoint, binaryPoint);
} catch (RecordCoreFormatException ex) {
// this can happen on format mismatch or encoding error
// fallback to the old way (less efficient)
query = SortedDocValuesField.newSlowExactQuery(PRIMARY_KEY_SEARCH_NAME, new BytesRef(keySerializer.asPackedByteArray(primaryKey)));
logSerializationError("Failed to delete using BinaryPoint encoded ID: {}", ex.getMessage());
}
} else {
// fallback to the old way (less efficient)
query = SortedDocValuesField.newSlowExactQuery(PRIMARY_KEY_SEARCH_NAME, new BytesRef(keySerializer.asPackedByteArray(primaryKey)));
}

indexWriter.deleteDocuments(query);
LuceneEvents.Events event = state.store.isIndexWriteOnly(state.index) ?
LuceneEvents.Events.LUCENE_DELETE_DOCUMENT_BY_QUERY_IN_WRITE_ONLY_MODE :
LuceneEvents.Events.LUCENE_DELETE_DOCUMENT_BY_QUERY;
state.context.record(event, System.nanoTime() - startTime);

// if we delete by query, we aren't certain whether the document was actually deleted (if, for instance, it wasn't in Lucene
// to begin with)
return 0;

Check warning on line 359 in fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java#L298-L359

This method is a bit lengthy [0]. Consider shortening it, e.g. by extracting code blocks into separate methods. [0] https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3705%2Fohadzeliger%2Fmuitl-update-lucene%3AHEAD&id=CDCD060C53A1C4C6AA90433D6B824B04
}

@Override
Expand Down Expand Up @@ -476,7 +483,8 @@
// delete old
return AsyncUtil.whenAll(oldRecordFields.keySet().stream().map(t -> {
try {
return tryDelete(Objects.requireNonNull(oldRecord), t);
// This is an update, so we know the old doc exists, so delete it even if it is not found
return tryDelete(Objects.requireNonNull(oldRecord), t, true);
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", Objects.requireNonNull(oldRecord).getPrimaryKey());
}
Expand All @@ -500,7 +508,7 @@
}

/**
* convenience wrapper that calls {@link #tryDelete(FDBIndexableRecord, Tuple)} only if the index is in
* convenience wrapper that calls {@link #tryDelete(FDBIndexableRecord, Tuple, boolean)} only if the index is in
* {@code WriteOnly} mode.
* This is usually needed when a record is inserted during an index build, but that
* record had already been added due to an explicit update earlier.
Expand All @@ -509,40 +517,49 @@
* @param groupingKey grouping key
* @param <M> message
* @return count of deleted docs
* @throws IOException propagated by {@link #tryDelete(FDBIndexableRecord, Tuple)}
* @throws IOException propagated by {@link #tryDelete(FDBIndexableRecord, Tuple, boolean)}
*/
private <M extends Message> CompletableFuture<Integer> tryDeleteInWriteOnlyMode(@Nonnull FDBIndexableRecord<M> record,
@Nonnull Tuple groupingKey) throws IOException {
if (!state.store.isIndexWriteOnly(state.index)) {
// no op
return CompletableFuture.completedFuture(0);
}
return tryDelete(record, groupingKey);
return tryDelete(record, groupingKey, false);
}

/**
* Delete a given record if it is indexed.
* The record may not necessarily exist in the index, or it may need to be deleted by query ({@link #deleteDocument(Tuple, Integer, Tuple)}).
* The record may not necessarily exist in the index, or it may need to be deleted by query ({@link #deleteDocument(Tuple, Integer, Tuple, boolean)}).
* Note that there can be cases where the document is in an interim state and cannot be found in the segment index
* though we know it exists. For example, when performing a second update to the same doc within the
* same transaction, the doc was already removed from the index (in the NRT cache) but not yet flushed, and so we should
* clear the segment index and decrement the partition count. Subsequent flush of the IndexWriter will get the state
* to be consistent.
*
* @param record record to be deleted
* @param groupingKey grouping key
* @param forceDeleteRecord try to force a record delete (it is known to exist), even if it does not show up
* @param <M> record message
* @return count of deleted docs: 1 indicates that the record has been deleted, 0 means that either no record was deleted or it was deleted by
* query.
* @throws IOException propagated from {@link #deleteDocument(Tuple, Integer, Tuple)}
* @throws IOException propagated from {@link #deleteDocument(Tuple, Integer, Tuple, boolean)}
*/
private <M extends Message> CompletableFuture<Integer> tryDelete(@Nonnull FDBIndexableRecord<M> record,
@Nonnull Tuple groupingKey) throws IOException {
@Nonnull Tuple groupingKey,
boolean forceDeleteRecord) throws IOException {
// non-partitioned
if (!partitioner.isPartitioningEnabled()) {
return CompletableFuture.completedFuture(deleteDocument(groupingKey, null, record.getPrimaryKey()));
final int countDeleted = deleteDocument(groupingKey, null, record.getPrimaryKey(), forceDeleteRecord);
return CompletableFuture.completedFuture(adjustCountDeleted(forceDeleteRecord, countDeleted));
}

// partitioned
return partitioner.tryGetPartitionInfo(record, groupingKey).thenApply(partitionInfo -> {
if (partitionInfo != null) {
try {
int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey());
int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey(), forceDeleteRecord);
countDeleted = adjustCountDeleted(forceDeleteRecord, countDeleted);
if (countDeleted > 0) {
partitioner.decrementCountAndSave(groupingKey, partitionInfo, countDeleted);
}
Expand All @@ -555,6 +572,15 @@
});
}

private static int adjustCountDeleted(final boolean forceDeleteRecord, int countDeleted) {
if ((countDeleted == 0) && forceDeleteRecord) {
// In this case, the document was not found by the segment index. We do know it exists
// so make it look like the delete by query actually deleted it.
countDeleted = 1;
}
return countDeleted;
}

private FieldType getTextFieldType(LuceneDocumentFromRecord.DocumentField field) {
FieldType ft = new FieldType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public enum LuceneLogMessageKeys {
TOTAL_COUNT,
PRIMARY_KEY,
SEGMENTS,
;
FORCE_DELETE;

private final String logKey;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ private CompletableFuture<Integer> moveDocsFromPartition(@Nonnull final LuceneRe
// situation with the partition metadata keys.
records.forEach(r -> {
try {
indexMaintainer.deleteDocument(groupingKey, partitionInfo.getId(), r.getPrimaryKey());
indexMaintainer.deleteDocument(groupingKey, partitionInfo.getId(), r.getPrimaryKey(), false);
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ public interface LucenePrimaryKeySegmentIndex {
*/
void clearForSegment(String segmentName) throws IOException;

/**
* Clear all entries for the given primary key.
* Note that this operation will clear all entries for the given
* PK - regardless of how many (if any) are present.
* @param primaryKey the record primary key to clear
*/
void clearForPrimaryKey(Tuple primaryKey);

/**
* Result of {@link #findDocument}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ public void clearForSegment(final String segmentName) {
// which does not appear compatible with AgilityContext
}

@Override
public void clearForPrimaryKey(final Tuple primaryKey) {
throw new UnsupportedOperationException("Clear for primary key not implemented");
}


/**
* Get the primary key byte array from a document's stored fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,10 @@ public void clearForSegment(final String segmentName) throws IOException {
directory.getAgilityContext().clear(Range.startsWith(entryKey));
}
}

@Override
public void clearForPrimaryKey(@Nonnull Tuple primaryKey) {
Subspace keySubspace = subspace.subspace(primaryKey);
directory.getAgilityContext().clear(Range.startsWith(keySubspace.pack()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.util.LoggableKeysAndValues;
import com.apple.test.ParameterizedTestUtils;
import com.apple.test.RandomizedTestUtils;
import com.apple.test.SuperSlow;
import com.apple.test.Tags;
Expand Down Expand Up @@ -900,6 +901,100 @@
assertThat(partitionCounts, Matchers.contains(5, 3, 4)));
}

static Stream<Arguments> multiUpdate() {
return ParameterizedTestUtils.cartesianProduct(
ParameterizedTestUtils.booleans("isSynthetic"),
ParameterizedTestUtils.booleans("isGrouped"),
Stream.of(0, 10),
Stream.of(0, 1, 10),
Stream.of(5365));
}

@ParameterizedTest
@MethodSource("multiUpdate")
void multipleUpdatesSingleRecordInTransaction(boolean isSynthetic, boolean isGrouped, int highWatermark, int updateCount, long seed) throws IOException {
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
.setIsGrouped(isGrouped)
.setIsSynthetic(isSynthetic)
.setPrimaryKeySegmentIndexEnabled(true)
.setPartitionHighWatermark(highWatermark)
.build();

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();

// save a single record in a single group
try (FDBRecordContext context = openContext(contextProps)) {
final FDBRecordStore store = dataModel.createOrOpenRecordStore(context);
dataModel.saveRecord(store, 1);
commit(context);
}

try (FDBRecordContext context = openContext(contextProps)) {
final FDBRecordStore store = dataModel.createOrOpenRecordStore(context);
for (int i = 0 ; i < updateCount ; i++) {
dataModel.recordsUnderTest().forEach(rec -> rec.updateOtherValue(store).join());
}
commit(context);
}
dataModel.validate(() -> openContext(contextProps));
if (highWatermark > 0) {
// Only one record in the partition
dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) ->
assertThat(partitionCounts, Matchers.contains(1)));
}

explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
dataModel.validate(() -> openContext(contextProps));
}

@ParameterizedTest
@MethodSource("multiUpdate")
void multipleUpdatesMultiRecordInTransaction(boolean isSynthetic, boolean isGrouped, int highWatermark, int updateCount, long seed) throws IOException {
final int documentCount = 25;
final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager)
.setIsGrouped(isGrouped)
.setIsSynthetic(isSynthetic)
.setPrimaryKeySegmentIndexEnabled(true)
.setPartitionHighWatermark(highWatermark)
.build();

final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder()
.addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2)
.addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0
.build();

// save records
try (FDBRecordContext context = openContext(contextProps)) {
final FDBRecordStore store = dataModel.createOrOpenRecordStore(context);

Check warning on line 971 in fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java#L971

[New] The initial value of variable `store` is never read https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3705%2Fohadzeliger%2Fmuitl-update-lucene%3AHEAD&id=BFC001269381DBDF8F0A766037DAB534
dataModel.saveRecordsToAllGroups(documentCount, context);
commit(context);
}

try (FDBRecordContext context = openContext(contextProps)) {
final FDBRecordStore store = dataModel.createOrOpenRecordStore(context);
dataModel.sampleRecordsUnderTest().forEach(rec -> {
for (int i = 0; i < updateCount; i++) {
// update some documents multiple times
rec.updateOtherValue(store).join();
}
});
commit(context);
}
explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);

if (highWatermark > 0) {
// ensure each partition has all records
dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) ->
assertThat(partitionCounts.stream().mapToInt(i -> i).sum(), Matchers.equalTo(documentCount)));
}

explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup);
dataModel.validate(() -> openContext(contextProps));
}

static Stream<Arguments> changingEncryptionKey() {
return Stream.concat(Stream.of(Arguments.of(true, true, 288513),
Arguments.of(false, false, 792025)),
Expand Down
Loading