diff --git a/pom.xml b/pom.xml index fa57ab00d2e..3b6b9275905 100755 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ ${project.build.directory}/test/proto 27.0.1-jre + 0.2.0 5.6.3 3.7.1 @@ -171,6 +172,12 @@ ${project.version} + + io.zipkin.zipkin2 + zipkin-storage-throttle + ${project.version} + + io.zipkin.zipkin2 zipkin-storage-elasticsearch diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml index f35e27a5a4f..c11fa5eeba7 100644 --- a/zipkin-autoconfigure/pom.xml +++ b/zipkin-autoconfigure/pom.xml @@ -41,6 +41,7 @@ collector-rabbitmq collector-kafka08 collector-scribe + storage-throttle storage-cassandra3 storage-elasticsearch storage-mysql diff --git a/zipkin-autoconfigure/storage-elasticsearch/src/main/java/zipkin2/autoconfigure/storage/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-autoconfigure/storage-elasticsearch/src/main/java/zipkin2/autoconfigure/storage/elasticsearch/ZipkinElasticsearchStorageProperties.java index e27032e7d3d..5747bdb3272 100644 --- a/zipkin-autoconfigure/storage-elasticsearch/src/main/java/zipkin2/autoconfigure/storage/elasticsearch/ZipkinElasticsearchStorageProperties.java +++ b/zipkin-autoconfigure/storage-elasticsearch/src/main/java/zipkin2/autoconfigure/storage/elasticsearch/ZipkinElasticsearchStorageProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 The OpenZipkin Authors + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -20,6 +20,7 @@ import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.logging.HttpLoggingInterceptor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import zipkin2.elasticsearch.ElasticsearchStorage; @@ -37,8 +38,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar private String index = "zipkin"; /** The date separator used to create the index name. Default to -. */ private String dateSeparator = "-"; - /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */ + /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */ private int maxRequests = 64; + /** Overrdies maximum in-flight requests to match throttling settings if throttling is enabled. */ + private Integer throttleMaxConcurrency; /** Number of shards (horizontal scaling factor) per index. Defaults to 5. */ private int indexShards = 5; /** Number of replicas (redundancy factor) per index. Defaults to 1.` */ @@ -58,6 +61,13 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar */ private int timeout = 10_000; + public ZipkinElasticsearchStorageProperties(@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled, + @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) { + if (throttleEnabled) { + this.throttleMaxConcurrency = throttleMaxConcurrency; + } + } + public String getPipeline() { return pipeline; } @@ -177,7 +187,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) { .index(index) .dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0)) .pipeline(pipeline) - .maxRequests(maxRequests) + .maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency) .indexShards(indexShards) .indexReplicas(indexReplicas); } diff --git a/zipkin-autoconfigure/storage-elasticsearch/src/test/java/zipkin2/autoconfigure/storage/elasticsearch/BasicAuthInterceptorTest.java b/zipkin-autoconfigure/storage-elasticsearch/src/test/java/zipkin2/autoconfigure/storage/elasticsearch/BasicAuthInterceptorTest.java index b8cf6022743..46edfdfd2d7 100644 --- a/zipkin-autoconfigure/storage-elasticsearch/src/test/java/zipkin2/autoconfigure/storage/elasticsearch/BasicAuthInterceptorTest.java +++ b/zipkin-autoconfigure/storage-elasticsearch/src/test/java/zipkin2/autoconfigure/storage/elasticsearch/BasicAuthInterceptorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 The OpenZipkin Authors + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -34,7 +34,7 @@ public class BasicAuthInterceptorTest { @Before public void beforeEach() { BasicAuthInterceptor interceptor = - new BasicAuthInterceptor(new ZipkinElasticsearchStorageProperties()); + new BasicAuthInterceptor(new ZipkinElasticsearchStorageProperties(false, 0)); client = new OkHttpClient.Builder().addNetworkInterceptor(interceptor).build(); mockWebServer = new MockWebServer(); } diff --git a/zipkin-autoconfigure/storage-throttle/pom.xml b/zipkin-autoconfigure/storage-throttle/pom.xml new file mode 100644 index 00000000000..4c45415f413 --- /dev/null +++ b/zipkin-autoconfigure/storage-throttle/pom.xml @@ -0,0 +1,38 @@ + + + + 4.0.0 + + io.zipkin.java + zipkin-autoconfigure + 2.12.10-SNAPSHOT + + + zipkin-autoconfigure-storage-throttle + Auto Configuration: Storage Throttle + + + ${project.basedir}/../.. + + + + + io.zipkin.zipkin2 + zipkin-storage-throttle + + + diff --git a/zipkin-autoconfigure/storage-throttle/src/main/java/zipkin2/autoconfigure/storage/throttle/ZipkinStorageThrottleProperties.java b/zipkin-autoconfigure/storage-throttle/src/main/java/zipkin2/autoconfigure/storage/throttle/ZipkinStorageThrottleProperties.java new file mode 100644 index 00000000000..04d171b0eb3 --- /dev/null +++ b/zipkin-autoconfigure/storage-throttle/src/main/java/zipkin2/autoconfigure/storage/throttle/ZipkinStorageThrottleProperties.java @@ -0,0 +1,62 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.autoconfigure.storage.throttle; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("zipkin.storage.throttle") +public class ZipkinStorageThrottleProperties { + /** Should we throttle at all? */ + private boolean enabled; + /** Minimum number of storage requests to allow through at a given time. */ + private int minConcurrency; + /** Maximum number of storage requests to allow through at a given time. */ + private int maxConcurrency; + /** + * Maximum number of storage requests to buffer while waiting for open Thread. + * 0 = no buffering. */ + private int maxQueueSize; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getMinConcurrency() { + return minConcurrency; + } + + public void setMinConcurrency(int minConcurrency) { + this.minConcurrency = minConcurrency; + } + + public int getMaxConcurrency() { + return maxConcurrency; + } + + public void setMaxConcurrency(int maxConcurrency) { + this.maxConcurrency = maxConcurrency; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } +} diff --git a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java index 464922debcb..1bb0d708078 100644 --- a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java +++ b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 The OpenZipkin Authors + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -26,6 +26,7 @@ import static java.lang.String.format; import static java.util.logging.Level.FINE; +import java.util.concurrent.RejectedExecutionException; /** * This component takes action on spans received from a transport. This includes deserializing, @@ -208,8 +209,13 @@ RuntimeException errorReading(String message, Throwable e) { RuntimeException errorStoringSpans(List spans, Throwable e) { metrics.incrementSpansDropped(spans.size()); // The exception could be related to a span being huge. Instead of filling logs, - // print trace id, span id pairs - StringBuilder msg = appendSpanIds(spans, new StringBuilder("Cannot store spans ")); + // print trace id/span id pairs, and a service name + StringBuilder messagePrefix = new StringBuilder("Cannot store spans "); + if (spans.size() > 0) { + messagePrefix.append("from ").append(spans.get(0).localServiceName()).append(' '); + } + + StringBuilder msg = appendSpanIds(spans, messagePrefix); return doError(msg.toString(), e); } @@ -224,7 +230,13 @@ RuntimeException doError(String message, Throwable e) { message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), error); warn(message, e); } - return new RuntimeException(message, e); + + if (e instanceof RejectedExecutionException) { + // This can indicate to a higher layer that we need to slow down ingestion + return (RejectedExecutionException) e; + } else { + return new RuntimeException(message, e); + } } } diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java index a28105a21bb..8bc993bad4e 100644 --- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java +++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 The OpenZipkin Authors + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -23,6 +23,7 @@ import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -98,7 +99,7 @@ public void acceptSpansCallback_onErrorWithNullMessage() { RuntimeException exception = new RuntimeException(); callback.onError(exception); - verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception); + verify(collector).warn(contains("Cannot store spans"), eq(exception)); } @Test @@ -108,7 +109,7 @@ public void acceptSpansCallback_onErrorWithMessage() { callback.onError(exception); verify(collector) - .warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception); + .warn(contains("due to IllegalArgumentException(no beer)"), eq(exception)); } @Test @@ -116,7 +117,8 @@ public void errorAcceptingSpans_onErrorWithNullMessage() { String message = collector.errorStoringSpans(asList(CLIENT_SPAN), new RuntimeException()).getMessage(); - assertThat(message).isEqualTo("Cannot store spans [1] due to RuntimeException()"); + assertThat(message).contains("Cannot store spans"); + assertThat(message).contains("due to RuntimeException()"); } @Test @@ -124,8 +126,8 @@ public void errorAcceptingSpans_onErrorWithMessage() { RuntimeException exception = new IllegalArgumentException("no beer"); String message = collector.errorStoringSpans(asList(CLIENT_SPAN), exception).getMessage(); - assertThat(message) - .isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)"); + assertThat(message).contains("Cannot store spans"); + assertThat(message).contains("due to IllegalArgumentException(no beer)"); } @Test diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml index c87db2c51e2..a69fd23af2c 100644 --- a/zipkin-server/pom.xml +++ b/zipkin-server/pom.xml @@ -112,6 +112,14 @@ true + + + io.zipkin.java + zipkin-autoconfigure-storage-throttle + ${project.version} + true + + io.zipkin.java diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java new file mode 100644 index 00000000000..afd189022be --- /dev/null +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java @@ -0,0 +1,43 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.server.internal; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.springframework.boot.autoconfigure.condition.ConditionOutcome; +import org.springframework.boot.autoconfigure.condition.SpringBootCondition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.core.type.AnnotatedTypeMetadata; + +@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class) +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface ConditionalOnThrottledStorage { + class ThrottledStorageCondition extends SpringBootCondition { + @Override + public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) { + String throttleEnabled = context.getEnvironment() + .getProperty("zipkin.storage.throttle.enabled"); + + if (!Boolean.valueOf(throttleEnabled)) { + return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true"); + } + + return ConditionOutcome.match(); + } + } +} diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java index c3229cd6860..d4b44b9785e 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java @@ -23,6 +23,9 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.config.MeterFilter; import java.util.List; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -30,6 +33,7 @@ import org.springframework.boot.actuate.health.HealthAggregator; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; @@ -39,11 +43,13 @@ import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.web.servlet.config.annotation.ViewControllerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; +import zipkin2.autoconfigure.storage.throttle.ZipkinStorageThrottleProperties; import zipkin2.collector.CollectorMetrics; import zipkin2.collector.CollectorSampler; import zipkin2.server.internal.brave.TracingStorageComponent; import zipkin2.storage.InMemoryStorage; import zipkin2.storage.StorageComponent; +import zipkin2.storage.throttle.ThrottledStorageComponent; @Configuration @ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class) @@ -151,6 +157,39 @@ public Object postProcessAfterInitialization(Object bean, String beanName) { } } + @Configuration + @EnableConfigurationProperties(ZipkinStorageThrottleProperties.class) + @ConditionalOnThrottledStorage + static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware { + + /** + * Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really + * bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make + * sure no cycles exist at all as a result of turning throttling on. + * + *

Ref: Tracking down cause of Spring's "not eligible for auto-proxying"

+ */ + private BeanFactory beanFactory; + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) { + if (bean instanceof StorageComponent) { + ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class); + return new ThrottledStorageComponent((StorageComponent) bean, + beanFactory.getBean(MeterRegistry.class), + throttleProperties.getMinConcurrency(), + throttleProperties.getMaxConcurrency(), + throttleProperties.getMaxQueueSize()); + } + return bean; + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + } + /** * This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can * supply both read apis, so we add two beans here. diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml index b64e415d550..83692277206 100644 --- a/zipkin-server/src/main/resources/zipkin-server-shared.yml +++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml @@ -50,6 +50,11 @@ zipkin: autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000} autocomplete-cardinality: 20000 type: ${STORAGE_TYPE:mem} + throttle: + enabled: ${THROTTLE_ENABLED:false} + minConcurrency: ${THROTTLE_MIN_CONCURRENCY:10} + maxConcurrency: ${THROTTLE_MAX_CONCURRENCY:200} + maxQueueSize: ${THROTTLE_MAX_QUEUE_SIZE:1000} mem: # Maximum number of spans to keep in memory. When exceeded, oldest traces (and their spans) will be purged. # A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java index c563673d602..e39186f989a 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/HttpBulkIndexer.java @@ -18,6 +18,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; import okhttp3.HttpUrl; import okhttp3.MediaType; import okhttp3.Request; @@ -74,6 +75,7 @@ enum CheckForErrors implements HttpCall.BodyConverter { @Override public Void convert(BufferedSource b) throws IOException { String content = b.readUtf8(); + if (content.contains("\"status\":429")) throw new RejectedExecutionException(content); if (content.contains("\"errors\":true")) throw new IllegalStateException(content); return null; } diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java index 5e59550c11c..5c06e304179 100644 --- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java +++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 The OpenZipkin Authors + * Copyright 2015-2019 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -15,7 +15,6 @@ import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.Semaphore; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -36,12 +35,10 @@ public interface BodyConverter { public static class Factory implements Closeable { final OkHttpClient ok; - final Semaphore semaphore; public final HttpUrl baseUrl; public Factory(OkHttpClient ok, HttpUrl baseUrl) { this.ok = ok; - this.semaphore = new Semaphore(ok.dispatcher().getMaxRequests()); this.baseUrl = baseUrl; } @@ -56,38 +53,24 @@ public HttpCall newCall(Request request, BodyConverter bodyConverter) public final okhttp3.Call call; public final BodyConverter bodyConverter; - final Semaphore semaphore; HttpCall(Factory factory, Request request, BodyConverter bodyConverter) { - this( - factory.ok.newCall(request), - factory.semaphore, - bodyConverter - ); + this.call = factory.ok.newCall(request); + this.bodyConverter = bodyConverter; } - HttpCall(okhttp3.Call call, Semaphore semaphore, BodyConverter bodyConverter) { - this.call = call; - this.semaphore = semaphore; - this.bodyConverter = bodyConverter; + HttpCall(HttpCall other) { + this.call = other.call.clone(); + this.bodyConverter = other.bodyConverter; } @Override public V execute() throws IOException { - if (!semaphore.tryAcquire()) throw new IllegalStateException("over capacity"); - try { - return parseResponse(call.execute(), bodyConverter); - } finally { - semaphore.release(); - } + return parseResponse(call.execute(), bodyConverter); } @Override public void enqueue(Callback delegate) { - if (!semaphore.tryAcquire()) { - delegate.onError(new IllegalStateException("over capacity")); - return; - } - call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, delegate)); + call.enqueue(new V2CallbackAdapter<>(bodyConverter, delegate)); } @Override public void cancel() { @@ -99,28 +82,24 @@ public HttpCall newCall(Request request, BodyConverter bodyConverter) } @Override public HttpCall clone() { - return new HttpCall(call.clone(), semaphore, bodyConverter); + return new HttpCall(this); } static class V2CallbackAdapter implements okhttp3.Callback { - final Semaphore semaphore; final BodyConverter bodyConverter; final Callback delegate; - V2CallbackAdapter(Semaphore semaphore, BodyConverter bodyConverter, Callback delegate) { - this.semaphore = semaphore; + V2CallbackAdapter(BodyConverter bodyConverter, Callback delegate) { this.bodyConverter = bodyConverter; this.delegate = delegate; } @Override public void onFailure(okhttp3.Call call, IOException e) { - semaphore.release(); delegate.onError(e); } /** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */ @Override public void onResponse(okhttp3.Call call, Response response) { - semaphore.release(); try { delegate.onSuccess(parseResponse(response, bodyConverter)); } catch (Throwable e) { diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java index fd8c3a50542..aece732fba6 100644 --- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/ElasticsearchSpanConsumerTest.java @@ -14,7 +14,6 @@ package zipkin2.elasticsearch; import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; @@ -23,14 +22,11 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import zipkin2.Callback; import zipkin2.Endpoint; import zipkin2.Span; import zipkin2.Span.Kind; -import zipkin2.TestObjects; import zipkin2.codec.SpanBytesDecoder; import zipkin2.codec.SpanBytesEncoder; -import zipkin2.internal.Nullable; import zipkin2.storage.SpanConsumer; import static java.util.Arrays.asList; @@ -244,48 +240,6 @@ public void addsPipelineId() throws Exception { assertThat(request.getPath()).isEqualTo("/_bulk?pipeline=zipkin"); } - @Test - public void dropsWhenBacklog() throws Exception { - close(); - - storage = - ElasticsearchStorage.newBuilder() - .hosts(asList(es.url("").toString())) - .maxRequests(1) - .build(); - ensureIndexTemplate(); - - es.enqueue(new MockResponse().setBodyDelay(1, TimeUnit.SECONDS)); - - final LinkedBlockingQueue q = new LinkedBlockingQueue<>(); - Callback callback = - new Callback() { - @Override - public void onSuccess(@Nullable Void value) { - q.add("success"); - } - - @Override - public void onError(Throwable t) { - q.add(t); - } - }; - // one request is delayed - storage.spanConsumer().accept(asList(TestObjects.CLIENT_SPAN)).enqueue(callback); - - // synchronous requests fail on backlog - try { - storage.spanConsumer().accept(asList(TestObjects.CLIENT_SPAN)).execute(); - failBecauseExceptionWasNotThrown(IllegalStateException.class); - } catch (IllegalStateException e) { - } - - // asynchronous requests fail on backlog - storage.spanConsumer().accept(asList(TestObjects.CLIENT_SPAN)).enqueue(callback); - - assertThat(q.take()).isInstanceOf(IllegalStateException.class); - } - @Test public void choosesTypeSpecificIndex() throws Exception { es.enqueue(new MockResponse()); diff --git a/zipkin-storage/pom.xml b/zipkin-storage/pom.xml index bd495777e32..7b4e9b01f17 100644 --- a/zipkin-storage/pom.xml +++ b/zipkin-storage/pom.xml @@ -34,6 +34,7 @@ + throttle cassandra-v1 cassandra mysql-v1 diff --git a/zipkin-storage/throttle/pom.xml b/zipkin-storage/throttle/pom.xml new file mode 100644 index 00000000000..43901de4e1b --- /dev/null +++ b/zipkin-storage/throttle/pom.xml @@ -0,0 +1,59 @@ + + + + 4.0.0 + + io.zipkin.zipkin2 + zipkin-storage-parent + 2.12.10-SNAPSHOT + + + zipkin-storage-throttle + Storage: Throttle + + + ${project.basedir}/../.. + + + -XepDisableWarningsInGeneratedCode + + + + + + ${project.groupId} + zipkin + + + + com.netflix.concurrency-limits + concurrency-limits-core + ${netflix.concurrency.limits.version} + + + io.micrometer + micrometer-core + ${micrometer.version} + + + + org.mockito + mockito-core + test + + + diff --git a/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ActuateThrottleMetrics.java b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ActuateThrottleMetrics.java new file mode 100644 index 00000000000..a69eb7e0213 --- /dev/null +++ b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ActuateThrottleMetrics.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.throttle; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limiter.AbstractLimiter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +class ActuateThrottleMetrics { + private final MeterRegistry registryInstance; + + public ActuateThrottleMetrics(MeterRegistry registryInstance) { + this.registryInstance = registryInstance; + } + + public void bind(ExecutorService executor) { + if(!(executor instanceof ThreadPoolExecutor)){ + return; + } + + ThreadPoolExecutor pool = (ThreadPoolExecutor) executor; + Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize) + .description("number of threads running storage requests") + .register(registryInstance); + Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size) + .description("number of items queued waiting for access to storage") + .register(registryInstance); + } + + public void bind(Limiter limiter) { + if(!(limiter instanceof AbstractLimiter)){ + return; + } + + AbstractLimiter abstractLimiter = (AbstractLimiter) limiter; + + // If this value never goes down there is a leak somewhere + Gauge.builder("zipkin_storage.throttle.in_flight_requests", abstractLimiter::getInflight) + .description("number of requests the limiter thinks are active") + .register(registryInstance); + } +} diff --git a/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledCall.java b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledCall.java new file mode 100644 index 00000000000..8ea4e7f3e8f --- /dev/null +++ b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledCall.java @@ -0,0 +1,184 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.throttle; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.Limiter.Listener; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; +import zipkin2.Call; +import zipkin2.Callback; +import zipkin2.storage.InMemoryStorage; + +/** + * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService serves two + * purposes: + *
    + *
  1. Limits the number of requests that can run in parallel.
  2. + *
  3. Depending on configuration, can queue up requests to make sure we don't aggressively drop requests that would + * otherwise succeed if given a moment. Bounded queues are safest for this as unbounded ones can lead to heap + * exhaustion and {@link OutOfMemoryError OOM errors}.
  4. + *
+ * + * @see ThrottledStorageComponent + */ +class ThrottledCall extends Call { + private final ExecutorService executor; + private final Limiter limiter; + private final Listener limitListener; + /** + * Delegate call needs to be supplied later to avoid having it take action when it is created (like + * {@link InMemoryStorage} and thus avoid being throttled. + */ + private final Supplier> delegate; + private Call call; + private boolean canceled; + + public ThrottledCall(ExecutorService executor, Limiter limiter, Supplier> delegate) { + this.executor = executor; + this.limiter = limiter; + this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new); + this.delegate = delegate; + } + + private ThrottledCall(ThrottledCall other) { + this(other.executor, other.limiter, other.call == null ? other.delegate : () -> other.call.clone()); + } + + @Override + public V execute() throws IOException { + try { + call = delegate.get(); + + Future future = executor.submit(call::execute); // Make sure we throttle + V result = future.get(); // Still block for the response + + limitListener.onSuccess(); + return result; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RejectedExecutionException) { + // Storage rejected us, throttle back + limitListener.onDropped(); + } else { + limitListener.onIgnore(); + } + + throw new ThrottleException(cause); + } catch (RuntimeException e) { + limitListener.onIgnore(); + throw e; // E.g. RejectedExecutionException + } catch (Exception e) { + limitListener.onIgnore(); + throw new ThrottleException(e); + } + } + + @Override + public void enqueue(Callback callback) { + try { + executor.execute(() -> { + if (canceled) { + return; + } + + call = delegate.get(); + ThrottledCallback throttleCallback = new ThrottledCallback<>(callback, limitListener); + call.enqueue(throttleCallback); + + // Need to wait here since the delegate call will run asynchronously also. + // This ensures we don't exceed our throttle limit. + throttleCallback.await(); + }); + } catch (RuntimeException e) { + // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may be write + // bound, but a drop in concurrency won't neccessarily help. + limitListener.onIgnore(); + throw e; // E.g. RejectedExecutionException + } catch (Exception e) { + limitListener.onIgnore(); + throw e; + } + } + + @Override + public void cancel() { + canceled = true; + if (call != null) { + call.cancel(); + } + } + + @Override + public boolean isCanceled() { + return canceled || (call != null && call.isCanceled()); + } + + @Override + public Call clone() { + return new ThrottledCall<>(this); + } + + private static class ThrottledCallback implements Callback { + private final Callback delegate; + private final Listener limitListener; + private final CountDownLatch latch; + + public ThrottledCallback(Callback delegate, Listener limitListener) { + this.delegate = delegate; + this.limitListener = limitListener; + this.latch = new CountDownLatch(1); + } + + public void await() { + try { + latch.await(); + } catch (InterruptedException ex) { + limitListener.onIgnore(); + throw new ThrottleException(ex); + } + } + + @Override + public void onSuccess(V value) { + try { + limitListener.onSuccess(); + delegate.onSuccess(value); + } finally { + latch.countDown(); + } + } + + @Override + public void onError(Throwable t) { + try { + limitListener.onDropped(); + delegate.onError(t); + } finally { + latch.countDown(); + } + } + } + + public static class ThrottleException extends RuntimeException { + public ThrottleException(Throwable cause) { + super(cause); + } + } +} diff --git a/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledSpanConsumer.java b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledSpanConsumer.java new file mode 100644 index 00000000000..35e82b96b2b --- /dev/null +++ b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledSpanConsumer.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.throttle; + +import com.netflix.concurrency.limits.Limiter; +import java.util.List; +import java.util.concurrent.ExecutorService; +import zipkin2.Call; +import zipkin2.Span; +import zipkin2.storage.SpanConsumer; + +/** + * Delegating implementation that wraps another {@link SpanConsumer} and ensures that only so many requests + * can get through to it at a given time. + * + * @see ThrottledCall + */ +class ThrottledSpanConsumer implements SpanConsumer { + private final SpanConsumer delegate; + private final Limiter limiter; + private final ExecutorService executor; + + ThrottledSpanConsumer(SpanConsumer delegate, Limiter limiter, ExecutorService executor) { + this.delegate = delegate; + this.limiter = limiter; + this.executor = executor; + } + + @Override + public Call accept(List spans) { + return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans)); + } +} diff --git a/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledStorageComponent.java b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledStorageComponent.java new file mode 100644 index 00000000000..55c516fe307 --- /dev/null +++ b/zipkin-storage/throttle/src/main/java/zipkin2/storage/throttle/ThrottledStorageComponent.java @@ -0,0 +1,175 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.throttle; + +import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.limit.Gradient2Limit; +import com.netflix.concurrency.limits.limiter.AbstractLimiter; +import io.micrometer.core.instrument.MeterRegistry; +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import zipkin2.storage.SpanConsumer; +import zipkin2.storage.SpanStore; +import zipkin2.storage.StorageComponent; + +/** + * Delegating implementation that limits requests to the {@link #spanConsumer()} of another + * {@link StorageComponent}. The theory here is that this class can be used to: + *
    + *
  • Prevent spamming the storage engine with excessive, spike requests when they come in; thus preserving it's life.
  • + *
  • Optionally act as a buffer so that a fixed number requests can be queued for execution when the throttle allows + * for it. This optional queue must be bounded in order to avoid running out of memory from infinitely queueing.
  • + *
+ * + * @see ThrottledSpanConsumer + */ +public class ThrottledStorageComponent extends StorageComponent { + private final StorageComponent delegate; + private final AbstractLimiter limiter; + private final ThreadPoolExecutor executor; + + public ThrottledStorageComponent(StorageComponent delegate, + MeterRegistry registry, + int minConcurrency, + int maxConcurrency, + int maxQueueSize) { + this.delegate = Objects.requireNonNull(delegate); + + Limit limit = Gradient2Limit.newBuilder() + .minLimit(minConcurrency) + .initialLimit(minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there + .maxConcurrency(maxConcurrency) + .build(); + this.limiter = new Builder().limit(limit).build(); + + this.executor = new ThreadPoolExecutor(limit.getLimit(), + limit.getLimit(), + 0, + TimeUnit.DAYS, + createQueue(maxQueueSize), + new ThottledThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); + limit.notifyOnChange(new PoolSizeConsumer(executor)); + + if (registry != null) { + ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry); + metrics.bind(executor); + metrics.bind(limiter); + } + } + + @Override + public SpanStore spanStore() { + return delegate.spanStore(); + } + + @Override + public SpanConsumer spanConsumer() { + return new ThrottledSpanConsumer(delegate.spanConsumer(), limiter, executor); + } + + @Override + public void close() throws IOException { + executor.shutdownNow(); + delegate.close(); + } + + private static BlockingQueue createQueue(int maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("Invalid max queue size; must be >= 0 but was: " + maxSize); + } + + if (maxSize == 0) { + // 0 means we should be bounded but we can't create a queue with that size so use 1 instead. + maxSize = 1; + } + + return new LinkedBlockingQueue<>(maxSize); + } + + private static class ThottledThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + thread.setName("throttle-pool-" + thread.getId()); + return thread; + } + } + + private static class PoolSizeConsumer implements Consumer { + private final ThreadPoolExecutor executor; + + public PoolSizeConsumer(ThreadPoolExecutor executor) { + this.executor = executor; + } + + /** + * This is {@code synchronized} to ensure that we don't let the core/max pool sizes get out of sync; even + * for an instant. The two need to be tightly coupled together to ensure that when our queue fills up we don't spin + * up extra Threads beyond our calculated limit. + * + *

There is also an unfortunate aspect where the {@code max} has to always be greater than {@code core} or an + * exception will be thrown. So they have to be adjust appropriately relative to the direction the size is going. + */ + @Override + public synchronized void accept(Integer newValue) { + int previousValue = executor.getCorePoolSize(); + + int newValueInt = newValue; + if (previousValue < newValueInt) { + executor.setMaximumPoolSize(newValueInt); + executor.setCorePoolSize(newValueInt); + } else if (previousValue > newValueInt) { + executor.setCorePoolSize(newValueInt); + executor.setMaximumPoolSize(newValueInt); + } + // Note: no case for equals. Why modify something that doesn't need modified? + } + } + + private static class Builder extends AbstractLimiter.Builder { + public NonLimitingLimiter build() { + return new NonLimitingLimiter(this); + } + + @Override + protected Builder self() { + return this; + } + } + + /** + * Unlike a normal Limiter, this will actually not prevent the creation of a {@link Listener} in + * {@link #acquire(java.lang.Void)}. The point of this is to ensure that we can always derive an appropriate + * {@link Limit#getLimit() Limit} while the {@link #executor} handles actually limiting running requests. + */ + private static class NonLimitingLimiter extends AbstractLimiter { + public NonLimitingLimiter(AbstractLimiter.Builder builder) { + super(builder); + } + + @Override + public Optional acquire(Void context) { + return Optional.of(createListener()); + } + } +} diff --git a/zipkin-storage/throttle/src/test/java/zipkin2/storage/throttle/ThrottledCallTest.java b/zipkin-storage/throttle/src/test/java/zipkin2/storage/throttle/ThrottledCallTest.java new file mode 100644 index 00000000000..d25696258d1 --- /dev/null +++ b/zipkin-storage/throttle/src/test/java/zipkin2/storage/throttle/ThrottledCallTest.java @@ -0,0 +1,225 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.throttle; + +import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.limit.SettableLimit; +import com.netflix.concurrency.limits.limiter.SimpleLimiter; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import zipkin2.Call; +import zipkin2.Callback; + +public class ThrottledCallTest { + private SettableLimit limit; + private Limiter limiter; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() { + this.limit = SettableLimit.startingAt(0); + this.limiter = SimpleLimiter.newBuilder().limit(limit).build(); + } + + @Test + public void callCreation_isDeferred() throws IOException { + boolean[] created = new boolean[] {false}; + Supplier> delegate = () -> { + created[0] = true; + return Call.create(null); + }; + + ThrottledCall throttle = createThrottle(delegate); + + assertFalse(created[0]); + throttle.execute(); + assertTrue(created[0]); + } + + @Test + public void execute_isThrottled() throws Throwable { + int numThreads = 1; + int queueSize = 1; + int totalTasks = numThreads + queueSize; + + Semaphore startLock = new Semaphore(numThreads); + Semaphore waitLock = new Semaphore(totalTasks); + Semaphore failLock = new Semaphore(1); + Supplier> delegate = () -> new LockedCall(startLock, waitLock); + ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); + + // Step 1: drain appropriate locks + startLock.drainPermits(); + waitLock.drainPermits(); + failLock.drainPermits(); + + // Step 2: saturate threads and fill queue + ExecutorService backgroundPool = Executors.newCachedThreadPool(); + for (int i = 0; i < totalTasks; i++) { + backgroundPool.submit(throttle::execute); + } + + try { + // Step 3: make sure the threads actually started + startLock.acquire(numThreads); + + // Step 4: submit something beyond our limits + Future future = backgroundPool.submit(() -> { + try { + return throttle.execute(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + // Step 6: signal that we tripped the limit + failLock.release(); + } + }); + + // Step 5: wait to make sure our limit actually tripped + failLock.acquire(); + + // Step 7: Expect great things + expectedException.expect(RejectedExecutionException.class); + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } finally { + waitLock.release(totalTasks); + startLock.release(totalTasks); + backgroundPool.shutdownNow(); + } + } + + @Test + public void enqueue_isThrottled() throws Throwable { + int numThreads = 1; + int queueSize = 1; + int totalTasks = numThreads + queueSize; + + Semaphore startLock = new Semaphore(numThreads); + Semaphore waitLock = new Semaphore(totalTasks); + Supplier> delegate = () -> new LockedCall(startLock, waitLock); + ThrottledCall throttle = createThrottle(numThreads, queueSize, delegate); + + // Step 1: drain appropriate locks + startLock.drainPermits(); + waitLock.drainPermits(); + + // Step 2: saturate threads and fill queue + Callback callback = new NoopCallback(); + for (int i = 0; i < totalTasks; i++) { + throttle.enqueue(callback); + } + + // Step 3: make sure the threads actually started + startLock.acquire(numThreads); + + try { + // Step 4: submit something beyond our limits and make sure it fails + expectedException.expect(RejectedExecutionException.class); + throttle.enqueue(callback); + } catch (Exception e) { + throw e; + } finally { + waitLock.release(totalTasks); + startLock.release(totalTasks); + } + } + + private ThrottledCall createThrottle(Supplier> delegate) { + return createThrottle(1, 1, delegate); + } + + private ThrottledCall createThrottle(int poolSize, int queueSize, Supplier> delegate) { + limit.setLimit(limit.getLimit() + 1); + return new ThrottledCall(createPool(poolSize, queueSize), limiter, delegate); + } + + private static ExecutorService createPool(int poolSize, int queueSize) { + return new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS, new LinkedBlockingQueue<>(queueSize)); + } + + private static class LockedCall extends Call { + private final Semaphore startLock; + private final Semaphore waitLock; + + public LockedCall(Semaphore startLock, Semaphore waitLock) { + this.startLock = startLock; + this.waitLock = waitLock; + } + + @Override + public Void execute() throws IOException { + try { + startLock.release(); + waitLock.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return null; + } + + @Override + public void enqueue(Callback callback) { + try { + startLock.release(); + waitLock.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void cancel() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public boolean isCanceled() { + return false; + } + + @Override + public Call clone() { + throw new UnsupportedOperationException("Not supported yet."); + } + } + + private static class NoopCallback implements Callback { + @Override + public void onSuccess(Void value) { + } + + @Override + public void onError(Throwable t) { + } + } +} diff --git a/zipkin-storage/throttle/src/test/java/zipkin2/storage/throttle/ThrottledStorageComponentTest.java b/zipkin-storage/throttle/src/test/java/zipkin2/storage/throttle/ThrottledStorageComponentTest.java new file mode 100644 index 00000000000..45ffe782da4 --- /dev/null +++ b/zipkin-storage/throttle/src/test/java/zipkin2/storage/throttle/ThrottledStorageComponentTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2015-2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.storage.throttle; + +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import static org.junit.Assert.*; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import static org.mockito.Mockito.*; +import zipkin2.storage.StorageComponent; + +public class ThrottledStorageComponentTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void spanConsumer_isProxied() { + StorageComponent delegate = mock(StorageComponent.class); + CompositeMeterRegistry registry = new CompositeMeterRegistry(); + + ThrottledStorageComponent throttle = new ThrottledStorageComponent(delegate, registry, 1, 2, 1); + + assertSame(ThrottledSpanConsumer.class, throttle.spanConsumer().getClass()); + } + + @Test + public void createComponent_withoutMeter() { + StorageComponent delegate = mock(StorageComponent.class); + + new ThrottledStorageComponent(delegate, null, 1, 2, 1); + // no exception == pass + } + + @Test + public void createComponent_withZeroSizedQueue() { + StorageComponent delegate = mock(StorageComponent.class); + CompositeMeterRegistry registry = new CompositeMeterRegistry(); + + int queueSize = 0; + new ThrottledStorageComponent(delegate, registry, 1, 2, queueSize); + // no exception == pass + } + + @Test + public void createComponent_withNegativeQueue() { + StorageComponent delegate = mock(StorageComponent.class); + CompositeMeterRegistry registry = new CompositeMeterRegistry(); + + expectedException.expect(IllegalArgumentException.class); + int queueSize = -1; + new ThrottledStorageComponent(delegate, registry, 1, 2, queueSize); + } +}