Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3805: parse multiple file in one time #9

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
445 changes: 348 additions & 97 deletions lang/java/avro/src/main/java/org/apache/avro/Schema.java

Large diffs are not rendered by default.

112 changes: 112 additions & 0 deletions lang/java/avro/src/test/java/org/apache/avro/TestSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestSchema {
Expand Down Expand Up @@ -413,6 +421,84 @@ void qualifiedName() {
assertEquals("Int", nameInt.getQualified("space"));
}

@Test
void enumLateDefine() {
String schemaString = "{\n" + " \"type\":\"record\",\n" + " \"name\": \"Main\",\n" + " \"fields\":[\n"
+ " {\n" + " \"name\":\"f1\",\n" + " \"type\":\"Sub\"\n" + " },\n"
+ " {\n" + " \"name\":\"f2\",\n" + " \"type\":{\n"
+ " \"type\":\"enum\",\n" + " \"name\":\"Sub\",\n"
+ " \"symbols\":[\"OPEN\",\"CLOSE\"]\n" + " }\n" + " }\n" + " ]\n" + "}";

final Schema schema = new Schema.Parser().parse(schemaString);
Schema f1Schema = schema.getField("f1").schema();
Schema f2Schema = schema.getField("f2").schema();
assertSame(f1Schema, f2Schema);
assertEquals(Type.ENUM, f1Schema.getType());
String stringSchema = schema.toString();
int definitionIndex = stringSchema.indexOf("\"symbols\":[\"OPEN\",\"CLOSE\"]");
int usageIndex = stringSchema.indexOf("\"type\":\"Sub\"");
assertTrue(definitionIndex < usageIndex, "usage is before definition");
}

@Test
public void testRecordInArray() {
String schemaString = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestRecord\",\n" + " \"fields\": [\n"
+ " {\n" + " \"name\": \"value\",\n" + " \"type\": {\n" + " \"type\": \"record\",\n"
+ " \"name\": \"Container\",\n" + " \"fields\": [\n" + " {\n"
+ " \"name\": \"Optional\",\n" + " \"type\": {\n" + " \"type\": \"array\",\n"
+ " \"items\": [\n" + " {\n" + " \"type\": \"record\",\n"
+ " \"name\": \"optional_field_0\",\n" + " \"namespace\": \"\",\n"
+ " \"doc\": \"\",\n" + " \"fields\": [\n" + " {\n"
+ " \"name\": \"optional_field_1\",\n" + " \"type\": \"long\",\n"
+ " \"doc\": \"\",\n" + " \"default\": 0\n"
+ " }\n" + " ]\n" + " }\n" + " ]\n"
+ " }\n" + " }\n" + " ]\n" + " }\n" + " }\n" + " ]\n" + "}";
final Schema schema = new Schema.Parser().parse(schemaString);
assertNotNull(schema);
}

/*
* @Test public void testRec() { String schemaString =
* "[{\"name\":\"employees\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Pair1081149ea1d6eb80\",\"fields\":[{\"name\":\"key\",\"type\":\"int\"},{\"name\":\"value\",\"type\":{\"type\":\"record\",\"name\":\"EmployeeInfo2\",\"fields\":[{\"name\":\"companyMap\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"PairIntegerString\",\"fields\":[{\"name\":\"key\",\"type\":\"int\"},{\"name\":\"value\",\"type\":\"string\"}]},\"java-class\":\"java.util.HashMap\"}],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}]}}]},\"java-class\":\"java.util.HashMap\"}],\"default\":null}]";
* final Schema schema = new Schema.Parser().parse(schemaString);
* Assert.assertNotNull(schema);
*
* }
*/

@Test
public void testUnionFieldType() {
String schemaString = "{\"type\": \"record\", \"name\": \"Lisp\", \"fields\": [{\"name\":\"value\", \"type\":[\"null\", \"string\",{\"type\": \"record\", \"name\": \"Cons\", \"fields\": [{\"name\":\"car\", \"type\":\"Lisp\"},{\"name\":\"cdr\", \"type\":\"Lisp\"}]}]}]}";
final Schema schema = new Schema.Parser().parse(schemaString);
Field value = schema.getField("value");
Schema fieldSchema = value.schema();
Schema subSchema = fieldSchema.getTypes().stream().filter((Schema s) -> s.getType() == Type.RECORD).findFirst()
.get();
assertTrue(subSchema.hasFields());
}

@Test
public void parseAliases() throws JsonProcessingException {
String s1 = "{ \"aliases\" : [\"a1\", \"b1\"]}";
ObjectMapper mapper = new ObjectMapper();
JsonNode j1 = mapper.readTree(s1);
Set<String> aliases = Schema.parseAliases(j1);
assertEquals(2, aliases.size());
assertTrue(aliases.contains("a1"));
assertTrue(aliases.contains("b1"));

String s2 = "{ \"aliases\" : {\"a1\": \"b1\"}}";
JsonNode j2 = mapper.readTree(s2);

SchemaParseException ex = assertThrows(SchemaParseException.class, () -> Schema.parseAliases(j2));
assertTrue(ex.getMessage().contains("aliases not an array"));

String s3 = "{ \"aliases\" : [11, \"b1\"]}";
JsonNode j3 = mapper.readTree(s3);
SchemaParseException ex3 = assertThrows(SchemaParseException.class, () -> Schema.parseAliases(j3));
assertTrue(ex3.getMessage().contains("alias not a string"));
}

@Test
void testContentAfterAvsc() throws Exception {
Schema.Parser parser = new Schema.Parser();
Expand Down Expand Up @@ -446,6 +532,32 @@ void testContentAfterAvscInFile() throws Exception {
assertThrows(SchemaParseException.class, () -> parser.parse(avscFile));
}

@Test
void testParseMultipleFile() throws IOException {
URL directory = Thread.currentThread().getContextClassLoader().getResource("multipleFile");
File f1 = new File(directory.getPath(), "ApplicationEvent.avsc");
File f2 = new File(directory.getPath(), "DocumentInfo.avsc");
File f3 = new File(directory.getPath(), "MyResponse.avsc");
Assertions.assertTrue(f1.exists(), "File not exist for test " + f1.getPath());
Assertions.assertTrue(f2.exists(), "File not exist for test " + f2.getPath());
Assertions.assertTrue(f3.exists(), "File not exist for test " + f3.getPath());

final List<Schema> schemas = new Schema.Parser().parse(Arrays.asList(f1, f2, f3));
Assertions.assertEquals(3, schemas.size());
Schema schemaAppEvent = schemas.get(0);
Schema schemaDocInfo = schemas.get(1);
Schema schemaResponse = schemas.get(2);

Assertions.assertNotNull(schemaAppEvent);
Assertions.assertEquals(3, schemaAppEvent.getFields().size());
Field documents = schemaAppEvent.getField("documents");
Schema docSchema = documents.schema().getTypes().get(1).getElementType();
Assertions.assertEquals(docSchema, schemaDocInfo);

Assertions.assertNotNull(schemaDocInfo);
Assertions.assertNotNull(schemaResponse);
}

@Test
void add_types() {
String schemaRecord2 = "{\"type\":\"record\", \"name\":\"record2\", \"fields\": ["
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"namespace": "model",
"type": "record",
"doc": "",
"name": "ApplicationEvent",
"fields": [
{
"name": "applicationId",
"type": "string",
"doc": "Application ID"
},
{
"name": "status",
"type": "string",
"doc": "Application Status"
},
{
"name": "documents",
"type": ["null", {
"type": "array",
"items": "model.DocumentInfo"
}],
"doc": "",
"default": null
}
]

}
19 changes: 19 additions & 0 deletions lang/java/avro/src/test/resources/multipleFile/DocumentInfo.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"namespace": "model",
"type": "record",
"doc": "",
"name": "DocumentInfo",
"fields": [
{
"name": "documentId",
"type": "string",
"doc": "Document ID"
},
{
"name": "filePath",
"type": "string",
"doc": "Document Path"
}
]

}
14 changes: 14 additions & 0 deletions lang/java/avro/src/test/resources/multipleFile/MyResponse.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"namespace": "model",
"type": "record",
"doc": "",
"name": "MyResponse",
"fields": [
{
"name": "isSuccessful",
"type": "boolean",
"doc": "Indicator for successful or unsuccessful call"
}
]

}
8 changes: 8 additions & 0 deletions lang/java/avro/src/test/resources/multipleFile/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## test for parsing multiple files.
This folder aims to test `public List<Schema> Schema.parse(Iterable<File> sources) throws IOException` method.

