Skip to content

Commit

Permalink
Implements deduplication in Elasticsearch (openzipkin#2573)
Browse files Browse the repository at this point in the history
This sets span index ID to `${traceID}-${MD5(json)}` to allow for
server-side deduplication and uses okio based libraries to be more
efficient writing to Elasticsearch.
  • Loading branch information
adriancole authored and abesto committed Sep 10, 2019
1 parent 4e9d66d commit 7b49a1e
Show file tree
Hide file tree
Showing 11 changed files with 516 additions and 266 deletions.
7 changes: 6 additions & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,15 @@
</dependency>

<dependency>
<groupId>org.apache.zipkin</groupId>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}.zipkin2</groupId>
<artifactId>zipkin-storage-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zipkin2.elasticsearch.internal;

import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import okio.Okio;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import zipkin2.Span;
import zipkin2.codec.CodecBenchmarks;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.elasticsearch.ElasticsearchStorage;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
@Threads(2)
public class BulkRequestBenchmarks {
static final Span CLIENT_SPAN = SpanBytesDecoder.JSON_V2.decodeOne(read("/zipkin2-client.json"));

final ElasticsearchStorage es = ElasticsearchStorage.newBuilder().build();
final BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");

final long indexTimestamp = CLIENT_SPAN.timestampAsLong() / 1000L;
final String spanIndex =
es.indexNameFormatter().formatTypeAndTimestampForInsert("span", '-', indexTimestamp);

@Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
}

@Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.addProfiler("gc")
.include(".*" + BulkRequestBenchmarks.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}

