newCall() {
}
}
- /**
- * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
- * when storing. The cheapest way to do this without changing the codec is prefixing it to the
- * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
- *
- * Tags are stored as a dictionary. Since some tag names will include inconsistent number of
- * dots (ex "error" and perhaps "error.message"), we cannot index them naturally with
- * elasticsearch. Instead, we add an index-only (non-source) field of {@code _q} which includes
- * valid search queries. For example, the tag {@code error -> 500} results in {@code
- * "_q":["error", "error=500"]}. This matches the input query syntax, and can be checked manually
- * with curl.
- *
- *
Ex {@code curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500}
- */
- static byte[] prefixWithTimestampMillisAndQuery(Span span, long timestampMillis) {
- Buffer prefix = new Buffer();
- JsonWriter writer = JsonWriter.of(prefix);
- try {
- writer.beginObject();
-
- if (timestampMillis != 0L) writer.name("timestamp_millis").value(timestampMillis);
- if (!span.tags().isEmpty() || !span.annotations().isEmpty()) {
- writer.name("_q");
- writer.beginArray();
- for (Annotation a : span.annotations()) {
- if (a.value().length() > INDEX_CHARS_LIMIT) continue;
- writer.value(a.value());
- }
- for (Map.Entry tag : span.tags().entrySet()) {
- int length = tag.getKey().length() + tag.getValue().length() + 1;
- if (length > INDEX_CHARS_LIMIT) continue;
- writer.value(tag.getKey()); // search is possible by key alone
- writer.value(tag.getKey() + "=" + tag.getValue());
- }
- writer.endArray();
- }
- writer.endObject();
- } catch (IOException e) {
- // very unexpected to have an IOE for an in-memory write
- assert false : "Error indexing query for span: " + span;
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "Error indexing query for span: " + span, e);
- }
- return SpanBytesEncoder.JSON_V2.encode(span);
- }
- byte[] document = SpanBytesEncoder.JSON_V2.encode(span);
- if (prefix.rangeEquals(0L, EMPTY_JSON)) return document;
- return mergeJson(prefix.readByteArray(), document);
- }
-
- static byte[] mergeJson(byte[] prefix, byte[] suffix) {
- byte[] newSpanBytes = new byte[prefix.length + suffix.length - 1];
- int pos = 0;
- System.arraycopy(prefix, 0, newSpanBytes, pos, prefix.length);
- pos += prefix.length;
- newSpanBytes[pos - 1] = ',';
- // starting at position 1 discards the old head of '{'
- System.arraycopy(suffix, 1, newSpanBytes, pos, suffix.length - 1);
- return newSpanBytes;
- }
-
static final class AutocompleteContext {
- final long indexTimestamp;
- final String autocompleteId;
+ final long timestamp;
+ final String key, value;
- AutocompleteContext(long indexTimestamp, String autocompleteId) {
- this.indexTimestamp = indexTimestamp;
- this.autocompleteId = autocompleteId;
+ AutocompleteContext(long timestamp, String key, String value) {
+ this.timestamp = timestamp;
+ this.key = key;
+ this.value = value;
}
@Override public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof AutocompleteContext)) return false;
AutocompleteContext that = (AutocompleteContext) o;
- return indexTimestamp == that.indexTimestamp && autocompleteId.equals(that.autocompleteId);
+ return timestamp == that.timestamp && key.equals(that.key) && value.equals(that.value);
}
@Override public int hashCode() {
int h$ = 1;
h$ *= 1000003;
- h$ ^= (int) (h$ ^ ((indexTimestamp >>> 32) ^ indexTimestamp));
+ h$ ^= (int) (h$ ^ ((timestamp >>> 32) ^ timestamp));
+ h$ *= 1000003;
+ h$ ^= key.hashCode();
h$ *= 1000003;
- h$ ^= autocompleteId.hashCode();
+ h$ ^= value.hashCode();
return h$;
}
}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
similarity index 51%
rename from zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java
rename to zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
index 2eeffeaf3a8..e5d7c6f56ca 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
@@ -16,6 +16,7 @@
*/
package zipkin2.elasticsearch.internal;
+import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionException;
import okhttp3.HttpUrl;
@@ -23,16 +24,15 @@
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
+import okio.BufferedSink;
import okio.BufferedSource;
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.internal.client.HttpCall;
-import zipkin2.internal.Nullable;
-
-import static zipkin2.internal.JsonEscaper.jsonEscape;
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
// exposed to re-use for testing writes of dependency links
-public final class HttpBulkIndexer {
+public final class BulkCallBuilder {
+ public static final int INDEX_CHARS_LIMIT = 256;
static final MediaType APPLICATION_JSON = MediaType.parse("application/json");
final String tag;
@@ -41,12 +41,12 @@ public final class HttpBulkIndexer {
final String pipeline;
final boolean waitForRefresh;
- // Mutated for each call to add
- final Buffer body = new Buffer();
+ // Mutated for each call to index
+ final Buffer buffer = new Buffer();
- public HttpBulkIndexer(String tag, ElasticsearchStorage es) {
+ public BulkCallBuilder(ElasticsearchStorage es, float esVersion, String tag) {
this.tag = tag;
- shouldAddType = es.version() < 7.0f;
+ shouldAddType = esVersion < 7.0f;
http = es.http();
pipeline = es.pipeline();
waitForRefresh = es.flushOnWrites();
@@ -55,52 +55,80 @@ public HttpBulkIndexer(String tag, ElasticsearchStorage es) {
enum CheckForErrors implements HttpCall.BodyConverter {
INSTANCE;
- @Override
- public Void convert(BufferedSource b) throws IOException {
+ @Override public Void convert(BufferedSource b) throws IOException {
String content = b.readUtf8();
if (content.contains("\"status\":429")) throw new RejectedExecutionException(content);
if (content.contains("\"errors\":true")) throw new IllegalStateException(content);
return null;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "CheckForErrors";
}
}
- public void add(String index, String typeName, byte[] document, @Nullable String id) {
- writeIndexMetadata(index, typeName, id);
- writeDocument(document);
+ public void index(String index, String typeName, T input, BulkIndexWriter writer) {
+ Buffer document = new Buffer();
+ String id = writer.writeDocument(input, document);
+ writeIndexMetadata(buffer, index, typeName, id);
+ buffer.writeByte('\n');
+ buffer.write(document, document.size());
+ buffer.writeByte('\n');
}
- void writeIndexMetadata(String index, String typeName, @Nullable String id) {
- body.writeUtf8("{\"index\":{\"_index\":\"").writeUtf8(index).writeByte('"');
- // the _type parameter is needed for Elasticsearch < 6.x
- if (shouldAddType) body.writeUtf8(",\"_type\":\"").writeUtf8(typeName).writeByte('"');
- if (id != null) {
- body.writeUtf8(",\"_id\":\"").writeUtf8(jsonEscape(id).toString()).writeByte('"');
+ void writeIndexMetadata(Buffer indexBuffer, String index, String typeName, String id) {
+ JsonWriter jsonWriter = JsonWriter.of(indexBuffer);
+ try {
+ jsonWriter.beginObject();
+ jsonWriter.name("index");
+ jsonWriter.beginObject();
+ jsonWriter.name("_index").value(index);
+ // the _type parameter is needed for Elasticsearch < 6.x
+ if (shouldAddType) jsonWriter.name("_type").value(typeName);
+ jsonWriter.name("_id").value(id);
+ jsonWriter.endObject();
+ jsonWriter.endObject();
+ } catch (IOException e) {
+ throw new AssertionError(e); // No I/O writing to a Buffer.
}
- body.writeUtf8("}}\n");
- }
-
- void writeDocument(byte[] document) {
- body.write(document);
- body.writeByte('\n');
}
/** Creates a bulk request when there is more than one object to store */
- public HttpCall newCall() {
+ public HttpCall build() {
HttpUrl.Builder urlBuilder = http.baseUrl.newBuilder("_bulk");
if (pipeline != null) urlBuilder.addQueryParameter("pipeline", pipeline);
if (waitForRefresh) urlBuilder.addQueryParameter("refresh", "wait_for");
- Request request = new Request.Builder()
- .url(urlBuilder.build())
- .tag(tag)
- .post(RequestBody.create(APPLICATION_JSON, body.readByteString()))
- .build();
+ RequestBody body = new BufferRequestBody(buffer);
+ Request request = new Request.Builder().url(urlBuilder.build()).tag(tag).post(body).build();
return http.newCall(request, CheckForErrors.INSTANCE);
}
+
+ /** This avoids allocating a large byte array (by using a poolable buffer instead). */
+ static final class BufferRequestBody extends RequestBody {
+ final long contentLength;
+ final Buffer buffer;
+
+ BufferRequestBody(Buffer buffer) {
+ this.contentLength = buffer.size();
+ this.buffer = buffer;
+ }
+
+ @Override public MediaType contentType() {
+ return APPLICATION_JSON;
+ }
+
+ @Override public long contentLength() {
+ return contentLength;
+ }
+
+ @Override public boolean isOneShot() {
+ return true;
+ }
+
+ @Override public void writeTo(BufferedSink sink) throws IOException {
+ sink.write(buffer, contentLength);
+ }
+ }
}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java
new file mode 100644
index 00000000000..e9fd4095a92
--- /dev/null
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkIndexWriter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.elasticsearch.internal;
+
+import com.squareup.moshi.JsonWriter;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import okio.Buffer;
+import okio.BufferedSink;
+import okio.HashingSink;
+import okio.Okio;
+import zipkin2.Annotation;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+import static zipkin2.elasticsearch.internal.BulkCallBuilder.INDEX_CHARS_LIMIT;
+
+public abstract class BulkIndexWriter {
+
+ /**
+ * Write a complete json document according to index strategy and returns the ID field.
+ */
+ public abstract String writeDocument(T input, BufferedSink writer);
+
+ public static final BulkIndexWriter SPAN = new BulkIndexWriter() {
+ @Override public String writeDocument(Span input, BufferedSink sink) {
+ return write(input, true, sink);
+ }
+ };
+ public static final BulkIndexWriter
+ SPAN_SEARCH_DISABLED = new BulkIndexWriter() {
+ @Override public String writeDocument(Span input, BufferedSink sink) {
+ return write(input, false, sink);
+ }
+ };
+
+ public static final BulkIndexWriter> AUTOCOMPLETE =
+ new BulkIndexWriter>() {
+ @Override public String writeDocument(Map.Entry input, BufferedSink sink) {
+ writeAutocompleteEntry(input.getKey(), input.getValue(), JsonWriter.of(sink));
+ // Id is used to dedupe server side as necessary. Arbitrarily same format as _q value.
+ return input.getKey() + "=" + input.getValue();
+ }
+ };
+
+ static final Endpoint EMPTY_ENDPOINT = Endpoint.newBuilder().build();
+
+ /**
+ * In order to allow systems like Kibana to search by timestamp, we add a field "timestamp_millis"
+ * when storing. The cheapest way to do this without changing the codec is prefixing it to the
+ * json. For example. {"traceId":"... becomes {"timestamp_millis":12345,"traceId":"...
+ *
+ * Tags are stored as a dictionary. Since some tag names will include inconsistent number of
+ * dots (ex "error" and perhaps "error.message"), we cannot index them naturally with
+ * elasticsearch. Instead, we add an index-only (non-source) field of {@code _q} which includes
+ * valid search queries. For example, the tag {@code error -> 500} results in {@code
+ * "_q":["error", "error=500"]}. This matches the input query syntax, and can be checked manually
+ * with curl.
+ *
+ *
Ex {@code curl -s localhost:9200/zipkin:span-2017-08-11/_search?q=_q:error=500}
+ *
+ * @param searchEnabled encodes timestamp_millis and _q when non-empty
+ */
+ static String write(Span span, boolean searchEnabled, BufferedSink sink) {
+ HashingSink hashingSink = HashingSink.md5(sink);
+ JsonWriter writer = JsonWriter.of(Okio.buffer(hashingSink));
+ try {
+ writer.beginObject();
+ if (searchEnabled) addSearchFields(span, writer);
+ writer.name("traceId").value(span.traceId());
+ if (span.parentId() != null) writer.name("parentId").value(span.parentId());
+ writer.name("id").value(span.id());
+ if (span.kind() != null) writer.name("kind").value(span.kind().toString());
+ if (span.name() != null) writer.name("name").value(span.name());
+ if (span.timestampAsLong() != 0L) writer.name("timestamp").value(span.timestampAsLong());
+ if (span.durationAsLong() != 0L) writer.name("duration").value(span.durationAsLong());
+ if (span.localEndpoint() != null && !EMPTY_ENDPOINT.equals(span.localEndpoint())) {
+ writer.name("localEndpoint");
+ write(span.localEndpoint(), writer);
+ }
+ if (span.remoteEndpoint() != null && !EMPTY_ENDPOINT.equals(span.remoteEndpoint())) {
+ writer.name("remoteEndpoint");
+ write(span.remoteEndpoint(), writer);
+ }
+ if (!span.annotations().isEmpty()) {
+ writer.name("annotations");
+ writer.beginArray();
+ for (int i = 0, length = span.annotations().size(); i < length; ) {
+ write(span.annotations().get(i++), writer);
+ }
+ writer.endArray();
+ }
+ if (!span.tags().isEmpty()) {
+ writer.name("tags");
+ writer.beginObject();
+ Iterator> tags = span.tags().entrySet().iterator();
+ while (tags.hasNext()) write(tags.next(), writer);
+ writer.endObject();
+ }
+ if (Boolean.TRUE.equals(span.debug())) writer.name("debug").value(true);
+ if (Boolean.TRUE.equals(span.shared())) writer.name("shared").value(true);
+ writer.endObject();
+ writer.flush();
+ hashingSink.flush();
+ } catch (IOException e) {
+ throw new AssertionError(e); // No I/O writing to a Buffer.
+ }
+ return new Buffer()
+ .writeUtf8(span.traceId()).writeByte('-').writeUtf8(hashingSink.hash().hex())
+ .readUtf8();
+ }
+
+ static void writeAutocompleteEntry(String key, String value, JsonWriter writer) {
+ try {
+ writer.beginObject();
+ writer.name("tagKey").value(key);
+ writer.name("tagValue").value(value);
+ writer.endObject();
+ } catch (IOException e) {
+ throw new AssertionError(e); // No I/O writing to a Buffer.
+ }
+ }
+
+ static void write(Map.Entry tag, JsonWriter writer) throws IOException {
+ writer.name(tag.getKey()).value(tag.getValue());
+ }
+
+ static void write(Annotation annotation, JsonWriter writer) throws IOException {
+ writer.beginObject();
+ writer.name("timestamp").value(annotation.timestamp());
+ writer.name("value").value(annotation.value());
+ writer.endObject();
+ }
+
+ static void write(Endpoint endpoint, JsonWriter writer) throws IOException {
+ writer.beginObject();
+ if (endpoint.serviceName() != null) writer.name("serviceName").value(endpoint.serviceName());
+ if (endpoint.ipv4() != null) writer.name("ipv4").value(endpoint.ipv4());
+ if (endpoint.ipv6() != null) writer.name("ipv6").value(endpoint.ipv6());
+ if (endpoint.portAsInt() != 0) writer.name("port").value(endpoint.portAsInt());
+ writer.endObject();
+ }
+
+ static void addSearchFields(Span span, JsonWriter writer) throws IOException {
+ long timestampMillis = span.timestampAsLong() / 1000L;
+ if (timestampMillis != 0L) writer.name("timestamp_millis").value(timestampMillis);
+ if (!span.tags().isEmpty() || !span.annotations().isEmpty()) {
+ writer.name("_q");
+ writer.beginArray();
+ for (Annotation a : span.annotations()) {
+ if (a.value().length() > INDEX_CHARS_LIMIT) continue;
+ writer.value(a.value());
+ }
+ for (Map.Entry tag : span.tags().entrySet()) {
+ int length = tag.getKey().length() + tag.getValue().length() + 1;
+ if (length > INDEX_CHARS_LIMIT) continue;
+ writer.value(tag.getKey()); // search is possible by key alone
+ writer.value(tag.getKey() + "=" + tag.getValue());
+ }
+ writer.endArray();
+ }
+ }
+}
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
index f74fae896fe..22284ca89b0 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java
@@ -31,7 +31,6 @@
import zipkin2.Span;
import zipkin2.Span.Kind;
import zipkin2.TestObjects;
-import zipkin2.codec.SpanBytesDecoder;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.internal.Nullable;
import zipkin2.storage.SpanConsumer;
@@ -40,7 +39,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static zipkin2.TestObjects.TODAY;
-import static zipkin2.elasticsearch.ElasticsearchSpanConsumer.prefixWithTimestampMillisAndQuery;
public class ElasticsearchSpanConsumerTest {
static final Endpoint WEB_ENDPOINT = Endpoint.newBuilder().serviceName("web").build();
@@ -87,100 +85,7 @@ public void addsTimestamp_millisIntoJson() throws Exception {
accept(span);
assertThat(es.takeRequest().getBody().readUtf8())
- .contains("\n{\"timestamp_millis\":" + Long.toString(TODAY) + ",\"traceId\":");
- }
-
- @Test
- public void prefixWithTimestampMillisAndQuery_skipsWhenNoData() throws Exception {
- Span span =
- Span.newBuilder()
- .traceId("20")
- .id("22")
- .name("")
- .parentId("21")
- .timestamp(0L)
- .localEndpoint(WEB_ENDPOINT)
- .kind(Kind.CLIENT)
- .build();
-
- byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
- assertThat(new String(result, "UTF-8")).startsWith("{\"traceId\":\"");
- }
-
- @Test
- public void prefixWithTimestampMillisAndQuery_addsTimestampMillis() throws Exception {
- Span span =
- Span.newBuilder()
- .traceId("20")
- .id("22")
- .name("")
- .parentId("21")
- .timestamp(1L)
- .localEndpoint(WEB_ENDPOINT)
- .kind(Kind.CLIENT)
- .build();
-
- byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
- assertThat(new String(result, "UTF-8")).startsWith("{\"timestamp_millis\":1,\"traceId\":");
- }
-
- @Test
- public void prefixWithTimestampMillisAndQuery_addsAnnotationQuery() throws Exception {
- Span span =
- Span.newBuilder()
- .traceId("20")
- .id("22")
- .name("")
- .parentId("21")
- .localEndpoint(WEB_ENDPOINT)
- .addAnnotation(1L, "\"foo")
- .build();
-
- byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
- assertThat(new String(result, "UTF-8")).startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
- }
-
- @Test
- public void prefixWithTimestampMillisAndQuery_addsAnnotationQueryTags() throws Exception {
- Span span =
- Span.newBuilder()
- .traceId("20")
- .id("22")
- .name("")
- .parentId("21")
- .localEndpoint(WEB_ENDPOINT)
- .putTag("\"foo", "\"bar")
- .build();
-
- byte[] result = prefixWithTimestampMillisAndQuery(span, span.timestampAsLong());
-
- assertThat(new String(result, "UTF-8"))
- .startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
- }
-
- @Test
- public void prefixWithTimestampMillisAndQuery_readable() {
- Span span =
- Span.newBuilder().traceId("20").id("20").name("get").timestamp(TODAY * 1000).build();
-
- assertThat(
- SpanBytesDecoder.JSON_V2.decodeOne(
- prefixWithTimestampMillisAndQuery(span, span.timestamp())))
- .isEqualTo(span); // ignores timestamp_millis field
- }
-
- @Test
- public void doesntWriteDocumentId() throws Exception {
- es.enqueue(new MockResponse());
-
- accept(Span.newBuilder().traceId("1").id("1").name("foo").build());
-
- RecordedRequest request = es.takeRequest();
- assertThat(request.getBody().readByteString().utf8())
- .doesNotContain("\"_type\":\"span\",\"_id\"");
+ .contains("\n{\"timestamp_millis\":" + TODAY + ",\"traceId\":");
}
@Test
@@ -310,7 +215,7 @@ public void choosesTypeSpecificIndex() throws Exception {
// index timestamp is the server timestamp, not current time!
assertThat(es.takeRequest().getBody().readByteString().utf8())
- .contains("{\"index\":{\"_index\":\"zipkin:span-1971-01-01\",\"_type\":\"span\"}}");
+ .startsWith("{\"index\":{\"_index\":\"zipkin:span-1971-01-01\",\"_type\":\"span\"");
}
/** Much simpler template which doesn't write the timestamp_millis field */
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
index c38b8ebe454..b221fa07fbf 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
@@ -16,12 +16,14 @@
*/
package zipkin2.elasticsearch;
+import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
+import okio.BufferedSink;
import zipkin2.DependencyLink;
-import zipkin2.codec.DependencyLinkBytesEncoder;
-import zipkin2.elasticsearch.internal.HttpBulkIndexer;
+import zipkin2.elasticsearch.internal.BulkIndexWriter;
+import zipkin2.elasticsearch.internal.BulkCallBuilder;
/** Package accessor for integration tests */
public class InternalForTests {
@@ -29,16 +31,32 @@ public static void writeDependencyLinks(ElasticsearchStorage es, List DEPENDENCY_LINK_BULK_INDEX_SUPPORT =
+ new BulkIndexWriter() {
+ @Override public String writeDocument(DependencyLink link, BufferedSink sink) {
+ JsonWriter writer = JsonWriter.of(sink);
+ try {
+ writer.beginObject();
+ writer.name("parent").value(link.parent());
+ writer.name("child").value(link.child());
+ writer.name("callCount").value(link.callCount());
+ if (link.errorCount() > 0) writer.name("errorCount").value(link.errorCount());
+ writer.endObject();
+ } catch (IOException e) {
+ throw new AssertionError(e); // No I/O writing to a Buffer.
+ }
+ return link.parent() + "|" + link.child();
+ }
+ };
}
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java
index 03731d5018b..a50ec5c8dae 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV6.java
@@ -55,9 +55,6 @@ public static class ITSpanStore extends zipkin2.storage.ITSpanStore {
return storage;
}
- @Override @Test @Ignore("No consumer-side span deduplication") public void deduplicates() {
- }
-
@Before @Override public void clear() throws IOException {
storage.clear();
}
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java
index c6a436760c2..fe0cd9ca587 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ITElasticsearchStorageV7.java
@@ -20,9 +20,7 @@
import java.util.List;
import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
@@ -55,9 +53,6 @@ public static class ITSpanStore extends zipkin2.storage.ITSpanStore {
return storage;
}
- @Override @Test @Ignore("No consumer-side span deduplication") public void deduplicates() {
- }
-
@Before @Override public void clear() throws IOException {
storage.clear();
}
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkCallBuilderTest.java
similarity index 95%
rename from zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java
rename to zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkCallBuilderTest.java
index 0f8589dc970..36e3ecd10bc 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/HttpBulkIndexerTest.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkCallBuilderTest.java
@@ -23,9 +23,9 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import zipkin2.elasticsearch.internal.HttpBulkIndexer.CheckForErrors;
+import zipkin2.elasticsearch.internal.BulkCallBuilder.CheckForErrors;
-public class HttpBulkIndexerTest {
+public class BulkCallBuilderTest {
@Rule public ExpectedException expectedException = ExpectedException.none();
@Test
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexWriterTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexWriterTest.java
new file mode 100644
index 00000000000..7c7857db8e3
--- /dev/null
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/BulkIndexWriterTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.elasticsearch.internal;
+
+import okio.Buffer;
+import org.junit.Test;
+import zipkin2.Span;
+import zipkin2.Span.Kind;
+import zipkin2.codec.SpanBytesDecoder;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static zipkin2.TestObjects.CLIENT_SPAN;
+import static zipkin2.TestObjects.FRONTEND;
+import static zipkin2.TestObjects.TODAY;
+
+public class BulkIndexWriterTest {
+ Buffer buffer = new Buffer();
+
+ @Test public void span_addsDocumentId() {
+ String id = BulkIndexWriter.SPAN.writeDocument(CLIENT_SPAN, buffer);
+ assertThat(id)
+ .isEqualTo(CLIENT_SPAN.traceId() + "-" + buffer.readByteString().md5().hex());
+ }
+
+ @Test public void spanSearchDisabled_addsDocumentId() {
+ String id = BulkIndexWriter.SPAN_SEARCH_DISABLED.writeDocument(CLIENT_SPAN, buffer);
+ assertThat(id)
+ .isEqualTo(CLIENT_SPAN.traceId() + "-" + buffer.readByteString().md5().hex());
+ }
+
+ @Test public void spanSearchFields_skipsWhenNoData() {
+ Span span = Span.newBuilder()
+ .traceId("20")
+ .id("22")
+ .parentId("21")
+ .timestamp(0L)
+ .localEndpoint(FRONTEND)
+ .kind(Kind.CLIENT)
+ .build();
+
+ BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+ assertThat(buffer.readUtf8()).startsWith("{\"traceId\":\"");
+ }
+
+ @Test public void spanSearchFields_addsTimestampFieldWhenNoTags() {
+ Span span =
+ Span.newBuilder()
+ .traceId("20")
+ .id("22")
+ .name("")
+ .parentId("21")
+ .timestamp(1000L)
+ .localEndpoint(FRONTEND)
+ .kind(Kind.CLIENT)
+ .build();
+
+ BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+ assertThat(buffer.readUtf8()).startsWith("{\"timestamp_millis\":1,\"traceId\":");
+ }
+
+ @Test public void spanSearchFields_addsQueryFieldForAnnotations() {
+ Span span = Span.newBuilder()
+ .traceId("20")
+ .id("22")
+ .name("")
+ .parentId("21")
+ .localEndpoint(FRONTEND)
+ .addAnnotation(1L, "\"foo")
+ .build();
+
+ BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+ assertThat(buffer.readUtf8()).startsWith("{\"_q\":[\"\\\"foo\"],\"traceId");
+ }
+
+ @Test public void spanSearchFields_addsQueryFieldForTags() {
+ Span span = Span.newBuilder()
+ .traceId("20")
+ .id("22")
+ .parentId("21")
+ .localEndpoint(FRONTEND)
+ .putTag("\"foo", "\"bar")
+ .build();
+
+ BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+ assertThat(buffer.readUtf8()).startsWith("{\"_q\":[\"\\\"foo\",\"\\\"foo=\\\"bar\"],\"traceId");
+ }
+
+ @Test public void spanSearchFields_readableByNormalJsonCodec() {
+ Span span =
+ Span.newBuilder().traceId("20").id("20").name("get").timestamp(TODAY * 1000).build();
+
+ BulkIndexWriter.SPAN.writeDocument(span, buffer);
+
+ assertThat(SpanBytesDecoder.JSON_V2.decodeOne(buffer.readByteArray()))
+ .isEqualTo(span); // ignores timestamp_millis field
+ }
+
+ @Test public void spanSearchDisabled_doesntAddQueryFields() {
+ BulkIndexWriter.SPAN_SEARCH_DISABLED.writeDocument(CLIENT_SPAN, buffer);
+
+ assertThat(buffer.readUtf8()).startsWith("{\"traceId\":\"");
+ }
+}