The objective is to check that a record schema define in a file can be use in another record schema as a field type.
Here, ApplicationEvent.avsc file contains a field of type DocumentInfo, defined in file DocumentInfo.avsc.

The is written at TestSchema.testParseMultipleFile.

Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public SpecificCompiler(Schema schema) {
this.protocol = null;
}

public SpecificCompiler(Iterable<Schema> schemas) {
this();
schemas.forEach(this::enqueue);
this.protocol = null;
}

/**
* Creates a specific compiler with the given type to use for date/time related
* logical types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ void nullPointer() throws Exception {
private static void checkParseError(String json) {
try {
new Schema.Parser().parse(json);
} catch (SchemaParseException e) {
} catch (AvroRuntimeException e) {
return;
}
fail("Should not have parsed: " + json);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,13 @@ private String[] getIncludedFiles(String absPath, String[] excludes, String[] in
}

private void compileFiles(String[] files, File sourceDir, File outDir) throws MojoExecutionException {
for (String filename : files) {
try {
// Need to register custom logical type factories before schema compilation.
loadLogicalTypesFactories();
doCompile(filename, sourceDir, outDir);
} catch (IOException e) {
throw new MojoExecutionException("Error compiling protocol file " + filename + " to " + outDir, e);
}
// Need to register custom logical type factories before schema compilation.
try {
loadLogicalTypesFactories();
} catch (IOException e) {
throw new MojoExecutionException("Error while loading logical types factories ", e);
}
this.doCompile(files, sourceDir, outDir);
}

private void loadLogicalTypesFactories() throws IOException, MojoExecutionException {
Expand Down Expand Up @@ -332,6 +330,16 @@ protected List<Object> instantiateAdditionalVelocityTools() {
return velocityTools;
}

protected void doCompile(String[] files, File sourceDirectory, File outputDirectory) throws MojoExecutionException {
for (String filename : files) {
try {
doCompile(filename, sourceDirectory, outputDirectory);
} catch (IOException e) {
throw new MojoExecutionException("Error compiling protocol file " + filename + " to " + outputDirectory, e);
}
}
}

protected abstract void doCompile(String filename, File sourceDirectory, File outputDirectory) throws IOException;

protected URLClassLoader createClassLoader() throws DependencyResolutionRequiredException, MalformedURLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.avro.mojo;

import org.apache.avro.SchemaParseException;
import org.apache.avro.generic.GenericData.StringType;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.avro.Schema;
import org.apache.avro.compiler.specific.SpecificCompiler;
import org.apache.maven.artifact.DependencyResolutionRequiredException;
import org.apache.maven.plugin.MojoExecutionException;

/**
* Generate Java classes from Avro schema files (.avsc)
Expand Down Expand Up @@ -72,21 +78,27 @@ public class SchemaMojo extends AbstractAvroMojo {
private String errorSpecificClass = "org.apache.avro.specific.SpecificExceptionBase";

@Override
protected void doCompile(String filename, File sourceDirectory, File outputDirectory) throws IOException {
File src = new File(sourceDirectory, filename);
final Schema schema;
protected void doCompile(String[] filesName, File sourceDirectory, File outputDirectory)
throws MojoExecutionException {
final List<File> sourceFiles = Arrays.stream(filesName)
.map((String filename) -> new File(sourceDirectory, filename)).collect(Collectors.toList());
final List<Schema> schemas;

// This is necessary to maintain backward-compatibility. If there are
// no imported files then isolate the schemas from each other, otherwise
// allow them to share a single schema so reuse and sharing of schema
// is possible.
if (imports == null) {
schema = new Schema.Parser().parse(src);
} else {
schema = schemaParser.parse(src);
try {
if (imports == null) {
schemas = new Schema.Parser().parse(sourceFiles);
} else {
schemas = schemaParser.parse(sourceFiles);
}
} catch (IOException | SchemaParseException ex) {
throw new MojoExecutionException("Error compiling one file of " + sourceDirectory + " to " + outputDirectory, ex);
}

final SpecificCompiler compiler = new SpecificCompiler(schema);
final SpecificCompiler compiler = new SpecificCompiler(schemas);
compiler.setTemplateDir(templateDirectory);
compiler.setStringType(StringType.valueOf(stringType));
compiler.setFieldVisibility(getFieldVisibility());
Expand All @@ -100,14 +112,26 @@ protected void doCompile(String filename, File sourceDirectory, File outputDirec
for (String customConversion : customConversions) {
compiler.addCustomConversion(classLoader.loadClass(customConversion));
}
} catch (ClassNotFoundException | DependencyResolutionRequiredException e) {
throw new IOException(e);
} catch (ClassNotFoundException | DependencyResolutionRequiredException | MalformedURLException e) {
throw new MojoExecutionException("Compilation error: Can't add custom conversion", e);
}
compiler.setOutputCharacterEncoding(project.getProperties().getProperty("project.build.sourceEncoding"));
compiler.setAdditionalVelocityTools(instantiateAdditionalVelocityTools());
compiler.setRecordSpecificClass(this.recordSpecificClass);
compiler.setErrorSpecificClass(this.errorSpecificClass);
compiler.compileToDestination(src, outputDirectory);
for (File src : sourceFiles) {
try {
compiler.compileToDestination(src, outputDirectory);
} catch (IOException ex) {
throw new MojoExecutionException("Compilation error with file " + src + " to " + outputDirectory, ex);
}
}
}

@Override
protected void doCompile(final String filename, final File sourceDirectory, final File outputDirectory)
throws IOException {
// Not call.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"namespace": "model",
"type": "record",
"doc": "",
"name": "ApplicationEvent",
"fields": [
{
"name": "applicationId",
"type": "string",
"doc": "Application ID"
},
{
"name": "status",
"type": "string",
"doc": "Application Status"
},
{
"name": "documents",
"type": ["null", {
"type": "array",
"items": "model.DocumentInfo"
}],
"doc": "",
"default": null
}
]

}
Loading