Skip to content

Commit

Permalink
Change index prefix to - (#52)
Browse files Browse the repository at this point in the history
* Change index prefix to -

Signed-off-by: Pavol Loffay <[email protected]>

* Deprecate : in index prefix

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Feb 27, 2019
1 parent 8279519 commit f2d6cca
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,36 +154,52 @@ private static String getSystemPropertyAsFileResource(String key) {
for (Map.Entry<String, String> entry : builder.sparkProperties.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
this.indexPrefix = builder.indexPrefix;
}

String indexPrefix = builder.indexPrefix;
if (indexPrefix != null) {
indexPrefix = String.format("%s:", indexPrefix);
} else {
indexPrefix = "";
}
this.indexPrefix = indexPrefix;
/**
* https://github.com/jaegertracing/jaeger/blob/master/CHANGELOG.md#190-2019-01-21
*/
private static String prefixBefore19(String prefix) {
return prefix != null ? String.format("%s:", prefix) : "";
}

private static String prefix(String prefix) {
return prefix != null ? String.format("%s-", prefix) : "";
}

public void run() {
run(indexDate("jaeger-span"), indexDate("jaeger-dependencies") + "/dependencies");
run(indexDate("jaeger-span"), indexDate("jaeger-dependencies"));
}

String indexDate(String index) {
String[] indexDate(String index) {
String date = day.toLocalDate().format(DateTimeFormatter.ISO_LOCAL_DATE);
return String.format("%s%s-%s", indexPrefix, index, date);
if (indexPrefix != null && indexPrefix.length() > 0) {
return new String[]{String.format("%s%s-%s", prefix(indexPrefix), index, date), String.format("%s%s-%s", prefixBefore19(indexPrefix), index, date)};
}
// if there is no prefix we read and write only to one index
return new String[]{String.format("%s-%s", index, date)};
}

void run(String spanResource, String depResource) {
log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanResource ,depResource);
void run(String[] spanIndices, String[] depIndices) {
JavaSparkContext sc = new JavaSparkContext(conf);
try {
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanResource)
.map(new ElasticTupleToSpan())
.groupBy(Span::getTraceId);
for (int i = 0; i < spanIndices.length; i++) {
String spanIndex = spanIndices[i];
String depIndex = depIndices[i];
log.info("Running Dependencies job for {}, reading from {} index, result storing to {}", day, spanIndex, depIndex);
JavaPairRDD<String, Iterable<Span>> traces = JavaEsSpark.esJsonRDD(sc, spanIndex)
.map(new ElasticTupleToSpan())
.groupBy(Span::getTraceId);
List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces);
store(sc, dependencyLinks, depIndex + "/dependencies");
log.info("Done, {} dependency objects created", dependencyLinks.size());
if (dependencyLinks.size() > 0) {
// we do not derive dependencies for old prefix "prefix:" if new prefix "prefix-" contains data
break;
}
}

List<Dependency> dependencyLinks = DependenciesSparkHelper.derive(traces);
store(sc, dependencyLinks, depResource);
log.info("Done, {} dependency objects created and stored to {}", dependencyLinks.size(), depResource);
} finally {
sc.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ public void start(Map<String, String> jaegerEnvs, String jaegerVersion) {
queryUrl = String.format("http://%s:%d", jaegerQuery.getContainerIpAddress(), jaegerQuery.getMappedPort(16686));
}

public void cleanUp(String spanIndex, String dependenciesIndex) throws IOException {
public void cleanUp(String[] spanIndex, String[] dependenciesIndex) throws IOException {
String matchAllQuery = "{\"query\": {\"match_all\":{} }}";
Request request = new Request.Builder()
.url(String.format("http://%s:%d/%s,%s/_delete_by_query?conflicts=proceed",
elasticsearch.getContainerIpAddress(),
elasticsearch.getMappedPort(9200),
spanIndex,
dependenciesIndex))
// we don't use index prefix
spanIndex[0],
dependenciesIndex[0]))
.post(
RequestBody.create(MediaType.parse("application/json; charset=utf-8"), matchAllQuery))
.build();
Expand Down

0 comments on commit f2d6cca

Please sign in to comment.