diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
new file mode 100644
index 00000000..d6df713f
--- /dev/null
+++ b/.github/workflows/maven.yml
@@ -0,0 +1,17 @@
+name: Singer Build
+
+on: [pull_request]
+
+jobs:
+ build:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v1
+ - name: Set up JDK 1.8
+ uses: actions/setup-java@v1
+ with:
+ java-version: 1.8
+ - name: Build with Maven
+ run: mvn -B package --file pom.xml
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 44930c2a..3373c440 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -23,7 +23,9 @@ To increase the chances that your pull request will be accepted:
- Write tests for your changes
- Write a good commit message
+NOTE: all PRs must pass build before they can be accepted.
+
## License
By contributing to this project, you agree that your contributions will be
-licensed under its [Apache 2 license](LICENSE).
\ No newline at end of file
+licensed under its [Apache 2 license](LICENSE).
diff --git a/pom.xml b/pom.xml
index f8d731cc..15cd62fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.pinterest.singer
singer-package
- 0.8.0-rc.5
+ 0.8.0-rc.6
pom
Singer Logging Agent modules
2013
@@ -37,6 +37,10 @@
ambud
Ambud Sharma
+
+ zzhhhzz
+ Heng Zhang
+
https://github.com/pinterest/singer.git
@@ -51,10 +55,6 @@
-
- Twitter public Maven repo
- https://maven.twttr.com
-
central
https://repo.maven.apache.org/maven2
diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml
index cbc39d0a..a6595b16 100644
--- a/singer-commons/pom.xml
+++ b/singer-commons/pom.xml
@@ -20,7 +20,7 @@
com.pinterest.singer
singer-package
- 0.8.0-rc.5
+ 0.8.0-rc.6
../pom.xml
diff --git a/singer/deb.version b/singer/deb.version
index ef0952c9..62b1079a 100644
--- a/singer/deb.version
+++ b/singer/deb.version
@@ -1 +1 @@
-0.8.0-rc.5
\ No newline at end of file
+0.8.0-rc.6
diff --git a/singer/pom.xml b/singer/pom.xml
index cf5e82d1..aa01199d 100644
--- a/singer/pom.xml
+++ b/singer/pom.xml
@@ -7,7 +7,7 @@
com.pinterest.singer
singer-package
- 0.8.0-rc.5
+ 0.8.0-rc.6
../pom.xml
@@ -27,6 +27,10 @@
ambud
Ambud Sharma
+
+ zzhhhzz
+ Heng Zhang
+
https://github.com/pinterest/singer.git
@@ -63,38 +67,6 @@
0.0.116
-
- com.twitter.common
- dynamic-host-set
- 0.0.56
-
-
-
- com.twitter.common
- service-thrift
- 1.0.55
-
-
- org.apache.thrift
- libthrift
-
-
-
-
- com.twitter
- finagle-core_2.11
- 6.25.0
-
-
- com.twitter
- finagle-serversets_2.11
- 6.25.0
-
-
- com.twitter
- util-core_2.11
- 6.25.0
-
org.apache.kafka
kafka_2.11
diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerLog.java b/singer/src/main/java/com/pinterest/singer/common/SingerLog.java
index 8b3ddf32..518266c2 100644
--- a/singer/src/main/java/com/pinterest/singer/common/SingerLog.java
+++ b/singer/src/main/java/com/pinterest/singer/common/SingerLog.java
@@ -27,11 +27,17 @@ public class SingerLog {
// The config for the SingerLog.
private final SingerLogConfig singerLogConfig;
+ private String podUid;
public SingerLog(SingerLogConfig singerLogConfig) {
this.singerLogConfig = Preconditions.checkNotNull(singerLogConfig);
}
+ public SingerLog(SingerLogConfig singerLogConfig, String podUid) {
+ this.podUid = podUid;
+ this.singerLogConfig = Preconditions.checkNotNull(singerLogConfig);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -62,4 +68,8 @@ public String getLogName() {
public SingerLogConfig getSingerLogConfig() {
return singerLogConfig;
}
+
+ public String getPodUid() {
+ return podUid;
+ }
}
diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
index 9df3b460..d34092d0 100644
--- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
+++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
@@ -75,6 +75,7 @@ public class SingerMetrics {
public static final String KAFKA_LATENCY = SINGER_WRITER + "max_kafka_batch_write_latency";
public static final String NUM_COMMITED_TRANSACTIONS = SINGER_WRITER + "num_committed_transactions";
public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions";
+ public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers";
public static final String KUBE_PREFIX = SINGER_PREIX + "kube.";
public static final String KUBE_API_ERROR = KUBE_PREFIX + "api_error";
diff --git a/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java b/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java
index 5aebf657..5ae3e162 100644
--- a/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java
+++ b/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java
@@ -18,15 +18,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.Command;
import com.twitter.common.base.MorePreconditions;
-import com.twitter.common.zookeeper.Group;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
import com.twitter.util.ExceptionalFunction;
-import org.apache.commons.lang.StringUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@@ -35,8 +28,6 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.Map;
/**
* Implementation of the ServerSet interface that uses a file on local disk instead of talking to ZooKeeper directly.
@@ -47,7 +38,7 @@
* Note that this implementation only supports monitor() and not join(). Use the standard ZooKeeper implementation for
* join().
*/
-public class ConfigFileServerSet implements ServerSet {
+public class ConfigFileServerSet {
// made public for unit testing only
public static String SERVERSET_DIR = "/var/serverset";
@@ -83,35 +74,7 @@ public ConfigFileServerSet(String serverSetFilePath) {
}
}
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint, Map additionalEndpoints, Status status)
- throws Group.JoinException, InterruptedException {
- throw new UnsupportedOperationException("ConfigFileServerSet does not support join()");
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint, Map additionalEndpoints)
- throws Group.JoinException, InterruptedException {
- throw new UnsupportedOperationException("ConfigFileServerSet does not support join()");
- }
-
- @Override
- public EndpointStatus join(
- InetSocketAddress endpoint, Map additionalEndpoints, int shardId)
- throws Group.JoinException, InterruptedException {
- throw new UnsupportedOperationException("ConfigFileServerSet does not support join()");
- }
-
- @Override
- public Command watch(final HostChangeMonitor monitor) throws MonitorException {
- monitor(monitor);
- return null;
- }
-
- @Override
- public void monitor(final HostChangeMonitor monitor) throws MonitorException {
+ public void monitor(final ServersetMonitor monitor) throws Exception {
Preconditions.checkNotNull(monitor);
try {
// Each call to monitor registers a new file watch. This is a bit inefficient if there
@@ -122,18 +85,18 @@ public void monitor(final HostChangeMonitor monitor) throws Mon
new ExceptionalFunction() {
@Override
public Void applyE(byte[] newContents) throws Exception {
- ImmutableSet newServerSet = readServerSet(newContents);
+ ImmutableSet newServerSet = readServerSet(newContents);
monitor.onChange(newServerSet);
return null;
}
});
} catch (IOException e) {
- throw new MonitorException(
+ throw new Exception(
"Error setting up watch on dynamic server set file:" + serverSetFilePath, e);
}
}
- protected Endpoint getEndPointFromServerSetLine(String line) {
+ protected InetSocketAddress getEndPointFromServerSetLine(String line) {
// We expect each line to be of the form "hostname:port". Note that host names can
// contain ':' themselves (e.g. ipv6 addresses).
int index = line.lastIndexOf(':');
@@ -141,11 +104,11 @@ protected Endpoint getEndPointFromServerSetLine(String line) {
String host = line.substring(0, index);
int port = Integer.parseInt(line.substring(index + 1));
- return new Endpoint(host, port);
+ return new InetSocketAddress(host, port);
}
- public ImmutableSet readServerSet(byte[] fileContent) throws IOException {
- ImmutableSet.Builder builder = new ImmutableSet.Builder<>();
+ public ImmutableSet readServerSet(byte[] fileContent) throws IOException {
+ ImmutableSet.Builder builder = new ImmutableSet.Builder<>();
InputStream stream = new ByteArrayInputStream(fileContent);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
while (true) {
@@ -158,12 +121,9 @@ public ImmutableSet readServerSet(byte[] fileContent) throws IO
continue;
}
- Endpoint endpoint = getEndPointFromServerSetLine(line);
+ InetSocketAddress endpoint = getEndPointFromServerSetLine(line);
if (endpoint != null) {
- builder.add(new ServiceInstance(
- endpoint, // endpoint
- Collections.emptyMap(), // additional endpoints
- Status.ALIVE)); // status
+ builder.add(endpoint);
}
}
return builder.build();
diff --git a/singer/src/main/java/com/pinterest/singer/config/ServersetMonitor.java b/singer/src/main/java/com/pinterest/singer/config/ServersetMonitor.java
new file mode 100644
index 00000000..38d15487
--- /dev/null
+++ b/singer/src/main/java/com/pinterest/singer/config/ServersetMonitor.java
@@ -0,0 +1,11 @@
+package com.pinterest.singer.config;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.collect.ImmutableSet;
+
+public interface ServersetMonitor {
+
+ public void onChange(ImmutableSet serviceInstances);
+
+}
diff --git a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java
index e9d20d99..c910b3b1 100644
--- a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java
+++ b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java
@@ -115,7 +115,7 @@ public synchronized static KubeService getInstance() {
@Override
public void run() {
// fetch existing pod directories
- updatePodUidsFromFileSystem();
+ updatePodNamesFromFileSystem();
// we should wait for some time
try {
@@ -128,7 +128,7 @@ public void run() {
try {
// method refactored so that class level method locking is done with the
// synchronized keyword
- updatePodUids();
+ updatePodNames();
LOG.debug("Pod IDs refreshed at:" + new Date(System.currentTimeMillis()));
} catch (Exception e) {
// TODO what should be done if we have errors in reaching MD service
@@ -150,7 +150,7 @@ public void run() {
*
* This method should have an effect on data if Singer was restarted
*/
- private void updatePodUidsFromFileSystem() {
+ private void updatePodNamesFromFileSystem() {
LOG.info("Kubernetes POD log directory configured as:" + podLogDirectory);
File[] directories = new File(podLogDirectory).listFiles(new FileFilter() {
@@ -177,18 +177,18 @@ public boolean accept(File pathname) {
if (directories != null) {
for (File directory : directories) {
- String podUid = directory.getName();
- if (temp.contains("." + podUid)) {
- LOG.info("Ignoring POD directory " + podUid + " since there is a tombstone file present");
+ String podName = directory.getName();
+ if (temp.contains("." + podName)) {
+ LOG.info("Ignoring POD directory " + podName + " since there is a tombstone file present");
// Skip adding this pod to the active podset
continue;
}
- activePodSet.add(podUid);
+ activePodSet.add(podName);
for (PodWatcher podWatcher : registeredWatchers) {
- podWatcher.podCreated(podUid);
+ podWatcher.podCreated(podName);
}
- LOG.info("Active POD found (via directory):" + podUid);
+ LOG.info("Active POD found (via directory):" + podName);
}
}
// update number of active pods running
@@ -196,7 +196,7 @@ public boolean accept(File pathname) {
}
/**
- * Clear the set of Pod UIDs and update it with the latest fetch from kubelet
+ * Clear the set of Pod Names and update it with the latest fetch from kubelet
*
* Following a listener design, currently we only have 1 listener but in future
* if we want to do something else as well when these events happen then this
@@ -204,15 +204,15 @@ public boolean accept(File pathname) {
*
* @throws IOException
*/
- public void updatePodUids() throws IOException {
+ public void updatePodNames() throws IOException {
LOG.debug("Active podset:" + activePodSet);
- Set updatedPodUids = fetchPodUidsFromMetadata();
- SetView deletedPods = Sets.difference(activePodSet, updatedPodUids);
+ Set updatedPodNames = fetchPodNamesFromMetadata();
+ SetView deletedNames = Sets.difference(activePodSet, updatedPodNames);
// ignore new pods, pod discovery is done by watching directories
- for (String podUid : deletedPods) {
- updatePodWatchers(podUid, true);
+ for (String podName : deletedNames) {
+ updatePodWatchers(podName, true);
// update metrics since pods have been deleted
Stats.incr(SingerMetrics.PODS_DELETED);
Stats.incr(SingerMetrics.NUMBER_OF_PODS, -1);
@@ -220,24 +220,24 @@ public void updatePodUids() throws IOException {
LOG.debug("Fired events for registered watchers");
activePodSet.clear();
- activePodSet.addAll(updatedPodUids);
- LOG.debug("Cleared and updated pod UIDs:" + activePodSet);
+ activePodSet.addAll(updatedPodNames);
+ LOG.debug("Cleared and updated pod names:" + activePodSet);
}
/**
* Update all {@link PodWatcher} about the pod that changed (created or deleted)
*
- * @param podUid
+ * @param podName
* @param isDelete
*/
- public void updatePodWatchers(String podUid, boolean isDelete) {
- LOG.debug("Pod change:" + podUid + " deleted:" + isDelete);
+ public void updatePodWatchers(String podName, boolean isDelete) {
+ LOG.debug("Pod change:" + podName + " deleted:" + isDelete);
for (PodWatcher watcher : registeredWatchers) {
try {
if (isDelete) {
- watcher.podDeleted(podUid);
+ watcher.podDeleted(podName);
} else {
- watcher.podCreated(podUid);
+ watcher.podCreated(podName);
}
} catch (Exception e) {
LOG.error("Watcher had an exception", e);
@@ -248,33 +248,35 @@ public void updatePodWatchers(String podUid, boolean isDelete) {
/**
* Fetch Pod IDs from metadata.
*
- * Note: inside singer we refer to pod's identifier as a the PodUid
- *
- * In our internal case 'Pod Uid' = 'kubernetes pod name'
+ * Note: inside singer we refer to pod's identifier as a the PodName
*
* e.g. see src/test/resources/pods-goodresponse.json
*
- * @return set of pod uids
+ * @return set of pod names
* @throws IOException
*/
- public Set fetchPodUidsFromMetadata() throws IOException {
+ public Set fetchPodNamesFromMetadata() throws IOException {
LOG.debug("Attempting to make pod md request");
String response = SingerUtils.makeGetRequest(PODS_MD_URL);
LOG.debug("Received pod md response:" + response);
Gson gson = new Gson();
JsonObject obj = gson.fromJson(response, JsonObject.class);
- LOG.debug("Finished parsing kubelet response for PODs; now extracting POD UIDs");
- Set podUids = new HashSet<>();
+ LOG.debug("Finished parsing kubelet response for PODs; now extracting POD names");
+ Set podNames = new HashSet<>();
if (obj != null && obj.has("items")) {
JsonArray ary = obj.get("items").getAsJsonArray();
for (int i = 0; i < ary.size(); i++) {
- String uid = ary.get(i).getAsJsonObject().get("metadata").getAsJsonObject().get("name").getAsString();
- podUids.add(uid);
- LOG.debug("Found active POD UID in JSON:" + uid);
+ JsonObject metadata = ary.get(i).getAsJsonObject().get("metadata").getAsJsonObject();
+ String name = metadata.get("name").getAsString();
+ podNames.add(name);
+ // to support namespace based POD directories
+ String namespace = metadata.get("namespace").getAsString();
+ podNames.add(namespace + "_" + name);
+ LOG.debug("Found active POD name in JSON:" + name);
}
}
- LOG.debug("Pod uids from kubelet:" + podUids);
- return podUids;
+ LOG.debug("Pod names from kubelet:" + podNames);
+ return podNames;
}
/**
@@ -385,22 +387,23 @@ public void checkAndProcessFsEvents() throws InterruptedException {
// ignore delete events
if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) {
if (!file.toFile().isFile()) {
- String podUid = file.toFile().getName();
- if (podUid.startsWith(".")) {
+ String podName = file.toFile().getName();
+ if (podName.startsWith(".")) {
// ignore tombstone files
return;
}
- LOG.info("New pod directory discovered by FSM:" + event.logDir() + " " + podLogDirectory+" poduid:" + podUid);
+ LOG.info("New pod directory discovered by FSM:" + event.logDir() + " " + podLogDirectory
+ + " podname:" + podName);
Stats.incr(SingerMetrics.PODS_CREATED);
Stats.incr(SingerMetrics.NUMBER_OF_PODS);
- activePodSet.add(podUid);
- updatePodWatchers(podUid, false);
+ activePodSet.add(podName);
+ updatePodWatchers(podName, false);
}
// ignore all events that are not directory create events
} else if (kind.equals(StandardWatchEventKinds.OVERFLOW)) {
LOG.warn("Received overflow watch event from filesystem: Events may have been lost");
- // perform a full sync on pod IDs from file system
- updatePodUidsFromFileSystem();
+ // perform a full sync on pod names from file system
+ updatePodNamesFromFileSystem();
} else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) {
// ignore the . files
if (!file.toFile().getName().startsWith(".")) {
diff --git a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java
index f8541da0..ae96ff15 100644
--- a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java
+++ b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java
@@ -636,7 +636,7 @@ public void initializeLogStreamForPod(String podUid, Collection
// since the DefaultLogMonitor de-duplicates this using hashcode
// which is dependent on LogStream name
clone.setName(podUid + ":" + clone.getName());
- singerLog = new SingerLog(clone);
+ singerLog = new SingerLog(clone, podUid);
if (!singerLogPaths.containsKey(logPathKey)) {
singerLogPaths.put(logPathKey, new HashSet<>());
diff --git a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java
index a3f8d15e..4cbaabc7 100644
--- a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java
+++ b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java
@@ -39,7 +39,6 @@
public class TextLogFileReader implements LogFileReader {
private static final Logger LOG = LoggerFactory.getLogger(TextLogFileReader.class);
- private static final String HOSTNAME = SingerUtils.getHostname();
protected boolean closed;
private final LogFile logFile;
@@ -55,6 +54,8 @@ public class TextLogFileReader implements LogFileReader {
// The text log message format, can be TextMessage, or String;
private final TextLogMessageType textLogMessageType;
+ private String hostname;
+
public TextLogFileReader(
LogFile logFile,
String path,
@@ -66,10 +67,12 @@ public TextLogFileReader(
TextLogMessageType messageType,
boolean prependTimestamp,
boolean prependHostName,
+ String hostname,
String prependFieldDelimiter) throws Exception {
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
Preconditions.checkArgument(byteOffset >= 0);
+ this.hostname = hostname;
this.logFile = Preconditions.checkNotNull(logFile);
this.path = path;
this.numMessagesPerLogMessage = numMessagesPerLogMessage;
@@ -118,7 +121,7 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
prependStr += System.currentTimeMillis() + prependFieldDelimiter;
}
if (prependHostname) {
- prependStr += HOSTNAME + prependFieldDelimiter;
+ prependStr += hostname + prependFieldDelimiter;
}
if (prependStr.length() > 0) {
maxBuffer.put(prependStr.getBytes());
@@ -142,7 +145,7 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc
case THRIFT_TEXT_MESSAGE:
TextMessage textMessage = new TextMessage();
textMessage.setFilename(path);
- textMessage.setHost(HOSTNAME);
+ textMessage.setHost(hostname);
textMessage.addToMessages(TextMessageReader.bufToString(out));
logMessage = new LogMessage(ByteBuffer.wrap(serializer.serialize(textMessage)));
break;
diff --git a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java
index fbf9cef0..7adba66d 100644
--- a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java
+++ b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java
@@ -16,6 +16,7 @@
package com.pinterest.singer.reader;
import com.pinterest.singer.common.LogStream;
+import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.thrift.LogFile;
import com.pinterest.singer.thrift.configuration.TextReaderConfig;
import com.pinterest.singer.utils.LogFileUtils;
@@ -40,9 +41,9 @@ public TextLogFileReaderFactory(TextReaderConfig readerConfig) {
this.readerConfig = Preconditions.checkNotNull(readerConfig);
}
+ @SuppressWarnings("resource")
public LogFileReader getLogFileReader(
LogStream logStream, LogFile logFile, String path, long byteOffset) throws Exception {
-
LogFileReader reader;
try {
long inode = SingerUtils.getFileInode(FileSystems.getDefault().getPath(path));
@@ -60,6 +61,7 @@ public LogFileReader getLogFileReader(
readerConfig.getTextLogMessageType(),
readerConfig.isPrependTimestamp(),
readerConfig.isPrependHostname(),
+ SingerUtils.getHostNameBasedOnConfig(logStream, SingerSettings.getSingerConfig()),
readerConfig.getPrependFieldDelimiter());
} catch (LogFileReaderException e) {
LOG.warn("Exception in getLogFileReader", e);
diff --git a/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java b/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java
index 137d93bd..f228d8a7 100644
--- a/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java
+++ b/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java
@@ -35,7 +35,7 @@ public static void main(String[] args) {
KubeService service = KubeService.getInstance();
try {
- Set podUids = service.fetchPodUidsFromMetadata();
+ Set podUids = service.fetchPodNamesFromMetadata();
for (String puid : podUids) {
System.out.println(puid);
}
diff --git a/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java b/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java
index a8a23dde..2fccc5e3 100644
--- a/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java
+++ b/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java
@@ -15,6 +15,7 @@
*/
package com.pinterest.singer.utils;
+import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
@@ -25,12 +26,10 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
+import com.pinterest.singer.config.ServersetMonitor;
-public final class BrokerSetChangeListener
- implements DynamicHostSet.HostChangeMonitor {
+public final class BrokerSetChangeListener implements ServersetMonitor {
+
private static final int RESTART_WAIT_MAX_DELAY = 1800_000;
private static final int BROKER_RESTART_PRECENTAGE_THRESHOLD = 50;
private static final int BROKER_RESTART_VALUE_THRESHOLD = 1;
@@ -53,11 +52,10 @@ public BrokerSetChangeListener(String monitoredServersetFilePath,
}
@Override
- public void onChange(ImmutableSet serviceInstances) {
+ public void onChange(ImmutableSet serviceInstances) {
Set newBrokerSet = Sets.newHashSet();
- for (ServiceInstance instance : serviceInstances) {
- Endpoint endPoint = instance.getServiceEndpoint();
- newBrokerSet.add(Joiner.on(":").join(endPoint.getHost(), endPoint.getPort()));
+ for (InetSocketAddress instance : serviceInstances) {
+ newBrokerSet.add(Joiner.on(":").join(instance.getHostName(), instance.getPort()));
}
// 'serverSetPath' variable will still long live. But it should be rare as we
// only
diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
index e33cb8bf..9609c9bc 100644
--- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
+++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
@@ -15,6 +15,7 @@
*/
package com.pinterest.singer.utils;
+import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.config.DirectorySingerConfigurator;
import com.pinterest.singer.config.PropertyFileSingerConfigurator;
@@ -22,12 +23,13 @@
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.config.SingerDirectoryWatcher;
-
+import com.pinterest.singer.monitor.LogStreamManager;
import com.google.common.base.Preconditions;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.Arrays;
import java.util.Comparator;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
@@ -108,11 +110,7 @@ public static Date convertToDate(String timeString) throws ConfigurationExceptio
}
public static void printStackTrace() {
- StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
- for(StackTraceElement e:stackTraceElements) {
- System.out.println(e.getClassName()+"."+e.getMethodName()+":"+e.getLineNumber());
- }
- System.out.println();
+ LOG.warn(Arrays.toString(Thread.currentThread().getStackTrace()));
}
/**
@@ -300,4 +298,16 @@ public static String makeGetRequest(String uri) throws IOException {
throw new IOException(e);
}
}
+
+ public static String getHostNameBasedOnConfig(LogStream logStream,
+ SingerConfig singerConfig) {
+ if (singerConfig.isKubernetesEnabled()) {
+ if (logStream.getSingerLog().getPodUid() !=null
+ && logStream.getSingerLog().getPodUid() != LogStreamManager.NON_KUBERNETES_POD_ID) {
+ return logStream.getSingerLog().getPodUid();
+ }
+ }
+ return SingerUtils.getHostname();
+ }
+
}
diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java
index 54aabf9b..bb7c738b 100644
--- a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java
+++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java
@@ -15,6 +15,8 @@
*/
package com.pinterest.singer.writer;
+import com.pinterest.singer.common.SingerMetrics;
+import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
import com.pinterest.singer.utils.KafkaUtils;
@@ -81,6 +83,8 @@ private KafkaProducer getProducerInternal(KafkaProducerConfig co
if (result != null && result != producer) {
producer.close();
}
+ // log metrics for no.of kafka producers currently in the cache
+ OpenTsdbMetricConverter.addMetric(SingerMetrics.NUM_KAFKA_PRODUCERS, producers.size());
}
result = producers.get(config);
return result;
@@ -106,6 +110,8 @@ private boolean resetProducerInternal(KafkaProducerConfig config) {
if (!retval) {
newProducer.close();
}
+ // log metrics for no.of kafka producers currently in the cache
+ OpenTsdbMetricConverter.addMetric(SingerMetrics.NUM_KAFKA_PRODUCERS, producers.size());
}
return retval;
}
diff --git a/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java b/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java
index 25c7997e..830e2b53 100644
--- a/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java
+++ b/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java
@@ -28,6 +28,9 @@
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -102,17 +105,23 @@ public void testGoodPodFetch() throws KeyManagementException, ClientProtocolExce
KubeConfig kubeConfig = new KubeConfig();
KubeService poll = new KubeService(kubeConfig);
- Set fetchPodIdsFromMetadata = poll.fetchPodUidsFromMetadata();
+ Set fetchPodNamesFromMetadata = poll.fetchPodNamesFromMetadata();
// check uid count is correct
- assertEquals(6, fetchPodIdsFromMetadata.size());
- String[] podUids = new String[] { "enimanager-ppl56",
- "zk-update-monitor-r1vlt", "tcollector-q4qx8",
- "metrics-agent-8vm9g", "kubernetes-dashboard-1835568627-hhfhj",
- "test-ci-0" };
- // check UIDs are correct, sequence doesn't really matter in this case
- for (int i = 0; i < podUids.length; i++) {
- assertTrue(fetchPodIdsFromMetadata.contains(podUids[i]));
+ assertEquals(12, fetchPodNamesFromMetadata.size());
+
+ Map podToNamespaceMap = new HashMap<>();
+ podToNamespaceMap.put("enimanager-ppl56", "default");
+ podToNamespaceMap.put("zk-update-monitor-r1vlt", "default");
+ podToNamespaceMap.put("tcollector-q4qx8", "default");
+ podToNamespaceMap.put("metrics-agent-8vm9g", "default");
+ podToNamespaceMap.put("kubernetes-dashboard-1835568627-hhfhj", "kube-system");
+ podToNamespaceMap.put("test-ci-0", "kubernetes-plugin");
+ for (Entry entry : podToNamespaceMap.entrySet()) {
+ String name = entry.getKey();
+ String namespace = entry.getValue();
+ assertTrue(fetchPodNamesFromMetadata.contains(name));
+ assertTrue(fetchPodNamesFromMetadata.contains(namespace + "_" + name));
}
}
@@ -123,7 +132,7 @@ public void testBadPodFetch() throws KeyManagementException, ClientProtocolExcep
KubeConfig kubeConfig = new KubeConfig();
KubeService kubeService = new KubeService(kubeConfig);
- Set fetchPodIdsFromMetadata = kubeService.fetchPodUidsFromMetadata();
+ Set fetchPodIdsFromMetadata = kubeService.fetchPodNamesFromMetadata();
assertEquals(0, fetchPodIdsFromMetadata.size());
}
@@ -169,7 +178,7 @@ public void podCreated(String podUid) {
set.add(podUid);
}
});
- poll.updatePodUids();
+ poll.updatePodNames();
assertEquals(6, set.size());
set.clear();
@@ -188,7 +197,7 @@ public void podCreated(String podUid) {
}
});
- poll.updatePodUids();
+ poll.updatePodNames();
assertEquals(6, set.size());
}
diff --git a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java
index 3c26be85..9e4b21cf 100644
--- a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java
+++ b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java
@@ -43,14 +43,32 @@ public void testReadLogMessageAndPosition() throws Exception {
long inode = SingerUtils.getFileInode(SingerUtils.getPath(path));
LogFile logFile = new LogFile(inode);
LogFileReader reader = new TextLogFileReader(logFile, path, 0, 8192, 102400, 1,
- Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null);
+ Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null, null);
for (int i = 0; i < 100; i++) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(dataWritten.get(i), new String(log.getLogMessage().getMessage()));
}
reader.close();
}
+
+ @Test
+ public void testReadLogMessageAndPositionWithHostname() throws Exception {
+ String path = FilenameUtils.concat(getTempPath(), "test2.log");
+ List dataWritten = generateDummyMessagesToFile(path);
+ String delimiter = " ";
+ String hostname = "test";
+ long inode = SingerUtils.getFileInode(SingerUtils.getPath(path));
+ LogFile logFile = new LogFile(inode);
+ LogFileReader reader = new TextLogFileReader(logFile, path, 0, 8192, 102400, 1,
+ Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, true, hostname, delimiter);
+ for (int i = 0; i < 100; i++) {
+ LogMessageAndPosition log = reader.readLogMessageAndPosition();
+ assertEquals(hostname + delimiter + dataWritten.get(i), new String(log.getLogMessage().getMessage()));
+ }
+ reader.close();
+ }
+
@Test
public void testReadLogMessageAndPositionMultiRead() throws Exception {
String path = FilenameUtils.concat(getTempPath(), "test2.log");
@@ -59,7 +77,7 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception {
long inode = SingerUtils.getFileInode(SingerUtils.getPath(path));
LogFile logFile = new LogFile(inode);
LogFileReader reader = new TextLogFileReader(logFile, path, 0, 8192, 102400, 2,
- Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null);
+ Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null, null);
for (int i = 0; i < 100; i = i + 2) {
LogMessageAndPosition log = reader.readLogMessageAndPosition();
assertEquals(dataWritten.get(i) + dataWritten.get(i + 1),
diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java
index 4e369224..d770c000 100644
--- a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java
+++ b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java
@@ -17,20 +17,18 @@
import static org.junit.Assert.assertTrue;
+import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
import com.google.common.collect.ImmutableSet;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
public class TestLogConfigs {
@@ -43,18 +41,18 @@ public void testBrokerChangeSystemExit() {
kafkaServerSets.put("/xyz", new HashSet<>(Arrays.asList("one:9092", "two:9092", "three:9092")));
BrokerSetChangeListener listener = new BrokerSetChangeListener("/xyz", kafkaServerSets, 100);
listener.onChange(
- ImmutableSet.of(new ServiceInstance(new Endpoint("one", 9092), null, Status.ALIVE),
- new ServiceInstance(new Endpoint("two", 9092), null, Status.ALIVE),
- new ServiceInstance(new Endpoint("three", 9092), null, Status.ALIVE)));
+ ImmutableSet.of(new InetSocketAddress("one", 9092),
+ new InetSocketAddress("two", 9092),
+ new InetSocketAddress("three", 9092)));
assertTrue(true);
listener.onChange(
- ImmutableSet.of(new ServiceInstance(new Endpoint("one", 9092), null, Status.ALIVE),
- new ServiceInstance(new Endpoint("two", 9092), null, Status.ALIVE)));
+ ImmutableSet.of(new InetSocketAddress("one", 9092),
+ new InetSocketAddress("two", 9092)));
assertTrue(true);
exit.expectSystemExitWithStatus(0);
listener.onChange(
- ImmutableSet.of(new ServiceInstance(new Endpoint("one", 9092), null, Status.ALIVE),
- new ServiceInstance(new Endpoint("four", 9092), null, Status.ALIVE)));
+ ImmutableSet.of(new InetSocketAddress("one", 9092),
+ new InetSocketAddress("four", 9092)));
}
}
diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java
new file mode 100644
index 00000000..d4bc7464
--- /dev/null
+++ b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import com.pinterest.singer.common.LogStream;
+import com.pinterest.singer.common.SingerLog;
+import com.pinterest.singer.monitor.LogStreamManager;
+import com.pinterest.singer.thrift.configuration.SingerConfig;
+import com.pinterest.singer.thrift.configuration.SingerLogConfig;
+
+public class TestSingerUtils {
+
+ @Test
+ public void testGetHostNameBasedOnConfig() {
+ LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig(), "pod-11"), "test");
+ SingerConfig config = new SingerConfig();
+ config.setKubernetesEnabled(true);
+ // case 1 kubernetes enabled; pod id specified
+ String hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config);
+ assertEquals("pod-11", hostNameBasedOnConfig);
+
+ // case 2 kubernetes enabled; pod id null
+ logStream = new LogStream(new SingerLog(new SingerLogConfig(), null), "test");
+ hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config);
+ assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig);
+
+ // case 3 kubernetes enabled; pod id NON_KUBERNETES_POD_ID
+ logStream = new LogStream(new SingerLog(new SingerLogConfig(), LogStreamManager.NON_KUBERNETES_POD_ID), "test");
+ hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config);
+ assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig);
+
+ // case 4 kubernetes disabled
+ config.setKubernetesEnabled(false);
+ logStream = new LogStream(new SingerLog(new SingerLogConfig(), "pod-11"), "test");
+ hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config);
+ assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig);
+ }
+
+}
diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml
index 0dc85ba4..62d6fb6a 100644
--- a/thrift-logger/pom.xml
+++ b/thrift-logger/pom.xml
@@ -4,7 +4,7 @@
com.pinterest.singer
singer-package
- 0.8.0-rc.5
+ 0.8.0-rc.6
../pom.xml
thrift-logger
diff --git a/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java b/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java
index c7002482..90723cc9 100644
--- a/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java
+++ b/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java
@@ -39,7 +39,7 @@ public class LogbackThriftLogger extends BaseThriftLogger {
protected static final String THRIFT_LOGGER_ERROR_LOGBACKEXCEPTION
= "thrift_logger.error.logbackexception";
- protected static String HOST_NAME;
+ protected static String HOST_NAME = "n/a";
static {
try {