From 6bd312b8686f45878c54f71fbcecf28bb7b4be06 Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Tue, 12 Nov 2024 17:42:37 -0500
Subject: [PATCH 1/9] Render streamed proto output in parallel

---
 .../output/StreamedProtoOutputFormatter.java  | 65 ++++++++++++++++++-
 1 file changed, 63 insertions(+), 2 deletions(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index 51c1c3eabaa516..3d3110ba80620d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -16,8 +16,12 @@
 import com.google.devtools.build.lib.packages.LabelPrinter;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
+import com.google.devtools.build.lib.query2.proto.proto2api.Build;
+
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.stream.StreamSupport;
 
 /**
  * An output formatter that outputs a protocol buffer representation of a query result and outputs
@@ -34,13 +38,70 @@ public String getName() {
   public OutputFormatterCallback<Target> createPostFactoStreamCallback(
       final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
     return new OutputFormatterCallback<Target>() {
+      private final LabelPrinter ourLabelPrinter = labelPrinter;
+
       @Override
       public void processOutput(Iterable<Target> partialResult)
           throws IOException, InterruptedException {
-        for (Target target : partialResult) {
-          toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out);
+        try {
+          StreamSupport.stream(partialResult.spliterator(), /* parallel= */true)
+              .map(this::toProto)
+              .map(StreamedProtoOutputFormatter::writeDelimited)
+              .forEach(this::writeToOutputStreamThreadSafe);
+        } catch (WrappedIOException e) {
+          throw e.getCause();
+        } catch (WrappedInterruptedException e) {
+          throw e.getCause();
+        }
+      }
+
+      private Build.Target toProto(Target target) {
+        try {
+          return toTargetProtoBuffer(target, ourLabelPrinter);
+        } catch (InterruptedException e) {
+          throw new WrappedInterruptedException(e);
+        }
+      }
+
+      private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bout) {
+        try {
+          bout.writeTo(out);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
         }
       }
     };
   }
+
+  private static ByteArrayOutputStream writeDelimited(Build.Target targetProtoBuffer) {
+    try {
+      var bout = new ByteArrayOutputStream(targetProtoBuffer.getSerializedSize() + 10);
+      targetProtoBuffer.writeDelimitedTo(bout);
+      return bout;
+    } catch (IOException e) {
+      throw new WrappedIOException(e);
+    }
+  }
+
+  private static class WrappedIOException extends RuntimeException {
+    private WrappedIOException(IOException cause) {
+      super(cause);
+    }
+
+    @Override
+    public synchronized IOException getCause() {
+      return (IOException) super.getCause();
+    }
+  }
+
+  private static class WrappedInterruptedException extends RuntimeException {
+    private WrappedInterruptedException(InterruptedException cause) {
+      super(cause);
+    }
+
+    @Override
+    public synchronized InterruptedException getCause() {
+      return (InterruptedException) super.getCause();
+    }
+  }
 }

From 41fbacfcf7dcd7149ce73fde6dadf90c32f9406e Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Tue, 12 Nov 2024 18:19:08 -0500
Subject: [PATCH 2/9] fix wrong exception

