Skip to content

Commit 79ef386

Browse files
committed
fix(adapter): update list and map conversion methods to include StructConverter
1 parent 0a3f410 commit 79ef386

File tree

4 files changed

+76
-21
lines changed

4 files changed

+76
-21
lines changed

core/src/main/java/kafka/automq/table/binder/AbstractTypeAdapter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType, Struc
8484
case TIMESTAMP:
8585
return convertTimestamp(sourceValue, sourceSchema, (Types.TimestampType) targetType);
8686
case LIST:
87-
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
87+
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType, structConverter);
8888
case MAP:
89-
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
89+
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType, structConverter);
9090
case STRUCT:
9191
return structConverter.convert(sourceValue, sourceSchema, targetType);
9292
default:
@@ -212,7 +212,7 @@ protected Object convertTimestamp(Object sourceValue, S sourceSchema, Types.Time
212212
throw new IllegalArgumentException("Cannot convert " + sourceValue.getClass().getSimpleName() + " to " + targetType.typeId());
213213
}
214214

215-
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType);
215+
protected abstract List<?> convertList(Object sourceValue, S sourceSchema, Types.ListType targetType, StructConverter<S> structConverter);
216216

217-
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType);
217+
protected abstract Map<?, ?> convertMap(Object sourceValue, S sourceSchema, Types.MapType targetType, StructConverter<S> structConverter);
218218
}

core/src/main/java/kafka/automq/table/binder/AvroValueAdapter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ protected Object convertTimestamp(Object sourceValue, Schema sourceSchema, Types
116116
}
117117

118118
@Override
119-
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType) {
119+
protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.ListType targetType, StructConverter<Schema> structConverter) {
120120
Schema listSchema = sourceSchema;
121121
Schema elementSchema = listSchema.getElementType();
122122

@@ -131,14 +131,14 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
131131

132132
List<Object> list = new ArrayList<>(sourceList.size());
133133
for (Object element : sourceList) {
134-
Object convert = convert(element, elementSchema, targetType.elementType());
134+
Object convert = convert(element, elementSchema, targetType.elementType(), structConverter);
135135
list.add(convert);
136136
}
137137
return list;
138138
}
139139

140140
@Override
141-
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType) {
141+
protected Map<?, ?> convertMap(Object sourceValue, Schema sourceSchema, Types.MapType targetType, StructConverter<Schema> structConverter) {
142142
if (sourceValue instanceof GenericData.Array) {
143143
GenericData.Array<?> arrayValue = (GenericData.Array<?>) sourceValue;
144144
Map<Object, Object> recordMap = new HashMap<>(arrayValue.size());
@@ -161,8 +161,8 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
161161
continue;
162162
}
163163
GenericRecord record = (GenericRecord) element;
164-
Object key = convert(record.get(keyField.pos()), keySchema, keyType);
165-
Object value = convert(record.get(valueField.pos()), valueSchema, valueType);
164+
Object key = convert(record.get(keyField.pos()), keySchema, keyType, structConverter);
165+
Object value = convert(record.get(valueField.pos()), valueSchema, valueType, structConverter);
166166
recordMap.put(key, value);
167167
}
168168
return recordMap;
@@ -179,8 +179,8 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
179179

180180
for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
181181
Object rawKey = entry.getKey();
182-
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType);
183-
Object value = convert(entry.getValue(), valueSchema, valueType);
182+
Object key = convert(rawKey, STRING_SCHEMA_INSTANCE, keyType, structConverter);
183+
Object value = convert(entry.getValue(), valueSchema, valueType, structConverter);
184184
adaptedMap.put(key, value);
185185
}
186186
return adaptedMap;

core/src/test/java/kafka/automq/table/binder/AvroRecordBinderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private void testSendRecord(org.apache.iceberg.Schema schema, Record record) {
122122
}
123123
}
124124

