Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dead letter table #233

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
03c3e36
dead-letter-table
tabmatfournier Apr 4, 2024
e87d820
next steps
tabmatfournier Apr 4, 2024
b062c51
moved transform in
tabmatfournier Apr 5, 2024
9b7f6b0
more refactoring
tabmatfournier Apr 8, 2024
c1b4de2
even more refactoring
tabmatfournier Apr 8, 2024
4e706e1
cruft
tabmatfournier Apr 8, 2024
f9136d8
even more cruft
tabmatfournier Apr 8, 2024
d3905a5
tests
tabmatfournier Apr 8, 2024
18c79ba
table create exceptions
tabmatfournier Apr 9, 2024
77aad4b
introduces identifier column, more error handle wrapping
tabmatfournier Apr 9, 2024
edc75a5
make converter/smt error converters configurable/user extensible
tabmatfournier Apr 9, 2024
8ee6840
introduce catalogApi to make IcebergWriterFactory testing easier
tabmatfournier Apr 9, 2024
de479d3
catalogApi and test stubs
tabmatfournier Apr 9, 2024
b78a68d
finished tests
tabmatfournier Apr 10, 2024
ac5d6a5
negate
tabmatfournier Apr 10, 2024
50b300b
dead-letter-table
tabmatfournier Apr 26, 2024
01f8cbb
put null record dropping back into iceberg writer
tabmatfournier Apr 26, 2024
d89f15a
fix dead letter utils private constructor
tabmatfournier Apr 26, 2024
c5c2186
fix cruft in readme
tabmatfournier Apr 26, 2024
831f205
Merge branch 'main' into dead-letter-table
tabmatfournier Apr 29, 2024
451e20f
Merge branch 'main' into dead-letter-table
tabmatfournier Apr 30, 2024
41c4372
post-merge fixes
tabmatfournier Apr 30, 2024
13220e9
more comments removing cruft
tabmatfournier Apr 30, 2024
797b861
regexrecordrouter
tabmatfournier Apr 30, 2024
7bd7d5d
start of fallback mode
tabmatfournier Apr 30, 2024
bff7233
third mode
tabmatfournier May 13, 2024
25208da
another test case
tabmatfournier May 13, 2024
205c2d7
better regex detection to avoid an extra config
tabmatfournier May 13, 2024
4aeedcd
cruft cleanup and starting docs
tabmatfournier May 13, 2024
0cf18d1
fix error transform tests
tabmatfournier May 14, 2024
05ea87f
more docs
tabmatfournier May 14, 2024
457a2f3
Merge branch 'main' into dead-letter-table
tabmatfournier May 21, 2024
1518cc7
Merge branch 'main' into dead-letter-table
tabmatfournier Jun 5, 2024
e4977c4
dead-letter-table
tabmatfournier Jun 5, 2024
4036557
rename module to not include deadletter
tabmatfournier Jun 5, 2024
0ff7972
moved writeExceptions into exception module
tabmatfournier Jun 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -50,6 +50,9 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded |
| iceberg.hadoop.* | Properties passed through to the Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |
| iceberg.tables.deadletter.handler | See Dead Letter Table Mode |
| iceberg.tables.deadletter.record_factpry | See Dead Letter Table Mode |
| iceberg.tables.deadletter.record_factory.* | see Dead Letter Table Mode |

If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If
`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will
@@ -322,6 +325,114 @@ See above for creating the table
}
```

## Dead Letter Table

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example config with Dead Letter will be very useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs can always be improved. I'll try to take another stab at this.

This is a big feature with a somewhat clunky coinfig API due to config visibility rules in Kafka Connect, so more docs/examples certainly help.

The connector can be configured to write to one or more Dead Letter iceberg tables, with capability beyond
what is offered from Kafka Connects Dead Letter Queue implementation. This is an optional setting you can
ignore if you want failures to cause the connector to fail. Alternatively, the Dead Letter Table mode can
be used to ignore all failures, beyond `error.tolerance = all`

| Location of Failure | Kafka Connect DLQ | Dead Letter Table Mode |
|----------------------------------------------------------|-------------------|------------------------|
| Deserialization/Converter | Yes | Yes* |
| SMT | Yes | Yes* |
| Table creation / schema issues | No | Yes |
| Iceberg record conversion | No | Yes |
| Malformed records (e.g. missing table route information) | No | Yes |
| Schema evolution issues | No | Yes |

If the `ErrorTransform` SMT is not used, it may be challenging to put records into the Dead Letter Table other than
recording metadata (Topic, Partition, Offset) while dropping the message.

If the `ErrorTransform` SMT is used, failures can include the original bytes of the message in the Iceberg Table
that can be extracted/inspected using a downstream query engine.

There are several classes that can be implemented and passed to the config for user customization:

* Error Transform SMT Key, Value, and Header deserialization failure handlers (each can be a different class)
* SMT transformation failure handlers
* Connector WriteException handler to handle issues with records themselves (e.g. missing route columns, invalid schema evolutions, etc.)
* Dead Letter Table schema / Record conversion

Some default implementations are provided.

NOTE: **Avro/Schema Registry** should not be used in conjunction with Dead Letter Table using the provided handlers. Avro deserialization
failures mix transient and non-transient errors in the same exception. A failure handler for avro deserialization is planned, but not
yet implemented.

Users can extend these handlers to suit the needs of their particular deserialization method, catalog, etc. all of whihc can raise
different errors. It is advised to carefully develop these and start conservatively: you do not want to send messages to the Dead Letter
Table for transient network errors, which may be catalog specific.

In order to use the ErrorTransform SMT:

You **MUST** set the following due to how Kafka Connect displays keys to SMTs:
- `errors.tolerance` to `none`
- `key.converter` `value.converter`, and `header.converter` to `org.apache.kafka.connect.converters.ByteArrayConverter`
- set a `transforms` key. For example, you may set `transforms` : `error` , in which case all further configures are under `transforms.error.*`

| Property | Description |
|------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
| transforms.error.type | **only** io.tabular.iceberg.connect.transforms.ErrorTransform , other SMTs will be defined elsewhere |
| transforms.error.value.converter | New location for the value converter |
| transforms.error.value.converter.* | properties to be passed to the value converter |
| transforms.error.key.converter | New location for the key converter |
| transforms.error.key.converter.* | properties to be passed to the key converter |
| transforms.error.header.converter | New location for the header converter |
| transforms.error.header.converter.* | properties to be passed to the header converter |
| transforms.error.smts | (Optional String): comma separated list of classes for additional SMTs to apply to the record after deserialization |
| transforms.error.smts.* | (Optional) properties to pass to the additional SMTS |
| transforms.error.converter | Class of io.tabular.iceberg.connect.transforms.TransformExceptionHandler to handle exceptions thrown by the key/value/header converter |
| transforms.error.smt | Class of io.tabular.iceberg.connect.transforms.TransformExceptionHandler to handle exceptions thrown the SMT chain |
| transforms.error.failed_record_factory | (Optional, property of the default exception handler): Class of io.tabular.iceberg.connect.deadletter.FailedRecordFactory |
| transforms.error.failed_record_factory.* | (Optional, property of the default exception handler): properties for the default exception handler |

`io.tabular.iceberg.connect.transforms.DefaultExceptionHandler` is provided for simple use cases/reference. The handle method must return a `SinkRecord`.

To maintain schema, record transformation parity between the SMT and the connector failures it is recommended to dynamically load a subclass of `io.tabular.iceberg.connect.deadletter.FailedRecordFactory`.
The reference implementation of `io.tabular.iceberg.connect.transforms.DefaultExceptionHandler` loads this by default. It can be configured on the `transforms.error.failed_record_factory` key.

In order to turn on Dead Letter Table mode in the connector:

| Property | Description |
|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------|
| iceberg.deadletter.handler | Sucblass of io.tabular.iceberg.connect.data.WriteExceptionHandler , if this is not-null Dead Letter Table mode is turned on in the connector |
| iceberg.deadletter.failed_record_factory | Scubclass of io.tabular.iceberg.connect.deadletter.FailedRecordFactory for converting records. The connector cannot see the SMT version |
| iceberg.deadletter.failed_record_factory.* | properties to be passed to the failed record factory |

You do not need to use the Error SMT to turn on dead letter mode; however, the provided `io.tabular.iceberg.connect.deadletter.DefaultFailedRecordFactory` assumes the Error SMT has been used
and will throw exceptions if not. You can implement your own WriteExceptionHandler/FailedRecordFactory to skip messages, transform messages, strip fields from messages and only write the
Kafka metadata etc.

### Routing

Dead Letter Table routing is a variation on Dynamic Routing --that is, a route field can be added by the
FailedRecordHandler that can be used to dispatch to one or more Dead Letter tables.

| iceberg.tables | dynamic-enabled | route-field | routing behavior |
|----------------|-----------------|--------------|--------------------------------------------------------------|
| empty | true | populated | DynamicRecordRouter |
| empty | false | populated | RegexRouter |
| populated | false | populated | RegexRouter if iceberg.table.\<table name\>.route-regex set |
| populated | false | null | ConfigRouter |
| populated | false | populated | DynamicRecordRouter then ConfigRouter |

Regardless of the above, if a WriteExceptionHandler `io.tabular.iceberg.connect.data.WriteExceptionHandler` is not null, Dead Letter Table mode
will wrap one of the underlying record routing modes. All exceptions are passed to the WriteExceptionHandler where
they can be ignored (message dropped), converted into a record and dispatched to the Dead Letter Table(s), or rethrown
to fail the connector.

### Partitioning

The following properties still apply to the Dead Letter Table. The partition-by field can be used to customize the
partitioning of the Dead Letter table(s).

| Property | Description |
|--------------------------------------------|------------------------------------------------------------------------------------------------|
| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified |
| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) |
| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table |

## Resources

* [Running IcebergSinkConnector locally](https://github.com/wuerike/kafka-iceberg-streaming)
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ http-client = { module = "org.apache.httpcomponents.client5:httpclient5", versi
junit-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-ver" }
junit-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit-ver" }
junit-params = { module = "org.junit.jupiter:junit-jupiter-params", version.ref = "junit-ver" }
kafka-connect-runtime = {module = "org.apache.kafka:connect-runtime", version.ref = "kafka-ver"}
mockito = "org.mockito:mockito-core:4.8.1"
testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers-ver" }
testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testcontainers-ver" }
18 changes: 18 additions & 0 deletions kafka-connect-deadletter/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id "java-test-fixtures"
}

dependencies {
implementation libs.iceberg.core
implementation libs.iceberg.common
implementation libs.iceberg.guava
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need any of the actual iceberg functionality in this module?

Suggested change
implementation libs.iceberg.core
implementation libs.iceberg.common
implementation libs.iceberg.guava

The only thing you do need is this import :D

import org.apache.iceberg.relocated.com.google.common.collect.Lists;

List<Struct> headers = Lists.newArrayList();

Which IMO you can just replace with this safely.

import java.util.ArrayList;

@SuppressWarnings("RegexpSingleline")
List<Struct> headers = new ArrayList<>();

compileOnly libs.bundles.kafka.connect
}

publishing {
publications {
mavenJava(MavenPublication) {
from components.java
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.deadletter;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;

public class DeadLetterUtils {

public static class DeadLetterException extends RuntimeException {
private final String location;
private final Throwable error;

public DeadLetterException(String location, Throwable error) {
super(error);
this.location = location;
this.error = error;
}

public String getLocation() {
return location;
}

public Throwable getError() {
return error;
}
}

private DeadLetterUtils() {}

public static final String KEY_HEADER = "t_original_key";
public static final String VALUE_HEADER = "t_original_value";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you're de-risking the chance of a collision with an existing header by prefixing a t_
More out of curiosity, what does the t_ stand for?
And wondering if we can do a little better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we could de-risk collisions more by just adding a single header t_original_record which is a Struct with this kind of structure (psuedo-code) instead of adding 3 separate headers:

Struct {
   OPTIONAL_BYTES_SCHEMA key, 
   OPTIONAL_BYTES_SCHEMA value, 
   OPTIONAL_ARRAY_HEADER_SCHEMA headers, 
}

nit: I would also name the header something specific to iceberg-kafka-connect IDK something along the lines of kafka.connect.iceberg.error.transform.original.record or something (obviously this is too long but you get the idea).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct in derisking collisions. I chose t for tabular.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose t for tabular.

🤦 should have figured that one out ....


public static final String HEADERS_HEADER = "t_original_headers";
public static final Schema HEADER_ELEMENT_SCHEMA =
SchemaBuilder.struct()
.field("key", Schema.STRING_SCHEMA)
.field("value", Schema.OPTIONAL_BYTES_SCHEMA)
.optional()
.build();

public static final Schema HEADER_SCHEMA =
SchemaBuilder.array(HEADER_ELEMENT_SCHEMA).optional().build();

public static String stackTrace(Throwable error) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
error.printStackTrace(pw);
return sw.toString();
}

/**
* No way to get back the original Kafka header bytes. We instead have an array with elements of
* {"key": String, "value": bytes} for each header. This can be converted back into a Kafka
* Connect header by the user later, and further converted into Kafka RecordHeaders to be put back
* into a ProducerRecord to create the original headers on the Kafka record.
*
* @param original record where headers are still byte array values
* @return Struct for an Array that can be put into Iceberg
*/
public static List<Struct> serializedHeaders(SinkRecord original) {
List<Struct> headers = Lists.newArrayList();
for (Header header : original.headers()) {
Struct headerStruct = new Struct(HEADER_ELEMENT_SCHEMA);
headerStruct.put("key", header.key());
headerStruct.put("value", header.value());
headers.add(headerStruct);
}
return headers;
}

public static Object loadClass(String name, ClassLoader loader) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("cannot initialize empty class");
}
Object obj;
try {
Class<?> clazz = Class.forName(name, true, loader);
obj = clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(String.format("could not initialize class %s", name), e);
}
return obj;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.deadletter;

import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

public class DefaultFailedRecordFactory implements FailedRecordFactory {
private static final String DEAD_LETTER_TABLE_NAME_PROP = "table_name";

private static final String DEAD_LETTER_ROUTE_FIELD_PROP = "route_field";
private static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
DEAD_LETTER_TABLE_NAME_PROP,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.MEDIUM,
"dead letter table name namespace.table")
.define(DEAD_LETTER_ROUTE_FIELD_PROP,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.MEDIUM,
"route field to inject table name on");

private static final String HEADERS = "headers";
private Schema schema;

private String deadLetterTableName;
private String deadLetterRouteField;

@Override
public Schema schema(String context) {
return schema;
}

@Override
public SinkRecord recordFromSmt(SinkRecord original, Throwable error, String context) {
Struct struct = new Struct(schema);
addCommon(struct, original, error, context);

if (original.key() != null) {
struct.put("key_bytes", original.key());
}
if (original.value() != null) {
struct.put("value_bytes", original.value());
}
if (!original.headers().isEmpty()) {
struct.put(HEADERS, DeadLetterUtils.serializedHeaders(original));
}

return original.newRecord(
original.topic(),
original.kafkaPartition(),
null,
null,
schema,
struct,
original.timestamp());
}

@Override
public SinkRecord recordFromConnector(SinkRecord record, Throwable error, String context) {

Struct struct = new Struct(schema);
addCommon(struct, record, error, context);

Headers headers = record.headers();
Header keyHeader = headers.lastWithName(DeadLetterUtils.KEY_HEADER);
Header valueHeader = headers.lastWithName(DeadLetterUtils.VALUE_HEADER);
Header serializedHeader = headers.lastWithName(DeadLetterUtils.HEADERS_HEADER);

if (keyHeader != null) {
struct.put("key_bytes", keyHeader.value());
}
if (valueHeader != null) {
struct.put("value_bytes", valueHeader.value());
}
if (serializedHeader != null) {
struct.put(HEADERS, serializedHeader.value());
}

return record.newRecord(
record.topic(), record.kafkaPartition(), null, null, schema, struct, record.timestamp());
}

@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
deadLetterTableName = config.getString(DEAD_LETTER_TABLE_NAME_PROP);
deadLetterRouteField = config.getString(DEAD_LETTER_ROUTE_FIELD_PROP);
if (deadLetterTableName == null) {
throw new IllegalArgumentException("Dead letter table name cannot be null");
}
schema =
SchemaBuilder.struct()
.name("failed_message")
.parameter("transform_failed", "true")
.field("topic", Schema.STRING_SCHEMA)
.field("partition", Schema.INT32_SCHEMA)
.field("offset", Schema.INT64_SCHEMA)
.field("timestamp", Schema.OPTIONAL_INT64_SCHEMA)
.field("exception", Schema.OPTIONAL_STRING_SCHEMA)
.field("stack_trace", Schema.OPTIONAL_STRING_SCHEMA)
.field("key_bytes", Schema.OPTIONAL_BYTES_SCHEMA)
.field("value_bytes", Schema.OPTIONAL_BYTES_SCHEMA)
.field(HEADERS, DeadLetterUtils.HEADER_SCHEMA)
.field("context", Schema.OPTIONAL_STRING_SCHEMA)
.field("target_table", Schema.OPTIONAL_STRING_SCHEMA)
.field(deadLetterRouteField, Schema.STRING_SCHEMA)
.build();
}

private void addCommon(Struct struct, SinkRecord record, Throwable error, String context) {
struct.put("topic", record.topic());
struct.put("partition", record.kafkaPartition());
struct.put("offset", record.kafkaOffset());
struct.put("timestamp", record.timestamp());
struct.put("exception", error.toString());
String stack = DeadLetterUtils.stackTrace(error);
if (!stack.isEmpty()) {
struct.put("stack_trace", stack);
}
if (context != null) {
struct.put("context", context);
}
struct.put(deadLetterRouteField, deadLetterTableName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.deadletter;

import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;

public interface FailedRecordFactory {
Schema schema(String context);

// how to take SMT record (which FYI is all ByteArrays) and turn it into some form of FailedRecord
SinkRecord recordFromSmt(SinkRecord original, Throwable error, String context);

// here is where it starts getting awkward
// where in the original are the byte arrays.
SinkRecord recordFromConnector(SinkRecord record, Throwable error, String context);

void configure(Map<String, ?> props);
}
2 changes: 2 additions & 0 deletions kafka-connect-transforms/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
dependencies {
implementation project(":iceberg-kafka-connect-deadletter")
implementation libs.iceberg.guava
implementation libs.bson
implementation libs.slf4j
@@ -9,6 +10,7 @@ dependencies {

testImplementation libs.mockito
testImplementation libs.assertj
testRuntimeOnly libs.kafka.connect.runtime
}

configurations {
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.deadletter.DeadLetterUtils;
import io.tabular.iceberg.connect.deadletter.FailedRecordFactory;

import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

public class DefaultExceptionHandler implements TransformExceptionHandler {

private static final String FAILED_RECORD_FACTORY_PROP = "failed_record_factory";

private static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
FAILED_RECORD_FACTORY_PROP,
ConfigDef.Type.STRING,
"io.tabular.iceberg.connect.deadletter.DefaultFailedRecordFactory",
ConfigDef.Importance.MEDIUM,
"class name for failed record conversion");

private FailedRecordFactory recordFactory;

@Override
public SinkRecord handle(SinkRecord original, Throwable error, String location) {
return recordFactory.recordFromSmt(original, error, location);
}

@Override
public void configure(Map<String, String> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
ClassLoader loader = this.getClass().getClassLoader();
this.recordFactory =
(FailedRecordFactory)
DeadLetterUtils.loadClass(config.getString(FAILED_RECORD_FACTORY_PROP), loader);
recordFactory.configure(props);
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

private Map<String, String> failedRecordProperties(Map<String, String> originalProps) {
return propertiesWithPrefix(originalProps, FAILED_RECORD_FACTORY_PROP + ".");
}

private static Map<String, String> propertiesWithPrefix(
Map<String, String> properties, String prefix) {
if (properties == null || properties.isEmpty()) {
return Collections.emptyMap();
}

Preconditions.checkArgument(prefix != null, "Invalid prefix: null");

return properties.entrySet().stream()
.filter(e -> e.getKey().startsWith(prefix))
.collect(Collectors.toMap(e -> e.getKey().replaceFirst(prefix, ""), Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,399 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.deadletter.DeadLetterUtils;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/**
* Wraps key, value, header converters and SMTs in order to catch exceptions. Failed records are
* converted into a standard struct and sent to the connector to be put into Iceberg
*
* <p>MUST ONLY BE USED with `value.converter`, `key.converter`, and `header.converter` set to
* "org.apache.kafka.connect.converters.ByteArrayConverter" which can not be validated from within
* this SMT
*
* <p>Actual value converter, key converter, and header converter are configured on the
* `transforms.xxx` key where xxx is the name of this transform. See example for how properties are
* passed and namespaced
*
* <p>"transforms": "tab", "transforms.tab.type":
* <ul>
* <li>"io.tabular.iceberg.connect.transforms.ErrorTransform" </li>
* <li>"transforms.tab.value.converter": "org.apache.kafka.connect.storage.StringConverter"</li>
* <li>"transforms.tab.value.converter.some_property: "...", "transforms.tab.key.converter":</li>
* <li>"org.apache.kafka.connect.storage.StringConverter", "transforms.tab.key.converter.some_property": "..."</li>
* <li>"transforms.tab.smts" : "some.java.class,some.other.java.class"</li>
* <li>"transforms.tab.smts.prop1" : "some_property_for_the_smts"</li>
* </ul>
* <p>This should not be used with any other SMT. All SMTs should be added to "transforms.tab.smts".
*
* <p> In the success path, the original key/value/header bytes are put on the headers of the transformed record.
* Note that the original Kafka headers are lost due to Kafka Connect; however, the Kafka Connect headers are
* translated and can be used to recover the original Kafka values in order to construct a new Producer Record if required
* (user responsibility).
*
* <p>The original payload can be used in the Iceberg Connector if the record cannot be transformed
* to an Iceberg record, or some other issue arises, so that the original kafka message can be stored in Iceberg at that point.
*
* <p>If any of the key, value, header deserializers or SMTs throw an exception a failed record is
* constructed that contains kafka metadata, exception/location information, and the original
* key/value/header bytes.
*/
public class ErrorTransform implements Transformation<SinkRecord> {

public static class TransformInitializationException extends RuntimeException {
TransformInitializationException(String errorMessage) {
super(errorMessage);
}

TransformInitializationException(String errorMessage, Throwable err) {
super(errorMessage, err);
}
}

public static class PropsParser {
static Map<String, ?> apply(Map<String, ?> props, String target) {
return props.entrySet().stream()
.filter(
entry ->
(!Objects.equals(entry.getKey(), target)) && (entry.getKey().startsWith(target)))
.collect(
Collectors.toMap(
entry -> entry.getKey().replaceFirst("^" + target + ".", ""),
Map.Entry::getValue));
}
}

private static class DeserializedRecord {
private final SinkRecord record;
private final boolean failed;

DeserializedRecord(SinkRecord record, boolean failed) {
this.record = record;
this.failed = failed;
}

public SinkRecord getRecord() {
return record;
}

public boolean isFailed() {
return failed;
}
}

private static final String KEY_CONVERTER = "key.converter";
private static final String VALUE_CONVERTER = "value.converter";
private static final String HEADER_CONVERTER = "header.converter";
private static final String CONVERTER_ERROR_HANDLER = "error.converter";
private static final String SMT_ERROR_HANDLER = "error.smt";
private static final String TRANSFORMATIONS = "smts";
private static final String KEY_FAILURE = "KEY_CONVERTER";
private static final String VALUE_FAILURE = "VALUE_CONVERTER";
private static final String HEADER_FAILURE = "HEADER_CONVERTER";
private static final String SMT_FAILURE = "SMT_FAILURE";
private static final Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.OPTIONAL_BYTES_SCHEMA;

private TransformExceptionHandler converterErrorHandler;
private TransformExceptionHandler smtErrorHandler;
private List<Transformation<SinkRecord>> smts;
private Function<SinkRecord, SchemaAndValue> keyConverter;
private Function<SinkRecord, SchemaAndValue> valueConverter;
private Function<SinkRecord, Headers> headerConverterFn;

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
KEY_CONVERTER,
ConfigDef.Type.STRING,
"org.apache.kafka.connect.converters.ByteArrayConverter",
ConfigDef.Importance.MEDIUM,
"key.converter")
.define(
VALUE_CONVERTER,
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.MEDIUM,
"value.converter")
.define(
HEADER_CONVERTER,
ConfigDef.Type.STRING,
"org.apache.kafka.connect.converters.ByteArrayConverter",
ConfigDef.Importance.MEDIUM,
"header.converter")
.define(TRANSFORMATIONS, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "smts")
.define(
CONVERTER_ERROR_HANDLER,
ConfigDef.Type.STRING,
"io.tabular.iceberg.connect.transforms.DefaultExceptionHandler",
ConfigDef.Importance.MEDIUM,
"Error handling class for converter errors")
.define(
SMT_ERROR_HANDLER,
ConfigDef.Type.STRING,
"io.tabular.iceberg.connect.transforms.DefaultExceptionHandler",
ConfigDef.Importance.MEDIUM,
"Error handling class for SMT errors");

@Override
public SinkRecord apply(SinkRecord record) {
// tombstones returned as-is
if (record == null || record.value() == null) {
return record;
}

DeserializedRecord deserialized = deserialize(record);
if (deserialized.isFailed()) {
return deserialized.getRecord();
}

SinkRecord transformedRecord = deserialized.getRecord();

for (Transformation<SinkRecord> smt : smts) {
try {
transformedRecord = smt.apply(transformedRecord);
if (transformedRecord == null) {
break;
}
} catch (Exception e) {
return smtErrorHandler.handle(record, e, SMT_FAILURE);
}
}
// SMT could filter out messages
if (transformedRecord == null) {
return null;
}

return newRecord(record, transformedRecord);
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public void close() {}

/*
Kafka Connect filters the properties it passes to the SMT to
only the keys under the `transform.xxx` name.
*/
@SuppressWarnings("unchecked")
@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
ClassLoader loader = this.getClass().getClassLoader();

if (Objects.equals(
config.getString(KEY_CONVERTER),
"org.apache.kafka.connect.converters.ByteArrayConverter")) {
keyConverter = record -> new SchemaAndValue(record.keySchema(), record.value());
} else {
Converter converter = (Converter) loadClass(config.getString(KEY_CONVERTER), loader);
converter.configure(PropsParser.apply(props, KEY_CONVERTER), true);
keyConverter = record -> converter.toConnectData(record.topic(), (byte[]) record.key());
}

if (config.getString(VALUE_CONVERTER) == null) {
throw new TransformInitializationException(
"ManagedTransformWrapper cannot be used without a defined value converter");
} else {
Converter converter = (Converter) loadClass(config.getString(VALUE_CONVERTER), loader);
converter.configure(PropsParser.apply(props, VALUE_CONVERTER), false);
valueConverter = record -> converter.toConnectData(record.topic(), (byte[]) record.value());
}

HeaderConverter headerConverter;

if (Objects.equals(
config.getString(HEADER_CONVERTER),
"org.apache.kafka.connect.converters.ByteArrayConverter")) {
try (HeaderConverter converter =
(HeaderConverter)
loadClass("org.apache.kafka.connect.converters.ByteArrayConverter", loader)) {
converter.configure(PropsParser.apply(props, HEADER_CONVERTER));
} catch (Exception e) {
throw new TransformInitializationException(
String.format(
"Error loading header converter class %s", config.getString(HEADER_CONVERTER)),
e);
}
headerConverterFn = ConnectRecord::headers;
} else {
try (HeaderConverter converter =
(HeaderConverter) loadClass(config.getString(HEADER_CONVERTER), loader)) {
converter.configure(PropsParser.apply(props, HEADER_CONVERTER));
headerConverter = converter;
} catch (Exception e) {
throw new TransformInitializationException(
String.format(
"Error loading header converter class %s", config.getString(HEADER_CONVERTER)),
e);
}

headerConverterFn =
record -> {
Headers newHeaders = new ConnectHeaders();
Headers recordHeaders = record.headers();
if (recordHeaders != null) {
String topic = record.topic();
for (Header recordHeader : recordHeaders) {
SchemaAndValue schemaAndValue =
headerConverter.toConnectHeader(
topic, recordHeader.key(), (byte[]) recordHeader.value());
newHeaders.add(recordHeader.key(), schemaAndValue);
}
}
return newHeaders;
};
}

if (config.getString(TRANSFORMATIONS) == null) {
smts = Lists.newArrayList();
} else {

smts =
Arrays.stream(config.getString(TRANSFORMATIONS).split(","))
.map(className -> loadClass(className, loader))
.map(obj -> (Transformation<SinkRecord>) obj)
.peek(smt -> smt.configure(PropsParser.apply(props, TRANSFORMATIONS)))
.collect(Collectors.toList());
}


Map<String, String> stringProps = propsAsStrings(props);

converterErrorHandler =
(TransformExceptionHandler) loadClass(config.getString(CONVERTER_ERROR_HANDLER), loader);
converterErrorHandler.configure(stringProps);
smtErrorHandler =
(TransformExceptionHandler) loadClass(config.getString(SMT_ERROR_HANDLER), loader);
smtErrorHandler.configure(stringProps);
}

private Object loadClass(String name, ClassLoader loader) {
if (name == null || name.isEmpty()) {
throw new TransformInitializationException("cannot initialize empty class");
}
Object obj;
try {
Class<?> clazz = Class.forName(name, true, loader);
obj = clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new TransformInitializationException(
String.format("could not initialize class %s", name), e);
}
return obj;
}

private DeserializedRecord deserialize(SinkRecord record) {
SchemaAndValue keyData;
SchemaAndValue valueData;
Headers newHeaders;

try {
keyData = keyConverter.apply(record);
} catch (Exception e) {
return new DeserializedRecord(converterErrorHandler.handle(record, e, KEY_FAILURE), true);
}

try {
valueData = valueConverter.apply(record);
} catch (Exception e) {
return new DeserializedRecord(converterErrorHandler.handle(record, e, VALUE_FAILURE), true);
}
try {
newHeaders = headerConverterFn.apply(record);
} catch (Exception e) {
return new DeserializedRecord(converterErrorHandler.handle(record, e, HEADER_FAILURE), true);
}

return new DeserializedRecord(
record.newRecord(
record.topic(),
record.kafkaPartition(),
keyData.schema(),
keyData.value(),
valueData.schema(),
valueData.value(),
record.timestamp(),
newHeaders),
false);
}

private SinkRecord newRecord(SinkRecord original, SinkRecord transformed) {
if (!original.headers().isEmpty()) {
List<Struct> serializedHeaders = DeadLetterUtils.serializedHeaders(original);
transformed
.headers()
.add(
DeadLetterUtils.HEADERS_HEADER,
new SchemaAndValue(DeadLetterUtils.HEADER_SCHEMA, serializedHeaders));
}
if (original.key() != null) {
transformed
.headers()
.add(
DeadLetterUtils.KEY_HEADER,
new SchemaAndValue(OPTIONAL_BYTES_SCHEMA, original.key()));
}
if (original.value() != null) {
transformed
.headers()
.add(
DeadLetterUtils.VALUE_HEADER,
new SchemaAndValue(OPTIONAL_BYTES_SCHEMA, original.value()));
}

return transformed;
}

private Map<String, String> propsAsStrings(Map<String, ?> props) {
Map<String, String> newProps = Maps.newHashMap();
props.forEach((key, value) -> {
if (value instanceof String) {
newProps.put(key, (String) value);
}
});
return newProps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.sink.SinkRecord;

public interface TransformExceptionHandler {
SinkRecord handle(SinkRecord original, Throwable error, String location);

void configure(Map<String, String> props);

ConfigDef config();
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.Transformation;

/* Appends values to record.values that are strings, useful for testing SMT control flow */
public class TestStringTransform implements Transformation<SinkRecord> {

private String text;
private boolean returnNull;

private boolean shouldThrow;

@Override
public synchronized SinkRecord apply(SinkRecord record) {
if (shouldThrow) {
throw new RuntimeException("smt failure");
}
if (record.value() == null) {
return record;
}
if (returnNull) {
return null;
}
String newValue;
if (record.value() instanceof String) {
newValue = (record.value()) + text;
} else {
throw new IllegalArgumentException("record.value is not a string");
}
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
newValue,
record.timestamp());
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> map) {
if (map.get("transform_text") != null) {
text = (String) map.get("transform_text");
} else {
text = "default";
}

if (map.get("null") != null) {
returnNull = Boolean.parseBoolean((String) map.get("null"));
} else {
returnNull = false;
}

if (map.get("throw") != null) {
shouldThrow = Boolean.parseBoolean((String) map.get("throw"));
} else {
shouldThrow = false;
}
}
}
1 change: 1 addition & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
implementation project(":iceberg-kafka-connect-events")
implementation project(":iceberg-kafka-connect-deadletter")
implementation libs.bundles.iceberg
implementation libs.bundles.jackson
implementation libs.avro
Original file line number Diff line number Diff line change
@@ -83,6 +83,11 @@ public class IcebergSinkConfig extends AbstractConfig {
"iceberg.tables.schema-force-optional";
private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
"iceberg.tables.schema-case-insensitive";
private static final String WRITE_EXCEPTION_HANDLER_PROP = "iceberg.deadletter.handler";
private static final String FAILED_RECORD_FACTORY_PROP =
"iceberg.deadletter.failed_record_factory";
private static final String FAILED_RECORD_FACTORY_PREFIX =
"iceberg.deadletter.failed_record_factory";
private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
private static final String CONTROL_GROUP_ID_PROP = "iceberg.control.group-id";
private static final String COMMIT_INTERVAL_MS_PROP = "iceberg.control.commit.interval-ms";
@@ -92,14 +97,11 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String COMMIT_THREADS_PROP = "iceberg.control.commit.threads";
private static final String CONNECT_GROUP_ID_PROP = "iceberg.connect.group-id";
private static final String HADDOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir";

private static final String NAME_PROP = "name";
private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers";

private static final String DEFAULT_CATALOG_NAME = "iceberg";
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";

public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts
public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts

@@ -237,6 +239,18 @@ private static ConfigDef newConfigDef() {
null,
Importance.MEDIUM,
"Coordinator threads to use for table commits, default is (cores * 2)");
configDef.define(
WRITE_EXCEPTION_HANDLER_PROP,
Type.STRING,
null,
Importance.MEDIUM,
"If writing to Dead Letter Table, write exception handler class to use");
configDef.define(
FAILED_RECORD_FACTORY_PROP,
Type.STRING,
null,
Importance.MEDIUM,
"If writing to Dead Letter Table, failed record factory class to use");
return configDef;
}

@@ -333,6 +347,18 @@ public boolean dynamicTablesEnabled() {
return getBoolean(TABLES_DYNAMIC_PROP);
}

public boolean deadLetterTableEnabled() {
return getWriteExceptionHandler() != null;
}

public String getWriteExceptionHandler() {
return getString(WRITE_EXCEPTION_HANDLER_PROP);
}

public String getFailedRecordHandler() {
return getString(FAILED_RECORD_FACTORY_PROP);
}

public String tablesRouteField() {
return getString(TABLES_ROUTE_FIELD_PROP);
}
@@ -349,6 +375,10 @@ public String tablesDefaultPartitionBy() {
return getString(TABLES_DEFAULT_PARTITION_BY);
}

public Map<String, String> failedRecordHandlerProperties() {
return PropertyUtil.propertiesWithPrefix(originalProps, FAILED_RECORD_FACTORY_PREFIX + ".");
}

public TableSinkConfig tableConfig(String tableName) {
return tableConfigMap.computeIfAbsent(
tableName,
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ public class TaskImpl implements Task, AutoCloseable {

public TaskImpl(SinkTaskContext context, IcebergSinkConfig config) {
this.catalog = Utilities.loadCatalog(config);
this.writer = new Worker(config, catalog);
this.writer = new Worker(context, config, catalog);
this.committer = new CommitterImpl(context, config, catalog);
}

Original file line number Diff line number Diff line change
@@ -18,52 +18,49 @@
*/
package io.tabular.iceberg.connect.channel;

import static java.util.stream.Collectors.toList;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.IcebergWriterFactory;
import io.tabular.iceberg.connect.data.Offset;
import io.tabular.iceberg.connect.data.RecordWriter;
import io.tabular.iceberg.connect.data.Utilities;
import io.tabular.iceberg.connect.data.WriterResult;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import io.tabular.iceberg.connect.data.IcebergWriterFactory;
import io.tabular.iceberg.connect.data.Offset;
import io.tabular.iceberg.connect.data.RecordRouter;
import io.tabular.iceberg.connect.data.WriterManager;
import io.tabular.iceberg.connect.data.WriterResult;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.connect.sink.SinkTaskContext;

// TODO: rename to WriterImpl later, minimize changes for clearer commit history for now
class Worker implements Writer, AutoCloseable {
private final WriterManager writers;

private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
private final IcebergSinkConfig config;
private final IcebergWriterFactory writerFactory;
private final Map<String, RecordWriter> writers;
private final Map<TopicPartition, Offset> sourceOffsets;
private final Map<TopicPartition, Offset> sourceOffsets;
private final RecordRouter recordRouter;

Worker(IcebergSinkConfig config, Catalog catalog) {
this(config, new IcebergWriterFactory(catalog, config));
Worker(SinkTaskContext context, IcebergSinkConfig config, Catalog catalog) {
this(context, config, new IcebergWriterFactory(catalog, config));
}

@VisibleForTesting
Worker(IcebergSinkConfig config, IcebergWriterFactory writerFactory) {
this.config = config;
this.writerFactory = writerFactory;
this.writers = Maps.newHashMap();
Worker(SinkTaskContext context, IcebergSinkConfig config, IcebergWriterFactory writerFactory) {
this.writers = new WriterManager(writerFactory);
this.sourceOffsets = Maps.newHashMap();
this.recordRouter =
RecordRouter.from(writers, config, this.getClass().getClassLoader(), context);

}

@Override
public Committable committable() {
List<WriterResult> writeResults =
writers.values().stream().flatMap(writer -> writer.complete().stream()).collect(toList());
List<WriterResult> writeResults = writers.writeResults();
Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);

writers.clear();
@@ -74,7 +71,7 @@ public Committable committable() {

@Override
public void close() throws IOException {
writers.values().forEach(RecordWriter::close);
writers.stop();
writers.clear();
sourceOffsets.clear();
}
@@ -93,67 +90,6 @@ private void save(SinkRecord record) {
new TopicPartition(record.topic(), record.kafkaPartition()),
new Offset(record.kafkaOffset() + 1, record.timestamp()));

if (config.dynamicTablesEnabled()) {
routeRecordDynamically(record);
} else {
routeRecordStatically(record);
}
}

private void routeRecordStatically(SinkRecord record) {
String routeField = config.tablesRouteField();

if (routeField == null) {
// route to all tables
config
.tables()
.forEach(
tableName -> {
writerForTable(tableName, record, false).write(record);
});

} else {
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
config
.tables()
.forEach(
tableName ->
config
.tableConfig(tableName)
.routeRegex()
.ifPresent(
regex -> {
if (regex.matcher(routeValue).matches()) {
writerForTable(tableName, record, false).write(record);
}
}));
}
}
}

private void routeRecordDynamically(SinkRecord record) {
String routeField = config.tablesRouteField();
Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");

String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
String tableName = routeValue.toLowerCase();
writerForTable(tableName, record, true).write(record);
}
}

private String extractRouteValue(Object recordValue, String routeField) {
if (recordValue == null) {
return null;
}
Object routeValue = Utilities.extractFromRecordValue(recordValue, routeField);
return routeValue == null ? null : routeValue.toString();
}

private RecordWriter writerForTable(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
return writers.computeIfAbsent(
tableName, notUsed -> writerFactory.createWriter(tableName, sample, ignoreMissingTable));
recordRouter.write(record);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseCatalogApi implements CatalogApi {
private static final Logger LOG = LoggerFactory.getLogger(BaseCatalogApi.class);
private final Catalog catalog;
private final BiFunction<TableIdentifier, SinkRecord, Schema> schemaFactory;
private final IcebergSinkConfig config;

BaseCatalogApi(Catalog catalog, IcebergSinkConfig config) {
this.config = config;
this.catalog = catalog;
this.schemaFactory =
(tableIdentifier, sample) -> {
Types.StructType structType;
if (sample.valueSchema() == null) {
structType =
SchemaUtils.inferIcebergType(sample.value(), config)
.orElseThrow(
() -> new DataException("Unable to create table from empty object"))
.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}

return new org.apache.iceberg.Schema(structType.fields());
};
}

@VisibleForTesting
BaseCatalogApi(
Catalog catalog,
IcebergSinkConfig config,
BiFunction<TableIdentifier, SinkRecord, Schema> schemaFactory) {
this.config = config;
this.catalog = catalog;
this.schemaFactory = schemaFactory;
}

@Override
public TableIdentifier tableId(String name) {
TableIdentifier tableId;
try {
Preconditions.checkArgument(!name.isEmpty());
tableId = TableIdentifier.parse(name);
} catch (Exception error) {
throw new WriteException.TableIdentifierException(name, error);
}
return tableId;
}

@Override
public final Table loadTable(TableIdentifier identifier) {
try {
return catalog.loadTable(identifier);
} catch (NoSuchTableException error) {
throw error;
} catch (Exception error) {
throw new WriteException.LoadTableException(identifier, error);
}
}

@Override
public final PartitionSpec partitionSpec(String tableName, Schema schema) {
List<String> partitionBy = config.tableConfig(tableName).partitionBy();

PartitionSpec spec;
try {
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
tableName,
e);
spec = PartitionSpec.unpartitioned();
}
return spec;
}

@Override
public Table createTable(
TableIdentifier identifier,
Schema schema,
PartitionSpec spec,
Map<String, String> properties) {

try {
return catalog.createTable(identifier, schema, spec, properties);
} catch (Exception error) {
throw new WriteException.CreateTableException(identifier, error);
}
}

@Override
public Schema schema(TableIdentifier identifier, SinkRecord sample) {
try {
return schemaFactory.apply(identifier, sample);
} catch (Exception error) {
throw new WriteException.CreateSchemaException(identifier, error);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.kafka.connect.sink.SinkRecord;

public interface CatalogApi {

TableIdentifier tableId(String name);

Table loadTable(TableIdentifier identifier);

PartitionSpec partitionSpec(String tableName, Schema schema);

Table createTable(
TableIdentifier identifier,
Schema schema,
PartitionSpec spec,
Map<String, String> properties);

Schema schema(TableIdentifier identifier, SinkRecord sample);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.deadletter.FailedRecordFactory;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public class DefaultWriteExceptionHandler implements WriteExceptionHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't live in this module, should live in the dead-letter module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WriteExceptionHandler is only used by the connector, not by both the connector and the error transform --it doesn't need to be shared.

Since it all gets packaged into a single jar anyways.... I could move it, or leave it separate. I'm ok with it how it is unless this really bugs you for some reason.

Note: we have other open comments about modules so let's chat offline about all that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteExceptionHandler interface should live in the connector, that's fine.

I wanted the implementation/s of that interface (currently just DefaultWriteExceptionHandler) to live outside of the connector because it's expected to be pluggable and because I'm not convinced we can get the implementation correct the first time (it's extremely hard to figure out what is transient and what is not transient). Once we're confident we've got it correct, we can move it in and make it the default if we want but that's a later step IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I move it, I also have to move all the custom exceptions into iceberg-kafka-connect-deadletter as well, otherwise I can't reference them from the exception handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's concerning because that means other users defining their own WriteExceptionHandler implementations can't reference those exceptions either?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think about this. If iceberg-kafka-connect runtime was provided, they should be able to see this no?

private FailedRecordFactory factory;

@Override
public void initialize(
SinkTaskContext context, IcebergSinkConfig config, FailedRecordFactory recordFactory) {
this.factory = recordFactory;
}

@Override
public SinkRecord handle(SinkRecord record, Exception exception) {
if (exception instanceof WriteException) {
return handleWriteException(record, (WriteException) exception);
}
Throwable cause = exception.getCause();
if (cause instanceof WriteException) {
return handleWriteException(record, (WriteException) cause);
}
throw new RuntimeException(exception);
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private SinkRecord handleWriteException(SinkRecord record, WriteException exception) {
if (exception instanceof WriteException.CreateTableException) {
Throwable cause = exception.getCause();
if (cause instanceof IllegalArgumentException || cause instanceof ValidationException) {
return failedRecord(record, exception);
}
} else if (exception instanceof WriteException.CreateSchemaException) {
return failedRecord(record, exception);
} else if (exception instanceof WriteException.LoadTableException) {
Throwable cause = exception.getCause();
if (cause instanceof IllegalArgumentException || cause instanceof ValidationException) {
return failedRecord(record, exception);
}
} else if (exception instanceof WriteException.RecordConversionException) {
return failedRecord(record, exception);

} else if (exception instanceof WriteException.RouteException) {
return failedRecord(record, exception);

} else if (exception instanceof WriteException.RouteRegexException) {
return failedRecord(record, exception);

} else if (exception instanceof WriteException.SchemaEvolutionException) {
Throwable cause = exception.getCause();
if (cause instanceof IllegalArgumentException
|| cause instanceof ValidationException
|| cause instanceof UnsupportedOperationException) {
return failedRecord(record, exception);
}
} else if (exception instanceof WriteException.TableIdentifierException) {
return failedRecord(record, exception);
}
throw exception;
}

private SinkRecord failedRecord(SinkRecord record, WriteException exception) {
return factory.recordFromConnector(record, exception, null);
}
}
Original file line number Diff line number Diff line change
@@ -94,15 +94,20 @@ private Record convertToRow(SinkRecord record) {
// initialize a new writer with the new schema
initNewWriter();
// convert the row again, this time using the new table schema
// fail here again
row = recordConverter.convert(record.value(), null);
}

return row;
}

private Operation extractCdcOperation(Object recordValue, String cdcField) {
Object opValue = Utilities.extractFromRecordValue(recordValue, cdcField);

Object opValue;
try {
opValue = Utilities.extractFromRecordValue(recordValue, cdcField);
} catch (Exception e) {
throw new WriteException.CdcException(e);
}
if (opValue == null) {
return Operation.INSERT;
}
Original file line number Diff line number Diff line change
@@ -19,46 +19,45 @@
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.Tasks;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergWriterFactory {

private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class);

private final Catalog catalog;
private final IcebergSinkConfig config;

private final CatalogApi catalogApi;

public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
this.catalog = catalog;
this(config, getCatalogApi(catalog, config));
}

public IcebergWriterFactory(IcebergSinkConfig config, CatalogApi api) {
this.config = config;
this.catalogApi = api;
}

public RecordWriter createWriter(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
TableIdentifier identifier = TableIdentifier.parse(tableName);
Table table;
try {
table = catalog.loadTable(identifier);
table = catalogApi.loadTable(identifier);
} catch (NoSuchTableException nst) {
if (config.autoCreateEnabled()) {
table = autoCreateTable(tableName, sample);
} else if (ignoreMissingTable) {
return new RecordWriter() {};
} else {
throw nst;
throw new WriteException.LoadTableException(identifier, nst);
}
}

@@ -67,46 +66,29 @@ public RecordWriter createWriter(

@VisibleForTesting
Table autoCreateTable(String tableName, SinkRecord sample) {
StructType structType;
if (sample.valueSchema() == null) {
structType =
SchemaUtils.inferIcebergType(sample.value(), config)
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}
TableIdentifier identifier = catalogApi.tableId(tableName);

org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);
Schema schema = catalogApi.schema(identifier, sample);

List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
identifier,
e);
spec = PartitionSpec.unpartitioned();
}
PartitionSpec partitionSpec = catalogApi.partitionSpec(tableName, schema);

PartitionSpec partitionSpec = spec;
AtomicReference<Table> result = new AtomicReference<>();
Tasks.range(1)
.retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
.run(
notUsed -> {
try {
result.set(catalog.loadTable(identifier));
result.set(catalogApi.loadTable(identifier));
} catch (NoSuchTableException e) {
result.set(
catalog.createTable(
catalogApi.createTable(
identifier, schema, partitionSpec, config.autoCreateProps()));
}
});
return result.get();
}

private static CatalogApi getCatalogApi(Catalog catalog, IcebergSinkConfig config) {
return new BaseCatalogApi(catalog, config) {};
}
}
Original file line number Diff line number Diff line change
@@ -91,10 +91,15 @@ public Record convert(Object data) {
}

public Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) {
if (data instanceof Struct || data instanceof Map) {
return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer);
try {
if (data instanceof Struct || data instanceof Map) {
return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer);
}
throw new WriteException.RecordConversionException(
new UnsupportedOperationException("Cannot convert type: " + data.getClass().getName()));
} catch (Exception error) {
throw new WriteException.RecordConversionException(error);
}
throw new UnsupportedOperationException("Cannot convert type: " + data.getClass().getName());
}

private NameMapping createNameMapping(Table table) {
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.deadletter.DeadLetterUtils;
import io.tabular.iceberg.connect.deadletter.FailedRecordFactory;

import java.util.List;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public abstract class RecordRouter {

public void write(SinkRecord record) {}

protected final String extractRouteValue(Object recordValue, String routeField) {
Object routeValue;
if (recordValue == null) {
return null;
}
try {
routeValue = Utilities.extractFromRecordValue(recordValue, routeField);
} catch (Exception error) {
throw new WriteException.RouteException(error);
}

return routeValue == null ? null : routeValue.toString();
}

public static RecordRouter from(
WriterManager writers,
IcebergSinkConfig config,
ClassLoader loader,
SinkTaskContext context) {
RecordRouter baseRecordRouter;

if (config.dynamicTablesEnabled()) {
Preconditions.checkNotNull(
config.tablesRouteField(), "Route field cannot be null with dynamic routing");
baseRecordRouter = new DynamicRecordRouter(writers, config.tablesRouteField());
} else {
if (config.tables() != null && !config.tables().isEmpty()) {
config.tables().forEach(TableIdentifier::of);
if (config.tablesRouteField() != null) {
if (hasRegexMode(config)) {
baseRecordRouter = new RegexRecordRouter(writers, config);
} else {
baseRecordRouter = new FallbackRecordRouter(new DynamicRecordRouter(writers, config.tablesRouteField()), new ConfigRecordRouter(writers, config.tables()));
}
} else {
baseRecordRouter = new ConfigRecordRouter(writers, config.tables());
}
} else {
baseRecordRouter = new RegexRecordRouter(writers, config);
}
}

if (config.deadLetterTableEnabled()) {
String failedRecordFactoryClass = config.getFailedRecordHandler();
String handlerClass = config.getWriteExceptionHandler();
FailedRecordFactory factory =
(FailedRecordFactory) DeadLetterUtils.loadClass(failedRecordFactoryClass, loader);
factory.configure(config.failedRecordHandlerProperties());
WriteExceptionHandler handler =
(WriteExceptionHandler) DeadLetterUtils.loadClass(handlerClass, loader);
handler.initialize(context, config, factory);
baseRecordRouter =
new RecordRouter.ErrorHandlingRecordRouter(baseRecordRouter, handler);
}

return baseRecordRouter;
}

private static boolean hasRegexMode(IcebergSinkConfig config) {
long definedRegexes = config
.tables()
.stream()
.map(
tableName -> {
try {
return config
.tableConfig(tableName)
.routeRegex().isPresent();
} catch (Exception unused) {
return false;
}
}).filter(present -> present).count();
return definedRegexes > 0;
}

public static class ConfigRecordRouter extends RecordRouter {
private final List<String> tables;
private final WriterManager writers;

ConfigRecordRouter(WriterManager writers, List<String> tables) {
this.tables = tables;
this.writers = writers;
}

@Override
public void write(SinkRecord record) {
// route to all tables
tables.forEach(
tableName -> {
writers.write(tableName, record, false);
});
}
}

public static class RegexRecordRouter extends RecordRouter {
private final String routeField;
private final WriterManager writers;
private final IcebergSinkConfig config;

RegexRecordRouter(WriterManager writers, IcebergSinkConfig config) {
this.routeField = config.tablesRouteField();
this.writers = writers;
this.config = config;
}

@Override
public void write(SinkRecord record) {
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
config
.tables()
.forEach(
tableName ->
config
.tableConfig(tableName)
.routeRegex()
.ifPresent(
regex -> {
boolean matches;
try {
matches = regex.matcher(routeValue).matches();
} catch (Exception error) {
throw new WriteException.RouteRegexException(error);
}
if (matches) {
writers.write(tableName, record, false);
}
}));
}
}
}

public static class DynamicRecordRouter extends RecordRouter {
private final String routeField;
private final WriterManager writers;

DynamicRecordRouter(WriterManager writers, String routeField) {
this.routeField = routeField;
this.writers = writers;
}

@Override
public void write(SinkRecord record) {
String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
String tableName = routeValue.toLowerCase();
writers.write(tableName, record, true);
}
}
}

public static class FallbackRecordRouter extends RecordRouter {
private final RecordRouter primary;
private final RecordRouter fallback;

FallbackRecordRouter(RecordRouter primary, RecordRouter fallback) {
this.primary = primary;
this.fallback = fallback;
}

public void write(SinkRecord record) {
try {
primary.write(record);
} catch (Exception error) {
fallback.write(record);
}
}
}

public static class ErrorHandlingRecordRouter extends RecordRouter {
private final WriteExceptionHandler handler;
private final RecordRouter router;

ErrorHandlingRecordRouter(
RecordRouter baseRouter,
WriteExceptionHandler handler) {
this.router = baseRouter;
this.handler = handler;
}

@Override
public void write(SinkRecord record) {
try {
router.write(record);
} catch (Exception error) {
SinkRecord result = handler.handle(record, error);
router.write(result);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -88,9 +88,13 @@ public static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates
return;
}

Tasks.range(1)
.retry(IcebergSinkConfig.SCHEMA_UPDATE_RETRIES)
.run(notUsed -> commitSchemaUpdates(table, updates));
try {
Tasks.range(1)
.retry(IcebergSinkConfig.SCHEMA_UPDATE_RETRIES)
.run(notUsed -> commitSchemaUpdates(table, updates));
} catch (Exception error) {
throw new WriteException.SchemaEvolutionException(table.name(), error);
}
}

private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updates) {
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import org.apache.iceberg.catalog.TableIdentifier;

public class WriteException extends RuntimeException {

private final String tableIdentifier;

WriteException(Throwable cause) {
super(cause);
tableIdentifier = null;
}

WriteException(String msg) {
super(msg);
tableIdentifier = null;
}

WriteException(TableIdentifier tableId, Throwable cause) {
super(cause);
this.tableIdentifier = tableId.toString();
}

WriteException(String tableId, Throwable cause) {
super(cause);
this.tableIdentifier = tableId;
}

public String tableId() {
return tableIdentifier;
}

public static class CdcException extends WriteException {
public CdcException(Throwable cause) {
super(cause);
}
}

public static class CreateTableException extends WriteException {

public CreateTableException(TableIdentifier identifier, Throwable cause) {
super(identifier, cause);
}
}

public static class CreateSchemaException extends WriteException {
public CreateSchemaException(TableIdentifier identifier, Throwable cause) {
super(identifier, cause);
}
}

public static class LoadTableException extends WriteException {

public LoadTableException(TableIdentifier identifier, Throwable cause) {
super(identifier, cause);
}
}

public static class RecordConversionException extends WriteException {

RecordConversionException(Throwable cause) {
super(cause);
}
}

public static class RouteException extends WriteException {
RouteException(Throwable cause) {
super(cause);
}

RouteException(String msg) {
super(msg);
}
}

public static class RouteRegexException extends WriteException {
RouteRegexException(Throwable cause) {
super(cause);
}
}

public static class SchemaEvolutionException extends WriteException {

SchemaEvolutionException(String name, Throwable cause) {
super(name, cause);
}
}

public static class TableIdentifierException extends WriteException {
TableIdentifierException(String name, Throwable cause) {
super(name, cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.deadletter.FailedRecordFactory;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public interface WriteExceptionHandler {
void initialize(SinkTaskContext context, IcebergSinkConfig config, FailedRecordFactory factory);

/**
* This method will be invoked whenever the connector runs into an exception while trying to write
* SinkRecords to a table. Implementations of this method have 3 general options:
*
* <ol>
* <li>Return a SinkRecord
* <li>Return null to drop the SinkRecord
* </ol>
*
* @param record The SinkRecord that couldn't be written
*/
SinkRecord handle(SinkRecord record, Exception exception);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.connect.sink.SinkRecord;

public class WriterManager {
private final IcebergWriterFactory writerFactory;

private final Map<String, RecordWriter> writers;

public WriterManager(IcebergWriterFactory writerFactory) {
this.writerFactory = writerFactory;
this.writers = Maps.newHashMap();
}

public void write(String tableName, SinkRecord record, boolean ignoreMissingTable) {
writers
.computeIfAbsent(
tableName, notUsed -> writerFactory.createWriter(tableName, record, ignoreMissingTable))
.write(record);
}

public List<WriterResult> writeResults() {
return writers.values().stream()
.flatMap(writer -> writer.complete().stream())
.collect(toList());
}

public void clear() {
this.writers.clear();
}

public void stop() {
writers.values().forEach(RecordWriter::close);
}
}
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types.StructType;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.jupiter.api.Test;

public class WorkerTest {
@@ -63,6 +64,9 @@ public void testDynamicRoute() {
}

private void workerTest(IcebergSinkConfig config, Map<String, Object> value) {
SinkTaskContext context = mock(SinkTaskContext.class);


WriterResult writeResult =
new WriterResult(
TableIdentifier.parse(TABLE_NAME),
@@ -75,7 +79,7 @@ private void workerTest(IcebergSinkConfig config, Map<String, Object> value) {
IcebergWriterFactory writerFactory = mock(IcebergWriterFactory.class);
when(writerFactory.createWriter(any(), any(), anyBoolean())).thenReturn(writer);

Writer worker = new Worker(config, writerFactory);
Writer worker = new Worker(context, config, writerFactory);

// save a record
SinkRecord rec = new SinkRecord(SRC_TOPIC_NAME, 0, null, "key", null, value, 0L);
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.TableSinkConfig;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;

public class BaseCatalogApiTest {

private static final String DEAD_LETTER_TABLE = "dlt.table";
private static final TableIdentifier DEAD_LETTER_TABLE_ID =
TableIdentifier.parse(DEAD_LETTER_TABLE);

private static final org.apache.kafka.connect.data.Schema SCHEMA =
SchemaBuilder.struct()
.field("a", Schema.STRING_SCHEMA)
.field("b", Schema.STRING_SCHEMA)
.build();

private SinkRecord sinkRecord() {
Struct struct = new Struct(SCHEMA);
struct.put("a", "a");
struct.put("b", "b");
return new SinkRecord("some-topic", 0, null, null, SCHEMA, struct, 100L);
}

@Test
@DisplayName("tableId throw exceptions for invalid table names")
public void tableIdThrows() {
Catalog catalog = mock(Catalog.class);
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
CatalogApi catalogApi = new BaseCatalogApi(catalog, config) {};
assertThrows(WriteException.TableIdentifierException.class, () -> catalogApi.tableId(""));
}

@Test
@DisplayName("loadTable should throw LoadTable exceptions wrapping underlying exceptions")
public void loadTableThrows() {}

@Test
@DisplayName("schema should wrap exceptions")
public void catalogApiSchemaShouldWrap() {
Catalog catalog = mock(Catalog.class);
IcebergSinkConfig config = mock(IcebergSinkConfig.class);

BiFunction<TableIdentifier, SinkRecord, org.apache.iceberg.Schema> illegalArgFn =
(a, b) -> {
throw new IllegalArgumentException("test");
};

BiFunction<TableIdentifier, SinkRecord, org.apache.iceberg.Schema> validationExceptionFn =
(a, b) -> {
throw new ValidationException("test");
};

CatalogApi catalogApiIllegalArg = new BaseCatalogApi(catalog, config, illegalArgFn);
CatalogApi catalogApiValidationExp = new BaseCatalogApi(catalog, config, validationExceptionFn);

assertThrows(
WriteException.CreateSchemaException.class,
() -> catalogApiIllegalArg.schema(DEAD_LETTER_TABLE_ID, sinkRecord()));
assertThrows(
WriteException.CreateSchemaException.class,
() -> catalogApiValidationExp.schema(DEAD_LETTER_TABLE_ID, sinkRecord()));
}

@Test
@DisplayName("partitionSpec should apply the configured PartitionSpec")
public void catalogApiAppliesPartitionConfig() {
Catalog catalog = mock(Catalog.class);
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
TableSinkConfig tableConfig =
new TableSinkConfig(
Pattern.compile(".*123", Pattern.DOTALL),
Lists.newArrayList(),
Lists.newArrayList("a"),
null);

when(config.tableConfig(ArgumentMatchers.any())).thenReturn(tableConfig);
CatalogApi catalogApi = new BaseCatalogApi(catalog, config) {};

org.apache.iceberg.Schema schema = catalogApi.schema(DEAD_LETTER_TABLE_ID, sinkRecord());
assertThat(catalogApi.partitionSpec(DEAD_LETTER_TABLE, schema).isPartitioned()).isTrue();
}

@Test
@DisplayName(".partitionSpec should create be unpartitioned if an error occurs")
public void catalogApiPartitionSpecUnpartitioned() {
Catalog catalog = mock(Catalog.class);
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
// partition on a field that does not exist
TableSinkConfig tableConfig =
new TableSinkConfig(
Pattern.compile(".*123", Pattern.DOTALL),
Lists.newArrayList(),
Lists.newArrayList("does_not_exist"),
null);

when(config.tableConfig(ArgumentMatchers.any())).thenReturn(tableConfig);
CatalogApi catalogApi = new BaseCatalogApi(catalog, config) {};

org.apache.iceberg.Schema schema = catalogApi.schema(DEAD_LETTER_TABLE_ID, sinkRecord());
assertThat(catalogApi.partitionSpec(DEAD_LETTER_TABLE, schema).isUnpartitioned()).isTrue();
}

@Test
@DisplayName("createTable should throw CreateTable exceptions for underlying exceptions")
public void catalogCreateTableShouldThrow() {
Catalog catalogValidationException = mock(Catalog.class);
when(catalogValidationException.createTable(
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()))
.thenThrow(new ValidationException("test"));

Catalog catalogIllegalArgException = mock(Catalog.class);
when(catalogIllegalArgException.createTable(
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()))
.thenThrow(new IllegalArgumentException("test"));

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
TableSinkConfig tableConfig =
new TableSinkConfig(
Pattern.compile(".*123", Pattern.DOTALL),
Lists.newArrayList(),
Lists.newArrayList("a"),
null);

when(config.tableConfig(ArgumentMatchers.any())).thenReturn(tableConfig);
CatalogApi catalogApiValidation = new BaseCatalogApi(catalogValidationException, config) {};
CatalogApi catalogApiIllegal = new BaseCatalogApi(catalogIllegalArgException, config) {};

org.apache.iceberg.Schema schema =
catalogApiValidation.schema(DEAD_LETTER_TABLE_ID, sinkRecord());

assertThrows(
WriteException.CreateTableException.class,
() ->
catalogApiValidation.createTable(
DEAD_LETTER_TABLE_ID,
schema,
catalogApiValidation.partitionSpec(DEAD_LETTER_TABLE, schema),
Maps.newHashMap()));
assertThrows(
WriteException.CreateTableException.class,
() ->
catalogApiIllegal.createTable(
DEAD_LETTER_TABLE_ID,
schema,
catalogApiValidation.partitionSpec(DEAD_LETTER_TABLE, schema),
Maps.newHashMap()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.deadletter.DeadLetterUtils;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

public class RecordRouterTest {

static class RecordingWriterManager extends WriterManager {

private final List<Pair<String, SinkRecord>> log;
RecordingWriterManager(IcebergWriterFactory factory) {
super(factory);
this.log = Lists.newArrayList();
}

@Override
public void write(String tableName, SinkRecord record, boolean ignoreMissingTable) {
log.add(Pair.of(tableName, record));
}

}

private static final IcebergWriterFactory factory = mock(IcebergWriterFactory.class);
private static final SinkTaskContext context = mock(SinkTaskContext.class);


@Test
@DisplayName("ConfigRouter should dispatch based on configured tables")
public void configRouterTest() {
RecordingWriterManager manager = new RecordingWriterManager(factory);
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
SinkRecord record = mock(SinkRecord.class);

when(config.tables()).thenReturn(Lists.newArrayList("tbl1", "tbl2"));
when(config.deadLetterTableEnabled()).thenReturn(false);
when(config.tablesRouteField()).thenReturn(null);
when(config.dynamicTablesEnabled()).thenReturn(false);

RecordRouter router = RecordRouter.from(manager, config, this.getClass().getClassLoader(), context);
// do some assertions here.
assertThat(router).isInstanceOf(RecordRouter.ConfigRecordRouter.class);
// test some dispatching here.
router.write(record);
List<Pair<String, SinkRecord>> result = manager.log;
assertThat(result).isEqualTo(Lists.newArrayList(Pair.of("tbl1", record), Pair.of("tbl2", record)));
}

@Test
@DisplayName("Fallback writer should dispatch based on record value and fall back to configured tables otherwise")
public void fallBackWriterTest() {
RecordingWriterManager manager = new RecordingWriterManager(factory);
IcebergSinkConfig config = mock(IcebergSinkConfig.class);

Schema schemaWithRoute = SchemaBuilder.struct().field("a", Schema.STRING_SCHEMA).field("route_field", Schema.STRING_SCHEMA).build();
Schema schemaWithoutRoute = SchemaBuilder.struct().field("a", Schema.STRING_SCHEMA);

Struct structWithRoute = new Struct(schemaWithRoute).put("a", "a").put("route_field", "route_field_table");
Struct structWithoutRoute = new Struct(schemaWithoutRoute).put("a", "a");

SinkRecord recordWithRoute = new SinkRecord("topic", 1, null, null, schemaWithRoute, structWithRoute, 100L);
SinkRecord recordWithoutRoute = new SinkRecord("topic", 1, null, null, schemaWithoutRoute, structWithoutRoute, 101L);

when(config.tables()).thenReturn(Lists.newArrayList("tbl1", "tbl2"));
when(config.deadLetterTableEnabled()).thenReturn(false);
when(config.tablesRouteField()).thenReturn("route_field");
when(config.dynamicTablesEnabled()).thenReturn(false);

RecordRouter router = RecordRouter.from(manager, config, this.getClass().getClassLoader(), context);
assertThat(router).isInstanceOf(RecordRouter.FallbackRecordRouter.class);
router.write(recordWithRoute);
router.write(recordWithoutRoute);
List<Pair<String, SinkRecord>> result = manager.log;
assertThat(result).isEqualTo(Lists.newArrayList(Pair.of("route_field_table", recordWithRoute), Pair.of("tbl1", recordWithoutRoute), Pair.of("tbl2", recordWithoutRoute)));
}


@Test
@DisplayName("DynamicRecordRouter should dispatch based on the record field")
public void dynamicRecordRouterTest() {
RecordingWriterManager manager = new RecordingWriterManager(factory);
IcebergSinkConfig config = mock(IcebergSinkConfig.class);

Schema schemaWithRoute = SchemaBuilder.struct().field("a", Schema.STRING_SCHEMA).field("route_field", Schema.STRING_SCHEMA).build();
Schema schemaWithoutRoute = SchemaBuilder.struct().field("a", Schema.STRING_SCHEMA);

Struct structWithRoute = new Struct(schemaWithRoute).put("a", "a").put("route_field", "route_field_table");
Struct structWithoutRoute = new Struct(schemaWithoutRoute).put("a", "a");

SinkRecord recordWithRoute = new SinkRecord("topic", 1, null, null, schemaWithRoute, structWithRoute, 100L);
SinkRecord recordWithoutRoute = new SinkRecord("topic", 1, null, null, schemaWithoutRoute, structWithoutRoute, 101L);

when(config.tables()).thenReturn(Lists.newArrayList());
when(config.deadLetterTableEnabled()).thenReturn(false);
when(config.tablesRouteField()).thenReturn("route_field");
when(config.dynamicTablesEnabled()).thenReturn(true);

RecordRouter router = RecordRouter.from(manager, config, this.getClass().getClassLoader(), context);
assertThat(router).isInstanceOf(RecordRouter.DynamicRecordRouter.class);

router.write(recordWithRoute);
List<Pair<String, SinkRecord>> result = manager.log;

assertThat(result).isEqualTo(Lists.newArrayList(Pair.of("route_field_table", recordWithRoute)));
assertThrows(WriteException.RouteException.class, () -> router.write(recordWithoutRoute));
}

@Test
@DisplayName("RegexRouter should be configured when dynamicTablesEnabled is false and iceberg.tables is null or empty")
public void regexRouterTest() {
RecordingWriterManager manager = new RecordingWriterManager(factory);

IcebergSinkConfig configTablesNull = mock(IcebergSinkConfig.class);
when(configTablesNull.tables()).thenReturn(null);
when(configTablesNull.deadLetterTableEnabled()).thenReturn(false);
when(configTablesNull.tablesRouteField()).thenReturn("route_val");
when(configTablesNull.dynamicTablesEnabled()).thenReturn(false);

IcebergSinkConfig configTablesEmpty = mock(IcebergSinkConfig.class);
when(configTablesEmpty.tables()).thenReturn(Lists.newArrayList());
when(configTablesEmpty.deadLetterTableEnabled()).thenReturn(false);
when(configTablesEmpty.tablesRouteField()).thenReturn("route_val");
when(configTablesEmpty.dynamicTablesEnabled()).thenReturn(false);

RecordRouter routerNull = RecordRouter.from(manager, configTablesNull, this.getClass().getClassLoader(), context);
RecordRouter routerEmpty = RecordRouter.from(manager, configTablesEmpty, this.getClass().getClassLoader(), context);

assertThat(routerNull).isInstanceOf(RecordRouter.RegexRecordRouter.class);
assertThat(routerEmpty).isInstanceOf(RecordRouter.RegexRecordRouter.class);
}

@Test
@DisplayName("ErrorHandlingRouter should be configured when deadLetterTableEnabled is true")
public void errorHandlingRouterGetsConfiguredProperly() {
RecordingWriterManager manager = new RecordingWriterManager(factory);

Schema schemaWithRoute = SchemaBuilder.struct().field("a", Schema.STRING_SCHEMA).field("route_field", Schema.STRING_SCHEMA).build();
Schema schemaWithoutRoute = SchemaBuilder.struct().field("a", Schema.STRING_SCHEMA);

Struct structWithRoute = new Struct(schemaWithRoute).put("a", "a").put("route_field", "route_field_table");
Struct structWithoutRoute = new Struct(schemaWithoutRoute).put("a", "bad_record_fail");

SinkRecord recordWithRoute = new SinkRecord("topic", 1, null, null, schemaWithRoute, structWithRoute, 100L);
SinkRecord recordWithoutRoute = new SinkRecord("topic", 1, null, null, schemaWithoutRoute, structWithoutRoute, 101L);

// defaultRecordFactory assumes ErrorTransform has been used to put original bytes on the records in the headers
// since this record will fail, it will go through the configured DefaultFailedRecordFactory and expects these values to be present
recordWithoutRoute.headers().add(DeadLetterUtils.VALUE_HEADER, new SchemaAndValue(SchemaBuilder.OPTIONAL_BYTES_SCHEMA, "test".getBytes(StandardCharsets.UTF_8)));

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(Lists.newArrayList("tbl1", "tbl2"));
when(config.deadLetterTableEnabled()).thenReturn(true);
when(config.tablesRouteField()).thenReturn("route_field");
when(config.dynamicTablesEnabled()).thenReturn(true);
when(config.getFailedRecordHandler()).thenReturn("io.tabular.iceberg.connect.deadletter.DefaultFailedRecordFactory");
when(config.getWriteExceptionHandler()).thenReturn("io.tabular.iceberg.connect.data.DefaultWriteExceptionHandler");
when(config.failedRecordHandlerProperties()).thenReturn(ImmutableMap.of("table_name", "dlt.table", "route_field", "route_field"));

RecordRouter router = RecordRouter.from(manager, config, this.getClass().getClassLoader(), context);
assertThat(router).isInstanceOf(RecordRouter.ErrorHandlingRecordRouter.class);

router.write(recordWithRoute);
router.write(recordWithoutRoute);

List<Pair<String, SinkRecord>> result = manager.log;
assertThat(result.stream().map(Pair::first).collect(Collectors.toList())).isEqualTo(Lists.newArrayList("route_field_table", "dlt.table"));
}

@Test
@DisplayName("ErrorHandlingRouter should throw if there is an issue with the failed record conversion")
public void errorHandlingRouterDoesNotInfiniteLoop() {
RecordingWriterManager manager = new RecordingWriterManager(factory);

Schema schemaWithoutRoute = SchemaBuilder.struct().field("a", Schema.STRING_SCHEMA);

Struct structWithoutRoute = new Struct(schemaWithoutRoute).put("a", "bad_record_fail");

SinkRecord recordWithoutRoute = new SinkRecord("topic", 1, null, null, schemaWithoutRoute, structWithoutRoute, 101L);

// defaultRecordFactory assumes ErrorTransform has been used to put original bytes on the records in the headers
// since this record will fail, it will go through the configured DefaultFailedRecordFactory and expects these values to be present
recordWithoutRoute.headers().add(DeadLetterUtils.VALUE_HEADER, new SchemaAndValue(SchemaBuilder.OPTIONAL_BYTES_SCHEMA, "test".getBytes(StandardCharsets.UTF_8)));

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tables()).thenReturn(Lists.newArrayList("tbl1", "tbl2"));
when(config.deadLetterTableEnabled()).thenReturn(true);
when(config.tablesRouteField()).thenReturn("route_field");
when(config.dynamicTablesEnabled()).thenReturn(true);
when(config.getFailedRecordHandler()).thenReturn("io.tabular.iceberg.connect.deadletter.DefaultFailedRecordFactory");
when(config.getWriteExceptionHandler()).thenReturn("io.tabular.iceberg.connect.data.DefaultWriteExceptionHandler");
// the underlying router is looking for `route_field` but the failed record handler is configured to have
// the route field on `route_field_bad`
// this should cause the ErrorHandler to throw an exception
// since this is a configuration issue, it should kill the connector w/ unhandled exception
when(config.failedRecordHandlerProperties()).thenReturn(ImmutableMap.of("table_name", "dlt.table", "route_field", "route_field_bad"));

RecordRouter router = RecordRouter.from(manager, config, this.getClass().getClassLoader(), context);
assertThat(router).isInstanceOf(RecordRouter.ErrorHandlingRecordRouter.class);

assertThrows(WriteException.RouteException.class, () -> router.write(recordWithoutRoute));
}
}
3 changes: 3 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -9,3 +9,6 @@ project(":iceberg-kafka-connect-transforms").projectDir = file("kafka-connect-tr

include "iceberg-kafka-connect-runtime"
project(":iceberg-kafka-connect-runtime").projectDir = file("kafka-connect-runtime")

include "iceberg-kafka-connect-deadletter"
project(":iceberg-kafka-connect-deadletter").projectDir = file("kafka-connect-deadletter")