diff --git a/src/test/java/de/azapps/kafkabackup/common/record/RecordSerdeTest.java b/src/test/java/de/azapps/kafkabackup/common/record/RecordSerdeTest.java index b15e14a..bb8d602 100644 --- a/src/test/java/de/azapps/kafkabackup/common/record/RecordSerdeTest.java +++ b/src/test/java/de/azapps/kafkabackup/common/record/RecordSerdeTest.java @@ -26,6 +26,8 @@ public class RecordSerdeTest { // Example records private static final Record SIMPLE_RECORD, NULL_RECORD, EMPTY_RECORD, HEADER_RECORD; + private static final byte[] NON_UTF8_BYTES = {0x012, 0x00, 0xf}; + static { SIMPLE_RECORD = new Record(TOPIC, PARTITION, KEY_BYTES, VALUE_BYTES, OFFSET); NULL_RECORD = new Record(TOPIC, PARTITION, null, null, OFFSET); @@ -42,6 +44,14 @@ public class RecordSerdeTest { public void roundtripTest() throws Exception { Record simpleRoundtrip = writeAndReadRecord(SIMPLE_RECORD); assertEquals(SIMPLE_RECORD, simpleRoundtrip); + + // non-utf8 + RecordHeaders headers = new RecordHeaders(); + headers.add("nonutf8", NON_UTF8_BYTES); + Record nonUtf8Headers = new Record(TOPIC, PARTITION, KEY_BYTES, VALUE_BYTES, OFFSET, null, TimestampType.NO_TIMESTAMP_TYPE, headers); + Record nonUtf8Roundtrip = writeAndReadRecord(nonUtf8Headers); + assertEquals(nonUtf8Headers, nonUtf8Roundtrip); + } @Test @@ -101,7 +111,7 @@ private static Record readFromFile(File file) throws IOException { * Call it manually once when the format changes */ private static void writeTestRecordsToFile() throws IOException { - File directory = new File("src/test/assets/v1/records"); // CHANGEME WHEN CHANGING DATA FORMAT! + File directory = new File("src/test/assets/v1_nonutf8/records"); // CHANGEME WHEN CHANGING DATA FORMAT! writeCurrentVersionRecordToFile(SIMPLE_RECORD, new File(directory, SIMPLE_RECORD_FILE)); writeCurrentVersionRecordToFile(NULL_RECORD, new File(directory, NULL_RECORD_FILE)); writeCurrentVersionRecordToFile(EMPTY_RECORD, new File(directory, EMPTY_RECORD_FILE)); diff --git a/src/test/java/de/azapps/kafkabackup/common/segment/SegmentSerdeTest.java b/src/test/java/de/azapps/kafkabackup/common/segment/SegmentSerdeTest.java index 7ee4229..27f2735 100644 --- a/src/test/java/de/azapps/kafkabackup/common/segment/SegmentSerdeTest.java +++ b/src/test/java/de/azapps/kafkabackup/common/segment/SegmentSerdeTest.java @@ -22,6 +22,7 @@ public class SegmentSerdeTest { private static final byte[] VALUE_BYTES = "test-value".getBytes(StandardCharsets.UTF_8); private static final RecordHeaders HEADERS = new RecordHeaders(); private static final Path TEMP_DIR = TestUtils.getTestDir("SegmentSerdeTest"); + private static final byte[] NON_UTF8_BYTES = {0x012, 0x00, 0xf}; static { HEADERS.add("", new byte[0]); @@ -39,11 +40,16 @@ public void simpleRoundtripTest() throws Exception { records.add(new Record(TOPIC, partition, new byte[0], new byte[0], 2)); records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 3, null, TimestampType.NO_TIMESTAMP_TYPE, HEADERS)); + RecordHeaders headers = new RecordHeaders(); + headers.add("nonutf8", NON_UTF8_BYTES); + records.add(new Record(TOPIC, partition, KEY_BYTES, VALUE_BYTES, 4, null, TimestampType.NO_TIMESTAMP_TYPE, headers)); + + SegmentWriter segmentWriter = new SegmentWriter(TOPIC, partition, 0, TEMP_DIR); for (Record record : records) { segmentWriter.append(record); } - assertEquals(segmentWriter.lastWrittenOffset(), 3); + assertEquals(segmentWriter.lastWrittenOffset(), 4); // Read with default reader diff --git a/system_test/utils/utils.py b/system_test/utils/utils.py index c0497bc..92ba0d8 100755 --- a/system_test/utils/utils.py +++ b/system_test/utils/utils.py @@ -84,6 +84,16 @@ def produce_messages(topic, partition, start_num, count, bootstrap_servers, size "empty": b'', "blubb": b'', } + }, + { + 'value': b"nonUTFHeaders", + 'key': b"headers", + 'headers': { + "nonutf8": bytes.fromhex("deadbeef"), + "nonutf82": bytes.fromhex("0000"), + "nonutf83": bytes.fromhex("ffff"), + bytes.fromhex("deadbeef"): bytes.fromhex("deadbeef"), + } } ]