Skip to content

GH-600 parallel parsing of NQUADS and N-Triples #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult;
import com.the_qa_company.qendpoint.core.header.HeaderPrivate;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered;
import com.the_qa_company.qendpoint.core.listener.MultiThreadListener;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
Expand Down Expand Up @@ -187,7 +188,7 @@ public CompressTripleMapper compressDictionary(Iterator<TripleString> iterator)
"Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with "
+ ways + "ways and " + workers + " worker(s)");

AsyncIteratorFetcher<TripleString> source = new AsyncIteratorFetcher<>(iterator);
AsyncIteratorFetcherUnordered<TripleString> source = new AsyncIteratorFetcherUnordered<>(iterator);

profiler.pushSection("section compression");
CompressionResult compressionResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public TripleAppender(TempDictionary dict, TempTriples triples, ProgressListener
}

@Override
public void processTriple(TripleString triple, long pos) {
synchronized public void processTriple(TripleString triple, long pos) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to propose a sync and unsync version (or a wrapper) to avoid a sync during a single thread usage

long s = dict.insert(triple.getSubject(), TripleComponentRole.SUBJECT);
long p = dict.insert(triple.getPredicate(), TripleComponentRole.PREDICATE);
long o = dict.insert(triple.getObject(), TripleComponentRole.OBJECT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void load(InputStream input, ControlInfo ci, ProgressListener listener) t
try {
RDFParserSimple parser = new RDFParserSimple();
parser.doParse(new ByteArrayInputStream(headerData), "http://www.rdfhdt.org", RDFNotation.NTRIPLES, true,
this);
this, false);
} catch (ParserException e) {
log.error("Unexpected exception.", e);
throw new IllegalFormatException("Error parsing header");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.the_qa_company.qendpoint.core.iterator.utils;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;

/**
* Synchronise an iterator
*
* @param <E> iterator type
* @author Håvard M. Ottestad
* @author Antoine Willerval
*/
public class AsyncIteratorFetcherUnordered<E> extends AsyncIteratorFetcher<E> {

private static final int CORES = Runtime.getRuntime().availableProcessors();

public static final int BUFFER = 1024 * 4;
private final Iterator<E> iterator;
private boolean end;
volatile Queue<E>[] queue = new Queue[CORES * 2];

{
for (int i = 0; i < queue.length; i++) {
queue[i] = new ArrayDeque<>(BUFFER);
}
}

public AsyncIteratorFetcherUnordered(Iterator<E> iterator) {
super(iterator);
this.iterator = iterator;
}

/**
* @return an element from the iterator, this method is thread safe
*/
@Override
public E get() {

int index = (int) (Thread.currentThread().getId() % queue.length);

Queue<E> es = queue[index];
if (es == null) {
for (Queue<E> eQueue : queue) {
if (eQueue != null) {
synchronized (eQueue) {
E poll = eQueue.poll();

if (poll != null) {
return poll;
}
}
}
}
}

if (es != null) {
// With this approach there is some risk that a queue is filled but
// never emptied. Maybe we should look for another queue to read
// from
// before filling our own queue?
synchronized (es) {
E poll = es.poll();

if (poll != null) {
return poll;
}

synchronized (this) {
es = queue[index];
if (es != null) {

poll = es.poll();
if (poll == null) {
if (iterator.hasNext()) {
poll = iterator.next();
for (int i = 0; i < BUFFER && iterator.hasNext(); i++) {
es.add(iterator.next());
}
}

}

if (poll == null) {
queue[index] = null;
} else {
return poll;
}
}
}
}
}

for (Queue<E> eQueue : queue) {
if (eQueue != null) {

synchronized (eQueue) {
synchronized (this) {
E poll = eQueue.poll();

if (poll != null) {
return poll;
}
}
}
}
}

synchronized (this) {
if (iterator.hasNext()) {
E poll = iterator.next();
return poll;
}
}

end = true;
return null;

}

/**
* @return is the end
*/
public boolean isEnd() {
return end;
}
}
Loading
Loading