Skip to content

Commit 34060eb

Browse files
authored
GH-836: Added support of ExtensionType for ComplexCopier (#837)
## What's Changed Updated ComplexCopier to support ExtensionType - it contains two **copy** methods ``` public static void copy(FieldReader input, FieldWriter output) //for not breaking existing logic public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) ``` Also updated ComplexCopier tests. Closes #836.
1 parent f38e72f commit 34060eb

20 files changed

+476
-9
lines changed

vector/src/main/codegen/includes/vv_imports.ftl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.arrow.vector.complex.*;
3434
import org.apache.arrow.vector.complex.reader.*;
3535
import org.apache.arrow.vector.complex.impl.*;
3636
import org.apache.arrow.vector.complex.writer.*;
37+
import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
3738
import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
3839
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
3940
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;

vector/src/main/codegen/templates/AbstractFieldReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ public void copyAsField(String name, ${name}Writer writer) {
109109

110110
</#list></#list>
111111

112+
public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory) {
113+
fail("CopyAsValue StructWriter");
114+
}
115+
112116
public void read(ExtensionHolder holder) {
113117
fail("Extension");
114118
}

vector/src/main/codegen/templates/BaseReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public interface RepeatedStructReader extends StructReader{
4949
boolean next();
5050
int size();
5151
void copyAsValue(StructWriter writer);
52+
void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory);
5253
}
5354

5455
public interface ListReader extends BaseReader{
@@ -59,6 +60,7 @@ public interface RepeatedListReader extends ListReader{
5960
boolean next();
6061
int size();
6162
void copyAsValue(ListWriter writer);
63+
void copyAsValue(ListWriter writer, ExtensionTypeWriterFactory writerFactory);
6264
}
6365

6466
public interface MapReader extends BaseReader{
@@ -69,6 +71,7 @@ public interface RepeatedMapReader extends MapReader{
6971
boolean next();
7072
int size();
7173
void copyAsValue(MapWriter writer);
74+
void copyAsValue(MapWriter writer, ExtensionTypeWriterFactory writerFactory);
7275
}
7376

7477
public interface ScalarReader extends

vector/src/main/codegen/templates/ComplexCopier.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,14 @@ public class ComplexCopier {
4242
* @param output field to write to
4343
*/
4444
public static void copy(FieldReader input, FieldWriter output) {
45-
writeValue(input, output);
45+
writeValue(input, output, null);
4646
}
4747

48-
private static void writeValue(FieldReader reader, FieldWriter writer) {
48+
public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
49+
writeValue(input, output, extensionTypeWriterFactory);
50+
}
51+
52+
private static void writeValue(FieldReader reader, FieldWriter writer, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
4953
final MinorType mt = reader.getMinorType();
5054

5155
switch (mt) {
@@ -61,7 +65,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
6165
FieldReader childReader = reader.reader();
6266
FieldWriter childWriter = getListWriterForReader(childReader, writer);
6367
if (childReader.isSet()) {
64-
writeValue(childReader, childWriter);
68+
writeValue(childReader, childWriter, extensionTypeWriterFactory);
6569
} else {
6670
childWriter.writeNull();
6771
}
@@ -79,8 +83,8 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
7983
FieldReader structReader = reader.reader();
8084
if (structReader.isSet()) {
8185
writer.startEntry();
82-
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()));
83-
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()));
86+
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()), extensionTypeWriterFactory);
87+
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()), extensionTypeWriterFactory);
8488
writer.endEntry();
8589
} else {
8690
writer.writeNull();
@@ -99,7 +103,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
99103
if (childReader.getMinorType() != Types.MinorType.NULL) {
100104
FieldWriter childWriter = getStructWriterForReader(childReader, writer, name);
101105
if (childReader.isSet()) {
102-
writeValue(childReader, childWriter);
106+
writeValue(childReader, childWriter, extensionTypeWriterFactory);
103107
} else {
104108
childWriter.writeNull();
105109
}
@@ -110,6 +114,20 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
110114
writer.writeNull();
111115
}
112116
break;
117+
case EXTENSIONTYPE:
118+
if (extensionTypeWriterFactory == null) {
119+
throw new IllegalArgumentException("Must provide ExtensionTypeWriterFactory");
120+
}
121+
if (reader.isSet()) {
122+
Object value = reader.readObject();
123+
if (value != null) {
124+
writer.addExtensionTypeWriterFactory(extensionTypeWriterFactory);
125+
writer.writeExtension(value);
126+
}
127+
} else {
128+
writer.writeNull();
129+
}
130+
break;
113131
<#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
114132
<#assign fields = minor.fields!type.fields />
115133
<#assign uncappedName = name?uncap_first/>
@@ -162,6 +180,9 @@ private static FieldWriter getStructWriterForReader(FieldReader reader, StructWr
162180
return (FieldWriter) writer.map(name);
163181
case LISTVIEW:
164182
return (FieldWriter) writer.listView(name);
183+
case EXTENSIONTYPE:
184+
ExtensionWriter extensionWriter = writer.extension(name, reader.getField().getType());
185+
return (FieldWriter) extensionWriter;
165186
default:
166187
throw new UnsupportedOperationException(reader.getMinorType().toString());
167188
}
@@ -186,6 +207,9 @@ private static FieldWriter getListWriterForReader(FieldReader reader, ListWriter
186207
return (FieldWriter) writer.list();
187208
case LISTVIEW:
188209
return (FieldWriter) writer.listView();
210+
case EXTENSIONTYPE:
211+
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
212+
return (FieldWriter) extensionWriter;
189213
default:
190214
throw new UnsupportedOperationException(reader.getMinorType().toString());
191215
}
@@ -211,6 +235,9 @@ private static FieldWriter getMapWriterForReader(FieldReader reader, MapWriter w
211235
return (FieldWriter) writer.listView();
212236
case MAP:
213237
return (FieldWriter) writer.map(false);
238+
case EXTENSIONTYPE:
239+
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
240+
return (FieldWriter) extensionWriter;
214241
default:
215242
throw new UnsupportedOperationException(reader.getMinorType().toString());
216243
}

vector/src/main/codegen/templates/NullReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ public void read(int arrayIndex, Nullable${name}Holder holder){
8686
}
8787
</#list></#list>
8888

89+
public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory){}
8990
public void read(ExtensionHolder holder) {
9091
holder.isSet = 0;
9192
}

vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.arrow.memory.BufferAllocator;
2323
import org.apache.arrow.memory.ReferenceManager;
2424
import org.apache.arrow.util.Preconditions;
25+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2526
import org.apache.arrow.vector.complex.reader.FieldReader;
2627
import org.apache.arrow.vector.util.DataSizeRoundingUtil;
2728
import org.apache.arrow.vector.util.TransferPair;
@@ -260,6 +261,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
260261
throw new UnsupportedOperationException();
261262
}
262263

264+
@Override
265+
public void copyFrom(
266+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
267+
throw new UnsupportedOperationException();
268+
}
269+
270+
@Override
271+
public void copyFromSafe(
272+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
273+
throw new UnsupportedOperationException();
274+
}
275+
263276
/**
264277
* Transfer the validity buffer from `validityBuffer` to the target vector's `validityBuffer`.
265278
* Start at `startIndex` and copy `length` number of elements. If the starting index is 8 byte

vector/src/main/java/org/apache/arrow/vector/NullVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
2828
import org.apache.arrow.util.Preconditions;
2929
import org.apache.arrow.vector.compare.VectorVisitor;
30+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
3031
import org.apache.arrow.vector.complex.impl.NullReader;
3132
import org.apache.arrow.vector.complex.reader.FieldReader;
3233
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
@@ -329,6 +330,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
329330
throw new UnsupportedOperationException();
330331
}
331332

333+
@Override
334+
public void copyFrom(
335+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
336+
throw new UnsupportedOperationException();
337+
}
338+
339+
@Override
340+
public void copyFromSafe(
341+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
342+
throw new UnsupportedOperationException();
343+
}
344+
332345
@Override
333346
public String getName() {
334347
return this.getField().getName();

vector/src/main/java/org/apache/arrow/vector/ValueVector.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.arrow.memory.OutOfMemoryException;
2323
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
2424
import org.apache.arrow.vector.compare.VectorVisitor;
25+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2526
import org.apache.arrow.vector.complex.reader.FieldReader;
2627
import org.apache.arrow.vector.types.Types.MinorType;
2728
import org.apache.arrow.vector.types.pojo.Field;
@@ -309,6 +310,30 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
309310
*/
310311
void copyFromSafe(int fromIndex, int thisIndex, ValueVector from);
311312