static byte[] read(String resource) {
try {
return ByteStreams.toByteArray(CodecBenchmarks.class.getResourceAsStream(resource));
} catch (IOException e) {
throw new IllegalStateException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,24 @@
*/
package zipkin2.elasticsearch;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import okio.Buffer;
import okio.ByteString;
import zipkin2.Annotation;
import zipkin2.Call;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.elasticsearch.internal.HttpBulkIndexer;
import zipkin2.elasticsearch.internal.BulkCallBuilder;
import zipkin2.elasticsearch.internal.BulkIndexWriter;
import zipkin2.elasticsearch.internal.IndexNameFormatter;
import zipkin2.internal.DelayLimiter;
import zipkin2.internal.Nullable;
import zipkin2.storage.SpanConsumer;

import static zipkin2.elasticsearch.ElasticsearchAutocompleteTags.AUTOCOMPLETE;
import static zipkin2.elasticsearch.ElasticsearchSpanStore.SPAN;
import static zipkin2.internal.JsonEscaper.jsonEscape;
import static zipkin2.internal.JsonEscaper.jsonEscapedSizeInBytes;
import static zipkin2.elasticsearch.internal.BulkCallBuilder.INDEX_CHARS_LIMIT;

class ElasticsearchSpanConsumer implements SpanConsumer { // not final for testing
static final Logger LOG = Logger.getLogger(ElasticsearchSpanConsumer.class.getName());
static final int INDEX_CHARS_LIMIT = 256;
static final ByteString EMPTY_JSON = ByteString.of(new byte[] {'{', '}'});

final ElasticsearchStorage es;
final Set<String> autocompleteKeys;
Expand Down Expand Up @@ -80,19 +67,17 @@ String formatTypeAndTimestampForInsert(String type, long timestampMillis) {

void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
for (Span span : spans) {
long spanTimestamp = span.timestampAsLong();
long indexTimestamp = 0L; // which index to store this span into
if (spanTimestamp != 0L) {
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(spanTimestamp);
} else {
final long indexTimestamp; // which index to store this span into
if (span.timestampAsLong() != 0L) {
indexTimestamp = span.timestampAsLong() / 1000L;
} else if (!span.annotations().isEmpty()) {
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
// the index bucket, any annotation is better than using current time.
if (!span.annotations().isEmpty()) {
indexTimestamp = span.annotations().get(0).timestamp() / 1000;
}
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
indexTimestamp = span.annotations().get(0).timestamp() / 1000L;
} else {
indexTimestamp = System.currentTimeMillis();
}
indexer.add(indexTimestamp, span, spanTimestamp);
indexer.add(indexTimestamp, span);
if (searchEnabled && !span.tags().isEmpty()) {
indexer.addAutocompleteValues(indexTimestamp, span);
}
Expand All @@ -101,21 +86,21 @@ void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {

/** Mutable type used for each call to store spans */
static final class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final BulkCallBuilder bulkCallBuilder;
final ElasticsearchSpanConsumer consumer;
final List<AutocompleteContext> pendingAutocompleteContexts = new ArrayList<>();
final BulkIndexWriter<Span> spanWriter;

BulkSpanIndexer(ElasticsearchSpanConsumer consumer) {
this.indexer = new HttpBulkIndexer("index-span", consumer.es);
this.bulkCallBuilder = new BulkCallBuilder(consumer.es, consumer.es.version(), "index-span");
this.consumer = consumer;
this.spanWriter =
consumer.searchEnabled ? BulkIndexWriter.SPAN : BulkIndexWriter.SPAN_SEARCH_DISABLED;
}

void add(long indexTimestamp, Span span, long timestampMillis) {
void add(long indexTimestamp, Span span) {
String index = consumer.formatTypeAndTimestampForInsert(SPAN, indexTimestamp);
byte[] document = consumer.searchEnabled
? prefixWithTimestampMillisAndQuery(span, timestampMillis)
: SpanBytesEncoder.JSON_V2.encode(span);
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
bulkCallBuilder.index(index, SPAN, span, spanWriter);
}

void addAutocompleteValues(long indexTimestamp, Span span) {
Expand All @@ -127,29 +112,17 @@ void addAutocompleteValues(long indexTimestamp, Span span) {
// If the autocomplete whitelist doesn't contain the key, skip storing its value
if (!consumer.autocompleteKeys.contains(tag.getKey())) continue;

// Id is used to dedupe server side as necessary. Arbitrarily same format as _q value.
String id = tag.getKey() + "=" + tag.getValue();
AutocompleteContext context = new AutocompleteContext(indexTimestamp, id);
AutocompleteContext context =
new AutocompleteContext(indexTimestamp, tag.getKey(), tag.getValue());
if (!consumer.delayLimiter.shouldInvoke(context)) continue;
pendingAutocompleteContexts.add(context);

// encode using zipkin's internal buffer so we don't have to catch exceptions etc
int sizeInBytes = 27; // {"tagKey":"","tagValue":""}
sizeInBytes += jsonEscapedSizeInBytes(tag.getKey());
sizeInBytes += jsonEscapedSizeInBytes(tag.getValue());
zipkin2.internal.Buffer b = zipkin2.internal.Buffer.allocate(sizeInBytes);
b.writeAscii("{\"tagKey\":\"");
b.writeUtf8(jsonEscape(tag.getKey()));
b.writeAscii("\",\"tagValue\":\"");
b.writeUtf8(jsonEscape(tag.getValue()));
b.writeAscii("\"}");
byte[] document = b.toByteArray();
indexer.add(idx, AUTOCOMPLETE, document, id);
bulkCallBuilder.index(idx, AUTOCOMPLETE, tag, BulkIndexWriter.AUTOCOMPLETE);
}
}

Call<Void> newCall() {
Call<Void> storeCall = indexer.newCall();
Call<Void> storeCall = bulkCallBuilder.build();
if (pendingAutocompleteContexts.isEmpty()) return storeCall;
return storeCall.handleError((error, callback) -> {
for (AutocompleteContext context : pendingAutocompleteContexts) {
Expand All @@ -160,89 +133,31 @@ Call<Void> newCall() {
}
}

/**
* In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
* when storing. The cheapest way to do this without changing the codec is prefixing it to the
* json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
*
* <p>Tags are stored as a dictionary. Since some tag names will include inconsistent number of
* dots (ex "error" and perhaps "error.message"), we cannot index them naturally with
* elasticsearch. Instead, we add an index-only (non-source) field of {@code _q} which includes
* valid search queries. For example, the tag {@code error -> 500} results in {@code
* "_q":["error", "error=500"]}. This matches the input query syntax, and can be checked manually
* with curl.
*
* <p>Ex {@code curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500}
*/
static byte[] prefixWithTimestampMillisAndQuery(Span span, long timestampMillis) {
Buffer prefix = new Buffer();
JsonWriter writer = JsonWriter.of(prefix);
try {
writer.beginObject();

if (timestampMillis != 0L) writer.name("timestamp_millis").value(timestampMillis);
if (!span.tags().isEmpty() || !span.annotations().isEmpty()) {
writer.name("_q");
writer.beginArray();
for (Annotation a : span.annotations()) {
if (a.value().length() > INDEX_CHARS_LIMIT) continue;
writer.value(a.value());
}
for (Map.Entry<String, String> tag : span.tags().entrySet()) {
int length = tag.getKey().length() + tag.getValue().length() + 1;
if (length > INDEX_CHARS_LIMIT) continue;
writer.value(tag.getKey()); // search is possible by key alone
writer.value(tag.getKey() + "=" + tag.getValue());
}
writer.endArray();
}
writer.endObject();
} catch (IOException e) {
// very unexpected to have an IOE for an in-memory write
assert false : "Error indexing query for span: " + span;
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Error indexing query for span: " + span, e);
}
return SpanBytesEncoder.JSON_V2.encode(span);
}
byte[] document = SpanBytesEncoder.JSON_V2.encode(span);
if (prefix.rangeEquals(0L, EMPTY_JSON)) return document;
return mergeJson(prefix.readByteArray(), document);
}

static byte[] mergeJson(byte[] prefix, byte[] suffix) {
byte[] newSpanBytes = new byte[prefix.length + suffix.length - 1];
int pos = 0;
System.arraycopy(prefix, 0, newSpanBytes, pos, prefix.length);
pos += prefix.length;
newSpanBytes[pos - 1] = ',';
// starting at position 1 discards the old head of '{'
System.arraycopy(suffix, 1, newSpanBytes, pos, suffix.length - 1);
return newSpanBytes;
}

static final class AutocompleteContext {
final long indexTimestamp;
final String autocompleteId;
final long timestamp;
final String key, value;

AutocompleteContext(long indexTimestamp, String autocompleteId) {
this.indexTimestamp = indexTimestamp;
this.autocompleteId = autocompleteId;
AutocompleteContext(long timestamp, String key, String value) {
this.timestamp = timestamp;
this.key = key;
this.value = value;
}

@Override public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof AutocompleteContext)) return false;
AutocompleteContext that = (AutocompleteContext) o;
return indexTimestamp == that.indexTimestamp && autocompleteId.equals(that.autocompleteId);
return timestamp == that.timestamp && key.equals(that.key) && value.equals(that.value);
}

@Override public int hashCode() {
int h$ = 1;
h$ *= 1000003;
h$ ^= (int) (h$ ^ ((indexTimestamp >>> 32) ^ indexTimestamp));
h$ ^= (int) (h$ ^ ((timestamp >>> 32) ^ timestamp));
h$ *= 1000003;
h$ ^= key.hashCode();
h$ *= 1000003;
h$ ^= autocompleteId.hashCode();
h$ ^= value.hashCode();
return h$;
}
}
Expand Down
Loading

0 comments on commit 7b49a1e

Please sign in to comment.