Skip to content

Commit 22eb035

Browse files
authored
Clone IndexInput when creating MemorySegmentPostingsVisitor (#129690)
1 parent 0e538bd commit 22eb035

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

server/src/main/java/org/elasticsearch/index/codec/vectors/DefaultIVFVectorsReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ NeighborQueue scorePostingLists(FieldInfo fieldInfo, KnnCollector knnCollector,
122122
PostingVisitor getPostingVisitor(FieldInfo fieldInfo, IndexInput indexInput, float[] target, IntPredicate needsScoring)
123123
throws IOException {
124124
FieldEntry entry = fields.get(fieldInfo.number);
125-
return new MemorySegmentPostingsVisitor(target, indexInput, entry, fieldInfo, needsScoring);
125+
return new MemorySegmentPostingsVisitor(target, indexInput.clone(), entry, fieldInfo, needsScoring);
126126
}
127127

128128
// TODO can we do this in off-heap blocks?

server/src/test/java/org/elasticsearch/index/codec/vectors/IVFVectorsFormatTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.io.IOException;
3535
import java.util.List;
3636
import java.util.Locale;
37+
import java.util.concurrent.atomic.AtomicBoolean;
3738

3839
import static java.lang.String.format;
3940
import static org.elasticsearch.index.codec.vectors.IVFVectorsFormat.MAX_VECTORS_PER_CLUSTER;
@@ -128,4 +129,49 @@ public void testSimpleOffHeapSize() throws IOException {
128129
}
129130
}
130131
}
132+
133+
// this is a modified version of lucene's TestSearchWithThreads test case
134+
public void testWithThreads() throws Exception {
135+
final int numThreads = random().nextInt(2, 5);
136+
final int numSearches = atLeast(100);
137+
final int numDocs = atLeast(1000);
138+
final int dimensions = random().nextInt(12, 500);
139+
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {
140+
for (int docCount = 0; docCount < numDocs; docCount++) {
141+
final Document doc = new Document();
142+
doc.add(new KnnFloatVectorField("f", randomVector(dimensions), VectorSimilarityFunction.EUCLIDEAN));
143+
w.addDocument(doc);
144+
}
145+
w.forceMerge(1);
146+
try (IndexReader reader = DirectoryReader.open(w)) {
147+
final AtomicBoolean failed = new AtomicBoolean();
148+
Thread[] threads = new Thread[numThreads];
149+
for (int threadID = 0; threadID < numThreads; threadID++) {
150+
threads[threadID] = new Thread(() -> {
151+
try {
152+
long totSearch = 0;
153+
for (; totSearch < numSearches && failed.get() == false; totSearch++) {
154+
float[] vector = randomVector(dimensions);
155+
LeafReader leafReader = getOnlyLeafReader(reader);
156+
leafReader.searchNearestVectors("f", vector, 10, leafReader.getLiveDocs(), Integer.MAX_VALUE);
157+
}
158+
assertTrue(totSearch > 0);
159+
} catch (Exception exc) {
160+
failed.set(true);
161+
throw new RuntimeException(exc);
162+
}
163+
});
164+
threads[threadID].setDaemon(true);
165+
}
166+
167+
for (Thread t : threads) {
168+
t.start();
169+
}
170+
171+
for (Thread t : threads) {
172+
t.join();
173+
}
174+
}
175+
}
176+
}
131177
}

0 commit comments

Comments
 (0)