Skip to content

Commit

Permalink
Merge master to logging-audit branch and bump up version 0.8.0-rc.6 (#35
Browse files Browse the repository at this point in the history
)

(1) merge master branch to logging-audit branch
(2) init HOST_NAME to be "n/a" in LogbackThriftLogger
(3) bump up version 0.8.0-rc.6
  • Loading branch information
zzhhhzz authored Jan 9, 2020
1 parent ed4c023 commit 6c3f42a
Show file tree
Hide file tree
Showing 24 changed files with 256 additions and 180 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Singer Build

on: [pull_request]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Build with Maven
run: mvn -B package --file pom.xml
4 changes: 3 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ To increase the chances that your pull request will be accepted:
- Write tests for your changes
- Write a good commit message

NOTE: all PRs must pass build before they can be accepted.

## License

By contributing to this project, you agree that your contributions will be
licensed under its [Apache 2 license](LICENSE).
licensed under its [Apache 2 license](LICENSE).
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0-rc.5</version>
<version>0.8.0-rc.6</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down Expand Up @@ -37,6 +37,10 @@
<id>ambud</id>
<name>Ambud Sharma</name>
</developer>
<developer>
<id>zzhhhzz</id>
<name>Heng Zhang</name>
</developer>
</developers>
<scm>
<connection>https://github.com/pinterest/singer.git</connection>
Expand All @@ -51,10 +55,6 @@
</modules>

