diff --git a/pom.xml b/pom.xml
index 3b07e714..980b1857 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.pinterest.singer
singer-package
- 0.8.0.23
+ 0.8.0.24
pom
Singer Logging Agent modules
2013
diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml
index b71d2c1d..ee4883b5 100644
--- a/singer-commons/pom.xml
+++ b/singer-commons/pom.xml
@@ -20,7 +20,7 @@
com.pinterest.singer
singer-package
- 0.8.0.23
+ 0.8.0.24
../pom.xml
diff --git a/singer/pom.xml b/singer/pom.xml
index f2628132..8149d1d1 100644
--- a/singer/pom.xml
+++ b/singer/pom.xml
@@ -7,7 +7,7 @@
com.pinterest.singer
singer-package
- 0.8.0.23
+ 0.8.0.24
../pom.xml
diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
index 79c3207f..7347449b 100644
--- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
+++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
@@ -104,6 +104,7 @@ public class SingerMetrics {
public static final String NUMBER_OF_MISSING_DIRS = "singer.missing_dir_checker.num_of_missing_dirs";
public static final String NUMBER_OF_SERIALIZING_HEADERS_ERRORS = "singer.headers_injector.num_of_serializing_headers_errors";
public static final String AUDIT_HEADERS_INJECTED = "singer.audit.num_of_headers_injected";
+ public static final String CHECKSUM_INJECTED = "singer.audit.num_of_checksum_injected";
public static final String AUDIT_HEADERS_METADATA_COUNT_MISMATCH = "singer.audit.headers_metadata_count_mismatch";
public static final String AUDIT_HEADERS_METADATA_COUNT_MATCH = "singer.audit.headers_metadata_count_match";
public static final String NUM_CORRUPTED_MESSAGES = "singer.audit.num_corrupted_messages";
diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java
index eb79b28d..6406edb0 100644
--- a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java
+++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java
@@ -381,10 +381,15 @@ public boolean checkMessageValidAndInjectHeaders(
protected void injectHeadersForProducerRecord(LogMessage msg, Headers headers) {
try {
- byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders());
- this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders);
- this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum()));
- OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "topic=" + topic, "host=" + HOSTNAME, "logName=" + msg.getLoggingAuditHeaders().getLogName(), "logStreamName=" + logName);
+ if (msg.isSetLoggingAuditHeaders()) {
+ byte[] serializedAuditHeaders = SERIALIZER.get().serialize(msg.getLoggingAuditHeaders());
+ this.headersInjector.addHeaders(headers, LOGGING_AUDIT_HEADER_KEY, serializedAuditHeaders);
+ OpenTsdbMetricConverter.incr(SingerMetrics.AUDIT_HEADERS_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName);
+ }
+ if (msg.isSetChecksum()) {
+ this.headersInjector.addHeaders(headers, CRC_HEADER_KEY, Longs.toByteArray(msg.getChecksum()));
+ OpenTsdbMetricConverter.incr(SingerMetrics.CHECKSUM_INJECTED, "host=" + HOSTNAME, "logStreamName=" + logName);
+ }
} catch (TException e) {
OpenTsdbMetricConverter.incr(SingerMetrics.NUMBER_OF_SERIALIZING_HEADERS_ERRORS);
LOG.warn("Exception thrown while serializing headers", e);
diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml
index ef3718f3..36edede5 100644
--- a/thrift-logger/pom.xml
+++ b/thrift-logger/pom.xml
@@ -4,7 +4,7 @@
com.pinterest.singer
singer-package
- 0.8.0.23
+ 0.8.0.24
../pom.xml
thrift-logger