diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java index 22cc46c2c..64747b990 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/HDTDiskImporter.java @@ -14,6 +14,7 @@ import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult; import com.the_qa_company.qendpoint.core.header.HeaderPrivate; import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher; +import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered; import com.the_qa_company.qendpoint.core.listener.MultiThreadListener; import com.the_qa_company.qendpoint.core.listener.ProgressListener; import com.the_qa_company.qendpoint.core.options.HDTOptions; @@ -187,7 +188,7 @@ public CompressTripleMapper compressDictionary(Iterator iterator) "Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with " + ways + "ways and " + workers + " worker(s)"); - AsyncIteratorFetcher source = new AsyncIteratorFetcher<>(iterator); + AsyncIteratorFetcherUnordered source = new AsyncIteratorFetcherUnordered<>(iterator); profiler.pushSection("section compression"); CompressionResult compressionResult; diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java index 9991d1d2a..dc6545c5d 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/hdt/impl/TempHDTImporterOnePass.java @@ -53,7 +53,7 @@ public TripleAppender(TempDictionary dict, TempTriples triples, ProgressListener } @Override - public void processTriple(TripleString triple, long pos) { + synchronized public void processTriple(TripleString triple, long pos) { long s = dict.insert(triple.getSubject(), TripleComponentRole.SUBJECT); long p = dict.insert(triple.getPredicate(), TripleComponentRole.PREDICATE); long o = dict.insert(triple.getObject(), TripleComponentRole.OBJECT); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java index 0c37539bd..f70221549 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/header/PlainHeader.java @@ -137,7 +137,7 @@ public void load(InputStream input, ControlInfo ci, ProgressListener listener) t try { RDFParserSimple parser = new RDFParserSimple(); parser.doParse(new ByteArrayInputStream(headerData), "http://www.rdfhdt.org", RDFNotation.NTRIPLES, true, - this); + this, false); } catch (ParserException e) { log.error("Unexpected exception.", e); throw new IllegalFormatException("Error parsing header"); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java new file mode 100644 index 000000000..ec0111705 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/AsyncIteratorFetcherUnordered.java @@ -0,0 +1,127 @@ +package com.the_qa_company.qendpoint.core.iterator.utils; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Queue; + +/** + * Synchronise an iterator + * + * @param iterator type + * @author Håvard M. Ottestad + * @author Antoine Willerval + */ +public class AsyncIteratorFetcherUnordered extends AsyncIteratorFetcher { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + public static final int BUFFER = 1024 * 4; + private final Iterator iterator; + private boolean end; + volatile Queue[] queue = new Queue[CORES * 2]; + + { + for (int i = 0; i < queue.length; i++) { + queue[i] = new ArrayDeque<>(BUFFER); + } + } + + public AsyncIteratorFetcherUnordered(Iterator iterator) { + super(iterator); + this.iterator = iterator; + } + + /** + * @return an element from the iterator, this method is thread safe + */ + @Override + public E get() { + + int index = (int) (Thread.currentThread().getId() % queue.length); + + Queue es = queue[index]; + if (es == null) { + for (Queue eQueue : queue) { + if (eQueue != null) { + synchronized (eQueue) { + E poll = eQueue.poll(); + + if (poll != null) { + return poll; + } + } + } + } + } + + if (es != null) { + // With this approach there is some risk that a queue is filled but + // never emptied. Maybe we should look for another queue to read + // from + // before filling our own queue? + synchronized (es) { + E poll = es.poll(); + + if (poll != null) { + return poll; + } + + synchronized (this) { + es = queue[index]; + if (es != null) { + + poll = es.poll(); + if (poll == null) { + if (iterator.hasNext()) { + poll = iterator.next(); + for (int i = 0; i < BUFFER && iterator.hasNext(); i++) { + es.add(iterator.next()); + } + } + + } + + if (poll == null) { + queue[index] = null; + } else { + return poll; + } + } + } + } + } + + for (Queue eQueue : queue) { + if (eQueue != null) { + + synchronized (eQueue) { + synchronized (this) { + E poll = eQueue.poll(); + + if (poll != null) { + return poll; + } + } + } + } + } + + synchronized (this) { + if (iterator.hasNext()) { + E poll = iterator.next(); + return poll; + } + } + + end = true; + return null; + + } + + /** + * @return is the end + */ + public boolean isEnd() { + return end; + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java new file mode 100644 index 000000000..b00173b14 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/iterator/utils/PipedCopyIteratorUnordered.java @@ -0,0 +1,384 @@ +package com.the_qa_company.qendpoint.core.iterator.utils; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +/** + * a utility class to create an iterator from the value returned by another + * Thread + * + * @param the iterator type + * @author Antoine Willerval + * @author Håvard M. Ottestad + */ + +public class PipedCopyIteratorUnordered extends PipedCopyIterator { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + /** + * RuntimeException generated by the PipedCopyIterator + * + * @author Antoine Willerval + */ + public static class PipedIteratorException extends RuntimeException { + public PipedIteratorException(String message, Throwable t) { + super(message, t); + } + } + + /** + * Callback for the + * {@link #createOfCallback(PipedCopyIteratorUnordered.PipeCallBack)} method + * + * @param the iterator type + * @author Antoine Willerval + */ + @FunctionalInterface + public interface PipeCallBack { + /** + * method called from the new thread to generate the new data, at the + * end of the callback, the pipe is closed with or without exception + * + * @param pipe the pipe to fill + * @throws Exception any exception returned by the generator + */ + void createPipe(PipedCopyIteratorUnordered pipe) throws Exception; + } + + /** + * create a piped iterator from a callback runner, the call to the callback + * should be made in the callbackRunner + * + * @param callbackRunner the callback runner + * @param type of the iterator + * @return the iterator + */ + public static PipedCopyIteratorUnordered createOfCallback(PipeCallBack callbackRunner) { + PipedCopyIteratorUnordered pipe = new PipedCopyIteratorUnordered<>(); + + Thread thread = new Thread(() -> { + try { + callbackRunner.createPipe(pipe); + pipe.closePipe(); + } catch (Throwable e) { + pipe.closePipe(e); + } + }, "PipeIterator"); + thread.start(); + + // close the thread at end + pipe.attachThread(thread); + + return pipe; + } + + private interface QueueObject { + boolean end(); + + T get(); + } + + private class ElementQueueObject implements QueueObject { + private final T obj; + + private ElementQueueObject(T obj) { + this.obj = obj; + } + + @Override + public boolean end() { + return false; + } + + @Override + public T get() { + return obj; + } + } + + private class EndQueueObject implements QueueObject { + @Override + public boolean end() { + return true; + } + + @Override + public T get() { + throw new IllegalArgumentException(); + } + } + + private final ArrayBlockingQueue>[] queue = new ArrayBlockingQueue[CORES * 2]; + + { + for (int i = 0; i < queue.length; i++) { + queue[i] = new ArrayBlockingQueue<>(16 * 1024); + } + } + + private final AtomicBoolean[] queueEnd = new AtomicBoolean[queue.length]; + + { + for (int i = 0; i < queueEnd.length; i++) { + queueEnd[i] = new AtomicBoolean(false); + } + } + + private T next; + private boolean end; + private PipedIteratorException exception; + + private Thread thread; + + volatile ArrayBlockingQueue> focusQueue; + + @Override + public boolean hasNext() { + if (end) { + return false; + } + if (next != null) { + return true; + } + + QueueObject obj; + try { + obj = useFocusQueue(); + + if (obj == null) { + obj = useThreadBasedQueue(); + } + + } catch (InterruptedException e) { + throw new PipedIteratorException("Can't read pipe", e); + } + + if (obj == null || obj.end()) { + obj = checkAllQueues(obj); + } + + if (obj.end()) { + end = true; + if (exception != null) { + throw exception; + } + return false; + } + next = obj.get(); + return true; + } + + private QueueObject useThreadBasedQueue() throws InterruptedException { + QueueObject obj; + int i = Thread.currentThread().hashCode(); + obj = queue[i % queue.length].poll(); + if (obj == null) { + obj = iterateThroughAllQueues(obj); + } else if (obj.end()) { + setQueueEnd(queue[i % queue.length]); + } else if (focusQueue == null) { + focusQueue = queue[i % queue.length]; + } + return obj; + } + + private QueueObject checkAllQueues(QueueObject originalObj) { + QueueObject obj = null; + boolean done; + + do { + done = true; + for (int i = 0; i < queue.length; i++) { + if (queueEnd[i].get()) { + continue; + } + done = false; + ArrayBlockingQueue> queueObjects = queue[i]; + obj = queueObjects.poll(); + if (obj == null) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else if (!obj.end()) { + return obj; + } else { + queueEnd[i].set(true); + } + } + } while (!done); + + if (obj == null) { + obj = originalObj; + } + + return obj; + } + + private QueueObject iterateThroughAllQueues(QueueObject obj) throws InterruptedException { + while (obj == null) { + for (int i = 0; i < queue.length; i++) { + if (queueEnd[i].get()) { + continue; + } + ArrayBlockingQueue> queueObjects = queue[i]; + obj = queueObjects.poll(); + if (obj != null) { + if (obj.end()) { + queueEnd[i].set(true); + } else if (focusQueue == null) { + focusQueue = queueObjects; + } + return obj; + } + } + Thread.sleep(10); + } + return obj; + } + + private QueueObject useFocusQueue() throws InterruptedException { + QueueObject obj; + var focusQueue = this.focusQueue; + if (focusQueue != null) { + QueueObject poll = focusQueue.poll(); + if (poll != null) { + obj = poll; + if (obj.end()) { + setQueueEnd(focusQueue); + } + } else { + obj = null; + this.focusQueue = null; + } + } else { + obj = null; + } + return obj; + } + + private void setQueueEnd(ArrayBlockingQueue> focusQueue) { + for (int i = 0; i < queue.length; i++) { + if (queue[i] == focusQueue) { + queueEnd[i].set(true); + break; + } + } + } + + @Override + public T next() { + if (!hasNext()) { + return null; + } + T next = this.next; + this.next = null; + return next; + } + + public void closePipe() { + closePipe(null); + } + + public void closePipe(Throwable e) { + if (e != null) { + // clear the queue to force the exception + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.clear(); + } + if (e instanceof PipedIteratorException) { + this.exception = (PipedIteratorException) e; + } else { + this.exception = new PipedIteratorException("closing exception", e); + } + } + try { + for (ArrayBlockingQueue> queueObjects : queue) { + queueObjects.put(new EndQueueObject()); + } + } catch (InterruptedException ee) { + throw new PipedIteratorException("Can't close pipe", ee); + } + } + + /** + * map this iterator to another type + * + * @param mappingFunction the mapping function + * @param the future type + * @return mapped iterator + */ + public Iterator map(Function mappingFunction) { + return new MapIterator<>(this, mappingFunction); + } + + /** + * map this iterator to another type + * + * @param mappingFunction the mapping function + * @param the future type + * @return mapped iterator + */ + public Iterator mapWithId(MapIterator.MapWithIdFunction mappingFunction) { + return new MapIterator<>(this, mappingFunction); + } + + AtomicInteger index = new AtomicInteger(0); + + public void addElement(T node) { + int i = Thread.currentThread().hashCode(); + int l = i % queue.length; + try { + boolean success = queue[l].offer(new ElementQueueObject(node)); + if (!success) { + focusQueue = queue[l]; + while (!success) { + for (ArrayBlockingQueue> queueObjects : queue) { + success = queueObjects.offer(new ElementQueueObject(node), 1, TimeUnit.MILLISECONDS); + if (success) { + break; + } + } + } + } + + } catch (InterruptedException ee) { + throw new PipedIteratorException("Can't add element to pipe", ee); + } + } + + /** + * attach a thread to interrupt with this iterator + * + * @param thread the thread + */ + public void attachThread(Thread thread) { + Objects.requireNonNull(thread, "thread can't be null!"); + if (this.thread != null && this.thread != thread) { + throw new IllegalArgumentException("Thread already attached"); + } + this.thread = thread; + } + + /** + * Allow receiving again elements after an end node + */ + public void reset() { + this.end = false; + } + + @Override + public void close() throws IOException { + if (thread != null) { + thread.interrupt(); + } + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java index 1cc6caf04..e8ffa7ab4 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserCallback.java @@ -56,6 +56,11 @@ default void doParse(Path file, String baseUri, RDFNotation notation, boolean ke doParse(file.toAbsolutePath().toString(), baseUri, notation, keepBNode, callback); } - void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException; + default void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) + throws ParserException { + doParse(in, baseUri, notation, keepBNode, callback, false); + }; + + void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException; } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java index f03ca8f6c..27db60d1e 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/RDFParserFactory.java @@ -21,6 +21,7 @@ import com.the_qa_company.qendpoint.core.enums.RDFNotation; import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; +import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIteratorUnordered; import com.the_qa_company.qendpoint.core.options.HDTOptions; import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys; import com.the_qa_company.qendpoint.core.rdf.parsers.RDFDeltaFileParser; @@ -34,7 +35,6 @@ import com.the_qa_company.qendpoint.core.rdf.parsers.RDFParserZip; import com.the_qa_company.qendpoint.core.triples.TripleString; import com.the_qa_company.qendpoint.core.iterator.utils.PipedCopyIterator; -import com.the_qa_company.qendpoint.core.util.string.PrefixesStorage; import java.io.InputStream; @@ -97,8 +97,9 @@ public static RDFParserCallback getParserCallback(RDFNotation notation, HDTOptio */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(stream, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered.createOfCallback( + (PipedCopyIteratorUnordered.PipeCallBack) pipe -> parser.doParse(stream, baseUri, + notation, keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()), false)); } /** @@ -112,8 +113,9 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(file, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered + .createOfCallback((PipedCopyIteratorUnordered.PipeCallBack) pipe -> parser.doParse(file, + baseUri, notation, keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } /** @@ -127,8 +129,9 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, InputStream stream, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(stream, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered + .createOfCallback((PipedCopyIteratorUnordered.PipeCallBack) pipe -> parser.doParse(stream, + baseUri, notation, keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()), true)); } /** @@ -143,8 +146,9 @@ public static PipedCopyIterator readAsIterator(RDFParserCallback p */ public static PipedCopyIterator readAsIterator(RDFParserCallback parser, String file, String baseUri, boolean keepBNode, RDFNotation notation, HDTOptions spec) { - return PipedCopyIterator.createOfCallback(pipe -> parser.doParse(file, baseUri, notation, keepBNode, - (triple, pos) -> pipe.addElement(triple.tripleToString()))); + return PipedCopyIteratorUnordered + .createOfCallback((PipedCopyIteratorUnordered.PipeCallBack) pipe -> parser.doParse(file, + baseUri, notation, keepBNode, (triple, pos) -> pipe.addElement(triple.tripleToString()))); } } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java new file mode 100644 index 000000000..39e4c96b7 --- /dev/null +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/ConcurrentInputStream.java @@ -0,0 +1,126 @@ +package com.the_qa_company.qendpoint.core.rdf.parsers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; + +public class ConcurrentInputStream { + + private static final Logger log = LoggerFactory.getLogger(ConcurrentInputStream.class); + private final InputStream source; + private final int numberOfStreams; + + private PipedInputStream[] pipedInputStreams; + private PipedOutputStream[] pipedOutputStreams; + + private PipedInputStream bnodeInputStream; + private PipedOutputStream bnodeOutputStream; + + private Thread readerThread; + + public ConcurrentInputStream(InputStream stream, int numberOfStreams) { + this.source = stream; + this.numberOfStreams = numberOfStreams; + setupPipes(); + startReadingThread(); + } + + private void setupPipes() { + pipedInputStreams = new PipedInputStream[numberOfStreams]; + pipedOutputStreams = new PipedOutputStream[numberOfStreams]; + + // The size of the pipes needs to be larger than the buffer of the + // buffered reader that Jena uses inside the parser, which is 131072 + // bytes. If our pipeSize is too small it limits the ability for the + // parsers to work concurrently. + int pipeSize = 131072 * 1024; + + try { + // Set up main fan-out pipes + for (int i = 0; i < numberOfStreams; i++) { + pipedOutputStreams[i] = new PipedOutputStream(); + pipedInputStreams[i] = new PipedInputStream(pipedOutputStreams[i], pipeSize); + } + + // Set up bnode pipe + bnodeOutputStream = new PipedOutputStream(); + bnodeInputStream = new PipedInputStream(bnodeOutputStream, pipeSize); + + } catch (IOException e) { + throw new RuntimeException("Error creating pipes", e); + } + } + + private void startReadingThread() { + readerThread = new Thread(new ReaderThread()); + + readerThread.setName("ConcurrentInputStream reader"); + readerThread.setDaemon(true); + readerThread.start(); + } + + /** + * Returns the stream for blank-node lines only. + */ + public InputStream getBnodeStream() { + return bnodeInputStream; + } + + /** + * Returns the array of InputStreams that share all concurrently read data. + */ + public InputStream[] getStreams() { + return pipedInputStreams; + } + + private class ReaderThread implements Runnable { + @Override + public void run() { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(source, StandardCharsets.UTF_8))) { + String line; + int currentStreamIndex = 0; + while ((line = reader.readLine()) != null) { + if (line.isEmpty()) { + continue; // Skip empty lines + } + + byte[] data = (line + "\n").getBytes(StandardCharsets.UTF_8); + + if (line.contains("_:")) { + // Write to bnodeOutputStream only + bnodeOutputStream.write(data); + } else { + // Write to a single stream from pipedOutputStreams in a + // round-robin manner + pipedOutputStreams[currentStreamIndex].write(data); + currentStreamIndex = (currentStreamIndex + 1) % pipedOutputStreams.length; + } + } + } catch (IOException e) { + log.error("Error reading input stream", e); + // If there's a read error, close everything. + } finally { + // Close all output streams to signal EOF + for (PipedOutputStream out : pipedOutputStreams) { + try { + out.close(); + } catch (IOException ignored) { + } + } + + try { + bnodeOutputStream.close(); + } catch (IOException e) { + log.error("Error closing bnodeOutputStream", e); + } + } + } + } +} diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java index 5fe00558b..3fe0e62dd 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFDeltaFileParser.java @@ -147,15 +147,15 @@ public RDFDeltaFileParser(HDTOptions spec) { public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) throws ParserException { try (InputStream is = IOUtil.getFileInputStream(fileName)) { - doParse(is, baseUri, notation, keepBNode, callback); + doParse(is, baseUri, notation, keepBNode, callback, false); } catch (IOException e) { throw new ParserException(e); } } @Override - public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try { // read df file DeltaFileReader reader = new DeltaFileReader(in, spec); @@ -169,7 +169,7 @@ public void doParse(InputStream in, String baseUri, RDFNotation notation, boolea try { // read the next byte information parser.doParse(new GZIPInputStream(new ByteArrayInputStream(next.data)), baseUri, not, keepBNode, - callback); + callback, false); } catch (IOException e) { throw new ParserException("Error when reading " + next.fileName + " size: " + next.data.length, e); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java index 0e2043ae5..a0a631249 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserDir.java @@ -121,8 +121,8 @@ public void doParse(Path path, String baseUri, RDFNotation notation, boolean kee } @Override - public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { throw new NotImplementedException("Can't parse a stream of directory!"); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java index 88d785575..e9ef94ba5 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserHDT.java @@ -32,8 +32,8 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole } @Override - public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) - throws ParserException { + public void doParse(InputStream in, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback, + boolean parallel) throws ParserException { try { // create a temp Path tempFile = Files.createTempFile("hdtjava-reader", ".hdt"); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java index 3aa8f8669..f11ecf796 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserList.java @@ -71,7 +71,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { BufferedReader reader = new BufferedReader(new InputStreamReader(input)); try { doParse(reader, baseUri, notation, keepBNode, callback); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java index d8231b0ff..19d9f6857 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRAR.java @@ -99,7 +99,7 @@ public void doParse(String rarFile, String baseUri, RDFNotation notation, boolea Process processExtract = extractProcessBuilder.start(); InputStream in = processExtract.getInputStream(); - parser.doParse(in, baseUri, guessnot, keepBNode, callback); + parser.doParse(in, baseUri, guessnot, keepBNode, callback, false); in.close(); processExtract.waitFor(); @@ -119,7 +119,7 @@ public void doParse(String rarFile, String baseUri, RDFNotation notation, boolea @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { throw new NotImplementedException(); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java index a89ec1e12..ee7455637 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOT.java @@ -18,35 +18,92 @@ package com.the_qa_company.qendpoint.core.rdf.parsers; -import java.io.FileNotFoundException; -import java.io.InputStream; - +import com.the_qa_company.qendpoint.core.enums.RDFNotation; +import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; +import com.the_qa_company.qendpoint.core.exceptions.ParserException; import com.the_qa_company.qendpoint.core.quad.QuadString; +import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; +import com.the_qa_company.qendpoint.core.triples.TripleString; +import com.the_qa_company.qendpoint.core.util.io.IOUtil; import org.apache.jena.graph.Triple; +import org.apache.jena.iri.impl.LexerFixer; import org.apache.jena.riot.Lang; import org.apache.jena.riot.RDFParser; import org.apache.jena.riot.lang.LabelToNode; import org.apache.jena.riot.system.StreamRDF; import org.apache.jena.sparql.core.Quad; -import com.the_qa_company.qendpoint.core.enums.RDFNotation; -import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException; -import com.the_qa_company.qendpoint.core.exceptions.ParserException; -import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; -import com.the_qa_company.qendpoint.core.triples.TripleString; -import com.the_qa_company.qendpoint.core.util.io.IOUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.util.ArrayList; + /** * @author mario.arias */ public class RDFParserRIOT implements RDFParserCallback { private static final Logger log = LoggerFactory.getLogger(RDFParserRIOT.class); - private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBNode, ElemStringBuffer buffer) { + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBNode, ElemStringBuffer buffer, + boolean parallel) { + + if (!parallel || (lang != Lang.NQUADS && lang != Lang.NTRIPLES)) { + if (keepBNode) { + RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + } else { + RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer); + } + return; + } + if (keepBNode) { - RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) - .parse(buffer); + LexerFixer.fixLexers(); + + ConcurrentInputStream cs = new ConcurrentInputStream(stream, CORES - 1); + + InputStream bnodes = cs.getBnodeStream(); + + var threads = new ArrayList(); + + Thread e1 = new Thread(() -> { + RDFParser.source(bnodes).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + e1.setName("BNode parser"); + threads.add(e1); + + InputStream[] streams = cs.getStreams(); + int i = 0; + for (InputStream s : streams) { + int temp = i + 1; + Thread e = new Thread(() -> { + RDFParser.source(s).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) + .parse(buffer); + }); + i++; + e.setName("Stream parser " + i); + threads.add(e); + + } + + threads.forEach(Thread::start); + for (Thread thread : threads) { + try { + while (thread.isAlive()) { + thread.join(1000); + } + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + +// RDFParser.source(stream).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven()) +// .parse(buffer); } else { RDFParser.source(stream).base(baseUri).lang(lang).parse(buffer); } @@ -62,7 +119,7 @@ private void parse(InputStream stream, String baseUri, Lang lang, boolean keepBN public void doParse(String fileName, String baseUri, RDFNotation notation, boolean keepBNode, RDFCallback callback) throws ParserException { try (InputStream input = IOUtil.getFileInputStream(fileName)) { - doParse(input, baseUri, notation, keepBNode, callback); + doParse(input, baseUri, notation, keepBNode, callback, false); } catch (FileNotFoundException e) { throw new ParserException(e); } catch (Exception e) { @@ -73,16 +130,15 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try { - ElemStringBuffer buffer = new ElemStringBuffer(callback); switch (notation) { - case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, buffer); - case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, buffer); - case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, buffer); - case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, buffer); - case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, buffer); - case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, buffer); + case NTRIPLES -> parse(input, baseUri, Lang.NTRIPLES, keepBNode, new ElemStringBuffer(callback), parallel); + case NQUAD -> parse(input, baseUri, Lang.NQUADS, keepBNode, new ElemStringBuffer(callback), parallel); + case RDFXML -> parse(input, baseUri, Lang.RDFXML, keepBNode, new ElemStringBuffer(callback), parallel); + case N3, TURTLE -> parse(input, baseUri, Lang.TURTLE, keepBNode, new ElemStringBuffer(callback), parallel); + case TRIG -> parse(input, baseUri, Lang.TRIG, keepBNode, new ElemStringBuffer(callback), parallel); + case TRIX -> parse(input, baseUri, Lang.TRIX, keepBNode, new ElemStringBuffer(callback), parallel); default -> throw new NotImplementedException("Parser not found for format " + notation); } } catch (Exception e) { @@ -91,17 +147,16 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo } } - private static class ElemStringBuffer implements StreamRDF { - private final TripleString triple = new TripleString(); - private final QuadString quad = new QuadString(); + public static class ElemStringBuffer implements StreamRDF { private final RDFCallback callback; - private ElemStringBuffer(RDFCallback callback) { + public ElemStringBuffer(RDFCallback callback) { this.callback = callback; } @Override public void triple(Triple parsedTriple) { + TripleString triple = new TripleString(); triple.setAll(JenaNodeFormatter.format(parsedTriple.getSubject()), JenaNodeFormatter.format(parsedTriple.getPredicate()), JenaNodeFormatter.format(parsedTriple.getObject())); @@ -110,6 +165,7 @@ public void triple(Triple parsedTriple) { @Override public void quad(Quad parsedQuad) { + QuadString quad = new QuadString(); quad.setAll(JenaNodeFormatter.format(parsedQuad.getSubject()), JenaNodeFormatter.format(parsedQuad.getPredicate()), JenaNodeFormatter.format(parsedQuad.getObject()), JenaNodeFormatter.format(parsedQuad.getGraph())); diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java index e6bf12033..4b86b5596 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimple.java @@ -63,7 +63,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try (BufferedReader reader = new BufferedReader(new InputStreamReader(input))) { doParse(reader, baseUri, notation, keepBNode, callback); } catch (IOException e) { diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java index 71bfdad8e..90fc7c6b0 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserTar.java @@ -45,7 +45,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole throws ParserException { try { InputStream input = IOUtil.getFileInputStream(fileName); - this.doParse(input, baseUri, notation, keepBNode, callback); + this.doParse(input, baseUri, notation, keepBNode, callback, false); input.close(); } catch (Exception e) { log.error("Unexpected exception parsing file: {}", fileName, e); @@ -55,7 +55,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try { final TarArchiveInputStream debInputStream = (TarArchiveInputStream) new ArchiveStreamFactory() @@ -74,7 +74,7 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo log.info("Parse from tar: {} as {}", entry.getName(), guessnot); RDFParserCallback parser = RDFParserFactory.getParserCallback(guessnot, spec); - parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback); + parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback, false); } catch (IllegalArgumentException | ParserException e1) { log.error("Unexpected exception.", e1); } diff --git a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java index 515e97915..d6d3147ab 100644 --- a/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java +++ b/qendpoint-core/src/main/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserZip.java @@ -42,7 +42,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole throws ParserException { try { InputStream input = IOUtil.getFileInputStream(fileName); - this.doParse(input, baseUri, notation, keepBNode, callback); + this.doParse(input, baseUri, notation, keepBNode, callback, false); input.close(); } catch (Exception e) { e.printStackTrace(); @@ -52,7 +52,7 @@ public void doParse(String fileName, String baseUri, RDFNotation notation, boole @Override public void doParse(InputStream input, String baseUri, RDFNotation notation, boolean keepBNode, - RDFCallback callback) throws ParserException { + RDFCallback callback, boolean parallel) throws ParserException { try { ZipInputStream zin = new ZipInputStream(input); @@ -67,7 +67,7 @@ public void doParse(InputStream input, String baseUri, RDFNotation notation, boo System.out.println("Parse from zip: " + zipEntry.getName() + " as " + guessnot); RDFParserCallback parser = RDFParserFactory.getParserCallback(guessnot, spec); - parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback); + parser.doParse(nonCloseIn, baseUri, guessnot, keepBNode, callback, false); } catch (IllegalArgumentException | ParserException e1) { e1.printStackTrace(); } diff --git a/qendpoint-core/src/main/java/org/apache/jena/iri/impl/LexerFixer.java b/qendpoint-core/src/main/java/org/apache/jena/iri/impl/LexerFixer.java new file mode 100644 index 000000000..779313acb --- /dev/null +++ b/qendpoint-core/src/main/java/org/apache/jena/iri/impl/LexerFixer.java @@ -0,0 +1,22 @@ +package org.apache.jena.iri.impl; + +import java.io.Reader; + +public class LexerFixer { + + private static final int CORES = Runtime.getRuntime().availableProcessors(); + + public static void fixLexers() { + Parser.lexers = new Lexer[CORES * 4][]; + for (int i = 0; i < Parser.lexers.length; i++) { + Parser.lexers[i] = new Lexer[] { new LexerScheme((Reader) null), new LexerUserinfo((Reader) null), + new LexerHost((Reader) null), new LexerPort((Reader) null), new LexerPath((Reader) null), + new LexerQuery((Reader) null), new LexerFragment((Reader) null), new LexerXHost((Reader) null), }; + } + } + + public static void printLexerSize() { + int length = Parser.lexers.length; + System.out.println("Lexer size: " + length); + } +} diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java index 374b6f41e..d84cdc0c8 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/AbstractNTriplesParserTest.java @@ -4,6 +4,7 @@ import com.the_qa_company.qendpoint.core.exceptions.ParserException; import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; import com.the_qa_company.qendpoint.core.triples.TripleString; +import com.the_qa_company.qendpoint.core.util.LargeFakeDataSetStreamSupplier; import org.apache.jena.atlas.io.StringWriterI; import org.apache.jena.atlas.lib.CharSpace; import org.apache.jena.graph.Node; @@ -11,17 +12,28 @@ import org.apache.jena.graph.Triple; import org.apache.jena.riot.system.StreamRDF; import org.apache.jena.riot.system.StreamRDFLib; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; public abstract class AbstractNTriplesParserTest { + @Rule + public TemporaryFolder tempDir = TemporaryFolder.builder().assureDeletion().build(); private interface Producer { void writeTo(StreamRDF out); @@ -73,6 +85,42 @@ public void writeTo(StreamRDF out) { assertEquals(JenaNodeFormatter.format(o3), triples.get(2).getObject()); } + protected Path createDataset(boolean fail) throws IOException { + Path path = tempDir.newFile("temp.nt").toPath(); + + final int numTriples = 1000; + try (Writer w = Files.newBufferedWriter(path)) { + LargeFakeDataSetStreamSupplier.createSupplierWithMaxTriples(numTriples, 23).withMaxElementSplit(10) + .withMaxLiteralSize(20).createNTFile(w); + if (fail) { + w.append("\n <<< ; ."); + } + w.flush(); + } + return path; + } + + @Test + public void singleParallelTest() throws IOException, ParserException { + RDFParserCallback parser = createParser(); + Path testFile = createDataset(false); + Set paraValues = new HashSet<>(); + Set singleValues = new HashSet<>(); + try (InputStream is = new BufferedInputStream(Files.newInputStream(testFile))) { + parser.doParse(is, LargeFakeDataSetStreamSupplier.BASE_URI, RDFNotation.NTRIPLES, true, + ((triple, pos) -> singleValues.add(triple.tripleToString())), false); + } + try (InputStream is = new BufferedInputStream(Files.newInputStream(testFile))) { + parser.doParse(is, LargeFakeDataSetStreamSupplier.BASE_URI, RDFNotation.NTRIPLES, true, ((triple, pos) -> { + synchronized (this) { + paraValues.add(triple.tripleToString()); + } + }), true); + } + + assertEquals("values aren't the same between normal and parallel parsing", singleValues, paraValues); + } + private String format(CharSpace charSpace, Producer producer) { StringWriterI buf = new StringWriterI(); StreamRDF out = StreamRDFLib.writer(buf, charSpace); @@ -87,7 +135,7 @@ private List parse(String ntriples, int expectedCount) throws Pars final List triples = new ArrayList<>(); createParser().doParse(in, "http://example.com#", RDFNotation.NTRIPLES, false, - (triple, pos) -> triples.add(new TripleString(triple))); + (triple, pos) -> triples.add(new TripleString(triple)), false); assertEquals(expectedCount, triples.size()); return triples; diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOTTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOTTest.java index cea7df4e4..bd500fe26 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOTTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserRIOTTest.java @@ -1,6 +1,15 @@ package com.the_qa_company.qendpoint.core.rdf.parsers; +import com.the_qa_company.qendpoint.core.enums.RDFNotation; +import com.the_qa_company.qendpoint.core.exceptions.ParserException; import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback; +import com.the_qa_company.qendpoint.core.util.LargeFakeDataSetStreamSupplier; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; public class RDFParserRIOTTest extends AbstractNTriplesParserTest { @@ -8,4 +17,24 @@ public class RDFParserRIOTTest extends AbstractNTriplesParserTest { protected RDFParserCallback createParser() { return new RDFParserRIOT(); } + + @Test(expected = ParserException.class) + public void parallelFailTest() throws IOException, ParserException { + RDFParserCallback parser = createParser(); + try (InputStream is = new BufferedInputStream(Files.newInputStream(createDataset(true)))) { + parser.doParse(is, LargeFakeDataSetStreamSupplier.BASE_URI, RDFNotation.NTRIPLES, true, ((triple, pos) -> { + // nothing + }), true); + } + } + + @Test(expected = ParserException.class) + public void singleFailTest() throws IOException, ParserException { + RDFParserCallback parser = createParser(); + try (InputStream is = new BufferedInputStream(Files.newInputStream(createDataset(true)))) { + parser.doParse(is, LargeFakeDataSetStreamSupplier.BASE_URI, RDFNotation.NTRIPLES, true, ((triple, pos) -> { + // nothing + }), false); + } + } } diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java index 4fe7cb41c..92a688ce3 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/rdf/parsers/RDFParserSimpleTest.java @@ -77,7 +77,7 @@ public void ingestTest() throws IOException, InterruptedException, ParserExcepti System.out.println(count[0] + " triples " + watch.stopAndShow()); } count[0]++; - }); + }, false); t.join(); if (re[0] != null) { diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java index c69a99efe..2637e7b1b 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/LargeFakeDataSetStreamSupplierTest.java @@ -25,8 +25,10 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -51,10 +53,16 @@ public void streamTest() throws IOException { try (PipedCopyIterator it = RDFParserFactory.readAsIterator( RDFParserFactory.getParserCallback(RDFNotation.NTRIPLES), is, HDTTestUtils.BASE_URI, true, RDFNotation.NTRIPLES)) { - it.forEachRemaining(s -> { - assertTrue(it2.hasNext()); - assertEquals(it2.next(), s); - }); + + Set set1 = new HashSet<>(); + Set set2 = new HashSet<>(); + + it.forEachRemaining(set1::add); + it2.forEachRemaining(set2::add); + + assertEquals(set1.size(), set2.size()); + assertEquals(set1, set2); + assertFalse(it.hasNext()); } } diff --git a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java index ba0fdad17..7f69fe921 100644 --- a/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java +++ b/qendpoint-core/src/test/java/com/the_qa_company/qendpoint/core/util/UnicodeEscapeTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.assertEquals; public class UnicodeEscapeTest { + @Test public void encodeTest() throws ParserException { String file = Objects.requireNonNull(UnicodeEscapeTest.class.getClassLoader().getResource("unicodeTest.nt"), @@ -26,20 +27,20 @@ public void encodeTest() throws ParserException { RDFParserCallback factory2 = RDFParserFactory.getParserCallback(RDFNotation.NTRIPLES, HDTOptions.of(Map.of(HDTOptionsKeys.NT_SIMPLE_PARSER_KEY, "false"))); - Set ts1 = new TreeSet<>(Comparator.comparing(t -> { + Set ts1 = Collections.synchronizedSet(new TreeSet<>(Comparator.comparing(t -> { try { return t.asNtriple().toString(); } catch (IOException e) { throw new RuntimeException(e); } - })); - Set ts2 = new TreeSet<>(Comparator.comparing(t -> { + }))); + Set ts2 = Collections.synchronizedSet(new TreeSet<>(Comparator.comparing(t -> { try { return t.asNtriple().toString(); } catch (IOException e) { throw new RuntimeException(e); } - })); + }))); factory.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, (t, i) -> ts1.add(t.tripleToString())); factory2.doParse(file, HDTTestUtils.BASE_URI, RDFNotation.NTRIPLES, true, (t, i) -> ts2.add(t.tripleToString()));