From 6c3f42a09e2a37219a6d4d41a7d65b1599718b51 Mon Sep 17 00:00:00 2001 From: zzhhhzz <9368368+zzhhhzz@users.noreply.github.com> Date: Wed, 8 Jan 2020 20:07:59 -0800 Subject: [PATCH] Merge master to logging-audit branch and bump up version 0.8.0-rc.6 (#35) (1) merge master branch to logging-audit branch (2) init HOST_NAME to be "n/a" in LogbackThriftLogger (3) bump up version 0.8.0-rc.6 --- .github/workflows/maven.yml | 17 ++++ CONTRIBUTING.md | 4 +- pom.xml | 10 +-- singer-commons/pom.xml | 2 +- singer/deb.version | 2 +- singer/pom.xml | 38 ++------- .../pinterest/singer/common/SingerLog.java | 10 +++ .../singer/common/SingerMetrics.java | 1 + .../singer/config/ConfigFileServerSet.java | 60 +++---------- .../singer/config/ServersetMonitor.java | 11 +++ .../singer/kubernetes/KubeService.java | 85 ++++++++++--------- .../singer/monitor/LogStreamManager.java | 2 +- .../singer/reader/TextLogFileReader.java | 9 +- .../reader/TextLogFileReaderFactory.java | 4 +- .../singer/tools/KubeServiceChecker.java | 2 +- .../singer/utils/BrokerSetChangeListener.java | 16 ++-- .../pinterest/singer/utils/SingerUtils.java | 22 +++-- .../singer/writer/KafkaProducerManager.java | 6 ++ .../singer/kubernetes/TestKubeService.java | 33 ++++--- .../singer/reader/TestTextLogFileReader.java | 22 ++++- .../singer/utils/TestLogConfigs.java | 20 ++--- .../singer/utils/TestSingerUtils.java | 56 ++++++++++++ thrift-logger/pom.xml | 2 +- .../singer/client/LogbackThriftLogger.java | 2 +- 24 files changed, 256 insertions(+), 180 deletions(-) create mode 100644 .github/workflows/maven.yml create mode 100644 singer/src/main/java/com/pinterest/singer/config/ServersetMonitor.java create mode 100644 singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml new file mode 100644 index 00000000..d6df713f --- /dev/null +++ b/.github/workflows/maven.yml @@ -0,0 +1,17 @@ +name: Singer Build + +on: [pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Build with Maven + run: mvn -B package --file pom.xml diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 44930c2a..3373c440 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -23,7 +23,9 @@ To increase the chances that your pull request will be accepted: - Write tests for your changes - Write a good commit message +NOTE: all PRs must pass build before they can be accepted. + ## License By contributing to this project, you agree that your contributions will be -licensed under its [Apache 2 license](LICENSE). \ No newline at end of file +licensed under its [Apache 2 license](LICENSE). diff --git a/pom.xml b/pom.xml index f8d731cc..15cd62fd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.pinterest.singer singer-package - 0.8.0-rc.5 + 0.8.0-rc.6 pom Singer Logging Agent modules 2013 @@ -37,6 +37,10 @@ ambud Ambud Sharma + + zzhhhzz + Heng Zhang + https://github.com/pinterest/singer.git @@ -51,10 +55,6 @@ - - Twitter public Maven repo - https://maven.twttr.com - central https://repo.maven.apache.org/maven2 diff --git a/singer-commons/pom.xml b/singer-commons/pom.xml index cbc39d0a..a6595b16 100644 --- a/singer-commons/pom.xml +++ b/singer-commons/pom.xml @@ -20,7 +20,7 @@ com.pinterest.singer singer-package - 0.8.0-rc.5 + 0.8.0-rc.6 ../pom.xml diff --git a/singer/deb.version b/singer/deb.version index ef0952c9..62b1079a 100644 --- a/singer/deb.version +++ b/singer/deb.version @@ -1 +1 @@ -0.8.0-rc.5 \ No newline at end of file +0.8.0-rc.6 diff --git a/singer/pom.xml b/singer/pom.xml index cf5e82d1..aa01199d 100644 --- a/singer/pom.xml +++ b/singer/pom.xml @@ -7,7 +7,7 @@ com.pinterest.singer singer-package - 0.8.0-rc.5 + 0.8.0-rc.6 ../pom.xml @@ -27,6 +27,10 @@ ambud Ambud Sharma + + zzhhhzz + Heng Zhang + https://github.com/pinterest/singer.git @@ -63,38 +67,6 @@ 0.0.116 - - com.twitter.common - dynamic-host-set - 0.0.56 - - - - com.twitter.common - service-thrift - 1.0.55 - - - org.apache.thrift - libthrift - - - - - com.twitter - finagle-core_2.11 - 6.25.0 - - - com.twitter - finagle-serversets_2.11 - 6.25.0 - - - com.twitter - util-core_2.11 - 6.25.0 - org.apache.kafka kafka_2.11 diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerLog.java b/singer/src/main/java/com/pinterest/singer/common/SingerLog.java index 8b3ddf32..518266c2 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerLog.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerLog.java @@ -27,11 +27,17 @@ public class SingerLog { // The config for the SingerLog. private final SingerLogConfig singerLogConfig; + private String podUid; public SingerLog(SingerLogConfig singerLogConfig) { this.singerLogConfig = Preconditions.checkNotNull(singerLogConfig); } + public SingerLog(SingerLogConfig singerLogConfig, String podUid) { + this.podUid = podUid; + this.singerLogConfig = Preconditions.checkNotNull(singerLogConfig); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -62,4 +68,8 @@ public String getLogName() { public SingerLogConfig getSingerLogConfig() { return singerLogConfig; } + + public String getPodUid() { + return podUid; + } } diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index 9df3b460..d34092d0 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -75,6 +75,7 @@ public class SingerMetrics { public static final String KAFKA_LATENCY = SINGER_WRITER + "max_kafka_batch_write_latency"; public static final String NUM_COMMITED_TRANSACTIONS = SINGER_WRITER + "num_committed_transactions"; public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions"; + public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers"; public static final String KUBE_PREFIX = SINGER_PREIX + "kube."; public static final String KUBE_API_ERROR = KUBE_PREFIX + "api_error"; diff --git a/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java b/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java index 5aebf657..5ae3e162 100644 --- a/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java +++ b/singer/src/main/java/com/pinterest/singer/config/ConfigFileServerSet.java @@ -18,15 +18,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import com.twitter.common.base.Command; import com.twitter.common.base.MorePreconditions; -import com.twitter.common.zookeeper.Group; -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; import com.twitter.util.ExceptionalFunction; -import org.apache.commons.lang.StringUtils; import java.io.BufferedReader; import java.io.ByteArrayInputStream; @@ -35,8 +28,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.Map; /** * Implementation of the ServerSet interface that uses a file on local disk instead of talking to ZooKeeper directly. @@ -47,7 +38,7 @@ * Note that this implementation only supports monitor() and not join(). Use the standard ZooKeeper implementation for * join(). */ -public class ConfigFileServerSet implements ServerSet { +public class ConfigFileServerSet { // made public for unit testing only public static String SERVERSET_DIR = "/var/serverset"; @@ -83,35 +74,7 @@ public ConfigFileServerSet(String serverSetFilePath) { } } - @Override - public EndpointStatus join( - InetSocketAddress endpoint, Map additionalEndpoints, Status status) - throws Group.JoinException, InterruptedException { - throw new UnsupportedOperationException("ConfigFileServerSet does not support join()"); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, Map additionalEndpoints) - throws Group.JoinException, InterruptedException { - throw new UnsupportedOperationException("ConfigFileServerSet does not support join()"); - } - - @Override - public EndpointStatus join( - InetSocketAddress endpoint, Map additionalEndpoints, int shardId) - throws Group.JoinException, InterruptedException { - throw new UnsupportedOperationException("ConfigFileServerSet does not support join()"); - } - - @Override - public Command watch(final HostChangeMonitor monitor) throws MonitorException { - monitor(monitor); - return null; - } - - @Override - public void monitor(final HostChangeMonitor monitor) throws MonitorException { + public void monitor(final ServersetMonitor monitor) throws Exception { Preconditions.checkNotNull(monitor); try { // Each call to monitor registers a new file watch. This is a bit inefficient if there @@ -122,18 +85,18 @@ public void monitor(final HostChangeMonitor monitor) throws Mon new ExceptionalFunction() { @Override public Void applyE(byte[] newContents) throws Exception { - ImmutableSet newServerSet = readServerSet(newContents); + ImmutableSet newServerSet = readServerSet(newContents); monitor.onChange(newServerSet); return null; } }); } catch (IOException e) { - throw new MonitorException( + throw new Exception( "Error setting up watch on dynamic server set file:" + serverSetFilePath, e); } } - protected Endpoint getEndPointFromServerSetLine(String line) { + protected InetSocketAddress getEndPointFromServerSetLine(String line) { // We expect each line to be of the form "hostname:port". Note that host names can // contain ':' themselves (e.g. ipv6 addresses). int index = line.lastIndexOf(':'); @@ -141,11 +104,11 @@ protected Endpoint getEndPointFromServerSetLine(String line) { String host = line.substring(0, index); int port = Integer.parseInt(line.substring(index + 1)); - return new Endpoint(host, port); + return new InetSocketAddress(host, port); } - public ImmutableSet readServerSet(byte[] fileContent) throws IOException { - ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); + public ImmutableSet readServerSet(byte[] fileContent) throws IOException { + ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); InputStream stream = new ByteArrayInputStream(fileContent); BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); while (true) { @@ -158,12 +121,9 @@ public ImmutableSet readServerSet(byte[] fileContent) throws IO continue; } - Endpoint endpoint = getEndPointFromServerSetLine(line); + InetSocketAddress endpoint = getEndPointFromServerSetLine(line); if (endpoint != null) { - builder.add(new ServiceInstance( - endpoint, // endpoint - Collections.emptyMap(), // additional endpoints - Status.ALIVE)); // status + builder.add(endpoint); } } return builder.build(); diff --git a/singer/src/main/java/com/pinterest/singer/config/ServersetMonitor.java b/singer/src/main/java/com/pinterest/singer/config/ServersetMonitor.java new file mode 100644 index 00000000..38d15487 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/config/ServersetMonitor.java @@ -0,0 +1,11 @@ +package com.pinterest.singer.config; + +import java.net.InetSocketAddress; + +import com.google.common.collect.ImmutableSet; + +public interface ServersetMonitor { + + public void onChange(ImmutableSet serviceInstances); + +} diff --git a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java index e9d20d99..c910b3b1 100644 --- a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java +++ b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java @@ -115,7 +115,7 @@ public synchronized static KubeService getInstance() { @Override public void run() { // fetch existing pod directories - updatePodUidsFromFileSystem(); + updatePodNamesFromFileSystem(); // we should wait for some time try { @@ -128,7 +128,7 @@ public void run() { try { // method refactored so that class level method locking is done with the // synchronized keyword - updatePodUids(); + updatePodNames(); LOG.debug("Pod IDs refreshed at:" + new Date(System.currentTimeMillis())); } catch (Exception e) { // TODO what should be done if we have errors in reaching MD service @@ -150,7 +150,7 @@ public void run() { * * This method should have an effect on data if Singer was restarted */ - private void updatePodUidsFromFileSystem() { + private void updatePodNamesFromFileSystem() { LOG.info("Kubernetes POD log directory configured as:" + podLogDirectory); File[] directories = new File(podLogDirectory).listFiles(new FileFilter() { @@ -177,18 +177,18 @@ public boolean accept(File pathname) { if (directories != null) { for (File directory : directories) { - String podUid = directory.getName(); - if (temp.contains("." + podUid)) { - LOG.info("Ignoring POD directory " + podUid + " since there is a tombstone file present"); + String podName = directory.getName(); + if (temp.contains("." + podName)) { + LOG.info("Ignoring POD directory " + podName + " since there is a tombstone file present"); // Skip adding this pod to the active podset continue; } - activePodSet.add(podUid); + activePodSet.add(podName); for (PodWatcher podWatcher : registeredWatchers) { - podWatcher.podCreated(podUid); + podWatcher.podCreated(podName); } - LOG.info("Active POD found (via directory):" + podUid); + LOG.info("Active POD found (via directory):" + podName); } } // update number of active pods running @@ -196,7 +196,7 @@ public boolean accept(File pathname) { } /** - * Clear the set of Pod UIDs and update it with the latest fetch from kubelet + * Clear the set of Pod Names and update it with the latest fetch from kubelet * * Following a listener design, currently we only have 1 listener but in future * if we want to do something else as well when these events happen then this @@ -204,15 +204,15 @@ public boolean accept(File pathname) { * * @throws IOException */ - public void updatePodUids() throws IOException { + public void updatePodNames() throws IOException { LOG.debug("Active podset:" + activePodSet); - Set updatedPodUids = fetchPodUidsFromMetadata(); - SetView deletedPods = Sets.difference(activePodSet, updatedPodUids); + Set updatedPodNames = fetchPodNamesFromMetadata(); + SetView deletedNames = Sets.difference(activePodSet, updatedPodNames); // ignore new pods, pod discovery is done by watching directories - for (String podUid : deletedPods) { - updatePodWatchers(podUid, true); + for (String podName : deletedNames) { + updatePodWatchers(podName, true); // update metrics since pods have been deleted Stats.incr(SingerMetrics.PODS_DELETED); Stats.incr(SingerMetrics.NUMBER_OF_PODS, -1); @@ -220,24 +220,24 @@ public void updatePodUids() throws IOException { LOG.debug("Fired events for registered watchers"); activePodSet.clear(); - activePodSet.addAll(updatedPodUids); - LOG.debug("Cleared and updated pod UIDs:" + activePodSet); + activePodSet.addAll(updatedPodNames); + LOG.debug("Cleared and updated pod names:" + activePodSet); } /** * Update all {@link PodWatcher} about the pod that changed (created or deleted) * - * @param podUid + * @param podName * @param isDelete */ - public void updatePodWatchers(String podUid, boolean isDelete) { - LOG.debug("Pod change:" + podUid + " deleted:" + isDelete); + public void updatePodWatchers(String podName, boolean isDelete) { + LOG.debug("Pod change:" + podName + " deleted:" + isDelete); for (PodWatcher watcher : registeredWatchers) { try { if (isDelete) { - watcher.podDeleted(podUid); + watcher.podDeleted(podName); } else { - watcher.podCreated(podUid); + watcher.podCreated(podName); } } catch (Exception e) { LOG.error("Watcher had an exception", e); @@ -248,33 +248,35 @@ public void updatePodWatchers(String podUid, boolean isDelete) { /** * Fetch Pod IDs from metadata. * - * Note: inside singer we refer to pod's identifier as a the PodUid - * - * In our internal case 'Pod Uid' = 'kubernetes pod name' + * Note: inside singer we refer to pod's identifier as a the PodName * * e.g. see src/test/resources/pods-goodresponse.json * - * @return set of pod uids + * @return set of pod names * @throws IOException */ - public Set fetchPodUidsFromMetadata() throws IOException { + public Set fetchPodNamesFromMetadata() throws IOException { LOG.debug("Attempting to make pod md request"); String response = SingerUtils.makeGetRequest(PODS_MD_URL); LOG.debug("Received pod md response:" + response); Gson gson = new Gson(); JsonObject obj = gson.fromJson(response, JsonObject.class); - LOG.debug("Finished parsing kubelet response for PODs; now extracting POD UIDs"); - Set podUids = new HashSet<>(); + LOG.debug("Finished parsing kubelet response for PODs; now extracting POD names"); + Set podNames = new HashSet<>(); if (obj != null && obj.has("items")) { JsonArray ary = obj.get("items").getAsJsonArray(); for (int i = 0; i < ary.size(); i++) { - String uid = ary.get(i).getAsJsonObject().get("metadata").getAsJsonObject().get("name").getAsString(); - podUids.add(uid); - LOG.debug("Found active POD UID in JSON:" + uid); + JsonObject metadata = ary.get(i).getAsJsonObject().get("metadata").getAsJsonObject(); + String name = metadata.get("name").getAsString(); + podNames.add(name); + // to support namespace based POD directories + String namespace = metadata.get("namespace").getAsString(); + podNames.add(namespace + "_" + name); + LOG.debug("Found active POD name in JSON:" + name); } } - LOG.debug("Pod uids from kubelet:" + podUids); - return podUids; + LOG.debug("Pod names from kubelet:" + podNames); + return podNames; } /** @@ -385,22 +387,23 @@ public void checkAndProcessFsEvents() throws InterruptedException { // ignore delete events if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) { if (!file.toFile().isFile()) { - String podUid = file.toFile().getName(); - if (podUid.startsWith(".")) { + String podName = file.toFile().getName(); + if (podName.startsWith(".")) { // ignore tombstone files return; } - LOG.info("New pod directory discovered by FSM:" + event.logDir() + " " + podLogDirectory+" poduid:" + podUid); + LOG.info("New pod directory discovered by FSM:" + event.logDir() + " " + podLogDirectory + + " podname:" + podName); Stats.incr(SingerMetrics.PODS_CREATED); Stats.incr(SingerMetrics.NUMBER_OF_PODS); - activePodSet.add(podUid); - updatePodWatchers(podUid, false); + activePodSet.add(podName); + updatePodWatchers(podName, false); } // ignore all events that are not directory create events } else if (kind.equals(StandardWatchEventKinds.OVERFLOW)) { LOG.warn("Received overflow watch event from filesystem: Events may have been lost"); - // perform a full sync on pod IDs from file system - updatePodUidsFromFileSystem(); + // perform a full sync on pod names from file system + updatePodNamesFromFileSystem(); } else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) { // ignore the . files if (!file.toFile().getName().startsWith(".")) { diff --git a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java index f8541da0..ae96ff15 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java @@ -636,7 +636,7 @@ public void initializeLogStreamForPod(String podUid, Collection // since the DefaultLogMonitor de-duplicates this using hashcode // which is dependent on LogStream name clone.setName(podUid + ":" + clone.getName()); - singerLog = new SingerLog(clone); + singerLog = new SingerLog(clone, podUid); if (!singerLogPaths.containsKey(logPathKey)) { singerLogPaths.put(logPathKey, new HashSet<>()); diff --git a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java index a3f8d15e..4cbaabc7 100644 --- a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java +++ b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReader.java @@ -39,7 +39,6 @@ public class TextLogFileReader implements LogFileReader { private static final Logger LOG = LoggerFactory.getLogger(TextLogFileReader.class); - private static final String HOSTNAME = SingerUtils.getHostname(); protected boolean closed; private final LogFile logFile; @@ -55,6 +54,8 @@ public class TextLogFileReader implements LogFileReader { // The text log message format, can be TextMessage, or String; private final TextLogMessageType textLogMessageType; + private String hostname; + public TextLogFileReader( LogFile logFile, String path, @@ -66,10 +67,12 @@ public TextLogFileReader( TextLogMessageType messageType, boolean prependTimestamp, boolean prependHostName, + String hostname, String prependFieldDelimiter) throws Exception { Preconditions.checkArgument(!Strings.isNullOrEmpty(path)); Preconditions.checkArgument(byteOffset >= 0); + this.hostname = hostname; this.logFile = Preconditions.checkNotNull(logFile); this.path = path; this.numMessagesPerLogMessage = numMessagesPerLogMessage; @@ -118,7 +121,7 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc prependStr += System.currentTimeMillis() + prependFieldDelimiter; } if (prependHostname) { - prependStr += HOSTNAME + prependFieldDelimiter; + prependStr += hostname + prependFieldDelimiter; } if (prependStr.length() > 0) { maxBuffer.put(prependStr.getBytes()); @@ -142,7 +145,7 @@ public LogMessageAndPosition readLogMessageAndPosition() throws LogFileReaderExc case THRIFT_TEXT_MESSAGE: TextMessage textMessage = new TextMessage(); textMessage.setFilename(path); - textMessage.setHost(HOSTNAME); + textMessage.setHost(hostname); textMessage.addToMessages(TextMessageReader.bufToString(out)); logMessage = new LogMessage(ByteBuffer.wrap(serializer.serialize(textMessage))); break; diff --git a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java index fbf9cef0..7adba66d 100644 --- a/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java +++ b/singer/src/main/java/com/pinterest/singer/reader/TextLogFileReaderFactory.java @@ -16,6 +16,7 @@ package com.pinterest.singer.reader; import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.thrift.LogFile; import com.pinterest.singer.thrift.configuration.TextReaderConfig; import com.pinterest.singer.utils.LogFileUtils; @@ -40,9 +41,9 @@ public TextLogFileReaderFactory(TextReaderConfig readerConfig) { this.readerConfig = Preconditions.checkNotNull(readerConfig); } + @SuppressWarnings("resource") public LogFileReader getLogFileReader( LogStream logStream, LogFile logFile, String path, long byteOffset) throws Exception { - LogFileReader reader; try { long inode = SingerUtils.getFileInode(FileSystems.getDefault().getPath(path)); @@ -60,6 +61,7 @@ public LogFileReader getLogFileReader( readerConfig.getTextLogMessageType(), readerConfig.isPrependTimestamp(), readerConfig.isPrependHostname(), + SingerUtils.getHostNameBasedOnConfig(logStream, SingerSettings.getSingerConfig()), readerConfig.getPrependFieldDelimiter()); } catch (LogFileReaderException e) { LOG.warn("Exception in getLogFileReader", e); diff --git a/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java b/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java index 137d93bd..f228d8a7 100644 --- a/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java +++ b/singer/src/main/java/com/pinterest/singer/tools/KubeServiceChecker.java @@ -35,7 +35,7 @@ public static void main(String[] args) { KubeService service = KubeService.getInstance(); try { - Set podUids = service.fetchPodUidsFromMetadata(); + Set podUids = service.fetchPodNamesFromMetadata(); for (String puid : podUids) { System.out.println(puid); } diff --git a/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java b/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java index a8a23dde..2fccc5e3 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java +++ b/singer/src/main/java/com/pinterest/singer/utils/BrokerSetChangeListener.java @@ -15,6 +15,7 @@ */ package com.pinterest.singer.utils; +import java.net.InetSocketAddress; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; @@ -25,12 +26,10 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import com.twitter.common.net.pool.DynamicHostSet; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; +import com.pinterest.singer.config.ServersetMonitor; -public final class BrokerSetChangeListener - implements DynamicHostSet.HostChangeMonitor { +public final class BrokerSetChangeListener implements ServersetMonitor { + private static final int RESTART_WAIT_MAX_DELAY = 1800_000; private static final int BROKER_RESTART_PRECENTAGE_THRESHOLD = 50; private static final int BROKER_RESTART_VALUE_THRESHOLD = 1; @@ -53,11 +52,10 @@ public BrokerSetChangeListener(String monitoredServersetFilePath, } @Override - public void onChange(ImmutableSet serviceInstances) { + public void onChange(ImmutableSet serviceInstances) { Set newBrokerSet = Sets.newHashSet(); - for (ServiceInstance instance : serviceInstances) { - Endpoint endPoint = instance.getServiceEndpoint(); - newBrokerSet.add(Joiner.on(":").join(endPoint.getHost(), endPoint.getPort())); + for (InetSocketAddress instance : serviceInstances) { + newBrokerSet.add(Joiner.on(":").join(instance.getHostName(), instance.getPort())); } // 'serverSetPath' variable will still long live. But it should be rare as we // only diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java index e33cb8bf..9609c9bc 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java @@ -15,6 +15,7 @@ */ package com.pinterest.singer.utils; +import com.pinterest.singer.common.LogStream; import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.config.DirectorySingerConfigurator; import com.pinterest.singer.config.PropertyFileSingerConfigurator; @@ -22,12 +23,13 @@ import com.pinterest.singer.thrift.configuration.SingerConfig; import com.pinterest.singer.config.SingerDirectoryWatcher; - +import com.pinterest.singer.monitor.LogStreamManager; import com.google.common.base.Preconditions; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.Arrays; import java.util.Comparator; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.comparator.LastModifiedFileComparator; @@ -108,11 +110,7 @@ public static Date convertToDate(String timeString) throws ConfigurationExceptio } public static void printStackTrace() { - StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); - for(StackTraceElement e:stackTraceElements) { - System.out.println(e.getClassName()+"."+e.getMethodName()+":"+e.getLineNumber()); - } - System.out.println(); + LOG.warn(Arrays.toString(Thread.currentThread().getStackTrace())); } /** @@ -300,4 +298,16 @@ public static String makeGetRequest(String uri) throws IOException { throw new IOException(e); } } + + public static String getHostNameBasedOnConfig(LogStream logStream, + SingerConfig singerConfig) { + if (singerConfig.isKubernetesEnabled()) { + if (logStream.getSingerLog().getPodUid() !=null + && logStream.getSingerLog().getPodUid() != LogStreamManager.NON_KUBERNETES_POD_ID) { + return logStream.getSingerLog().getPodUid(); + } + } + return SingerUtils.getHostname(); + } + } diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java index 54aabf9b..bb7c738b 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java +++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java @@ -15,6 +15,8 @@ */ package com.pinterest.singer.writer; +import com.pinterest.singer.common.SingerMetrics; +import com.pinterest.singer.metrics.OpenTsdbMetricConverter; import com.pinterest.singer.thrift.configuration.KafkaProducerConfig; import com.pinterest.singer.utils.KafkaUtils; @@ -81,6 +83,8 @@ private KafkaProducer getProducerInternal(KafkaProducerConfig co if (result != null && result != producer) { producer.close(); } + // log metrics for no.of kafka producers currently in the cache + OpenTsdbMetricConverter.addMetric(SingerMetrics.NUM_KAFKA_PRODUCERS, producers.size()); } result = producers.get(config); return result; @@ -106,6 +110,8 @@ private boolean resetProducerInternal(KafkaProducerConfig config) { if (!retval) { newProducer.close(); } + // log metrics for no.of kafka producers currently in the cache + OpenTsdbMetricConverter.addMetric(SingerMetrics.NUM_KAFKA_PRODUCERS, producers.size()); } return retval; } diff --git a/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java b/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java index 25c7997e..830e2b53 100644 --- a/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java +++ b/singer/src/test/java/com/pinterest/singer/kubernetes/TestKubeService.java @@ -28,6 +28,9 @@ import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -102,17 +105,23 @@ public void testGoodPodFetch() throws KeyManagementException, ClientProtocolExce KubeConfig kubeConfig = new KubeConfig(); KubeService poll = new KubeService(kubeConfig); - Set fetchPodIdsFromMetadata = poll.fetchPodUidsFromMetadata(); + Set fetchPodNamesFromMetadata = poll.fetchPodNamesFromMetadata(); // check uid count is correct - assertEquals(6, fetchPodIdsFromMetadata.size()); - String[] podUids = new String[] { "enimanager-ppl56", - "zk-update-monitor-r1vlt", "tcollector-q4qx8", - "metrics-agent-8vm9g", "kubernetes-dashboard-1835568627-hhfhj", - "test-ci-0" }; - // check UIDs are correct, sequence doesn't really matter in this case - for (int i = 0; i < podUids.length; i++) { - assertTrue(fetchPodIdsFromMetadata.contains(podUids[i])); + assertEquals(12, fetchPodNamesFromMetadata.size()); + + Map podToNamespaceMap = new HashMap<>(); + podToNamespaceMap.put("enimanager-ppl56", "default"); + podToNamespaceMap.put("zk-update-monitor-r1vlt", "default"); + podToNamespaceMap.put("tcollector-q4qx8", "default"); + podToNamespaceMap.put("metrics-agent-8vm9g", "default"); + podToNamespaceMap.put("kubernetes-dashboard-1835568627-hhfhj", "kube-system"); + podToNamespaceMap.put("test-ci-0", "kubernetes-plugin"); + for (Entry entry : podToNamespaceMap.entrySet()) { + String name = entry.getKey(); + String namespace = entry.getValue(); + assertTrue(fetchPodNamesFromMetadata.contains(name)); + assertTrue(fetchPodNamesFromMetadata.contains(namespace + "_" + name)); } } @@ -123,7 +132,7 @@ public void testBadPodFetch() throws KeyManagementException, ClientProtocolExcep KubeConfig kubeConfig = new KubeConfig(); KubeService kubeService = new KubeService(kubeConfig); - Set fetchPodIdsFromMetadata = kubeService.fetchPodUidsFromMetadata(); + Set fetchPodIdsFromMetadata = kubeService.fetchPodNamesFromMetadata(); assertEquals(0, fetchPodIdsFromMetadata.size()); } @@ -169,7 +178,7 @@ public void podCreated(String podUid) { set.add(podUid); } }); - poll.updatePodUids(); + poll.updatePodNames(); assertEquals(6, set.size()); set.clear(); @@ -188,7 +197,7 @@ public void podCreated(String podUid) { } }); - poll.updatePodUids(); + poll.updatePodNames(); assertEquals(6, set.size()); } diff --git a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java index 3c26be85..9e4b21cf 100644 --- a/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java +++ b/singer/src/test/java/com/pinterest/singer/reader/TestTextLogFileReader.java @@ -43,14 +43,32 @@ public void testReadLogMessageAndPosition() throws Exception { long inode = SingerUtils.getFileInode(SingerUtils.getPath(path)); LogFile logFile = new LogFile(inode); LogFileReader reader = new TextLogFileReader(logFile, path, 0, 8192, 102400, 1, - Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null); + Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null, null); for (int i = 0; i < 100; i++) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); assertEquals(dataWritten.get(i), new String(log.getLogMessage().getMessage())); } reader.close(); } + + @Test + public void testReadLogMessageAndPositionWithHostname() throws Exception { + String path = FilenameUtils.concat(getTempPath(), "test2.log"); + List dataWritten = generateDummyMessagesToFile(path); + String delimiter = " "; + String hostname = "test"; + long inode = SingerUtils.getFileInode(SingerUtils.getPath(path)); + LogFile logFile = new LogFile(inode); + LogFileReader reader = new TextLogFileReader(logFile, path, 0, 8192, 102400, 1, + Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, true, hostname, delimiter); + for (int i = 0; i < 100; i++) { + LogMessageAndPosition log = reader.readLogMessageAndPosition(); + assertEquals(hostname + delimiter + dataWritten.get(i), new String(log.getLogMessage().getMessage())); + } + reader.close(); + } + @Test public void testReadLogMessageAndPositionMultiRead() throws Exception { String path = FilenameUtils.concat(getTempPath(), "test2.log"); @@ -59,7 +77,7 @@ public void testReadLogMessageAndPositionMultiRead() throws Exception { long inode = SingerUtils.getFileInode(SingerUtils.getPath(path)); LogFile logFile = new LogFile(inode); LogFileReader reader = new TextLogFileReader(logFile, path, 0, 8192, 102400, 2, - Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null); + Pattern.compile("^.*$"), TextLogMessageType.PLAIN_TEXT, false, false, null, null); for (int i = 0; i < 100; i = i + 2) { LogMessageAndPosition log = reader.readLogMessageAndPosition(); assertEquals(dataWritten.get(i) + dataWritten.get(i + 1), diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java index 4e369224..d770c000 100644 --- a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java +++ b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigs.java @@ -17,20 +17,18 @@ import static org.junit.Assert.assertTrue; +import java.net.InetSocketAddress; import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.jboss.netty.util.internal.ConcurrentHashMap; import org.junit.Rule; import org.junit.Test; import org.junit.contrib.java.lang.system.ExpectedSystemExit; import com.google.common.collect.ImmutableSet; -import com.twitter.thrift.Endpoint; -import com.twitter.thrift.ServiceInstance; -import com.twitter.thrift.Status; public class TestLogConfigs { @@ -43,18 +41,18 @@ public void testBrokerChangeSystemExit() { kafkaServerSets.put("/xyz", new HashSet<>(Arrays.asList("one:9092", "two:9092", "three:9092"))); BrokerSetChangeListener listener = new BrokerSetChangeListener("/xyz", kafkaServerSets, 100); listener.onChange( - ImmutableSet.of(new ServiceInstance(new Endpoint("one", 9092), null, Status.ALIVE), - new ServiceInstance(new Endpoint("two", 9092), null, Status.ALIVE), - new ServiceInstance(new Endpoint("three", 9092), null, Status.ALIVE))); + ImmutableSet.of(new InetSocketAddress("one", 9092), + new InetSocketAddress("two", 9092), + new InetSocketAddress("three", 9092))); assertTrue(true); listener.onChange( - ImmutableSet.of(new ServiceInstance(new Endpoint("one", 9092), null, Status.ALIVE), - new ServiceInstance(new Endpoint("two", 9092), null, Status.ALIVE))); + ImmutableSet.of(new InetSocketAddress("one", 9092), + new InetSocketAddress("two", 9092))); assertTrue(true); exit.expectSystemExitWithStatus(0); listener.onChange( - ImmutableSet.of(new ServiceInstance(new Endpoint("one", 9092), null, Status.ALIVE), - new ServiceInstance(new Endpoint("four", 9092), null, Status.ALIVE))); + ImmutableSet.of(new InetSocketAddress("one", 9092), + new InetSocketAddress("four", 9092))); } } diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java new file mode 100644 index 00000000..d4bc7464 --- /dev/null +++ b/singer/src/test/java/com/pinterest/singer/utils/TestSingerUtils.java @@ -0,0 +1,56 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.utils; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerLog; +import com.pinterest.singer.monitor.LogStreamManager; +import com.pinterest.singer.thrift.configuration.SingerConfig; +import com.pinterest.singer.thrift.configuration.SingerLogConfig; + +public class TestSingerUtils { + + @Test + public void testGetHostNameBasedOnConfig() { + LogStream logStream = new LogStream(new SingerLog(new SingerLogConfig(), "pod-11"), "test"); + SingerConfig config = new SingerConfig(); + config.setKubernetesEnabled(true); + // case 1 kubernetes enabled; pod id specified + String hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config); + assertEquals("pod-11", hostNameBasedOnConfig); + + // case 2 kubernetes enabled; pod id null + logStream = new LogStream(new SingerLog(new SingerLogConfig(), null), "test"); + hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config); + assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig); + + // case 3 kubernetes enabled; pod id NON_KUBERNETES_POD_ID + logStream = new LogStream(new SingerLog(new SingerLogConfig(), LogStreamManager.NON_KUBERNETES_POD_ID), "test"); + hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config); + assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig); + + // case 4 kubernetes disabled + config.setKubernetesEnabled(false); + logStream = new LogStream(new SingerLog(new SingerLogConfig(), "pod-11"), "test"); + hostNameBasedOnConfig = SingerUtils.getHostNameBasedOnConfig(logStream, config); + assertEquals(SingerUtils.getHostname(), hostNameBasedOnConfig); + } + +} diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml index 0dc85ba4..62d6fb6a 100644 --- a/thrift-logger/pom.xml +++ b/thrift-logger/pom.xml @@ -4,7 +4,7 @@ com.pinterest.singer singer-package - 0.8.0-rc.5 + 0.8.0-rc.6 ../pom.xml thrift-logger diff --git a/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java b/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java index c7002482..90723cc9 100644 --- a/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java +++ b/thrift-logger/src/main/java/com/pinterest/singer/client/LogbackThriftLogger.java @@ -39,7 +39,7 @@ public class LogbackThriftLogger extends BaseThriftLogger { protected static final String THRIFT_LOGGER_ERROR_LOGBACKEXCEPTION = "thrift_logger.error.logbackexception"; - protected static String HOST_NAME; + protected static String HOST_NAME = "n/a"; static { try {