From 097ff9db2d3a35f4b4e011bf70650b70f9a9347d Mon Sep 17 00:00:00 2001 From: HengZhang Date: Thu, 6 Aug 2020 10:16:08 -0700 Subject: [PATCH] inject ProducerRecord's headers only when LogMessage's LoggingAuditHeaders field or checkSum field is set. --- pom.xml | 2 +- singer-commons/pom.xml | 2 +- singer/pom.xml | 2 +- .../com/pinterest/singer/common/SingerMetrics.java | 1 + .../com/pinterest/singer/writer/KafkaWriter.java | 13 +++++++++---- thrift-logger/pom.xml | 2 +- 6 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 3b07e714..980b1857 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.pinterest.singer singer-package - 0.8.0.23 + 0.8.0.24 pom Singer Logging Agent modules 2013 diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml index b71d2c1d..ee4883b5 100644 --- a/singer-commons/pom.xml +++ b/singer-commons/pom.xml @@ -20,7 +20,7 @@ com.pinterest.singer singer-package - 0.8.0.23 + 0.8.0.24 ../pom.xml diff --git a/singer/pom.xml b/singer/pom.xml index f2628132..8149d1d1 100644 --- a/singer/pom.xml +++ b/singer/pom.xml @@ -7,7 +7,7 @@ com.pinterest.singer singer-package - 0.8.0.23 + 0.8.0.24 ../pom.xml diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index 79c3207f..7347449b 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -104,6 +104,7 @@ public class SingerMetrics { public static final String NUMBER_OF_MISSING_DIRS = "singer.missing_dir_checker.num_of_missing_dirs"; public static final String NUMBER_OF_SERIALIZING_HEADERS_ERRORS = "singer.headers_injector.num_of_serializing_headers_errors"; public static final String AUDIT_HEADERS_INJECTED = "singer.audit.num_of_headers_injected"; + public static final String CHECKSUM_INJECTED = "singer.audit.num_of_checksum_injected"; public static final String AUDIT_HEADERS_METADATA_COUNT_MISMATCH = "singer.audit.headers_metadata_count_mismatch"; public static final String AUDIT_HEADERS_METADATA_COUNT_MATCH = "singer.audit.headers_metadata_count_match"; public static final String NUM_CORRUPTED_MESSAGES = "singer.audit.num_corrupted_messages"; diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java index eb79b28d..6406edb0 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java +++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java @@ -381,10 +381,15 @@ public boolean checkMessageValidAndInjectHeaders( protected void injectHeadersForProducerRecord(LogMessage msg, Headers headers) { try { - byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders()); - this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders); - this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum())); - OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "topic=" + topic, "host=" + HOSTNAME, "logName=" + msg.getLoggingAuditHeaders().getLogName(), "logStreamName=" + logName); + if (msg.isSetLoggingAuditHeaders()) { + byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders()); + this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders); + OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName); + } + if (msg.isSetChecksum()) { + this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum())); + OpenTsdbMetricConverter.incr(SingerMetrics.CHECKSUM_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName); + } } catch (TException e) { OpenTsdbMetricConverter.incr(SingerMetrics.NUMBER_OF_SERIALIZING_HEADERS_ERRORS); LOG.warn("Exception thrown while serializing headers", e); diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml index ef3718f3..36edede5 100644 --- a/thrift-logger/pom.xml +++ b/thrift-logger/pom.xml @@ -4,7 +4,7 @@ com.pinterest.singer singer-package - 0.8.0.23 + 0.8.0.24 ../pom.xml thrift-logger