313+
/**
314+
* Copy a cell value from a particular index in source vector to a particular position in this
315+
* vector.
316+
*
317+
* @param fromIndex position to copy from in source vector
318+
* @param thisIndex position to copy to in this vector
319+
* @param from source vector
320+
* @param writerFactory the extension type writer factory to use for copying extension type values
321+
*/
322+
void copyFrom(
323+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
324+
325+
/**
326+
* Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
327+
* capacity of the vector needs to be expanded before copy.
328+
*
329+
* @param fromIndex position to copy from in source vector
330+
* @param thisIndex position to copy to in this vector
331+
* @param from source vector
332+
* @param writerFactory the extension type writer factory to use for copying extension type values
333+
*/
334+
void copyFromSafe(
335+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);
336+
312337
/**
313338
* Accept a generic {@link VectorVisitor} and return the result.
314339
*

vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.arrow.vector.DensityAwareVector;
2222
import org.apache.arrow.vector.FieldVector;
2323
import org.apache.arrow.vector.ValueVector;
24+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
2425
import org.apache.arrow.vector.types.Types.MinorType;
2526
import org.apache.arrow.vector.types.pojo.ArrowType;
2627
import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList;
@@ -151,6 +152,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
151152
throw new UnsupportedOperationException();
152153
}
153154

155+
@Override
156+
public void copyFrom(
157+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
158+
throw new UnsupportedOperationException();
159+
}
160+
161+
@Override
162+
public void copyFromSafe(
163+
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
164+
throw new UnsupportedOperationException();
165+
}
166+
154167
@Override
155168
public String getName() {
156169
return name;

vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.arrow.vector.ZeroVector;
5050
import org.apache.arrow.vector.compare.VectorVisitor;
5151
import org.apache.arrow.vector.complex.impl.ComplexCopier;
52+
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
5253
import org.apache.arrow.vector.complex.impl.UnionLargeListReader;
5354
import org.apache.arrow.vector.complex.impl.UnionLargeListWriter;
5455
import org.apache.arrow.vector.complex.reader.FieldReader;
@@ -482,12 +483,42 @@ public void copyFromSafe(int inIndex, int outIndex, ValueVector from) {
482483
*/
483484
@Override
484485
public void copyFrom(int inIndex, int outIndex, ValueVector from) {
486+
copyFrom(inIndex, outIndex, from, null);
487+
}
488+
489+
/**
490+
* Copy a cell value from a particular index in source vector to a particular position in this
491+
* vector.
492+
*
493+
* @param inIndex position to copy from in source vector
494+
* @param outIndex position to copy to in this vector
495+
* @param from source vector
496+
* @param writerFactory the extension type writer factory to use for copying extension type values
497+
*/
498+
@Override
499+
public void copyFrom(
500+
int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
485501
Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
486502
FieldReader in = from.getReader();
487503
in.setPosition(inIndex);
488504
UnionLargeListWriter out = getWriter();
489505
out.setPosition(outIndex);
490-
ComplexCopier.copy(in, out);
506+
ComplexCopier.copy(in, out, writerFactory);
507+
}
508+
509+
/**
510+
* Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
511+
* capacity of the vector needs to be expanded before copy.
512+
*
513+
* @param inIndex position to copy from in source vector
514+
* @param outIndex position to copy to in this vector
515+
* @param from source vector
516+
* @param writerFactory the extension type writer factory to use for copying extension type values
517+
*/
518+
@Override
519+
public void copyFromSafe(
520+
int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
521+
copyFrom(inIndex, outIndex, from, writerFactory);
491522
}
492523

493524
/**

0 commit comments

Comments
 (0)