Skip to content

Commit 6c534fe

Browse files
committed
wip
1 parent 69fade7 commit 6c534fe

File tree

4 files changed

+767
-166
lines changed

4 files changed

+767
-166
lines changed

qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/diskimport/SectionCompressor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public SectionCompressor(CloseSuppressPath baseFileName, AsyncIteratorFetcher<Tr
6060
this.source = source;
6161
this.listener = listener;
6262
this.baseFileName = baseFileName;
63-
this.bufferSize = bufferSize;
63+
this.bufferSize = Math.max(4 * 1024 * 1024, bufferSize);
6464
this.chunkSize = chunkSize;
6565
this.k = k;
6666
this.debugSleepKwayDict = debugSleepKwayDict;
Lines changed: 56 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
package com.the_qa_company.qendpoint.core.iterator.utils;
22

3-
import java.util.ArrayDeque;
3+
import java.io.IOException;
44
import java.util.Arrays;
55
import java.util.Comparator;
6-
import java.util.Deque;
76
import java.util.List;
8-
import java.util.concurrent.CompletableFuture;
9-
import java.util.concurrent.ExecutionException;
10-
import java.util.concurrent.Executor;
11-
import java.util.concurrent.Executors;
127
import java.util.function.BiFunction;
138
import java.util.function.Function;
149

@@ -150,25 +145,8 @@ public static <I, T, E extends Exception> ExceptionIterator<T, E> buildOfTree(
150145
buildOfTree(itFunction, comp, array, mid, end), comp);
151146
}
152147

153-
private final ExceptionIterator<T, E> in1;
154-
private final ExceptionIterator<T, E> in2;
148+
private final ExceptionIterator<T, E> in1, in2;
155149
private final Comparator<T> comp;
156-
private final int chunkSize = 1024 * 4;
157-
private final Executor executor = Executors.newVirtualThreadPerTaskExecutor(); // Could
158-
// be
159-
// a
160-
// ForkJoinPool.commonPool(),
161-
// or
162-
// a
163-
// custom
164-
// pool
165-
166-
private final Deque<T> chunk1 = new ArrayDeque<>();
167-
private final Deque<T> chunk2 = new ArrayDeque<>();
168-
169-
// Local buffer to store merged chunks
170-
private final Deque<T> buffer = new ArrayDeque<>();
171-
172150
private T next;
173151
private T prevE1;
174152
private T prevE2;
@@ -177,25 +155,56 @@ public MergeExceptionIterator(ExceptionIterator<T, E> in1, ExceptionIterator<T,
177155
this.in1 = in1;
178156
this.in2 = in2;
179157
this.comp = comp;
158+
// new Throwable().printStackTrace();
180159
}
181160

