Skip to content

GH-600 parallel parsing of NQUADS and N-Triples #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from

Conversation

hmottestad
Copy link
Contributor

Issue resolved (if any): #600

Description of this pull request:


Please check all the lines before posting the pull request:

  • I've created tests for all my changes
  • My pull request isn't fixing or changing multiple unlinked elements (please create one pull request for each element)
  • I've applied the code formatter (mvn formatter:format on the backend, npm run format on the frontend) before posting my pull request, mvn formatter:validate to validate the formatting on the backend, npm run validate on the frontend
  • All my commits have relevant names
  • I've squashed my commits (if necessary)

…d approach to parsing NQUADS and N-Triples files. Also implement more concurrent intermediary structures instead of refactoring the code to support multiple ElemStringBuffer buffers when parsing.
@hmottestad
Copy link
Contributor Author

hmottestad commented Mar 20, 2025

Timed conversion of latest-lexemes.nt.gz from https://dumps.wikimedia.org/wikidatawiki/entities/ . Tested on an M3 Max with 16 cores. Originally 11 minutes, now 7 minutes.

Before

Screenshot 2025-03-20 at 14 38 55

After

Screenshot 2025-03-20 at 14 55 11

@hmottestad
Copy link
Contributor Author

A few of the tests assumed that the RDF parser would return statements in a fixed and predictable order.

I fixed up a couple of them, but then found out that it's probably best to have a way to enable/disable parallel parsing.

Now all the tests are passing, but I'll need to double check the performance now to see that it's still as good as expected.

Can you start testing it @ate47 ?

@ate47 ate47 self-requested a review May 6, 2025 12:49

Thread e1 = new Thread(() -> {
RDFParser.source(bnodes).base(baseUri).lang(lang).labelToNode(LabelToNode.createUseLabelAsGiven())
.parse(buffer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of having a custom stream for the bnodes? And to disable parallel parsing when it does not keep them

import java.io.PipedOutputStream;
import java.nio.charset.StandardCharsets;

public class ConcurrentInputStream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better error handling would be better, if the parsing fails it seems to create an exception and close the streams. Ok it'll fallback on the other threads, but with a dead stream ioexception and on the user side nothing seems to exist?

@@ -26,20 +27,20 @@ public void encodeTest() throws ParserException {
RDFParserCallback factory2 = RDFParserFactory.getParserCallback(RDFNotation.NTRIPLES,
HDTOptions.of(Map.of(HDTOptionsKeys.NT_SIMPLE_PARSER_KEY, "false")));

Set<TripleString> ts1 = new TreeSet<>(Comparator.comparing(t -> {
Set<TripleString> ts1 = Collections.synchronizedSet(new TreeSet<>(Comparator.comparing(t -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing the parallel parsing is only for streamed files, is this useful?

@@ -53,7 +53,7 @@ public TripleAppender(TempDictionary dict, TempTriples triples, ProgressListener
}

@Override
public void processTriple(TripleString triple, long pos) {
synchronized public void processTriple(TripleString triple, long pos) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to propose a sync and unsync version (or a wrapper) to avoid a sync during a single thread usage

@ate47
Copy link
Collaborator

ate47 commented May 15, 2025

I think you can also get a look at the ExceptionThread class, I've made it to bind threads together while keeping track of the exceptions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants