Skip to content

Commit

Permalink
Add support for Debezium read operations
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosescura committed Sep 10, 2024
1 parent 2971c85 commit 2409fd7
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class AddOpFieldForDebeziumRecord<R extends ConnectRecord<R>> implements
private static final String OP_C = "c";
private static final String OP_U = "u";
private static final String OP_D = "d";
private static final String OP_R = "r";
private Cache<Schema, Schema> schemaUpdateCache;

@Override
Expand All @@ -82,15 +83,17 @@ public R apply(R record) {
try {
op = (String) value.get(OP);
} catch (Exception e) {
LOG.debug("Expected operation key: `{}` in record but none was found", OP);
return record;
}
if (op.equals(OP_C) || op.equals(OP_U)) {
if (op.equals(OP_C) || op.equals(OP_U) || op.equals(OP_R)) {
Struct newValue = updateValue(value, AFTER);
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
} else if (op.equals(OP_D)) {
Struct newValue = updateValue(value, BEFORE);
return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp());
}
LOG.info("Operation type `{}` is not supported by this transform Class. Returning original record", op);
} catch (Exception e) {
return record;
}
Expand Down

0 comments on commit 2409fd7

Please sign in to comment.