125-
private TaskWriter<Record> createTableWriter(Table table) {
125+
public static TaskWriter<Record> createTableWriter(Table table) {
126126
FileAppenderFactory<Record> appenderFactory = new GenericAppenderFactory(
127127
table.schema(),
128128
table.spec(),

core/src/test/java/kafka/automq/table/process/convert/ProtobufRegistryConverterTest.java

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kafka.automq.table.process.convert;
22

3+
import kafka.automq.table.binder.RecordBinder;
34
import kafka.automq.table.deserializer.proto.CustomProtobufSchema;
45
import kafka.automq.table.deserializer.proto.ProtobufSchemaProvider;
56
import kafka.automq.table.deserializer.proto.parse.ProtobufSchemaParser;
@@ -9,6 +10,7 @@
910

1011
import org.apache.kafka.common.utils.ByteUtils;
1112

13+
import com.google.common.collect.ImmutableMap;
1214
import com.google.protobuf.ByteString;
1315
import com.google.protobuf.Descriptors;
1416
import com.google.protobuf.DynamicMessage;
@@ -17,9 +19,17 @@
1719
import com.squareup.wire.schema.internal.parser.ProtoParser;
1820

1921
import org.apache.avro.generic.GenericRecord;
22+
import org.apache.iceberg.Table;
23+
import org.apache.iceberg.avro.AvroSchemaUtil;
24+
import org.apache.iceberg.catalog.Namespace;
25+
import org.apache.iceberg.catalog.TableIdentifier;
26+
import org.apache.iceberg.data.Record;
27+
import org.apache.iceberg.inmemory.InMemoryCatalog;
28+
import org.apache.iceberg.io.TaskWriter;
2029
import org.junit.jupiter.api.Tag;
2130
import org.junit.jupiter.api.Test;
2231

32+
import java.io.IOException;
2333
import java.nio.ByteBuffer;
2434
import java.nio.charset.StandardCharsets;
2535
import java.util.Collections;
@@ -29,6 +39,7 @@
2939

3040
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
3141

42+
import static kafka.automq.table.binder.AvroRecordBinderTest.createTableWriter;
3243
import static org.junit.jupiter.api.Assertions.assertEquals;
3344
import static org.junit.jupiter.api.Assertions.assertSame;
3445

@@ -37,21 +48,29 @@ public class ProtobufRegistryConverterTest {
3748

3849
private static final String ALL_TYPES_PROTO = """
3950
syntax = \"proto3\";
40-
51+
4152
package kafka.automq.table.process.proto;
42-
53+
4354
import \"google/protobuf/timestamp.proto\";
44-
55+
4556
message Nested {
4657
string name = 1;
4758
int32 count = 2;
4859
}
49-
60+
5061
enum SampleEnum {
5162
SAMPLE_ENUM_UNSPECIFIED = 0;
5263
SAMPLE_ENUM_SECOND = 1;
5364
}
54-
65+
66+
message FloatArray {
67+
repeated double values = 1;
68+
}
69+
70+
message StringArray {
71+
repeated string values = 1;
72+
}
73+
5574
message AllTypes {
5675
// Scalar primitives in order defined by Avro ProtobufData mapping
5776
bool f_bool = 1;
@@ -79,12 +98,28 @@ enum SampleEnum {
7998
oneof choice {
8099
string choice_str = 22;
81100
int32 choice_int = 23;
101+
FloatArray choice_float_array = 26;
102+
StringArray choice_string_array = 27;
82103
}
83104
repeated Nested f_nested_list = 24;
84105
map<string, Nested> f_string_nested_map = 25;
85106
}
86107
""";
87108

109+
private void testSendRecord(org.apache.iceberg.Schema schema, org.apache.iceberg.data.Record record) {
110+
InMemoryCatalog catalog = new InMemoryCatalog();
111+
catalog.initialize("test", ImmutableMap.of());
112+
catalog.createNamespace(Namespace.of("default"));
113+
String tableName = "test";
114+
Table table = catalog.createTable(TableIdentifier.of(Namespace.of("default"), tableName), schema);
115+
TaskWriter<org.apache.iceberg.data.Record> writer = createTableWriter(table);
116+
try {
117+
writer.write(record);
118+
} catch (IOException e) {
119+
throw new RuntimeException(e);
120+
}
121+
}
122+
88123
@Test
89124
void testConvertAllPrimitiveAndCollectionTypes() throws Exception {
90125
String topic = "pb-all-types";
@@ -108,7 +143,7 @@ void testConvertAllPrimitiveAndCollectionTypes() throws Exception {
108143

109144
DynamicMessage message = buildAllTypesMessage(descriptor);
110145
// magic byte + schema id + single message index + serialized protobuf payload
111-
ByteBuffer payload = buildConfluentPayload(schemaId, message.toByteArray(), 1);
146+
ByteBuffer payload = buildConfluentPayload(schemaId, message.toByteArray(), 3);
112147

113148
ProtobufRegistryConverter converter = new ProtobufRegistryConverter(registryClient, "http://mock:8081", false);
114149

@@ -121,6 +156,11 @@ void testConvertAllPrimitiveAndCollectionTypes() throws Exception {
121156
assertPrimitiveFields(record);
122157
assertRepeatedAndMapFields(record);
123158
assertNestedAndTimestamp(record);
159+
160+
org.apache.iceberg.Schema iceberg = AvroSchemaUtil.toIceberg(record.getSchema());
161+
RecordBinder recordBinder = new RecordBinder(iceberg, record.getSchema());
162+
Record bind = recordBinder.bind(record);
163+
testSendRecord(iceberg, bind);
124164
}
125165

126166
private static DynamicMessage buildAllTypesMessage(Descriptors.Descriptor descriptor) {
@@ -145,7 +185,16 @@ private static DynamicMessage buildAllTypesMessage(Descriptors.Descriptor descri
145185
descriptor.findFieldByName("f_enum"),
146186
descriptor.getFile().findEnumTypeByName("SampleEnum").findValueByName("SAMPLE_ENUM_SECOND")
147187
);
148-
builder.setField(descriptor.findFieldByName("choice_str"), "choice-string");
188+
189+
// Build FloatArray for oneof choice
190+
Descriptors.FieldDescriptor floatArrayField = descriptor.findFieldByName("choice_float_array");
191+
Descriptors.Descriptor floatArrayDescriptor = floatArrayField.getMessageType();
192+
DynamicMessage.Builder floatArrayBuilder = DynamicMessage.newBuilder(floatArrayDescriptor);
193+
Descriptors.FieldDescriptor floatValuesField = floatArrayDescriptor.findFieldByName("values");
194+
floatArrayBuilder.addRepeatedField(floatValuesField, 1.1);
195+
floatArrayBuilder.addRepeatedField(floatValuesField, 2.2);
196+
floatArrayBuilder.addRepeatedField(floatValuesField, 3.3);
197+
builder.setField(floatArrayField, floatArrayBuilder.build());
149198

150199
Descriptors.FieldDescriptor nestedField = descriptor.findFieldByName("f_message");
151200
Descriptors.Descriptor nestedDescriptor = nestedField.getMessageType();
@@ -286,8 +335,14 @@ private static void assertNestedAndTimestamp(GenericRecord record) {
286335
// Optional field should fall back to proto3 default (empty string)
287336
assertEquals("", getField(record, "f_optional_string", "fOptionalString").toString());
288337

289-
Object oneofValue = getField(record, "choice_str", "choiceStr");
290-
assertEquals("choice-string", oneofValue.toString());
338+
// Verify oneof with complex FloatArray type
339+
GenericRecord floatArrayValue = (GenericRecord) getField(record, "choice_float_array", "floatArray");
340+
List<?> floatValues = (List<?>) floatArrayValue.get("values");
341+
List<Double> expectedFloats = List.of(1.1, 2.2, 3.3);
342+
assertEquals(expectedFloats.size(), floatValues.size());
343+
for (int i = 0; i < expectedFloats.size(); i++) {
344+
assertEquals(expectedFloats.get(i), (Double) floatValues.get(i), 1e-6);
345+
}
291346
}
292347

293348
private static Object getField(GenericRecord record, String... candidateNames) {

0 commit comments

Comments
 (0)