Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Adding storage-throttle module/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled.
Making sure RejectedExecutionExceptions are "first class" citizens since they are used to reduce the throttle.
Removing HttpCall's Semaphore in favor of the throttle (same purpose, different implementations).
Inspired by work done on openzipkin#2169.
  • Loading branch information
Logic-32 committed Apr 17, 2019
1 parent a11a2ae commit ad8e922
Show file tree
Hide file tree
Showing 23 changed files with 1,063 additions and 92 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
<unpack-proto.directory>${project.build.directory}/test/proto</unpack-proto.directory>
<!-- This is from armeria, but be careful to avoid >= v20 apis -->
<guava.version>27.0.1-jre</guava.version>
<netflix.concurrency.limits.version>0.2.0</netflix.concurrency.limits.version>

<brave.version>5.6.3</brave.version>
<cassandra-driver-core.version>3.7.1</cassandra-driver-core.version>
Expand Down Expand Up @@ -171,6 +172,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin-storage-throttle</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin-storage-elasticsearch</artifactId>
Expand Down
1 change: 1 addition & 0 deletions zipkin-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<module>collector-rabbitmq</module>
<module>collector-kafka08</module>
<module>collector-scribe</module>
<module>storage-throttle</module>
<module>storage-cassandra3</module>
<module>storage-elasticsearch</module>
<module>storage-mysql</module>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,6 +20,7 @@
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import zipkin2.elasticsearch.ElasticsearchStorage;

Expand All @@ -37,8 +38,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
private String index = "zipkin";
/** The date separator used to create the index name. Default to -. */
private String dateSeparator = "-";
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
private int maxRequests = 64;
/** Overrdies maximum in-flight requests to match throttling settings if throttling is enabled. */
private Integer throttleMaxConcurrency;
/** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
private int indexShards = 5;
/** Number of replicas (redundancy factor) per index. Defaults to 1.` */
Expand All @@ -58,6 +61,13 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
*/
private int timeout = 10_000;

public ZipkinElasticsearchStorageProperties(@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
}

public String getPipeline() {
return pipeline;
}
Expand Down Expand Up @@ -177,7 +187,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
.index(index)
.dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
.pipeline(pipeline)
.maxRequests(maxRequests)
.maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency)
.indexShards(indexShards)
.indexReplicas(indexReplicas);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -34,7 +34,7 @@ public class BasicAuthInterceptorTest {
@Before
public void beforeEach() {
BasicAuthInterceptor interceptor =
new BasicAuthInterceptor(new ZipkinElasticsearchStorageProperties());
new BasicAuthInterceptor(new ZipkinElasticsearchStorageProperties(false, 0));
client = new OkHttpClient.Builder().addNetworkInterceptor(interceptor).build();
mockWebServer = new MockWebServer();
}
Expand Down
38 changes: 38 additions & 0 deletions zipkin-autoconfigure/storage-throttle/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2015-2019 The OpenZipkin Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure</artifactId>
<version>2.12.10-SNAPSHOT</version>
</parent>

<artifactId>zipkin-autoconfigure-storage-throttle</artifactId>
<name>Auto Configuration: Storage Throttle</name>

<properties>
<main.basedir>${project.basedir}/../..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin-storage-throttle</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.autoconfigure.storage.throttle;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("zipkin.storage.throttle")
public class ZipkinStorageThrottleProperties {
/** Should we throttle at all? */
private boolean enabled;
/** Minimum number of storage requests to allow through at a given time. */
private int minConcurrency;
/** Maximum number of storage requests to allow through at a given time. */
private int maxConcurrency;
/**
* Maximum number of storage requests to buffer while waiting for open Thread.
* 0 = no buffering. */
private int maxQueueSize;

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public int getMinConcurrency() {
return minConcurrency;
}

public void setMinConcurrency(int minConcurrency) {
this.minConcurrency = minConcurrency;
}

public int getMaxConcurrency() {
return maxConcurrency;
}

public void setMaxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
}

public int getMaxQueueSize() {
return maxQueueSize;
}

public void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -26,6 +26,7 @@

import static java.lang.String.format;
import static java.util.logging.Level.FINE;
import java.util.concurrent.RejectedExecutionException;

/**
* This component takes action on spans received from a transport. This includes deserializing,
Expand Down Expand Up @@ -208,8 +209,13 @@ RuntimeException errorReading(String message, Throwable e) {
RuntimeException errorStoringSpans(List<Span> spans, Throwable e) {
metrics.incrementSpansDropped(spans.size());
// The exception could be related to a span being huge. Instead of filling logs,
// print trace id, span id pairs
StringBuilder msg = appendSpanIds(spans, new StringBuilder("Cannot store spans "));
// print trace id/span id pairs, and a service name
StringBuilder messagePrefix = new StringBuilder("Cannot store spans ");
if (spans.size() > 0) {
messagePrefix.append("from ").append(spans.get(0).localServiceName()).append(' ');
}

StringBuilder msg = appendSpanIds(spans, messagePrefix);
return doError(msg.toString(), e);
}

Expand All @@ -224,7 +230,13 @@ RuntimeException doError(String message, Throwable e) {
message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), error);
warn(message, e);
}
return new RuntimeException(message, e);

if (e instanceof RejectedExecutionException) {
// This can indicate to a higher layer that we need to slow down ingestion
return (RejectedExecutionException) e;
} else {
return new RuntimeException(message, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 The OpenZipkin Authors
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -23,6 +23,7 @@

import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void acceptSpansCallback_onErrorWithNullMessage() {
RuntimeException exception = new RuntimeException();
callback.onError(exception);

verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception);
verify(collector).warn(contains("Cannot store spans"), eq(exception));
}

@Test
Expand All @@ -108,24 +109,25 @@ public void acceptSpansCallback_onErrorWithMessage() {
callback.onError(exception);

verify(collector)
.warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception);
.warn(contains("due to IllegalArgumentException(no beer)"), eq(exception));
}

@Test
public void errorAcceptingSpans_onErrorWithNullMessage() {
String message =
collector.errorStoringSpans(asList(CLIENT_SPAN), new RuntimeException()).getMessage();

assertThat(message).isEqualTo("Cannot store spans [1] due to RuntimeException()");
assertThat(message).contains("Cannot store spans");
assertThat(message).contains("due to RuntimeException()");
}

@Test
public void errorAcceptingSpans_onErrorWithMessage() {
RuntimeException exception = new IllegalArgumentException("no beer");
String message = collector.errorStoringSpans(asList(CLIENT_SPAN), exception).getMessage();

assertThat(message)
.isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)");
assertThat(message).contains("Cannot store spans");
assertThat(message).contains("due to IllegalArgumentException(no beer)");
}

@Test
Expand Down
8 changes: 8 additions & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@
<optional>true</optional>
</dependency>

<!-- Storage throttle -->
<dependency>
<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-storage-throttle</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>

<!-- Cassandra backend -->
<dependency>
<groupId>io.zipkin.java</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.server.internal;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.type.AnnotatedTypeMetadata;

@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface ConditionalOnThrottledStorage {
class ThrottledStorageCondition extends SpringBootCondition {
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
String throttleEnabled = context.getEnvironment()
.getProperty("zipkin.storage.throttle.enabled");

if (!Boolean.valueOf(throttleEnabled)) {
return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true");
}

return ConditionOutcome.match();
}
}
}
Loading

0 comments on commit ad8e922

Please sign in to comment.