---
 .../lib/query2/query/output/StreamedProtoOutputFormatter.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index 3d3110ba80620d..7ae8e66801f67d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -44,7 +44,7 @@ public OutputFormatterCallback<Target> createPostFactoStreamCallback(
       public void processOutput(Iterable<Target> partialResult)
           throws IOException, InterruptedException {
         try {
-          StreamSupport.stream(partialResult.spliterator(), /* parallel= */true)
+          StreamSupport.stream(partialResult.spliterator(), /* parallel= */ true)
               .map(this::toProto)
               .map(StreamedProtoOutputFormatter::writeDelimited)
               .forEach(this::writeToOutputStreamThreadSafe);
@@ -67,7 +67,7 @@ private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bo
         try {
           bout.writeTo(out);
         } catch (IOException e) {
-          throw new RuntimeException(e);
+          throw new WrappedIOException(e);
         }
       }
     };

From 0c9a1d257e6ffaf2d60b70db9776e7b6ba8f4ae9 Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Tue, 12 Nov 2024 18:21:06 -0500
Subject: [PATCH 3/9] remove unnecessary synchronized

---
 .../lib/query2/query/output/StreamedProtoOutputFormatter.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index 7ae8e66801f67d..e80ef4f6bc4f6c 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -89,7 +89,7 @@ private WrappedIOException(IOException cause) {
     }
 
     @Override
-    public synchronized IOException getCause() {
+    public IOException getCause() {
       return (IOException) super.getCause();
     }
   }
@@ -100,7 +100,7 @@ private WrappedInterruptedException(InterruptedException cause) {
     }
 
     @Override
-    public synchronized InterruptedException getCause() {
+    public InterruptedException getCause() {
       return (InterruptedException) super.getCause();
     }
   }

From 913d4a397b38b8f8ea364257927d0d4ea0a7ee78 Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Wed, 13 Nov 2024 10:02:32 -0500
Subject: [PATCH 4/9] address jin feedback

---
 .../output/StreamedProtoOutputFormatter.java   | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index e80ef4f6bc4f6c..fdf0a9dbcb2d57 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -17,7 +17,6 @@
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.proto.proto2api.Build;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -29,6 +28,19 @@
  * on a {@code Build.QueryResult} object the full result can be reconstructed.
  */
 public class StreamedProtoOutputFormatter extends ProtoOutputFormatter {
+
+  /**
+   * The most bytes that protobuf delimited proto format will prepend to a proto message. See <a
+   * href="https://github.com/protocolbuffers/protobuf/blob/c11033dc27c3e9c1913e45b62fb5d4c5b5644b3e/java/core/src/main/java/com/google/protobuf/AbstractMessageLite.java#L72">
+   * <code>writeDelimitedTo</code></a> and <a
+   * href="https://github.com/protocolbuffers/protobuf/blob/c11033dc27c3e9c1913e45b62fb5d4c5b5644b3e/java/core/src/main/java/com/google/protobuf/WireFormat.java#L28">
+   * <code>MAX_VARINT32_SIZE</code></a>.
+   *
+   * <p>The value for int32 (used by {@code writeDelimitedTo} is actually 5, but we pick 10 just to
+   * be safe.
+   */
+  private static final int MAX_BYTES_FOR_VARINT32_ENCODING = 10;
+
   @Override
   public String getName() {
     return "streamed_proto";
@@ -75,7 +87,9 @@ private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bo
 
   private static ByteArrayOutputStream writeDelimited(Build.Target targetProtoBuffer) {
     try {
-      var bout = new ByteArrayOutputStream(targetProtoBuffer.getSerializedSize() + 10);
+      var bout =
+          new ByteArrayOutputStream(
+              targetProtoBuffer.getSerializedSize() + MAX_BYTES_FOR_VARINT32_ENCODING);
       targetProtoBuffer.writeDelimitedTo(bout);
       return bout;
     } catch (IOException e) {

From 9843a5ea1f01e586b849534ef7f3a6b891a3c67e Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Wed, 13 Nov 2024 11:05:41 -0500
Subject: [PATCH 5/9] always order

---
 .../query2/query/output/StreamedProtoOutputFormatter.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index fdf0a9dbcb2d57..d615510e51d47d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -59,7 +59,11 @@ public void processOutput(Iterable<Target> partialResult)
           StreamSupport.stream(partialResult.spliterator(), /* parallel= */ true)
               .map(this::toProto)
               .map(StreamedProtoOutputFormatter::writeDelimited)
-              .forEach(this::writeToOutputStreamThreadSafe);
+              // I imagine forEachOrdered hurts performance somewhat in some cases. While we may
+              // not need to actually produce output in order, this code does not know whether
+              // ordering was requested. So we just always write it in order, and hope performance
+              // is OK.
+              .forEachOrdered(this::writeToOutputStreamThreadSafe);
         } catch (WrappedIOException e) {
           throw e.getCause();
         } catch (WrappedInterruptedException e) {

From 9a0efa028702133ebc8c6bd5d692c3b0be69e52d Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Thu, 14 Nov 2024 12:13:06 -0500
Subject: [PATCH 6/9] address michajlo feedback

---
 .../output/StreamedProtoOutputFormatter.java  | 31 ++++++-------------
 1 file changed, 10 insertions(+), 21 deletions(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index d615510e51d47d..05294d5af8b2c7 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -17,7 +17,8 @@
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.proto.proto2api.Build;
-import java.io.ByteArrayOutputStream;
+import com.google.protobuf.CodedOutputStream;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.stream.StreamSupport;
@@ -29,18 +30,6 @@
  */
 public class StreamedProtoOutputFormatter extends ProtoOutputFormatter {
 
-  /**
-   * The most bytes that protobuf delimited proto format will prepend to a proto message. See <a
-   * href="https://github.com/protocolbuffers/protobuf/blob/c11033dc27c3e9c1913e45b62fb5d4c5b5644b3e/java/core/src/main/java/com/google/protobuf/AbstractMessageLite.java#L72">
-   * <code>writeDelimitedTo</code></a> and <a
-   * href="https://github.com/protocolbuffers/protobuf/blob/c11033dc27c3e9c1913e45b62fb5d4c5b5644b3e/java/core/src/main/java/com/google/protobuf/WireFormat.java#L28">
-   * <code>MAX_VARINT32_SIZE</code></a>.
-   *
-   * <p>The value for int32 (used by {@code writeDelimitedTo} is actually 5, but we pick 10 just to
-   * be safe.
-   */
-  private static final int MAX_BYTES_FOR_VARINT32_ENCODING = 10;
-
   @Override
   public String getName() {
     return "streamed_proto";
@@ -79,9 +68,9 @@ private Build.Target toProto(Target target) {
         }
       }
 
-      private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bout) {
+      private synchronized void writeToOutputStreamThreadSafe(byte[] data) {
         try {
-          bout.writeTo(out);
+          out.write(data);
         } catch (IOException e) {
           throw new WrappedIOException(e);
         }
@@ -89,13 +78,13 @@ private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bo
     };
   }
 
-  private static ByteArrayOutputStream writeDelimited(Build.Target targetProtoBuffer) {
+  private static byte[] writeDelimited(Build.Target targetProtoBuffer) {
     try {
-      var bout =
-          new ByteArrayOutputStream(
-              targetProtoBuffer.getSerializedSize() + MAX_BYTES_FOR_VARINT32_ENCODING);
-      targetProtoBuffer.writeDelimitedTo(bout);
-      return bout;
+      var serializedSize = targetProtoBuffer.getSerializedSize();
+      var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
+      var output = new byte[headerSize + serializedSize];
+      targetProtoBuffer.writeTo(CodedOutputStream.newInstance(output, headerSize, output.length - headerSize));
+      return output;
     } catch (IOException e) {
       throw new WrappedIOException(e);
     }

From 1852be0f332748f7f1413b56bee22acad315deee Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Thu, 14 Nov 2024 12:15:36 -0500
Subject: [PATCH 7/9] flush

---
 .../lib/query2/query/output/StreamedProtoOutputFormatter.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index 05294d5af8b2c7..dc9bd19813c455 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -83,7 +83,9 @@ private static byte[] writeDelimited(Build.Target targetProtoBuffer) {
       var serializedSize = targetProtoBuffer.getSerializedSize();
       var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
       var output = new byte[headerSize + serializedSize];
-      targetProtoBuffer.writeTo(CodedOutputStream.newInstance(output, headerSize, output.length - headerSize));
+      var codedOut = CodedOutputStream.newInstance(output, headerSize, output.length - headerSize);
+      targetProtoBuffer.writeTo(codedOut);
+      codedOut.flush();
       return output;
     } catch (IOException e) {
       throw new WrappedIOException(e);

From 89e8b3beb86258163b4fe9b11bf3275a7fb7c1bb Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Thu, 14 Nov 2024 12:19:31 -0500
Subject: [PATCH 8/9] run google-java-format

---
 .../lib/query2/query/output/StreamedProtoOutputFormatter.java    | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index dc9bd19813c455..1ac4b7dc34003f 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -18,7 +18,6 @@
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.proto.proto2api.Build;
 import com.google.protobuf.CodedOutputStream;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.stream.StreamSupport;

From 5fc8b1342d243fcdb8749826920da9a8623b8839 Mon Sep 17 00:00:00 2001
From: Keith Lea <keithl@stripe.com>
Date: Fri, 15 Nov 2024 07:14:50 -0500
Subject: [PATCH 9/9] [wip] address michaeljo feedback and improve performance
 and memory overhead

---
 .../output/StreamedProtoOutputFormatter.java  | 75 +++++++++++++------
 1 file changed, 54 insertions(+), 21 deletions(-)

diff --git a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
index 1ac4b7dc34003f..c4e5bae6fce183 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/query/output/StreamedProtoOutputFormatter.java
@@ -13,14 +13,18 @@
 // limitations under the License.
 package com.google.devtools.build.lib.query2.query.output;
 
+import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.packages.LabelPrinter;
 import com.google.devtools.build.lib.packages.Target;
 import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
 import com.google.devtools.build.lib.query2.proto.proto2api.Build;
 import com.google.protobuf.CodedOutputStream;
+
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.stream.StreamSupport;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * An output formatter that outputs a protocol buffer representation of a query result and outputs
@@ -38,27 +42,64 @@ public String getName() {
   public OutputFormatterCallback<Target> createPostFactoStreamCallback(
       final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
     return new OutputFormatterCallback<Target>() {
+      private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
+      private static final int TARGETS_PER_CHUNK = 500;
+
       private final LabelPrinter ourLabelPrinter = labelPrinter;
 
       @Override
       public void processOutput(Iterable<Target> partialResult)
           throws IOException, InterruptedException {
+        ForkJoinTask<?> writeAllTargetsFuture;
+        try (ForkJoinPool executor =
+            new ForkJoinPool(
+                Runtime.getRuntime().availableProcessors(),
+                ForkJoinPool.defaultForkJoinWorkerThreadFactory,
+                null,
+                // we use asyncMode to ensure the queue is processed FIFO, which maximizes
+                // throughput
+                true)) {
+          var targetQueue = new LinkedBlockingQueue<Future<List<byte[]>>>(MAX_CHUNKS_IN_QUEUE);
+          var stillAddingTargetsToQueue = new AtomicBoolean(true);
+          writeAllTargetsFuture =
+              executor.submit(
+                  () -> {
+                    try {
+                      while (stillAddingTargetsToQueue.get() || !targetQueue.isEmpty()) {
+                        Future<List<byte[]>> targets = targetQueue.take();
+                        for (byte[] target : targets.get()) {
+                          out.write(target);
+                        }
+                      }
+                    } catch (InterruptedException e) {
+                      throw new WrappedInterruptedException(e);
+                    } catch (IOException e) {
+                      throw new WrappedIOException(e);
+                    } catch (ExecutionException e) {
+                      // TODO: figure out what might be in here and propagate
+                      throw new RuntimeException(e);
+                    }
+                  });
+          try {
+            for (List<Target> targets : Iterables.partition(partialResult, TARGETS_PER_CHUNK)) {
+              targetQueue.put(executor.submit(() -> writeTargetsDelimitedToByteArrays(targets)));
+            }
+          } finally {
+            stillAddingTargetsToQueue.set(false);
+          }
+        }
         try {
-          StreamSupport.stream(partialResult.spliterator(), /* parallel= */ true)
-              .map(this::toProto)
-              .map(StreamedProtoOutputFormatter::writeDelimited)
-              // I imagine forEachOrdered hurts performance somewhat in some cases. While we may
-              // not need to actually produce output in order, this code does not know whether
-              // ordering was requested. So we just always write it in order, and hope performance
-              // is OK.
-              .forEachOrdered(this::writeToOutputStreamThreadSafe);
-        } catch (WrappedIOException e) {
-          throw e.getCause();
-        } catch (WrappedInterruptedException e) {
-          throw e.getCause();
+          writeAllTargetsFuture.get();
+        } catch (ExecutionException e) {
+          // TODO: propagate
+          throw new RuntimeException(e);
         }
       }
 
+      private List<byte[]> writeTargetsDelimitedToByteArrays(List<Target> targets) {
+        return targets.stream().map(target -> writeDelimited(toProto(target))).toList();
+      }
+
       private Build.Target toProto(Target target) {
         try {
           return toTargetProtoBuffer(target, ourLabelPrinter);
@@ -66,14 +107,6 @@ private Build.Target toProto(Target target) {
           throw new WrappedInterruptedException(e);
         }
       }
-
-      private synchronized void writeToOutputStreamThreadSafe(byte[] data) {
-        try {
-          out.write(data);
-        } catch (IOException e) {
-          throw new WrappedIOException(e);
-        }
-      }
     };
   }