Skip to content

Commit

Permalink
Add support for peer.service dependencies (#56)
Browse files Browse the repository at this point in the history
* Add in DAG the services called but not instrumented

With the peer.service tag is present, it creates 'pseudo-service' when the span :
* is a leaf (no children)
* is a client (span.kind = 'client')
* has the tag tag_key (as specified in commandline)

The 'pseudo-service' name is the value of the tag 'peer.service'.

The tag key can be overriden by env variable

Signed-off-by: Etienne Carriere <[email protected]>

* Send the parameter through function parameter and not Spark env

Signed-off-by: Etienne Carriere <[email protected]>
  • Loading branch information
Etienne-Carriere authored and pavolloffay committed Mar 12, 2019
1 parent f2d6cca commit 6022811
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public Builder localDc(String localDc) {
public CassandraDependenciesJob build() {
return new CassandraDependenciesJob(this);
}

}

private final String keyspace;
Expand Down Expand Up @@ -153,7 +154,7 @@ public CassandraDependenciesJob build() {
}
}

public void run() {
public void run(String peerServiceTag) {
long microsLower = day.toInstant().toEpochMilli() * 1000;
long microsUpper = day.plus(Period.ofDays(1)).toInstant().toEpochMilli() * 1000 - 1;

Expand All @@ -167,7 +168,7 @@ public void run() {
.mapValues(span -> (Span) span)
.groupByKey();

List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag);
store(sc, dependencyLinks);
log.info("Done, {} dependency objects created", dependencyLinks.size());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void deriveDependencies() throws Exception {
.day(LocalDate.now())
.keyspace("jaeger_v1_dc1")
.build()
.run();
.run("peer.service");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private DependenciesSparkHelper() {}
* spans with that traceId.
* @return Aggregated dependency links for all traces.
*/
public static List<Dependency> derive(JavaPairRDD<String, Iterable<Span>> traceIdSpans) {
return traceIdSpans.flatMapValues(new SpansToDependencyLinks())
public static List<Dependency> derive(JavaPairRDD<String, Iterable<Span>> traceIdSpans,String peerServiceTag) {
return traceIdSpans.flatMapValues(new SpansToDependencyLinks(peerServiceTag))
.values()
.mapToPair(dependency -> new Tuple2<>(new Tuple2<>(dependency.getParent(), dependency.getChild()), dependency))
.reduceByKey((v1, v2) -> new Dependency(v1.getParent(), v1.getChild(), v1.getCallCount() + v2.getCallCount()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.spark.SparkEnv;
import org.apache.spark.api.java.function.Function;

/**
Expand All @@ -39,10 +41,28 @@ public class SpansToDependencyLinks implements Function<Iterable<Span>, Iterable
* @return collection of dependency links, note that it contains duplicates
* @throws Exception
*/

public String peerServiceTag = "";

public SpansToDependencyLinks(String peerServiceTag){
this.peerServiceTag = peerServiceTag;
}

@Override
public Iterable<Dependency> call(Iterable<Span> trace) throws Exception {
Map<Long, Set<Span>> spanMap = new LinkedHashMap<>();
Map<Long, Set<Span>> spanChildrenMap = new LinkedHashMap<>();
for (Span span: trace) {
// Map of children
for (Reference ref: span.getRefs()){
Set <Span> children = spanChildrenMap.get(ref.getSpanId());
if (children == null){
children = new LinkedHashSet<>();
spanChildrenMap.put(ref.getSpanId(), children);
}
children.add(span);
}
// Map of parents
Set<Span> sharedSpans = spanMap.get(span.getSpanId());
if (sharedSpans == null) {
sharedSpans = new LinkedHashSet<>();
Expand Down Expand Up @@ -84,6 +104,13 @@ public Iterable<Dependency> call(Iterable<Span> trace) throws Exception {
}
}
}
// We are on a leaf so we try to add a dependency for calls to components that calls remote components not instrumented
if (spanChildrenMap.get(span.getSpanId()) == null ){
String targetName = span.getTag(peerServiceTag);
if (targetName != null) {
result.add(new Dependency(span.getProcess().getServiceName(), targetName));
}
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* @author Pavol Loffay
*/
public class Span implements Serializable {

private static final long serialVersionUID = 0L;

private String traceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ private static String prefix(String prefix) {
return prefix != null ? String.format("%s-", prefix) : "";
}

public void run() {
run(indexDate("jaeger-span"), indexDate("jaeger-dependencies"));
public void run(String peerServiceTag) {
run(indexDate("jaeger-span"), indexDate("jaeger-dependencies") ,peerServiceTag);
}

String[] indexDate(String index) {
Expand All @@ -181,7 +181,7 @@ String[] indexDate(String index) {
return new String[]{String.format("%s-%s", index, date)};
}

void run(String[] spanIndices, String[] depIndices) {
void run(String[] spanIndices, String[] depIndices,String peerServiceTag) {
JavaSparkContext sc = new JavaSparkContext(conf);
try {
for (int i = 0; i < spanIndices.length; i++) {
Expand All @@ -191,15 +191,14 @@ void run(String[] spanIndices, String[] depIndices) {
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex)
.map(new ElasticTupleToSpan())
.groupBy(Span::getTraceId);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag);
store(sc, dependencyLinks, depIndex + "/dependencies");
log.info("Done, {} dependency objects created", dependencyLinks.size());
if (dependencyLinks.size() > 0) {
// we do not derive dependencies for old prefix "prefix:" if new prefix "prefix-" contains data
break;
}
}

} finally {
sc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void deriveDependencies() {
.nodes("http://" + jaegerElasticsearchEnvironment.getElasticsearchIPPort())
.day(LocalDate.now())
.build();
dependenciesJob.run();
dependenciesJob.run("peer.service");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.net.URL;
import java.net.URLDecoder;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;

public final class DependenciesSparkJob {

Expand All @@ -39,19 +41,23 @@ public static void main(String[] args) throws UnsupportedEncodingException {
}

private static void run(String storage, LocalDate localDate) throws UnsupportedEncodingException {
String peerServiceTag = System.getenv("PEER_SERVICE_TAG");
if (peerServiceTag == null){
peerServiceTag = "peer.service";
}
String jarPath = pathToUberJar();
if ("elasticsearch".equalsIgnoreCase(storage)) {
ElasticsearchDependenciesJob.builder()
.jars(jarPath)
.day(localDate)
.build()
.run();
.run(peerServiceTag);
} else if ("cassandra".equalsIgnoreCase(storage)) {
CassandraDependenciesJob.builder()
.jars(jarPath)
.day(localDate)
.build()
.run();
.run(peerServiceTag);
} else {
throw new IllegalArgumentException("Unsupported storage: " + storage);
}
Expand Down

0 comments on commit 6022811

Please sign in to comment.