182161
@Override
183162
public boolean hasNext() throws E {
184-
if (buffer.isEmpty()) {
185-
fillBuffer();
186-
}
187-
if (buffer.isEmpty()) {
188-
return false;
189-
}
190-
return buffer.peek() != null;
191-
}
163+
try {
164+
if (next != null) {
165+
return true;
166+
}
192167

193-
@Override
194-
public T next() throws E {
195-
if (!hasNext()) {
196-
return null; // or throw NoSuchElementException
168+
// read next element 1 if required
169+
if (prevE1 == null && in1.hasNext()) {
170+
prevE1 = in1.next();
171+
}
172+
// read next element 2 if required
173+
if (prevE2 == null && in2.hasNext()) {
174+
prevE2 = in2.next();
175+
}
176+
177+
if (prevE1 != null && prevE2 != null) {
178+
// we have an element from both stream, compare them
179+
if (comp.compare(prevE1, prevE2) < 0) {
180+
// element 1 lower, return it
181+
next = prevE1;
182+
prevE1 = null;
183+
} else {
184+
// element 2 lower, return it
185+
next = prevE2;
186+
prevE2 = null;
187+
}
188+
return true;
189+
}
190+
// we have at most one element
191+
if (prevE1 != null) {
192+
// return element 1
193+
next = prevE1;
194+
prevE1 = null;
195+
return true;
196+
}
197+
if (prevE2 != null) {
198+
// return element 2
199+
next = prevE2;
200+
prevE2 = null;
201+
return true;
202+
}
203+
// nothing else
204+
return false;
205+
} catch (Exception e) {
206+
throw (E) e;
197207
}
198-
return buffer.pollFirst();
199208
}
200209

201210
@Override
@@ -205,139 +214,21 @@ public long getSize() {
205214
if (s1 == -1 || s2 == -1) {
206215
return -1;
207216
}
208-
return s1 + s2;
217+
return s2 + s1;
209218
}
210219

211-
/**
212-
* Fetch a chunk of items from both child iterators in parallel, then merge
213-
* them into our local buffer. Only called when the buffer is empty.
214-
*/
215-
private void fillBuffer() throws E {
216-
217-
if (!buffer.isEmpty()) {
218-
return;
219-
}
220-
221-
// Kick off two parallel fetch tasks
222-
CompletableFuture<Deque<T>> future1 = chunk1.size() < chunkSize && in1.hasNext()
223-
? fetchChunkAsync(in1, chunk1, chunkSize)
224-
: null;
225-
CompletableFuture<Deque<T>> future2 = chunk2.size() < chunkSize && in2.hasNext()
226-
? fetchChunkAsync(in2, chunk2, chunkSize)
227-
: null;
228-
229-
// Wait for both tasks to complete
230-
if (future1 != null && future2 != null) {
231-
CompletableFuture.allOf(future1, future2).join();
232-
}
233-
234-
Deque<T> chunk1;
235-
Deque<T> chunk2;
236-
try {
237-
chunk1 = future1 != null ? future1.get() : this.chunk1;
238-
chunk2 = future2 != null ? future2.get() : this.chunk2;
239-
} catch (InterruptedException ie) {
240-
// Restore interrupt status
241-
Thread.currentThread().interrupt();
242-
throw new RuntimeException("Interrupted while fetching chunks in parallel", ie);
243-
} catch (ExecutionException ee) {
244-
// If our underlying fetch threw a checked exception E, unwrap and
245-
// throw it
246-
Throwable cause = ee.getCause();
247-
if (cause instanceof Exception ex) {
248-
// You may need a different mechanism to re-throw the correct
249-
// type E
250-
// e.g. reflection or a wrapper if E is known
251-
throw asE(ex);
252-
} else {
253-
throw new RuntimeException("Unexpected error in parallel fetch", cause);
254-
}
255-
}
256-
if (chunk1.isEmpty()) {
257-
while (!chunk2.isEmpty()) {
258-
buffer.addLast(chunk2.pollFirst());
259-
}
260-
} else if (chunk2.isEmpty()) {
261-
while (!chunk1.isEmpty()) {
262-
buffer.addLast(chunk1.pollFirst());
263-
}
264-
} else {
265-
// Merge the two fetched chunks in sorted order
266-
mergeChunksIntoBuffer(chunk1, chunk2);
267-
}
268-
}
269-
270-
/**
271-
* Helper to schedule a chunk fetch on the given iterator and return a
272-
* CompletableFuture. Because T can throw a checked exception E, we wrap the
273-
* call and handle exceptions carefully.
274-
*/
275-
private CompletableFuture<Deque<T>> fetchChunkAsync(ExceptionIterator<T, E> iter, Deque<T> chunk, int n) {
276-
CompletableFuture<Deque<T>> future = new CompletableFuture<>();
277-
// executor.execute(() -> {
220+
@Override
221+
public T next() throws E {
278222
try {
279-
Deque<T> result = fetchChunk(iter, chunk, n);
280-
future.complete(result);
281-
} catch (Exception e) {
282-
future.completeExceptionally(e);
283-
}
284-
// });
285-
return future;
286-
}
287-
288-
/**
289-
* Actual synchronous fetch of up to 'n' items from the child iterator.
290-
*/
291-
private Deque<T> fetchChunk(ExceptionIterator<T, E> iter, Deque<T> list, int n) throws E {
292-
while (list.size() < n && iter.hasNext()) {
293-
list.addLast(iter.next());
294-
}
295-
return list;
296-
}
297-
298-
/**
299-
* Merge two sorted lists into our buffer in ascending order. If the child
300-
* iterators are guaranteed sorted, you can do this linear merge. Otherwise,
301-
* you'd need a custom approach (possibly sorting the partial chunks).
302-
*/
303-
private void mergeChunksIntoBuffer(Deque<T> c1, Deque<T> c2) {
304-
305-
if (c1.isEmpty() || c2.isEmpty()) {
306-
return;
307-
}
308-
309-
// this assumes that each of the two chunks is sorted
310-
T c1First = c1.peek();
311-
T c2Last = c2.peekLast();
312-
if (comp.compare(c1First, c2Last) > 0) {
313-
buffer.addAll(c2);
314-
c2.clear();
315-
return;
316-
}
317223

318-
T c2First = c2.peek();
319-
T c1Last = c1.peekLast();
320-
if (comp.compare(c2First, c1Last) > 0) {
321-
buffer.addAll(c1);
322-
c1.clear();
323-
return;
324-
}
325-
326-
while (!(c1.isEmpty() || c2.isEmpty())) {
327-
if (comp.compare(c1.peek(), c2.peek()) < 0) {
328-
buffer.addLast(c1.pollFirst());
329-
} else {
330-
buffer.addLast(c2.pollFirst());
224+
if (!hasNext()) {
225+
return null;
331226
}
227+
T next = this.next;
228+
this.next = null;
229+
return next;
230+
} catch (Exception e) {
231+
throw (E) e;
332232
}
333233
}
334-
335-
/**
336-
* Utility to cast a generic Exception to E if needed, or wrap as
337-
* RuntimeException. Adjust as necessary for your real-world scenario.
338-
*/
339-
@SuppressWarnings("unchecked")
340-
private E asE(Exception ex) {
341-
return (E) ex;
342-
}
343234
}

0 commit comments

Comments
 (0)