diff --git a/.gitignore b/.gitignore index 7b9f03d27..4f77236f6 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ pom.xml.* # Scala Plugin for VSCode .metals +.bloop/ +.metals/ diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala index dad054a2f..709115797 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala @@ -539,6 +539,36 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging this.dataQuanta().writeKafkaTopic(topicName, formatterUdf, udfLoadProfileEstimator) } + /** + * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ParquetSink]]. This triggers + * execution of the constructed [[WayangPlan]]. + * + * @param url URL of the Parquet file to be written + * @param overwrite whether to overwrite existing files + * @param preferDataset whether to prefer Spark Dataset over RDD + */ + def writeParquet(url: String, + overwrite: Boolean, + preferDataset: Boolean): Unit = + this.writeParquet(url, overwrite, preferDataset, null) + + /** + * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ParquetSink]]. This triggers + * execution of the constructed [[WayangPlan]]. + * + * @param url URL of the Parquet file to be written + * @param overwrite whether to overwrite existing files + * @param preferDataset whether to prefer Spark Dataset over RDD + * @param jobName optional name for the [[WayangPlan]] + */ + def writeParquet(url: String, + overwrite: Boolean, + preferDataset: Boolean, + jobName: String): Unit = { + if (jobName != null) this.javaPlanBuilder.withJobName(jobName) + this.dataQuanta().asInstanceOf[DataQuanta[Record]].writeParquet(url, overwrite, preferDataset) + } + /** * Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of * type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java index a4418c18a..decc102f9 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java @@ -64,7 +64,8 @@ public class Mappings { new GoogleCloudStorageSourceMapping(), new AzureBlobStorageSourceMapping(), new ApacheIcebergSourceMapping(), - new ApacheIcebergSinkMapping() + new ApacheIcebergSinkMapping(), + new ParquetSinkMapping() ); public static Collection GRAPH_MAPPINGS = Arrays.asList( diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ParquetSinkMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ParquetSinkMapping.java new file mode 100644 index 000000000..dbd76b5b9 --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ParquetSinkMapping.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.mapping; + +import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.java.operators.JavaParquetSink; +import org.apache.wayang.java.platform.JavaPlatform; + +import java.util.Collection; +import java.util.Collections; + +public class ParquetSinkMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + JavaPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", new ParquetSink("", true, true), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new JavaParquetSink(matchedOperator).at(epoch) + ); + } +} \ No newline at end of file diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaParquetSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaParquetSink.java new file mode 100644 index 000000000..de971d096 --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaParquetSink.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.basic.types.RecordType; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; +import org.apache.wayang.java.platform.JavaPlatform; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Writes {@link Record}s to a Parquet file using the Java platform. + */ +public class JavaParquetSink extends ParquetSink implements JavaExecutionOperator { + + private static final int SCHEMA_SAMPLE_SIZE = 50; + + public JavaParquetSink(ParquetSink that) { + super(that.getOutputUrl(), that.isOverwrite(), that.prefersDataset(), that.getType()); + } + + @Override + public Tuple, Collection> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + + assert inputs.length == 1; + assert outputs.length == 0; + + // Get the input stream and collect all records into a list + final List records = this.getRecords(inputs[0]); + + if (records.isEmpty()) { + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + try { + // Handle overwrite — delete existing file if needed + Path outputPath = new Path(this.getOutputUrl()); + Configuration conf = new Configuration(); + if (this.isOverwrite()) { + FileSystem fs = outputPath.getFileSystem(conf); + fs.delete(outputPath, true); + } + + // Infer schema from RecordType + sampled records + Schema schema = this.inferSchema(records); + + // Write records as Parquet + try (ParquetWriter writer = AvroParquetWriter.builder(outputPath) + .withSchema(schema) + .withConf(conf) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .build()) { + + for (Record record : records) { + writer.write(this.convertToGenericRecord(record, schema)); + } + } + + } catch (IOException e) { + throw new RuntimeException("Failed to write Parquet file: " + this.getOutputUrl(), e); + } + + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + } + + /** + * Extracts records from the input channel, handling both Stream and Collection channels. + */ + private List getRecords(ChannelInstance input) { + if (input instanceof CollectionChannel.Instance) { + return ((CollectionChannel.Instance) input).provideCollection() + .stream().collect(Collectors.toList()); + } + return ((StreamChannel.Instance) input).provideStream() + .collect(Collectors.toList()); + } + + /** + * Infers an Avro schema from the RecordType (if available) and sampled record values. + */ + private Schema inferSchema(List records) { + String[] fieldNames = this.resolveFieldNames(records); + List samples = records.subList(0, Math.min(SCHEMA_SAMPLE_SIZE, records.size())); + + SchemaBuilder.FieldAssembler fields = SchemaBuilder + .record("WayangRecord") + .namespace("org.apache.wayang") + .fields(); + + for (int i = 0; i < fieldNames.length; i++) { + Schema.Type avroType = this.inferColumnType(samples, i); + // Make fields nullable — union of [null, type] + Schema fieldSchema = Schema.createUnion( + Schema.create(Schema.Type.NULL), + Schema.create(avroType) + ); + fields.name(fieldNames[i]).type(fieldSchema).noDefault(); + } + + return fields.endRecord(); + } + + /** + * Resolves field names from RecordType if available, otherwise generates field0, field1, etc. + */ + private String[] resolveFieldNames(List records) { + DataSetType dataSetType = this.getType(); + if (dataSetType != null && dataSetType.getDataUnitType() instanceof RecordType) { + RecordType recordType = (RecordType) dataSetType.getDataUnitType(); + if (recordType.getFieldNames() != null && recordType.getFieldNames().length > 0) { + return recordType.getFieldNames(); + } + } + + // Fallback: generate generic field names + int numFields = records.get(0).size(); + String[] names = new String[numFields]; + for (int i = 0; i < numFields; i++) { + names[i] = "field" + i; + } + return names; + } + + /** + * Infers the Avro type for a column by sampling record values. + */ + private Schema.Type inferColumnType(List samples, int columnIndex) { + for (Record sample : samples) { + if (sample == null || columnIndex >= sample.size()) { + continue; + } + Object value = sample.getField(columnIndex); + if (value == null) { + continue; + } + return this.toAvroType(value); + } + // Default to string if all values are null + return Schema.Type.STRING; + } + + /** + * Maps a Java value to an Avro schema type. + */ + private Schema.Type toAvroType(Object value) { + if (value instanceof String || value instanceof Character) { + return Schema.Type.STRING; + } else if (value instanceof Integer) { + return Schema.Type.INT; + } else if (value instanceof Long || value instanceof Timestamp) { + return Schema.Type.LONG; + } else if (value instanceof Float) { + return Schema.Type.FLOAT; + } else if (value instanceof Double || value instanceof BigDecimal) { + return Schema.Type.DOUBLE; + } else if (value instanceof Boolean) { + return Schema.Type.BOOLEAN; + } else if (value instanceof byte[]) { + return Schema.Type.BYTES; + } + return Schema.Type.STRING; + } + + /** + * Converts a Wayang Record to an Avro GenericRecord using the given schema. + */ + private GenericRecord convertToGenericRecord(Record record, Schema schema) { + GenericRecord genericRecord = new GenericData.Record(schema); + List fields = schema.getFields(); + for (int i = 0; i < fields.size(); i++) { + Object value = i < record.size() ? record.getField(i) : null; + // Convert value to match the Avro type if needed + if (value != null) { + value = this.convertValue(value, fields.get(i).schema()); + } + genericRecord.put(fields.get(i).name(), value); + } + return genericRecord; + } + + /** + * Converts a value to match the expected Avro schema type. + */ + private Object convertValue(Object value, Schema fieldSchema) { + // Handle nullable union types — extract the actual type + Schema actualSchema = fieldSchema; + if (fieldSchema.getType() == Schema.Type.UNION) { + for (Schema s : fieldSchema.getTypes()) { + if (s.getType() != Schema.Type.NULL) { + actualSchema = s; + break; + } + } + } + + switch (actualSchema.getType()) { + case STRING: + return value.toString(); + case INT: + return value instanceof Number ? ((Number) value).intValue() : Integer.parseInt(value.toString()); + case LONG: + if (value instanceof Timestamp) return ((Timestamp) value).getTime(); + return value instanceof Number ? ((Number) value).longValue() : Long.parseLong(value.toString()); + case FLOAT: + return value instanceof Number ? ((Number) value).floatValue() : Float.parseFloat(value.toString()); + case DOUBLE: + return value instanceof Number ? ((Number) value).doubleValue() : Double.parseDouble(value.toString()); + case BOOLEAN: + return value instanceof Boolean ? value : Boolean.parseBoolean(value.toString()); + default: + return value; + } + } + + @Override + public List getSupportedInputChannels(int index) { + return Arrays.asList(CollectionChannel.DESCRIPTOR, StreamChannel.DESCRIPTOR); + } + + @Override + public List getSupportedOutputChannels(int index) { + throw new UnsupportedOperationException("This operator has no outputs."); + } + + @Override + public JavaPlatform getPlatform() { + return JavaPlatform.getInstance(); + } +} \ No newline at end of file diff --git a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaParquetSinkTest.java b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaParquetSinkTest.java new file mode 100644 index 000000000..1103de0d8 --- /dev/null +++ b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaParquetSinkTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.operators.ParquetSink; +import org.apache.wayang.basic.types.RecordType; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.types.DataSetType; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test suite for {@link JavaParquetSink}. + */ +class JavaParquetSinkTest extends JavaExecutionOperatorTestBase { + + @Test + void testWriteStringRecords() throws IOException { + List records = Arrays.asList( + new Record("a", "hello"), + new Record("b", "world"), + new Record("c", "test") + ); + + java.nio.file.Path tempDir = Files.createTempDirectory("wayang-java-parquet-sink"); + java.nio.file.Path outputFile = tempDir.resolve("test-strings.parquet"); + + try { + JavaParquetSink sink = new JavaParquetSink( + new ParquetSink(outputFile.toUri().toString(), true, false) + ); + + ChannelInstance[] inputs = new ChannelInstance[]{ + createCollectionChannelInstance(records) + }; + evaluate(sink, inputs, new ChannelInstance[0]); + + List readBack = readParquetFile(outputFile.toString()); + assertEquals(3, readBack.size()); + assertEquals("a", readBack.get(0).get("field0").toString()); + assertEquals("hello", readBack.get(0).get("field1").toString()); + assertEquals("b", readBack.get(1).get("field0").toString()); + assertEquals("world", readBack.get(1).get("field1").toString()); + assertEquals("c", readBack.get(2).get("field0").toString()); + assertEquals("test", readBack.get(2).get("field1").toString()); + } finally { + deleteRecursively(tempDir); + } + } + + @Test + void testWriteMixedTypeRecords() throws IOException { + List records = Arrays.asList( + new Record(1, "alpha", 10.5), + new Record(2, "beta", 20.0), + new Record(3, "gamma", 30.5) + ); + + java.nio.file.Path tempDir = Files.createTempDirectory("wayang-java-parquet-sink"); + java.nio.file.Path outputFile = tempDir.resolve("test-mixed.parquet"); + + try { + JavaParquetSink sink = new JavaParquetSink( + new ParquetSink(outputFile.toUri().toString(), true, false) + ); + + ChannelInstance[] inputs = new ChannelInstance[]{ + createCollectionChannelInstance(records) + }; + evaluate(sink, inputs, new ChannelInstance[0]); + + List readBack = readParquetFile(outputFile.toString()); + assertEquals(3, readBack.size()); + assertEquals(1, readBack.get(0).get("field0")); + assertEquals("alpha", readBack.get(0).get("field1").toString()); + assertEquals(10.5, readBack.get(0).get("field2")); + } finally { + deleteRecursively(tempDir); + } + } + + @Test + void testWriteWithRecordType() throws IOException { + List records = Arrays.asList( + new Record("x1", 100), + new Record("x2", 200), + new Record("x3", 300) + ); + + java.nio.file.Path tempDir = Files.createTempDirectory("wayang-java-parquet-sink"); + java.nio.file.Path outputFile = tempDir.resolve("test-recordtype.parquet"); + + try { + DataSetType typedDataSet = DataSetType.createDefault( + new RecordType("name", "value") + ); + ParquetSink logicalSink = new ParquetSink( + outputFile.toUri().toString(), true, false, typedDataSet + ); + JavaParquetSink sink = new JavaParquetSink(logicalSink); + + ChannelInstance[] inputs = new ChannelInstance[]{ + createCollectionChannelInstance(records) + }; + evaluate(sink, inputs, new ChannelInstance[0]); + + List readBack = readParquetFile(outputFile.toString()); + assertEquals(3, readBack.size()); + assertEquals("x1", readBack.get(0).get("name").toString()); + assertEquals(100, readBack.get(0).get("value")); + assertEquals("x2", readBack.get(1).get("name").toString()); + assertEquals(200, readBack.get(1).get("value")); + } finally { + deleteRecursively(tempDir); + } + } + + @Test + void testOverwriteExistingFile() throws IOException { + java.nio.file.Path tempDir = Files.createTempDirectory("wayang-java-parquet-sink"); + java.nio.file.Path outputFile = tempDir.resolve("test-overwrite.parquet"); + + try { + // First write + List firstRecords = Arrays.asList( + new Record("old", "data") + ); + JavaParquetSink sink1 = new JavaParquetSink( + new ParquetSink(outputFile.toUri().toString(), true, false) + ); + evaluate(sink1, + new ChannelInstance[]{createCollectionChannelInstance(firstRecords)}, + new ChannelInstance[0]); + + // Second write with overwrite + List secondRecords = Arrays.asList( + new Record("new", "data"), + new Record("more", "records") + ); + JavaParquetSink sink2 = new JavaParquetSink( + new ParquetSink(outputFile.toUri().toString(), true, false) + ); + evaluate(sink2, + new ChannelInstance[]{createCollectionChannelInstance(secondRecords)}, + new ChannelInstance[0]); + + // We check if only second write's data exists + List readBack = readParquetFile(outputFile.toString()); + assertEquals(2, readBack.size()); + assertEquals("new", readBack.get(0).get("field0").toString()); + assertEquals("more", readBack.get(1).get("field0").toString()); + } finally { + deleteRecursively(tempDir); + } + } + + private List readParquetFile(String path) throws IOException { + List records = new ArrayList<>(); + Configuration conf = new Configuration(); + Path hadoopPath = new Path(path); + try (ParquetReader reader = AvroParquetReader.builder( + HadoopInputFile.fromPath(hadoopPath, conf)).build()) { + GenericRecord record; + while ((record = reader.read()) != null) { + records.add(record); + } + } + return records; + } + + private void deleteRecursively(java.nio.file.Path dir) throws IOException { + if (Files.exists(dir)) { + Files.walk(dir) + .sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + } + }); + } + } +} \ No newline at end of file