<repositories>
<repository>
<id>Twitter public Maven repo</id>
<url>https://maven.twttr.com</url>
</repository>
<repository>
<id>central</id>
<url>https://repo.maven.apache.org/maven2</url>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0-rc.5</version>
<version>0.8.0-rc.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
2 changes: 1 addition & 1 deletion singer/deb.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.8.0-rc.5
0.8.0-rc.6
38 changes: 5 additions & 33 deletions singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0-rc.5</version>
<version>0.8.0-rc.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand All @@ -27,6 +27,10 @@
<id>ambud</id>
<name>Ambud Sharma</name>
</developer>
<developer>
<id>zzhhhzz</id>
<name>Heng Zhang</name>
</developer>
</developers>
<scm>
<connection>https://github.com/pinterest/singer.git</connection>
Expand Down Expand Up @@ -63,38 +67,6 @@
<version>0.0.116</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.twitter.common/dynamic-host-set -->
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>dynamic-host-set</artifactId>
<version>0.0.56</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.twitter.common/service-thrift -->
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>service-thrift</artifactId>
<version>1.0.55</version>
<exclusions>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>finagle-core_2.11</artifactId>
<version>6.25.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>finagle-serversets_2.11</artifactId>
<version>6.25.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>util-core_2.11</artifactId>
<version>6.25.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions singer/src/main/java/com/pinterest/singer/common/SingerLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@ public class SingerLog {

// The config for the SingerLog.
private final SingerLogConfig singerLogConfig;
private String podUid;

public SingerLog(SingerLogConfig singerLogConfig) {
this.singerLogConfig = Preconditions.checkNotNull(singerLogConfig);
}

public SingerLog(SingerLogConfig singerLogConfig, String podUid) {
this.podUid = podUid;
this.singerLogConfig = Preconditions.checkNotNull(singerLogConfig);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -62,4 +68,8 @@ public String getLogName() {
public SingerLogConfig getSingerLogConfig() {
return singerLogConfig;
}

public String getPodUid() {
return podUid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class SingerMetrics {
public static final String KAFKA_LATENCY = SINGER_WRITER + "max_kafka_batch_write_latency";
public static final String NUM_COMMITED_TRANSACTIONS = SINGER_WRITER + "num_committed_transactions";
public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions";
public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers";

public static final String KUBE_PREFIX = SINGER_PREIX + "kube.";
public static final String KUBE_API_ERROR = KUBE_PREFIX + "api_error";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.base.Command;
import com.twitter.common.base.MorePreconditions;
import com.twitter.common.zookeeper.Group;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.thrift.Endpoint;
import com.twitter.thrift.ServiceInstance;
import com.twitter.thrift.Status;
import com.twitter.util.ExceptionalFunction;
import org.apache.commons.lang.StringUtils;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
Expand All @@ -35,8 +28,6 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;

/**
* Implementation of the ServerSet interface that uses a file on local disk instead of talking to ZooKeeper directly.
Expand All @@ -47,7 +38,7 @@
* Note that this implementation only supports monitor() and not join(). Use the standard ZooKeeper implementation for
* join().
*/
public class ConfigFileServerSet implements ServerSet {
public class ConfigFileServerSet {

// made public for unit testing only
public static String SERVERSET_DIR = "/var/serverset";
Expand Down Expand Up @@ -83,35 +74,7 @@ public ConfigFileServerSet(String serverSetFilePath) {
}
}

@Override
public EndpointStatus join(
InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints, Status status)
throws Group.JoinException, InterruptedException {
throw new UnsupportedOperationException("ConfigFileServerSet does not support join()");
}

@Override
public EndpointStatus join(
InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints)
throws Group.JoinException, InterruptedException {
throw new UnsupportedOperationException("ConfigFileServerSet does not support join()");
}

@Override
public EndpointStatus join(
InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints, int shardId)
throws Group.JoinException, InterruptedException {
throw new UnsupportedOperationException("ConfigFileServerSet does not support join()");
}

@Override
public Command watch(final HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
monitor(monitor);
return null;
}

@Override
public void monitor(final HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
public void monitor(final ServersetMonitor monitor) throws Exception {
Preconditions.checkNotNull(monitor);
try {
// Each call to monitor registers a new file watch. This is a bit inefficient if there
Expand All @@ -122,30 +85,30 @@ public void monitor(final HostChangeMonitor<ServiceInstance> monitor) throws Mon
new ExceptionalFunction<byte[], Void>() {
@Override
public Void applyE(byte[] newContents) throws Exception {
ImmutableSet<ServiceInstance> newServerSet = readServerSet(newContents);
ImmutableSet<InetSocketAddress> newServerSet = readServerSet(newContents);
monitor.onChange(newServerSet);
return null;
}
});
} catch (IOException e) {
throw new MonitorException(
throw new Exception(
"Error setting up watch on dynamic server set file:" + serverSetFilePath, e);
}
}

protected Endpoint getEndPointFromServerSetLine(String line) {
protected InetSocketAddress getEndPointFromServerSetLine(String line) {
// We expect each line to be of the form "hostname:port". Note that host names can
// contain ':' themselves (e.g. ipv6 addresses).
int index = line.lastIndexOf(':');
Preconditions.checkArgument(index > 0 && index < line.length() - 1);

String host = line.substring(0, index);
int port = Integer.parseInt(line.substring(index + 1));
return new Endpoint(host, port);
return new InetSocketAddress(host, port);
}

public ImmutableSet<ServiceInstance> readServerSet(byte[] fileContent) throws IOException {
ImmutableSet.Builder<ServiceInstance> builder = new ImmutableSet.Builder<>();
public ImmutableSet<InetSocketAddress> readServerSet(byte[] fileContent) throws IOException {
ImmutableSet.Builder<InetSocketAddress> builder = new ImmutableSet.Builder<>();
InputStream stream = new ByteArrayInputStream(fileContent);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
while (true) {
Expand All @@ -158,12 +121,9 @@ public ImmutableSet<ServiceInstance> readServerSet(byte[] fileContent) throws IO
continue;
}

Endpoint endpoint = getEndPointFromServerSetLine(line);
InetSocketAddress endpoint = getEndPointFromServerSetLine(line);
if (endpoint != null) {
builder.add(new ServiceInstance(
endpoint, // endpoint
Collections.<String, Endpoint>emptyMap(), // additional endpoints
Status.ALIVE)); // status
builder.add(endpoint);
}
}
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.pinterest.singer.config;

import java.net.InetSocketAddress;

import com.google.common.collect.ImmutableSet;

public interface ServersetMonitor {

public void onChange(ImmutableSet<InetSocketAddress> serviceInstances);

}
Loading

0 comments on commit 6c3f42a

Please sign in to comment.