diff --git a/src/main/java/com/starrocks/connector/kafka/transforms/AddOpFieldForDebeziumRecord.java b/src/main/java/com/starrocks/connector/kafka/transforms/AddOpFieldForDebeziumRecord.java index d8819eb..37a3ae7 100644 --- a/src/main/java/com/starrocks/connector/kafka/transforms/AddOpFieldForDebeziumRecord.java +++ b/src/main/java/com/starrocks/connector/kafka/transforms/AddOpFieldForDebeziumRecord.java @@ -56,6 +56,7 @@ public class AddOpFieldForDebeziumRecord> implements private static final String OP_C = "c"; private static final String OP_U = "u"; private static final String OP_D = "d"; + private static final String OP_R = "r"; private Cache schemaUpdateCache; @Override @@ -82,15 +83,17 @@ public R apply(R record) { try { op = (String) value.get(OP); } catch (Exception e) { + LOG.debug("Expected operation key: `{}` in record but none was found", OP); return record; } - if (op.equals(OP_C) || op.equals(OP_U)) { + if (op.equals(OP_C) || op.equals(OP_U) || op.equals(OP_R)) { Struct newValue = updateValue(value, AFTER); return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp()); } else if (op.equals(OP_D)) { Struct newValue = updateValue(value, BEFORE); return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), newValue.schema(), newValue, record.timestamp()); } + LOG.info("Operation type `{}` is not supported by this transform Class. Returning original record", op); } catch (Exception e) { return record; }