Skip to content

Commit

Permalink
Adding storage-throttle module to address "over capacity" issues (ope…
Browse files Browse the repository at this point in the history
…nzipkin#2502)

Adding ThrottledStorageComponent/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.

Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is 
enabled.

Inspired by work done on openzipkin#2169.
  • Loading branch information
Logic-32 authored and abesto committed Sep 10, 2019
1 parent 7b49a1e commit 311fe2e
Show file tree
Hide file tree
Showing 16 changed files with 1,049 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import zipkin2.storage.StorageComponent;

import static java.util.Arrays.asList;
import java.util.concurrent.RejectedExecutionException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -186,6 +187,16 @@ public void storeSpansCallback_onErrorWithMessage() {
}

@Test
public void errorAcceptingSpans_onErrorRejectedExecution() {
RuntimeException error = new RejectedExecutionException("slow down");
collector.handleStorageError(TRACE, error, callback);

verify(callback).onError(error);
assertThat(messages)
.containsOnly("Cannot store spans [1, 1, 2, ...] due to RejectedExecutionException(slow down)");
verify(metrics).incrementSpansDropped(4);
}

public void handleStorageError_onErrorWithNullMessage() {
RuntimeException error = new RuntimeException();
collector.handleStorageError(TRACE, error, callback);
Expand Down
10 changes: 10 additions & 0 deletions zipkin-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ Defaults to true
* `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint
* `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr)

### Throttled Storage (Experimental)
These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch):

* `STORAGE_THROTTLE_ENABLED`: Enables throttling
* `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage.
* `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage. In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`.
* `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering).

As this feature is experimental, it is not recommended to run this in production environments.

### Cassandra Storage
Zipkin's [Cassandra storage component](../zipkin-storage/cassandra)
supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`:
Expand Down
17 changes: 16 additions & 1 deletion zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,17 @@
<version>${micrometer.version}</version>
</dependency>

<dependency>
<groupId>com.netflix.concurrency-limits</groupId>
<artifactId>concurrency-limits-core</artifactId>
<version>0.2.2</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>

<!-- Trace api controller activity with Brave -->
<dependency>
<groupId>io.zipkin.brave</groupId>
Expand Down Expand Up @@ -299,6 +310,11 @@
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down Expand Up @@ -372,7 +388,6 @@
<version>${kotlin.version}</version>
<configuration>
<jvmTarget>${main.java.version}</jvmTarget>
<experimentalCoroutines>enable</experimentalCoroutines>
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zipkin2.server.internal;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.core.type.AnnotatedTypeMetadata;

@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
@interface ConditionalOnThrottledStorage {
class ThrottledStorageCondition extends SpringBootCondition {
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
String throttleEnabled = context.getEnvironment()
.getProperty("zipkin.storage.throttle.enabled");

if (!Boolean.valueOf(throttleEnabled)) {
return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true");
}

return ConditionOutcome.match();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,34 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;
import java.util.List;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCustomizer;
import org.springframework.boot.actuate.health.HealthAggregator;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.server.internal.brave.TracingStorageComponent;
import zipkin2.storage.InMemoryStorage;
import zipkin2.storage.StorageComponent;
import zipkin2.server.internal.throttle.ThrottledStorageComponent;

@Configuration
@ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class)
Expand Down Expand Up @@ -157,10 +164,47 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
}
}

@Configuration
@EnableConfigurationProperties(ZipkinStorageThrottleProperties.class)
@ConditionalOnThrottledStorage
static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware {

/**
* Need this to resolve cyclic instantiation issue with spring. Mostly, this is for MeterRegistry as really
* bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make
* sure no cycles exist at all as a result of turning throttling on.
*
* <p>Ref: <a href="https://stackoverflow.com/a/19688634">Tracking down cause of Spring's "not eligible for auto-proxying"</a></p>
*/
private BeanFactory beanFactory;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof StorageComponent) {
ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class);
return new ThrottledStorageComponent((StorageComponent) bean,
beanFactory.getBean(MeterRegistry.class),
throttleProperties.getMinConcurrency(),
throttleProperties.getMaxConcurrency(),
throttleProperties.getMaxQueueSize());
}
return bean;
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
}

/**
* This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can
* supply both read apis, so we add two beans here.
*
* <p>Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with
* {@link ThrottledStorageComponentEnhancer}.
*/
@Lazy
@Configuration
@Conditional(StorageTypeMemAbsentOrEmpty.class)
@ConditionalOnMissingBean(StorageComponent.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import zipkin2.elasticsearch.ElasticsearchStorage;

Expand All @@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
private String index = "zipkin";
/** The date separator used to create the index name. Default to -. */
private String dateSeparator = "-";
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */
/** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
private int maxRequests = 64;
/** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */
private Integer throttleMaxConcurrency;
/** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
private int indexShards = 5;
/** Number of replicas (redundancy factor) per index. Defaults to 1.` */
Expand All @@ -61,6 +64,14 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
*/
private int timeout = 10_000;

ZipkinElasticsearchStorageProperties(
@Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
@Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
if (throttleEnabled) {
this.throttleMaxConcurrency = throttleMaxConcurrency;
}
}

public String getPipeline() {
return pipeline;
}
Expand Down Expand Up @@ -180,7 +191,7 @@ public ElasticsearchStorage.Builder toBuilder(OkHttpClient client) {
.index(index)
.dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
.pipeline(pipeline)
.maxRequests(maxRequests)
.maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency)
.indexShards(indexShards)
.indexReplicas(indexReplicas);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zipkin2.server.internal.throttle;

import com.netflix.concurrency.limits.limiter.AbstractLimiter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.concurrent.ThreadPoolExecutor;
import zipkin2.server.internal.ActuateCollectorMetrics;

/** Follows the same naming convention as {@link ActuateCollectorMetrics} */
final class ActuateThrottleMetrics {
final MeterRegistry registryInstance;

ActuateThrottleMetrics(MeterRegistry registryInstance) {
this.registryInstance = registryInstance;
}

void bind(ThreadPoolExecutor pool) {
Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize)
.description("number of threads running storage requests")
.register(registryInstance);
Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size)
.description("number of items queued waiting for access to storage")
.register(registryInstance);
}

void bind(AbstractLimiter limiter) {
// This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency)
// It is tracked to make sure it doesn't perpetually increase. If it does then we're not resolving LimitListeners.
Gauge.builder("zipkin_storage.throttle.in_flight_requests", limiter::getInflight)
.description("number of requests the limiter thinks are active")
.register(registryInstance);
}
}
Loading

0 comments on commit 311fe2e

Please sign in to comment.