From 6022811486b56765b2107142286747a468afa203 Mon Sep 17 00:00:00 2001 From: CARRIERE Etienne Date: Tue, 12 Mar 2019 13:55:21 +0100 Subject: [PATCH] Add support for peer.service dependencies (#56) * Add in DAG the services called but not instrumented With the peer.service tag is present, it creates 'pseudo-service' when the span : * is a leaf (no children) * is a client (span.kind = 'client') * has the tag tag_key (as specified in commandline) The 'pseudo-service' name is the value of the tag 'peer.service'. The tag key can be overriden by env variable Signed-off-by: Etienne Carriere * Send the parameter through function parameter and not Spark env Signed-off-by: Etienne Carriere --- .../cassandra/CassandraDependenciesJob.java | 5 ++-- .../CassandraDependenciesJobTest.java | 2 +- .../dependencies/DependenciesSparkHelper.java | 4 +-- .../dependencies/SpansToDependencyLinks.java | 27 +++++++++++++++++++ .../spark/dependencies/model/Span.java | 1 + .../elastic/ElasticsearchDependenciesJob.java | 9 +++---- .../ElasticsearchDependenciesJobTest.java | 2 +- .../dependencies/DependenciesSparkJob.java | 10 +++++-- 8 files changed, 47 insertions(+), 13 deletions(-) diff --git a/jaeger-spark-dependencies-cassandra/src/main/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJob.java b/jaeger-spark-dependencies-cassandra/src/main/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJob.java index 539abe1..d3691b6 100644 --- a/jaeger-spark-dependencies-cassandra/src/main/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJob.java +++ b/jaeger-spark-dependencies-cassandra/src/main/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJob.java @@ -126,6 +126,7 @@ public Builder localDc(String localDc) { public CassandraDependenciesJob build() { return new CassandraDependenciesJob(this); } + } private final String keyspace; @@ -153,7 +154,7 @@ public CassandraDependenciesJob build() { } } - public void run() { + public void run(String peerServiceTag) { long microsLower = day.toInstant().toEpochMilli() * 1000; long microsUpper = day.plus(Period.ofDays(1)).toInstant().toEpochMilli() * 1000 - 1; @@ -167,7 +168,7 @@ public void run() { .mapValues(span -> (Span) span) .groupByKey(); - List dependencyLinks = DependenciesSparkHelper.derive(traces); + List dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag); store(sc, dependencyLinks); log.info("Done, {} dependency objects created", dependencyLinks.size()); } finally { diff --git a/jaeger-spark-dependencies-cassandra/src/test/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJobTest.java b/jaeger-spark-dependencies-cassandra/src/test/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJobTest.java index 5bbd827..02b8a80 100644 --- a/jaeger-spark-dependencies-cassandra/src/test/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJobTest.java +++ b/jaeger-spark-dependencies-cassandra/src/test/java/io/jaegertracing/spark/dependencies/cassandra/CassandraDependenciesJobTest.java @@ -120,7 +120,7 @@ protected void deriveDependencies() throws Exception { .day(LocalDate.now()) .keyspace("jaeger_v1_dc1") .build() - .run(); + .run("peer.service"); } @Override diff --git a/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkHelper.java b/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkHelper.java index 3c00855..9df09ab 100644 --- a/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkHelper.java +++ b/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkHelper.java @@ -35,8 +35,8 @@ private DependenciesSparkHelper() {} * spans with that traceId. * @return Aggregated dependency links for all traces. */ - public static List derive(JavaPairRDD> traceIdSpans) { - return traceIdSpans.flatMapValues(new SpansToDependencyLinks()) + public static List derive(JavaPairRDD> traceIdSpans,String peerServiceTag) { + return traceIdSpans.flatMapValues(new SpansToDependencyLinks(peerServiceTag)) .values() .mapToPair(dependency -> new Tuple2<>(new Tuple2<>(dependency.getParent(), dependency.getChild()), dependency)) .reduceByKey((v1, v2) -> new Dependency(v1.getParent(), v1.getChild(), v1.getCallCount() + v2.getCallCount())) diff --git a/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/SpansToDependencyLinks.java b/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/SpansToDependencyLinks.java index 12a7cc3..ae79433 100644 --- a/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/SpansToDependencyLinks.java +++ b/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/SpansToDependencyLinks.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; + +import org.apache.spark.SparkEnv; import org.apache.spark.api.java.function.Function; /** @@ -39,10 +41,28 @@ public class SpansToDependencyLinks implements Function, Iterable * @return collection of dependency links, note that it contains duplicates * @throws Exception */ + + public String peerServiceTag = ""; + + public SpansToDependencyLinks(String peerServiceTag){ + this.peerServiceTag = peerServiceTag; + } + @Override public Iterable call(Iterable trace) throws Exception { Map> spanMap = new LinkedHashMap<>(); + Map> spanChildrenMap = new LinkedHashMap<>(); for (Span span: trace) { + // Map of children + for (Reference ref: span.getRefs()){ + Set children = spanChildrenMap.get(ref.getSpanId()); + if (children == null){ + children = new LinkedHashSet<>(); + spanChildrenMap.put(ref.getSpanId(), children); + } + children.add(span); + } + // Map of parents Set sharedSpans = spanMap.get(span.getSpanId()); if (sharedSpans == null) { sharedSpans = new LinkedHashSet<>(); @@ -84,6 +104,13 @@ public Iterable call(Iterable trace) throws Exception { } } } + // We are on a leaf so we try to add a dependency for calls to components that calls remote components not instrumented + if (spanChildrenMap.get(span.getSpanId()) == null ){ + String targetName = span.getTag(peerServiceTag); + if (targetName != null) { + result.add(new Dependency(span.getProcess().getServiceName(), targetName)); + } + } } return result; } diff --git a/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/model/Span.java b/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/model/Span.java index e44c8eb..6067ceb 100644 --- a/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/model/Span.java +++ b/jaeger-spark-dependencies-common/src/main/java/io/jaegertracing/spark/dependencies/model/Span.java @@ -20,6 +20,7 @@ * @author Pavol Loffay */ public class Span implements Serializable { + private static final long serialVersionUID = 0L; private String traceId; diff --git a/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java b/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java index fbf6ebd..34e0866 100644 --- a/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java +++ b/jaeger-spark-dependencies-elasticsearch/src/main/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJob.java @@ -168,8 +168,8 @@ private static String prefix(String prefix) { return prefix != null ? String.format("%s-", prefix) : ""; } - public void run() { - run(indexDate("jaeger-span"), indexDate("jaeger-dependencies")); + public void run(String peerServiceTag) { + run(indexDate("jaeger-span"), indexDate("jaeger-dependencies") ,peerServiceTag); } String[] indexDate(String index) { @@ -181,7 +181,7 @@ String[] indexDate(String index) { return new String[]{String.format("%s-%s", index, date)}; } - void run(String[] spanIndices, String[] depIndices) { + void run(String[] spanIndices, String[] depIndices,String peerServiceTag) { JavaSparkContext sc = new JavaSparkContext(conf); try { for (int i = 0; i < spanIndices.length; i++) { @@ -191,7 +191,7 @@ void run(String[] spanIndices, String[] depIndices) { JavaPairRDD> traces = JavaEsSpark.esJsonRDD(sc, spanIndex) .map(new ElasticTupleToSpan()) .groupBy(Span::getTraceId); - List dependencyLinks = DependenciesSparkHelper.derive(traces); + List dependencyLinks = DependenciesSparkHelper.derive(traces,peerServiceTag); store(sc, dependencyLinks, depIndex + "/dependencies"); log.info("Done, {} dependency objects created", dependencyLinks.size()); if (dependencyLinks.size() > 0) { @@ -199,7 +199,6 @@ void run(String[] spanIndices, String[] depIndices) { break; } } - } finally { sc.stop(); } diff --git a/jaeger-spark-dependencies-elasticsearch/src/test/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJobTest.java b/jaeger-spark-dependencies-elasticsearch/src/test/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJobTest.java index 566cec3..08733fb 100644 --- a/jaeger-spark-dependencies-elasticsearch/src/test/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJobTest.java +++ b/jaeger-spark-dependencies-elasticsearch/src/test/java/io/jaegertracing/spark/dependencies/elastic/ElasticsearchDependenciesJobTest.java @@ -71,7 +71,7 @@ protected void deriveDependencies() { .nodes("http://" + jaegerElasticsearchEnvironment.getElasticsearchIPPort()) .day(LocalDate.now()) .build(); - dependenciesJob.run(); + dependenciesJob.run("peer.service"); } @Override diff --git a/jaeger-spark-dependencies/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkJob.java b/jaeger-spark-dependencies/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkJob.java index 5142795..e0c1f92 100644 --- a/jaeger-spark-dependencies/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkJob.java +++ b/jaeger-spark-dependencies/src/main/java/io/jaegertracing/spark/dependencies/DependenciesSparkJob.java @@ -19,6 +19,8 @@ import java.net.URL; import java.net.URLDecoder; import java.time.LocalDate; +import java.util.HashMap; +import java.util.Map; public final class DependenciesSparkJob { @@ -39,19 +41,23 @@ public static void main(String[] args) throws UnsupportedEncodingException { } private static void run(String storage, LocalDate localDate) throws UnsupportedEncodingException { + String peerServiceTag = System.getenv("PEER_SERVICE_TAG"); + if (peerServiceTag == null){ + peerServiceTag = "peer.service"; + } String jarPath = pathToUberJar(); if ("elasticsearch".equalsIgnoreCase(storage)) { ElasticsearchDependenciesJob.builder() .jars(jarPath) .day(localDate) .build() - .run(); + .run(peerServiceTag); } else if ("cassandra".equalsIgnoreCase(storage)) { CassandraDependenciesJob.builder() .jars(jarPath) .day(localDate) .build() - .run(); + .run(peerServiceTag); } else { throw new IllegalArgumentException("Unsupported storage: " + storage); }