zipkin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adrianc...@apache.org
Subject [incubator-zipkin] branch master updated: Adding storage-throttle module to address "over capacity" issues (#2502)
Date Fri, 10 May 2019 08:10:41 GMT
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git


The following commit(s) were added to refs/heads/master by this push:
     new b3eefbe  Adding storage-throttle module to address "over capacity" issues (#2502)
b3eefbe is described below

commit b3eefbee1aec34b1036b61913149d805712ea2fc
Author: Logic-32 <Logic-32@users.noreply.github.com>
AuthorDate: Fri May 10 02:10:35 2019 -0600

    Adding storage-throttle module to address "over capacity" issues (#2502)
    
    Adding ThrottledStorageComponent/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
    
    Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is
    enabled.
    
    Inspired by work done on #2169.
---
 .../test/java/zipkin2/collector/CollectorTest.java |  11 +
 zipkin-server/README.md                            |  10 +
 zipkin-server/pom.xml                              |  17 +-
 .../internal/ConditionalOnThrottledStorage.java    |  46 ++++
 .../server/internal/ZipkinServerConfiguration.java |  44 ++++
 .../ZipkinElasticsearchStorageProperties.java      |  15 +-
 .../internal/throttle/ActuateThrottleMetrics.java  |  49 ++++
 .../server/internal/throttle/ThrottledCall.java    | 233 +++++++++++++++++
 .../throttle/ThrottledStorageComponent.java        | 203 +++++++++++++++
 .../throttle/ZipkinStorageThrottleProperties.java  |  69 +++++
 .../src/main/resources/zipkin-server-shared.yml    |   5 +
 .../elasticsearch/BasicAuthInterceptorTest.kt      |   2 +-
 .../server/internal/throttle/ThrottledCallTest.kt  | 289 +++++++++++++++++++++
 .../throttle/ThrottledStorageComponentTest.kt      |  51 ++++
 .../elasticsearch/internal/client/HttpCall.java    |   5 +
 .../main/java/zipkin2/storage/InMemoryStorage.java |   4 +
 16 files changed, 1049 insertions(+), 4 deletions(-)

diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
index aabcbcb..56f988c 100644
--- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
+++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
@@ -31,6 +31,7 @@ import zipkin2.storage.InMemoryStorage;
 import zipkin2.storage.StorageComponent;
 
 import static java.util.Arrays.asList;
+import java.util.concurrent.RejectedExecutionException;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -186,6 +187,16 @@ public class CollectorTest {
   }
 
   @Test
+  public void errorAcceptingSpans_onErrorRejectedExecution() {
+    RuntimeException error = new RejectedExecutionException("slow down");
+    collector.handleStorageError(TRACE, error, callback);
+
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to RejectedExecutionException(slow down)");
+    verify(metrics).incrementSpansDropped(4);
+  }
+
   public void handleStorageError_onErrorWithNullMessage() {
     RuntimeException error = new RuntimeException();
     collector.handleStorageError(TRACE, error, callback);
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 7e64055..5863d7b 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -157,6 +157,16 @@ Defaults to true
 * `AUTOCOMPLETE_KEYS`: list of span tag keys which will be returned by the `/api/v2/autocompleteTags` endpoint
 * `AUTOCOMPLETE_TTL`: How long in milliseconds to suppress calls to write the same autocomplete key/value pair. Default 3600000 (1 hr)
 
+### Throttled Storage (Experimental)
+These settings can be used to help tune the rate at which Zipkin flushes data to another, underlying `StorageComponent` (such as Elasticsearch):
+
+    * `STORAGE_THROTTLE_ENABLED`: Enables throttling
+    * `STORAGE_THROTTLE_MIN_CONCURRENCY`: Minimum number of Threads to use for writing to storage.
+    * `STORAGE_THROTTLE_MAX_CONCURRENCY`: Maximum number of Threads to use for writing to storage.  In order to avoid configuration drift, this value may override other, storage-specific values such as Elasticsearch's `ES_MAX_REQUESTS`.
+    * `STORAGE_THROTTLE_MAX_QUEUE_SIZE`: How many messages to buffer while all Threads are writing data before abandoning a message (0 = no buffering).
+
+As this feature is experimental, it is not recommended to run this in production environments.
+
 ### Cassandra Storage
 Zipkin's [Cassandra storage component](../zipkin-storage/cassandra)
 supports version 3.11+ and applies when `STORAGE_TYPE` is set to `cassandra3`:
diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml
index f17dd89..a456ddd 100644
--- a/zipkin-server/pom.xml
+++ b/zipkin-server/pom.xml
@@ -250,6 +250,17 @@
       <version>${micrometer.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.netflix.concurrency-limits</groupId>
+      <artifactId>concurrency-limits-core</artifactId>
+      <version>0.2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>io.micrometer</groupId>
+      <artifactId>micrometer-core</artifactId>
+      <version>${micrometer.version}</version>
+    </dependency>
+
     <!-- Trace api controller activity with Brave -->
     <dependency>
       <groupId>io.zipkin.brave</groupId>
@@ -299,6 +310,11 @@
       <version>2.4.0</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
@@ -372,7 +388,6 @@
         <version>${kotlin.version}</version>
         <configuration>
           <jvmTarget>${main.java.version}</jvmTarget>
-          <experimentalCoroutines>enable</experimentalCoroutines>
         </configuration>
         <executions>
           <execution>
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java
new file mode 100644
index 0000000..0d7cb4e
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ConditionalOnThrottledStorage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
+import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+@Conditional(ConditionalOnThrottledStorage.ThrottledStorageCondition.class)
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+@interface ConditionalOnThrottledStorage {
+  class ThrottledStorageCondition extends SpringBootCondition {
+    @Override
+    public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
+      String throttleEnabled = context.getEnvironment()
+              .getProperty("zipkin.storage.throttle.enabled");
+
+      if (!Boolean.valueOf(throttleEnabled)) {
+        return ConditionOutcome.noMatch("zipkin.storage.throttle.enabled isn't true");
+      }
+
+      return ConditionOutcome.match();
+    }
+  }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
index cb381a7..b3d4b90 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinServerConfiguration.java
@@ -26,6 +26,9 @@ import com.linecorp.armeria.spring.actuate.ArmeriaSpringActuatorAutoConfiguratio
 import io.micrometer.core.instrument.MeterRegistry;
 import io.micrometer.core.instrument.config.MeterFilter;
 import java.util.List;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.beans.factory.config.BeanPostProcessor;
@@ -33,20 +36,24 @@ import org.springframework.boot.actuate.autoconfigure.metrics.MeterRegistryCusto
 import org.springframework.boot.actuate.health.HealthAggregator;
 import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Condition;
 import org.springframework.context.annotation.ConditionContext;
 import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.core.annotation.Order;
 import org.springframework.core.type.AnnotatedTypeMetadata;
 import org.springframework.web.servlet.config.annotation.ViewControllerRegistry;
 import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+import zipkin2.server.internal.throttle.ZipkinStorageThrottleProperties;
 import zipkin2.collector.CollectorMetrics;
 import zipkin2.collector.CollectorSampler;
 import zipkin2.server.internal.brave.TracingStorageComponent;
 import zipkin2.storage.InMemoryStorage;
 import zipkin2.storage.StorageComponent;
+import zipkin2.server.internal.throttle.ThrottledStorageComponent;
 
 @Configuration
 @ImportAutoConfiguration(ArmeriaSpringActuatorAutoConfiguration.class)
@@ -157,10 +164,47 @@ public class ZipkinServerConfiguration implements WebMvcConfigurer {
     }
   }
 
+  @Configuration
+  @EnableConfigurationProperties(ZipkinStorageThrottleProperties.class)
+  @ConditionalOnThrottledStorage
+  static class ThrottledStorageComponentEnhancer implements BeanPostProcessor, BeanFactoryAware {
+
+    /**
+     * Need this to resolve cyclic instantiation issue with spring.  Mostly, this is for MeterRegistry as really
+     * bad things happen if you try to Autowire it (loss of JVM metrics) but also using it for properties just to make
+     * sure no cycles exist at all as a result of turning throttling on.
+     * 
+     * <p>Ref: <a href="https://stackoverflow.com/a/19688634">Tracking down cause of Spring's "not eligible for auto-proxying"</a></p>
+     */
+    private BeanFactory beanFactory;
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) {
+      if (bean instanceof StorageComponent) {
+        ZipkinStorageThrottleProperties throttleProperties = beanFactory.getBean(ZipkinStorageThrottleProperties.class);
+        return new ThrottledStorageComponent((StorageComponent) bean,
+                                             beanFactory.getBean(MeterRegistry.class),
+                                             throttleProperties.getMinConcurrency(),
+                                             throttleProperties.getMaxConcurrency(),
+                                             throttleProperties.getMaxQueueSize());
+      }
+      return bean;
+    }
+
+    @Override
+    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
+      this.beanFactory = beanFactory;
+    }
+  }
+
   /**
    * This is a special-case configuration if there's no StorageComponent of any kind. In-Mem can
    * supply both read apis, so we add two beans here.
+   *
+   * <p>Note: this needs to be {@link Lazy} to avoid circular dependency issues when using with
+   * {@link ThrottledStorageComponentEnhancer}.
    */
+  @Lazy
   @Configuration
   @Conditional(StorageTypeMemAbsentOrEmpty.class)
   @ConditionalOnMissingBean(StorageComponent.class)
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
index 8ba574f..a2fb450 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 import okhttp3.HttpUrl;
 import okhttp3.OkHttpClient;
 import okhttp3.logging.HttpLoggingInterceptor;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import zipkin2.elasticsearch.ElasticsearchStorage;
 
@@ -40,8 +41,10 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
   private String index = "zipkin";
   /** The date separator used to create the index name. Default to -. */
   private String dateSeparator = "-";
-  /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 */
+  /** Sets maximum in-flight requests from this process to any Elasticsearch host. Defaults to 64 (overriden by throttle settings) */
   private int maxRequests = 64;
+  /** Overrides maximum in-flight requests to match throttling settings if throttling is enabled. */
+  private Integer throttleMaxConcurrency;
   /** Number of shards (horizontal scaling factor) per index. Defaults to 5. */
   private int indexShards = 5;
   /** Number of replicas (redundancy factor) per index. Defaults to 1.` */
@@ -61,6 +64,14 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
    */
   private int timeout = 10_000;
 
+  ZipkinElasticsearchStorageProperties(
+    @Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
+    @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
+    if (throttleEnabled) {
+      this.throttleMaxConcurrency = throttleMaxConcurrency;
+    }
+  }
+
   public String getPipeline() {
     return pipeline;
   }
@@ -180,7 +191,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { // for Spar
         .index(index)
         .dateSeparator(dateSeparator.isEmpty() ? 0 : dateSeparator.charAt(0))
         .pipeline(pipeline)
-        .maxRequests(maxRequests)
+        .maxRequests(throttleMaxConcurrency == null ? maxRequests : throttleMaxConcurrency)
         .indexShards(indexShards)
         .indexReplicas(indexReplicas);
   }
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java
new file mode 100644
index 0000000..55e7c36
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ActuateThrottleMetrics.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.limiter.AbstractLimiter;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import java.util.concurrent.ThreadPoolExecutor;
+import zipkin2.server.internal.ActuateCollectorMetrics;
+
+/** Follows the same naming convention as {@link ActuateCollectorMetrics} */
+final class ActuateThrottleMetrics {
+  final MeterRegistry registryInstance;
+
+  ActuateThrottleMetrics(MeterRegistry registryInstance) {
+    this.registryInstance = registryInstance;
+  }
+
+  void bind(ThreadPoolExecutor pool) {
+    Gauge.builder("zipkin_storage.throttle.concurrency", pool::getCorePoolSize)
+      .description("number of threads running storage requests")
+      .register(registryInstance);
+    Gauge.builder("zipkin_storage.throttle.queue_size", pool.getQueue()::size)
+      .description("number of items queued waiting for access to storage")
+      .register(registryInstance);
+  }
+
+  void bind(AbstractLimiter limiter) {
+    // This value should parallel (zipkin_storage.throttle.queue_size + zipkin_storage.throttle.concurrency)
+    // It is tracked to make sure it doesn't perpetually increase.  If it does then we're not resolving LimitListeners.
+    Gauge.builder("zipkin_storage.throttle.in_flight_requests", limiter::getInflight)
+      .description("number of requests the limiter thinks are active")
+      .register(registryInstance);
+  }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
new file mode 100644
index 0000000..f43d61e
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limiter;
+import com.netflix.concurrency.limits.Limiter.Listener;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+import zipkin2.Call;
+import zipkin2.Callback;
+import zipkin2.storage.InMemoryStorage;
+
+/**
+ * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
+ * serves two purposes:
+ * <ol>
+ * <li>Limits the number of requests that can run in parallel.</li>
+ * <li>Depending on configuration, can queue up requests to make sure we don't aggressively drop
+ * requests that would otherwise succeed if given a moment. Bounded queues are safest for this as
+ * unbounded ones can lead to heap exhaustion and {@link OutOfMemoryError OOM errors}.</li>
+ * </ol>
+ *
+ * @see ThrottledStorageComponent
+ */
+final class ThrottledCall<V> extends Call<V> {
+  final ExecutorService executor;
+  final Limiter<Void> limiter;
+  final Listener limitListener;
+  /**
+   * supplier call needs to be supplied later to avoid having it take action when it is created
+   * (like {@link InMemoryStorage} and thus avoid being throttled.
+   */
+  final Supplier<? extends Call<V>> supplier;
+  volatile Call<V> delegate;
+  volatile boolean canceled;
+
+  public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
+    Supplier<? extends Call<V>> supplier) {
+    this.executor = executor;
+    this.limiter = limiter;
+    this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+    this.supplier = supplier;
+  }
+
+  // TODO: refactor this when in-memory no longer executes storage ops during assembly time
+  ThrottledCall(ThrottledCall<V> other) {
+    this(other.executor, other.limiter,
+      other.delegate == null ? other.supplier : () -> other.delegate.clone());
+  }
+
+  // TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
+  // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out
+  // the need for assembly time throttling (fix to in-memory storage)
+  @Override public V execute() throws IOException {
+    try {
+      delegate = supplier.get();
+
+      // Make sure we throttle
+      Future<V> future = executor.submit(() -> {
+        String oldName = setCurrentThreadName(delegate.toString());
+        try {
+          return delegate.execute();
+        } finally {
+          setCurrentThreadName(oldName);
+        }
+      });
+      V result = future.get(); // Still block for the response
+
+      limitListener.onSuccess();
+      return result;
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof RejectedExecutionException) {
+        // Storage rejected us, throttle back
+        limitListener.onDropped();
+      } else {
+        limitListener.onIgnore();
+      }
+
+      if (cause instanceof RuntimeException) {
+        throw (RuntimeException) cause;
+      } else if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else {
+        throw new RuntimeException("Issue while executing on a throttled call", cause);
+      }
+    } catch (InterruptedException e) {
+      limitListener.onIgnore();
+      throw new RuntimeException("Interrupted while blocking on a throttled call", e);
+    } catch (RuntimeException | Error e) {
+      propagateIfFatal(e);
+      // Ignoring in all cases here because storage itself isn't saying we need to throttle.  Though, we may still be
+      // write bound, but a drop in concurrency won't necessarily help.
+      limitListener.onIgnore();
+      throw e;
+    }
+  }
+
+  @Override public void enqueue(Callback<V> callback) {
+    try {
+      executor.execute(new QueuedCall(callback));
+    } catch (RuntimeException | Error e) {
+      propagateIfFatal(e);
+      // Ignoring in all cases here because storage itself isn't saying we need to throttle.  Though, we may still be
+      // write bound, but a drop in concurrency won't necessarily help.
+      limitListener.onIgnore();
+      throw e;
+    }
+  }
+
+  @Override public void cancel() {
+    canceled = true;
+    if (delegate != null) delegate.cancel();
+  }
+
+  @Override public boolean isCanceled() {
+    return canceled || (delegate != null && delegate.isCanceled());
+  }
+
+  @Override public Call<V> clone() {
+    return new ThrottledCall<>(this);
+  }
+
+  @Override public String toString() {
+    return "Throttled" + supplier;
+  }
+
+  static String setCurrentThreadName(String name) {
+    Thread thread = Thread.currentThread();
+    String originalName = thread.getName();
+    thread.setName(name);
+    return originalName;
+  }
+
+  final class QueuedCall implements Runnable {
+    final Callback<V> callback;
+
+    QueuedCall(Callback<V> callback) {
+      this.callback = callback;
+    }
+
+    @Override public void run() {
+      try {
+        if (isCanceled()) return;
+
+        delegate = ThrottledCall.this.supplier.get();
+
+        String oldName = setCurrentThreadName(delegate.toString());
+        try {
+          enqueueAndWait();
+        } finally {
+          setCurrentThreadName(oldName);
+        }
+      } catch (RuntimeException | Error e) {
+        propagateIfFatal(e);
+        limitListener.onIgnore();
+        callback.onError(e);
+      }
+    }
+
+    void enqueueAndWait() {
+      ThrottledCallback<V> throttleCallback = new ThrottledCallback<>(callback, limitListener);
+      delegate.enqueue(throttleCallback);
+
+      // Need to wait here since the callback call will run asynchronously also.
+      // This ensures we don't exceed our throttle/queue limits.
+      throttleCallback.await();
+    }
+  }
+
+  static final class ThrottledCallback<V> implements Callback<V> {
+    final Callback<V> supplier;
+    final Listener limitListener;
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    ThrottledCallback(Callback<V> supplier, Listener limitListener) {
+      this.supplier = supplier;
+      this.limitListener = limitListener;
+    }
+
+    void await() {
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        limitListener.onIgnore();
+        throw new RuntimeException("Interrupted while blocking on a throttled call", e);
+      }
+    }
+
+    @Override public void onSuccess(V value) {
+      try {
+        limitListener.onSuccess();
+        supplier.onSuccess(value);
+      } finally {
+        latch.countDown();
+      }
+    }
+
+    @Override public void onError(Throwable t) {
+      try {
+        if (t instanceof RejectedExecutionException) {
+          limitListener.onDropped();
+        } else {
+          limitListener.onIgnore();
+        }
+
+        supplier.onError(t);
+      } finally {
+        latch.countDown();
+      }
+    }
+  }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
new file mode 100644
index 0000000..91e7b78
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import com.netflix.concurrency.limits.Limit;
+import com.netflix.concurrency.limits.Limiter;
+import com.netflix.concurrency.limits.limit.Gradient2Limit;
+import com.netflix.concurrency.limits.limiter.AbstractLimiter;
+import io.micrometer.core.instrument.MeterRegistry;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import zipkin2.Call;
+import zipkin2.Span;
+import zipkin2.storage.SpanConsumer;
+import zipkin2.storage.SpanStore;
+import zipkin2.storage.StorageComponent;
+
+/**
+ * Delegating implementation that limits requests to the {@link #spanConsumer()} of another {@link
+ * StorageComponent}.  The theory here is that this class can be used to:
+ * <ul>
+ * <li>Prevent spamming the storage engine with excessive, spike requests when they come in; thus
+ * preserving it's life.</li>
+ * <li>Optionally act as a buffer so that a fixed number requests can be queued for execution when
+ * the throttle allows for it.  This optional queue must be bounded in order to avoid running out of
+ * memory from infinitely queueing.</li>
+ * </ul>
+ *
+ * @see ThrottledSpanConsumer
+ */
+public final class ThrottledStorageComponent extends StorageComponent {
+  final StorageComponent delegate;
+  final AbstractLimiter<Void> limiter;
+  final ThreadPoolExecutor executor;
+
+  public ThrottledStorageComponent(StorageComponent delegate, MeterRegistry registry,
+    int minConcurrency,
+    int maxConcurrency,
+    int maxQueueSize) {
+    this.delegate = Objects.requireNonNull(delegate);
+
+    Limit limit = Gradient2Limit.newBuilder()
+      .minLimit(minConcurrency)
+      .initialLimit(
+        minConcurrency) // Limiter will trend towards min until otherwise necessary so may as well start there
+      .maxConcurrency(maxConcurrency)
+      .queueSize(0)
+      .build();
+    this.limiter = new Builder().limit(limit).build();
+
+    // TODO: explain these parameters
+    this.executor = new ThreadPoolExecutor(limit.getLimit(),
+      limit.getLimit(),
+      0,
+      TimeUnit.DAYS,
+      createQueue(maxQueueSize),
+      new ThottledThreadFactory(),
+      new ThreadPoolExecutor.AbortPolicy());
+
+    limit.notifyOnChange(new ThreadPoolExecutorResizer(executor));
+
+    ActuateThrottleMetrics metrics = new ActuateThrottleMetrics(registry);
+    metrics.bind(executor);
+    metrics.bind(limiter);
+  }
+
+  @Override public SpanStore spanStore() {
+    return delegate.spanStore();
+  }
+
+  @Override public SpanConsumer spanConsumer() {
+    return new ThrottledSpanConsumer(delegate.spanConsumer(), limiter, executor);
+  }
+
+  @Override public void close() throws IOException {
+    executor.shutdownNow();
+    delegate.close();
+  }
+
+  @Override public String toString() {
+    return "Throttled" + delegate;
+  }
+
+  final class ThrottledSpanConsumer implements SpanConsumer {
+    final SpanConsumer delegate;
+    final Limiter<Void> limiter;
+    final ExecutorService executor;
+
+    ThrottledSpanConsumer(SpanConsumer delegate, Limiter<Void> limiter, ExecutorService executor) {
+      this.delegate = delegate;
+      this.limiter = limiter;
+      this.executor = executor;
+    }
+
+    @Override public Call<Void> accept(List<Span> spans) {
+      return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+    }
+
+    @Override public String toString() {
+      return "Throttled" + delegate;
+    }
+  }
+
+  static BlockingQueue<Runnable> createQueue(int maxSize) {
+    if (maxSize < 0) throw new IllegalArgumentException("maxSize < 0");
+
+    if (maxSize == 0) {
+      // 0 means we should be bounded but we can't create a queue with that size so use 1 instead.
+      maxSize = 1;
+    }
+
+    return new LinkedBlockingQueue<>(maxSize);
+  }
+
+  static final class ThottledThreadFactory implements ThreadFactory {
+    @Override public Thread newThread(Runnable r) {
+      Thread thread = new Thread(r);
+      thread.setDaemon(true);
+      thread.setName("zipkin-throttle-pool-" + thread.getId());
+      return thread;
+    }
+  }
+
+  static final class ThreadPoolExecutorResizer implements Consumer<Integer> {
+    final ThreadPoolExecutor executor;
+
+    ThreadPoolExecutorResizer(ThreadPoolExecutor executor) {
+      this.executor = executor;
+    }
+
+    /**
+     * This is {@code synchronized} to ensure that we don't let the core/max pool sizes get out of
+     * sync; even for an instant.  The two need to be tightly coupled together to ensure that when
+     * our queue fills up we don't spin up extra Threads beyond our calculated limit.
+     *
+     * <p>There is also an unfortunate aspect where the {@code max} has to always be greater than
+     * {@code core} or an exception will be thrown.  So they have to be adjust appropriately
+     * relative to the direction the size is going.
+     */
+    @Override public synchronized void accept(Integer newValue) {
+      int previousValue = executor.getCorePoolSize();
+
+      int newValueInt = newValue;
+      if (previousValue < newValueInt) {
+        executor.setMaximumPoolSize(newValueInt);
+        executor.setCorePoolSize(newValueInt);
+      } else if (previousValue > newValueInt) {
+        executor.setCorePoolSize(newValueInt);
+        executor.setMaximumPoolSize(newValueInt);
+      }
+      // Note: no case for equals.  Why modify something that doesn't need modified?
+    }
+  }
+
+  static final class Builder extends AbstractLimiter.Builder<Builder> {
+    NonLimitingLimiter build() {
+      return new NonLimitingLimiter(this);
+    }
+
+    @Override protected Builder self() {
+      return this;
+    }
+  }
+
+  /**
+   * Unlike a normal Limiter, this will actually not prevent the creation of a {@link Listener} in
+   * {@link #acquire(java.lang.Void)}.  The point of this is to ensure that we can always derive an
+   * appropriate {@link Limit#getLimit() Limit} while the {@link #executor} handles actually
+   * limiting running requests.
+   */
+  static final class NonLimitingLimiter extends AbstractLimiter<Void> {
+    NonLimitingLimiter(AbstractLimiter.Builder<?> builder) {
+      super(builder);
+    }
+
+    @Override public Optional<Listener> acquire(Void context) {
+      return Optional.of(createListener());
+    }
+  }
+}
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java
new file mode 100644
index 0000000..fd344db
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ZipkinStorageThrottleProperties.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("zipkin.storage.throttle")
+public final class ZipkinStorageThrottleProperties {
+  /** Should we throttle at all? */
+  private boolean enabled;
+  /** Minimum number of storage requests to allow through at a given time. */
+  private int minConcurrency;
+  /**
+   * Maximum number of storage requests to allow through at a given time. Should be tuned to
+   * (bulk_index_pool_size / num_servers_in_cluster). e.g. 200 (default pool size in Elasticsearch)
+   * / 2 (number of load balanced zipkin-server instances) = 100.
+   */
+  private int maxConcurrency;
+  /**
+   * Maximum number of storage requests to buffer while waiting for open Thread. 0 = no buffering.
+   */
+  private int maxQueueSize;
+
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  public void setEnabled(boolean enabled) {
+    this.enabled = enabled;
+  }
+
+  public int getMinConcurrency() {
+    return minConcurrency;
+  }
+
+  public void setMinConcurrency(int minConcurrency) {
+    this.minConcurrency = minConcurrency;
+  }
+
+  public int getMaxConcurrency() {
+    return maxConcurrency;
+  }
+
+  public void setMaxConcurrency(int maxConcurrency) {
+    this.maxConcurrency = maxConcurrency;
+  }
+
+  public int getMaxQueueSize() {
+    return maxQueueSize;
+  }
+
+  public void setMaxQueueSize(int maxQueueSize) {
+    this.maxQueueSize = maxQueueSize;
+  }
+}
diff --git a/zipkin-server/src/main/resources/zipkin-server-shared.yml b/zipkin-server/src/main/resources/zipkin-server-shared.yml
index 72b0fe8..009e3a0 100644
--- a/zipkin-server/src/main/resources/zipkin-server-shared.yml
+++ b/zipkin-server/src/main/resources/zipkin-server-shared.yml
@@ -53,6 +53,11 @@ zipkin:
     autocomplete-ttl: ${AUTOCOMPLETE_TTL:3600000}
     autocomplete-cardinality: 20000
     type: ${STORAGE_TYPE:mem}
+    throttle:
+      enabled: ${STORAGE_THROTTLE_ENABLED:false}
+      min-concurrency: ${STORAGE_THROTTLE_MIN_CONCURRENCY:10}
+      max-concurrency: ${STORAGE_THROTTLE_MAX_CONCURRENCY:200}
+      max-queue-size: ${STORAGE_THROTTLE_MAX_QUEUE_SIZE:1000}
     mem:
       # Maximum number of spans to keep in memory.  When exceeded, oldest traces (and their spans) will be purged.
       # A safe estimate is 1K of memory per span (each span with 2 annotations + 1 binary annotation), plus
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt
index 40ec38c..6a5f5f3 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/elasticsearch/BasicAuthInterceptorTest.kt
@@ -29,7 +29,7 @@ class BasicAuthInterceptorTest {
   @Rule @JvmField val thrown: ExpectedException = ExpectedException.none()
 
   var client: OkHttpClient = OkHttpClient.Builder()
-    .addNetworkInterceptor(BasicAuthInterceptor(ZipkinElasticsearchStorageProperties()))
+    .addNetworkInterceptor(BasicAuthInterceptor(ZipkinElasticsearchStorageProperties(false, 0)))
     .build()
 
   @Test fun intercept_whenESReturns403AndJsonBody_throwsWithResponseBodyMessage() {
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
new file mode 100644
index 0000000..00eb02f
--- /dev/null
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle
+
+import com.netflix.concurrency.limits.Limiter
+import com.netflix.concurrency.limits.Limiter.Listener
+import com.netflix.concurrency.limits.limit.SettableLimit
+import com.netflix.concurrency.limits.limiter.SimpleLimiter
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.Test
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito
+import org.mockito.Mockito.`when`
+import org.mockito.Mockito.doThrow
+import org.mockito.Mockito.verify
+import zipkin2.Call
+import zipkin2.Callback
+import java.io.IOException
+import java.util.Optional
+import java.util.concurrent.Callable
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.Executors
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.Semaphore
+import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.TimeUnit
+import java.util.function.Supplier
+
+// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be
+//  refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way)
+class ThrottledCallTest {
+  var limit = SettableLimit.startingAt(0)
+  var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
+
+  inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
+
+  @Test fun callCreation_isDeferred() {
+    val created = booleanArrayOf(false)
+
+    val throttle = createThrottle(Supplier {
+      created[0] = true
+      Call.create<Void>(null)
+    })
+
+    assertThat(created).contains(false)
+    throttle.execute()
+    assertThat(created).contains(true)
+  }
+
+  @Test fun execute_isThrottled() {
+    val numThreads = 1
+    val queueSize = 1
+    val totalTasks = numThreads + queueSize
+
+    val startLock = Semaphore(numThreads)
+    val waitLock = Semaphore(totalTasks)
+    val failLock = Semaphore(1)
+    val throttle =
+      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+
+    // Step 1: drain appropriate locks
+    startLock.drainPermits()
+    waitLock.drainPermits()
+    failLock.drainPermits()
+
+    // Step 2: saturate threads and fill queue
+    val backgroundPool = Executors.newCachedThreadPool()
+    for (i in 0 until totalTasks) {
+      backgroundPool.submit(Callable { throttle.execute() })
+    }
+
+    try {
+      // Step 3: make sure the threads actually started
+      startLock.acquire(numThreads)
+
+      // Step 4: submit something beyond our limits
+      val future = backgroundPool.submit(Callable {
+        try {
+          throttle.execute()
+        } catch (e: IOException) {
+          throw RuntimeException(e)
+        } finally {
+          // Step 6: signal that we tripped the limit
+          failLock.release()
+        }
+      })
+
+      // Step 5: wait to make sure our limit actually tripped
+      failLock.acquire()
+
+      future.get()
+
+      // Step 7: Expect great things
+      assertThat(true).isFalse() // should raise a RejectedExecutionException
+    } catch (t: Throwable) {
+      assertThat(t)
+        .isInstanceOf(ExecutionException::class.java) // from future.get
+        .hasCauseInstanceOf(RejectedExecutionException::class.java)
+    } finally {
+      waitLock.release(totalTasks)
+      startLock.release(totalTasks)
+      backgroundPool.shutdownNow()
+    }
+  }
+
+  @Test fun execute_trottlesBack_whenStorageRejects() {
+    val listener: Listener = mock()
+    val call = FakeCall()
+    call.overCapacity = true
+
+    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+    try {
+      throttle.execute()
+      assertThat(true).isFalse() // should raise a RejectedExecutionException
+    } catch (e: RejectedExecutionException) {
+      verify(listener).onDropped()
+    }
+  }
+
+  @Test fun execute_ignoresLimit_whenPoolFull() {
+    val listener: Listener = mock()
+
+    val throttle =
+      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+    try {
+      throttle.execute()
+      assertThat(true).isFalse() // should raise a RejectedExecutionException
+    } catch (e: RejectedExecutionException) {
+      verify(listener).onIgnore()
+    }
+  }
+
+  @Test fun enqueue_isThrottled() {
+    val numThreads = 1
+    val queueSize = 1
+    val totalTasks = numThreads + queueSize
+
+    val startLock = Semaphore(numThreads)
+    val waitLock = Semaphore(totalTasks)
+    val throttle =
+      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+
+    // Step 1: drain appropriate locks
+    startLock.drainPermits()
+    waitLock.drainPermits()
+
+    // Step 2: saturate threads and fill queue
+    val callback: Callback<Void> = mock()
+    for (i in 0 until totalTasks) {
+      throttle.enqueue(callback)
+    }
+
+    // Step 3: make sure the threads actually started
+    startLock.acquire(numThreads)
+
+    try {
+      // Step 4: submit something beyond our limits and make sure it fails
+      throttle.enqueue(callback)
+
+      assertThat(true).isFalse() // should raise a RejectedExecutionException
+    } catch (e: RejectedExecutionException) {
+    } finally {
+      waitLock.release(totalTasks)
+      startLock.release(totalTasks)
+    }
+  }
+
+  @Test fun enqueue_throttlesBack_whenStorageRejects() {
+    val listener: Listener = mock()
+    val call = FakeCall()
+    call.overCapacity = true
+
+    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call })
+    val latch = CountDownLatch(1)
+    throttle.enqueue(object : Callback<Void> {
+      override fun onSuccess(value: Void) {
+        latch.countDown()
+      }
+
+      override fun onError(t: Throwable) {
+        latch.countDown()
+      }
+    })
+
+    latch.await(1, TimeUnit.MINUTES)
+    verify(listener).onDropped()
+  }
+
+  @Test fun enqueue_ignoresLimit_whenPoolFull() {
+    val listener: Listener = mock()
+
+    val throttle =
+      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+    try {
+      throttle.enqueue(null)
+      assertThat(true).isFalse() // should raise a RejectedExecutionException
+    } catch (e: RejectedExecutionException) {
+      verify(listener).onIgnore()
+    }
+  }
+
+  private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> {
+    return createThrottle(1, 1, delegate)
+  }
+
+  private fun createThrottle(
+    poolSize: Int,
+    queueSize: Int,
+    delegate: Supplier<Call<Void>>
+  ): ThrottledCall<Void> {
+    limit.setLimit(limit.getLimit() + 1)
+    return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
+  }
+
+  private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() {
+    override fun doExecute(): Void? {
+      try {
+        startLock.release()
+        waitLock.acquire()
+        return null;
+      } catch (e: InterruptedException) {
+        Thread.currentThread().interrupt()
+        throw AssertionError(e)
+      }
+    }
+
+    override fun doEnqueue(callback: Callback<Void>) {
+      try {
+        callback.onSuccess(doExecute())
+      } catch (t: Throwable) {
+        callback.onError(t)
+      }
+    }
+
+    override fun clone() = LockedCall(startLock, waitLock);
+  }
+
+  private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
+    return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
+      LinkedBlockingQueue(queueSize))
+  }
+
+  private fun mockExhaustedPool(): ExecutorService {
+    val mock: ExecutorService = mock()
+    doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
+    doThrow(RejectedExecutionException::class.java).`when`(mock).submit(any(Callable::class.java))
+    return mock
+  }
+
+  private fun mockLimiter(listener: Listener): Limiter<Void> {
+    val mock: Limiter<Void> = mock()
+    `when`(mock.acquire(any())).thenReturn(Optional.of(listener))
+    return mock
+  }
+
+  private class FakeCall(var overCapacity: Boolean = false) : Call.Base<Void>() {
+    override fun doExecute(): Void? {
+      if (overCapacity) throw RejectedExecutionException()
+      return null
+    }
+
+    override fun doEnqueue(callback: Callback<Void>) {
+      if (overCapacity) {
+        callback.onError(RejectedExecutionException())
+      } else {
+        callback.onSuccess(null)
+      }
+    }
+
+    override fun clone() = FakeCall()
+  }
+}
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
new file mode 100644
index 0000000..705967a
--- /dev/null
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.server.internal.throttle
+
+import com.linecorp.armeria.common.metric.NoopMeterRegistry
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.Test
+import zipkin2.storage.InMemoryStorage
+
+class ThrottledStorageComponentTest {
+  val delegate = InMemoryStorage.newBuilder().build()
+  val registry = NoopMeterRegistry.get()
+
+  @Test fun spanConsumer_isProxied() {
+    val throttle = ThrottledStorageComponent(delegate, registry, 1, 2, 1)
+
+    assertThat(throttle.spanConsumer().accept(listOf()))
+      .isInstanceOf(ThrottledCall::class.java)
+  }
+
+  @Test fun createComponent_withZeroSizedQueue() {
+    val queueSize = 0
+    ThrottledStorageComponent(delegate, registry, 1, 2, queueSize)
+    // no exception == pass
+  }
+
+  @Test(expected = IllegalArgumentException::class)
+  fun createComponent_withNegativeQueue() {
+    val queueSize = -1
+    ThrottledStorageComponent(delegate, registry, 1, 2, queueSize)
+  }
+
+  @Test fun niceToString() {
+    assertThat(ThrottledStorageComponent(delegate, registry, 1, 2, 1))
+      .hasToString("ThrottledInMemoryStorage{traceCount=0}");
+  }
+}
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
index 6c5e4b3..507ad8e 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
@@ -105,6 +105,11 @@ public final class HttpCall<V> extends Call<V> {
     return new HttpCall<V>(call.clone(), semaphore, bodyConverter);
   }
 
+  @Override
+  public String toString() {
+    return "HttpCall(" + call + ")";
+  }
+
   static class V2CallbackAdapter<V> implements okhttp3.Callback {
     final Semaphore semaphore;
     final BodyConverter<V> bodyConverter;
diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
index 3057940..ce72238 100644
--- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
+++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
@@ -574,4 +574,8 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore
       return h$;
     }
   }
+
+  @Override public String toString() {
+    return "InMemoryStorage{traceCount=" + traceIdToTraceIdTimeStamps.size() + "}";
+  }
 }


Mime
View raw message