zipkin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adrianc...@apache.org
Subject [incubator-zipkin] branch master updated: Makes it an error to store during assembly of a call (#2580)
Date Sat, 11 May 2019 02:25:30 GMT
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git


The following commit(s) were added to refs/heads/master by this push:
     new 862e3f3  Makes it an error to store during assembly of a call (#2580)
862e3f3 is described below

commit 862e3f3d6979cd66fa75f83e4def1e3bbcf1f3b0
Author: Adrian Cole <adriancole@users.noreply.github.com>
AuthorDate: Sat May 11 10:25:25 2019 +0800

    Makes it an error to store during assembly of a call (#2580)
    
    Before this, there was some extra code in the throttle package handling
    a bug in our in memory storage. This fixes that and removes the extra
    code.
    
    See #2502
---
 .../internal/BulkRequestBenchmarks.java            |  11 +-
 .../ZipkinElasticsearchStorageProperties.java      |   2 +-
 .../server/internal/throttle/ThrottledCall.java    |  83 ++++++---------
 .../throttle/ThrottledStorageComponent.java        |   8 +-
 .../server/internal/throttle/ThrottledCallTest.kt  |  72 +++++--------
 .../throttle/ThrottledStorageComponentTest.kt      |   2 +-
 .../elasticsearch/internal/BulkCallBuilder.java    | 113 ++++++++++++---------
 .../elasticsearch/internal/client/HttpCall.java    |  17 ++--
 .../zipkin2/elasticsearch/InternalForTests.java    |   9 +-
 .../internal/client/HttpCallTest.java              |  19 ++++
 .../src/main/java/zipkin2/storage/ITSpanStore.java |  56 ++++++++++
 .../main/java/zipkin2/storage/InMemoryStorage.java |  39 ++++++-
 .../java/zipkin2/storage/InMemoryStorageTest.java  |  14 +--
 13 files changed, 266 insertions(+), 179 deletions(-)

diff --git a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
index 210edac..ea2b5cb 100644
--- a/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
+++ b/benchmarks/src/main/java/zipkin2/elasticsearch/internal/BulkRequestBenchmarks.java
@@ -38,6 +38,7 @@ import zipkin2.Span;
 import zipkin2.codec.CodecBenchmarks;
 import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.elasticsearch.ElasticsearchStorage;
+import zipkin2.elasticsearch.internal.BulkCallBuilder.IndexEntry;
 
 @Measurement(iterations = 5, time = 1)
 @Warmup(iterations = 10, time = 1)
@@ -50,18 +51,24 @@ public class BulkRequestBenchmarks {
   static final Span CLIENT_SPAN = SpanBytesDecoder.JSON_V2.decodeOne(read("/zipkin2-client.json"));
 
   final ElasticsearchStorage es = ElasticsearchStorage.newBuilder().build();
-  final BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
-
   final long indexTimestamp = CLIENT_SPAN.timestampAsLong() / 1000L;
   final String spanIndex =
     es.indexNameFormatter().formatTypeAndTimestampForInsert("span", '-', indexTimestamp);
+  final IndexEntry<Span> entry =
+    BulkCallBuilder.newIndexEntry(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
+
+  @Benchmark public void writeRequest_singleSpan() throws IOException {
+    BulkCallBuilder.write(Okio.buffer(Okio.blackhole()), entry, true);
+  }
 
   @Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
+    BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
     builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
     builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
   }
 
   @Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
+    BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
     for (int i = 0; i < 10; i++) {
       builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
     }
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
index a2fb450..4f367ff 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/elasticsearch/ZipkinElasticsearchStorageProperties.java
@@ -66,7 +66,7 @@ class ZipkinElasticsearchStorageProperties implements Serializable { //
for Spar
 
   ZipkinElasticsearchStorageProperties(
     @Value("${zipkin.storage.throttle.enabled:false}") boolean throttleEnabled,
-    @Value("${zipkin.storage.throttle.maxConcurrency:200}") int throttleMaxConcurrency) {
+    @Value("${zipkin.storage.throttle.max-concurrency:200}") int throttleMaxConcurrency)
{
     if (throttleEnabled) {
       this.throttleMaxConcurrency = throttleMaxConcurrency;
     }
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
index f43d61e..066042c 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java
@@ -24,10 +24,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.function.Supplier;
 import zipkin2.Call;
 import zipkin2.Callback;
-import zipkin2.storage.InMemoryStorage;
 
 /**
  * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService
@@ -41,39 +39,21 @@ import zipkin2.storage.InMemoryStorage;
  *
  * @see ThrottledStorageComponent
  */
-final class ThrottledCall<V> extends Call<V> {
+final class ThrottledCall<V> extends Call.Base<V> {
   final ExecutorService executor;
   final Limiter<Void> limiter;
-  final Listener limitListener;
-  /**
-   * supplier call needs to be supplied later to avoid having it take action when it is created
-   * (like {@link InMemoryStorage} and thus avoid being throttled.
-   */
-  final Supplier<? extends Call<V>> supplier;
-  volatile Call<V> delegate;
-  volatile boolean canceled;
-
-  public ThrottledCall(ExecutorService executor, Limiter<Void> limiter,
-    Supplier<? extends Call<V>> supplier) {
+  final Call<V> delegate;
+
+  ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate)
{
     this.executor = executor;
     this.limiter = limiter;
-    this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
-    this.supplier = supplier;
+    this.delegate = delegate;
   }
 
-  // TODO: refactor this when in-memory no longer executes storage ops during assembly time
-  ThrottledCall(ThrottledCall<V> other) {
-    this(other.executor, other.limiter,
-      other.delegate == null ? other.supplier : () -> other.delegate.clone());
-  }
+  @Override protected V doExecute() throws IOException {
+    Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
 
-  // TODO: we cannot currently extend Call.Base as tests execute the call multiple times,
-  // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor
out
-  // the need for assembly time throttling (fix to in-memory storage)
-  @Override public V execute() throws IOException {
     try {
-      delegate = supplier.get();
-
       // Make sure we throttle
       Future<V> future = executor.submit(() -> {
         String oldName = setCurrentThreadName(delegate.toString());
@@ -115,9 +95,11 @@ final class ThrottledCall<V> extends Call<V> {
     }
   }
 
-  @Override public void enqueue(Callback<V> callback) {
+  @Override protected void doEnqueue(Callback<V> callback) {
+    Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new);
+
     try {
-      executor.execute(new QueuedCall(callback));
+      executor.execute(new QueuedCall<>(delegate, callback, limitListener));
     } catch (RuntimeException | Error e) {
       propagateIfFatal(e);
       // Ignoring in all cases here because storage itself isn't saying we need to throttle.
 Though, we may still be
@@ -127,21 +109,12 @@ final class ThrottledCall<V> extends Call<V> {
     }
   }
 
-  @Override public void cancel() {
-    canceled = true;
-    if (delegate != null) delegate.cancel();
-  }
-
-  @Override public boolean isCanceled() {
-    return canceled || (delegate != null && delegate.isCanceled());
-  }
-
   @Override public Call<V> clone() {
-    return new ThrottledCall<>(this);
+    return new ThrottledCall<>(executor, limiter, delegate.clone());
   }
 
   @Override public String toString() {
-    return "Throttled" + supplier;
+    return "Throttled(" + delegate + ")";
   }
 
   static String setCurrentThreadName(String name) {
@@ -151,18 +124,20 @@ final class ThrottledCall<V> extends Call<V> {
     return originalName;
   }
 
-  final class QueuedCall implements Runnable {
+  static final class QueuedCall<V> implements Runnable {
+    final Call<V> delegate;
     final Callback<V> callback;
+    final Listener limitListener;
 
-    QueuedCall(Callback<V> callback) {
+    QueuedCall(Call<V> delegate, Callback<V> callback, Listener limitListener)
{
+      this.delegate = delegate;
       this.callback = callback;
+      this.limitListener = limitListener;
     }
 
     @Override public void run() {
       try {
-        if (isCanceled()) return;
-
-        delegate = ThrottledCall.this.supplier.get();
+        if (delegate.isCanceled()) return;
 
         String oldName = setCurrentThreadName(delegate.toString());
         try {
@@ -185,15 +160,19 @@ final class ThrottledCall<V> extends Call<V> {
       // This ensures we don't exceed our throttle/queue limits.
       throttleCallback.await();
     }
+
+    @Override public String toString() {
+      return "QueuedCall{delegate=" + delegate + ", callback=" + callback + "}";
+    }
   }
 
   static final class ThrottledCallback<V> implements Callback<V> {
-    final Callback<V> supplier;
+    final Callback<V> delegate;
     final Listener limitListener;
     final CountDownLatch latch = new CountDownLatch(1);
 
-    ThrottledCallback(Callback<V> supplier, Listener limitListener) {
-      this.supplier = supplier;
+    ThrottledCallback(Callback<V> delegate, Listener limitListener) {
+      this.delegate = delegate;
       this.limitListener = limitListener;
     }
 
@@ -210,7 +189,7 @@ final class ThrottledCall<V> extends Call<V> {
     @Override public void onSuccess(V value) {
       try {
         limitListener.onSuccess();
-        supplier.onSuccess(value);
+        delegate.onSuccess(value);
       } finally {
         latch.countDown();
       }
@@ -224,10 +203,14 @@ final class ThrottledCall<V> extends Call<V> {
           limitListener.onIgnore();
         }
 
-        supplier.onError(t);
+        delegate.onError(t);
       } finally {
         latch.countDown();
       }
     }
+
+    @Override public String toString() {
+      return "Throttled(" + delegate + ")";
+    }
   }
 }
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
index 91e7b78..42dc85c 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java
@@ -101,10 +101,10 @@ public final class ThrottledStorageComponent extends StorageComponent
{
   }
 
   @Override public String toString() {
-    return "Throttled" + delegate;
+    return "Throttled(" + delegate + ")";
   }
 
-  final class ThrottledSpanConsumer implements SpanConsumer {
+  static final class ThrottledSpanConsumer implements SpanConsumer {
     final SpanConsumer delegate;
     final Limiter<Void> limiter;
     final ExecutorService executor;
@@ -116,11 +116,11 @@ public final class ThrottledStorageComponent extends StorageComponent
{
     }
 
     @Override public Call<Void> accept(List<Span> spans) {
-      return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans));
+      return new ThrottledCall<>(executor, limiter, delegate.accept(spans));
     }
 
     @Override public String toString() {
-      return "Throttled" + delegate;
+      return "Throttled(" + delegate + ")";
     }
   }
 
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
index 00eb02f..bce9bf6 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt
@@ -21,6 +21,7 @@ import com.netflix.concurrency.limits.Limiter.Listener
 import com.netflix.concurrency.limits.limit.SettableLimit
 import com.netflix.concurrency.limits.limiter.SimpleLimiter
 import org.assertj.core.api.Assertions.assertThat
+import org.junit.After
 import org.junit.Test
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
@@ -41,39 +42,34 @@ import java.util.concurrent.RejectedExecutionException
 import java.util.concurrent.Semaphore
 import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.TimeUnit
-import java.util.function.Supplier
 
-// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to
be
-//  refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct
way)
 class ThrottledCallTest {
-  var limit = SettableLimit.startingAt(0)
-  var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
+  val limit = SettableLimit.startingAt(0)
+  val limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>()
 
-  inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
+  val numThreads = 1
+  val executor = Executors.newSingleThreadExecutor();
+  @After fun shutdownExecutor() = executor.shutdown()
 
-  @Test fun callCreation_isDeferred() {
-    val created = booleanArrayOf(false)
+  inline fun <reified T : Any> mock() = Mockito.mock(T::class.java)
 
-    val throttle = createThrottle(Supplier {
-      created[0] = true
-      Call.create<Void>(null)
-    })
+  @Test fun niceToString() {
+    val delegate: Call<Void> = mock()
+    `when`(delegate.toString()).thenReturn("StoreSpansCall{}")
 
-    assertThat(created).contains(false)
-    throttle.execute()
-    assertThat(created).contains(true)
+    assertThat(ThrottledCall(executor, limiter, delegate))
+      .hasToString("Throttled(StoreSpansCall{})")
   }
 
   @Test fun execute_isThrottled() {
-    val numThreads = 1
     val queueSize = 1
     val totalTasks = numThreads + queueSize
+    limit.limit = totalTasks
 
     val startLock = Semaphore(numThreads)
     val waitLock = Semaphore(totalTasks)
     val failLock = Semaphore(1)
-    val throttle =
-      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+    val throttled = throttle(LockedCall(startLock, waitLock))
 
     // Step 1: drain appropriate locks
     startLock.drainPermits()
@@ -83,7 +79,7 @@ class ThrottledCallTest {
     // Step 2: saturate threads and fill queue
     val backgroundPool = Executors.newCachedThreadPool()
     for (i in 0 until totalTasks) {
-      backgroundPool.submit(Callable { throttle.execute() })
+      backgroundPool.submit(Callable { throttled.clone().execute() })
     }
 
     try {
@@ -93,7 +89,7 @@ class ThrottledCallTest {
       // Step 4: submit something beyond our limits
       val future = backgroundPool.submit(Callable {
         try {
-          throttle.execute()
+          throttled.execute()
         } catch (e: IOException) {
           throw RuntimeException(e)
         } finally {
@@ -125,7 +121,7 @@ class ThrottledCallTest {
     val call = FakeCall()
     call.overCapacity = true
 
-    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call
})
+    val throttle = ThrottledCall(executor, mockLimiter(listener), call)
     try {
       throttle.execute()
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -137,8 +133,7 @@ class ThrottledCallTest {
   @Test fun execute_ignoresLimit_whenPoolFull() {
     val listener: Listener = mock()
 
-    val throttle =
-      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+    val throttle = ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
     try {
       throttle.execute()
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -148,14 +143,13 @@ class ThrottledCallTest {
   }
 
   @Test fun enqueue_isThrottled() {
-    val numThreads = 1
     val queueSize = 1
     val totalTasks = numThreads + queueSize
+    limit.limit = totalTasks
 
     val startLock = Semaphore(numThreads)
     val waitLock = Semaphore(totalTasks)
-    val throttle =
-      createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) })
+    val throttle = throttle(LockedCall(startLock, waitLock))
 
     // Step 1: drain appropriate locks
     startLock.drainPermits()
@@ -164,7 +158,7 @@ class ThrottledCallTest {
     // Step 2: saturate threads and fill queue
     val callback: Callback<Void> = mock()
     for (i in 0 until totalTasks) {
-      throttle.enqueue(callback)
+      throttle.clone().enqueue(callback)
     }
 
     // Step 3: make sure the threads actually started
@@ -172,7 +166,7 @@ class ThrottledCallTest {
 
     try {
       // Step 4: submit something beyond our limits and make sure it fails
-      throttle.enqueue(callback)
+      throttle.clone().enqueue(callback)
 
       assertThat(true).isFalse() // should raise a RejectedExecutionException
     } catch (e: RejectedExecutionException) {
@@ -187,7 +181,7 @@ class ThrottledCallTest {
     val call = FakeCall()
     call.overCapacity = true
 
-    val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call
})
+    val throttle = ThrottledCall(executor, mockLimiter(listener), call)
     val latch = CountDownLatch(1)
     throttle.enqueue(object : Callback<Void> {
       override fun onSuccess(value: Void) {
@@ -207,7 +201,7 @@ class ThrottledCallTest {
     val listener: Listener = mock()
 
     val throttle =
-      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() })
+      ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall())
     try {
       throttle.enqueue(null)
       assertThat(true).isFalse() // should raise a RejectedExecutionException
@@ -216,18 +210,7 @@ class ThrottledCallTest {
     }
   }
 
-  private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void>
{
-    return createThrottle(1, 1, delegate)
-  }
-
-  private fun createThrottle(
-    poolSize: Int,
-    queueSize: Int,
-    delegate: Supplier<Call<Void>>
-  ): ThrottledCall<Void> {
-    limit.setLimit(limit.getLimit() + 1)
-    return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate)
-  }
+  private fun throttle(delegate: Call<Void>) = ThrottledCall(executor, limiter, delegate)
 
   private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>()
{
     override fun doExecute(): Void? {
@@ -252,11 +235,6 @@ class ThrottledCallTest {
     override fun clone() = LockedCall(startLock, waitLock);
   }
 
-  private fun createPool(poolSize: Int, queueSize: Int): ExecutorService {
-    return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS,
-      LinkedBlockingQueue(queueSize))
-  }
-
   private fun mockExhaustedPool(): ExecutorService {
     val mock: ExecutorService = mock()
     doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any())
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
index 705967a..2a804df 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledStorageComponentTest.kt
@@ -46,6 +46,6 @@ class ThrottledStorageComponentTest {
 
   @Test fun niceToString() {
     assertThat(ThrottledStorageComponent(delegate, registry, 1, 2, 1))
-      .hasToString("ThrottledInMemoryStorage{traceCount=0}");
+      .hasToString("Throttled(InMemoryStorage{traceCount=0})");
   }
 }
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
index e5d7c6f..2361907 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/BulkCallBuilder.java
@@ -16,8 +16,11 @@
  */
 package zipkin2.elasticsearch.internal;
 
+import com.google.auto.value.AutoValue;
 import com.squareup.moshi.JsonWriter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.RejectedExecutionException;
 import okhttp3.HttpUrl;
 import okhttp3.MediaType;
@@ -42,7 +45,7 @@ public final class BulkCallBuilder {
   final boolean waitForRefresh;
 
   // Mutated for each call to index
-  final Buffer buffer = new Buffer();
+  final List<IndexEntry<?>> entries = new ArrayList<>();
 
   public BulkCallBuilder(ElasticsearchStorage es, float esVersion, String tag) {
     this.tag = tag;
@@ -52,45 +55,23 @@ public final class BulkCallBuilder {
     waitForRefresh = es.flushOnWrites();
   }
 
-  enum CheckForErrors implements HttpCall.BodyConverter<Void> {
-    INSTANCE;
+  static <T> IndexEntry<T> newIndexEntry(String index, String typeName, T input,
+    BulkIndexWriter<T> writer) {
+    return new AutoValue_BulkCallBuilder_IndexEntry<>(index, typeName, input, writer);
+  }
 
-    @Override public Void convert(BufferedSource b) throws IOException {
-      String content = b.readUtf8();
-      if (content.contains("\"status\":429")) throw new RejectedExecutionException(content);
-      if (content.contains("\"errors\":true")) throw new IllegalStateException(content);
-      return null;
-    }
+  @AutoValue static abstract class IndexEntry<T> {
+    abstract String index();
 
-    @Override public String toString() {
-      return "CheckForErrors";
-    }
-  }
+    abstract String typeName();
 
-  public <T> void index(String index, String typeName, T input, BulkIndexWriter<T>
writer) {
-    Buffer document = new Buffer();
-    String id = writer.writeDocument(input, document);
-    writeIndexMetadata(buffer, index, typeName, id);
-    buffer.writeByte('\n');
-    buffer.write(document, document.size());
-    buffer.writeByte('\n');
+    abstract T input();
+
+    abstract BulkIndexWriter<T> writer();
   }
 
-  void writeIndexMetadata(Buffer indexBuffer, String index, String typeName, String id) {
-    JsonWriter jsonWriter = JsonWriter.of(indexBuffer);
-    try {
-      jsonWriter.beginObject();
-      jsonWriter.name("index");
-      jsonWriter.beginObject();
-      jsonWriter.name("_index").value(index);
-      // the _type parameter is needed for Elasticsearch < 6.x
-      if (shouldAddType) jsonWriter.name("_type").value(typeName);
-      jsonWriter.name("_id").value(id);
-      jsonWriter.endObject();
-      jsonWriter.endObject();
-    } catch (IOException e) {
-      throw new AssertionError(e); // No I/O writing to a Buffer.
-    }
+  public <T> void index(String index, String typeName, T input, BulkIndexWriter<T>
writer) {
+    entries.add(newIndexEntry(index, typeName, input, writer));
   }
 
   /** Creates a bulk request when there is more than one object to store */
@@ -99,36 +80,72 @@ public final class BulkCallBuilder {
     if (pipeline != null) urlBuilder.addQueryParameter("pipeline", pipeline);
     if (waitForRefresh) urlBuilder.addQueryParameter("refresh", "wait_for");
 
-    RequestBody body = new BufferRequestBody(buffer);
+    RequestBody body = new BulkRequestBody(entries, shouldAddType);
 
     Request request = new Request.Builder().url(urlBuilder.build()).tag(tag).post(body).build();
     return http.newCall(request, CheckForErrors.INSTANCE);
   }
 
   /** This avoids allocating a large byte array (by using a poolable buffer instead). */
-  static final class BufferRequestBody extends RequestBody {
-    final long contentLength;
-    final Buffer buffer;
+  static final class BulkRequestBody extends RequestBody {
+    final List<IndexEntry<?>> entries;
+    final boolean shouldAddType;
 
-    BufferRequestBody(Buffer buffer) {
-      this.contentLength = buffer.size();
-      this.buffer = buffer;
+    BulkRequestBody(List<IndexEntry<?>> entries, boolean shouldAddType) {
+      this.entries = entries;
+      this.shouldAddType = shouldAddType;
     }
 
     @Override public MediaType contentType() {
       return APPLICATION_JSON;
     }
 
-    @Override public long contentLength() {
-      return contentLength;
+    @Override public void writeTo(BufferedSink sink) throws IOException {
+      for (int i = 0, length = entries.size(); i < length; i++) {
+        write(sink, entries.get(i), shouldAddType);
+      }
     }
+  }
+
+  static void write(BufferedSink sink, IndexEntry entry, boolean shouldAddType) throws IOException
{
+    Buffer document = new Buffer();
+    String id = entry.writer().writeDocument(entry.input(), document);
+    writeIndexMetadata(sink, entry, id, shouldAddType);
+    sink.writeByte('\n');
+    sink.write(document, document.size());
+    sink.writeByte('\n');
+  }
+
+  static void writeIndexMetadata(BufferedSink sink, IndexEntry entry, String id,
+    boolean shouldAddType) {
+    JsonWriter jsonWriter = JsonWriter.of(sink);
+    try {
+      jsonWriter.beginObject();
+      jsonWriter.name("index");
+      jsonWriter.beginObject();
+      jsonWriter.name("_index").value(entry.index());
+      // the _type parameter is needed for Elasticsearch < 6.x
+      if (shouldAddType) jsonWriter.name("_type").value(entry.typeName());
+      jsonWriter.name("_id").value(id);
+      jsonWriter.endObject();
+      jsonWriter.endObject();
+    } catch (IOException e) {
+      throw new AssertionError(e); // No I/O writing to a Buffer.
+    }
+  }
 
-    @Override public boolean isOneShot() {
-      return true;
+  enum CheckForErrors implements HttpCall.BodyConverter<Void> {
+    INSTANCE;
+
+    @Override public Void convert(BufferedSource b) throws IOException {
+      String content = b.readUtf8();
+      if (content.contains("\"status\":429")) throw new RejectedExecutionException(content);
+      if (content.contains("\"errors\":true")) throw new IllegalStateException(content);
+      return null;
     }
 
-    @Override public void writeTo(BufferedSink sink) throws IOException {
-      sink.write(buffer, contentLength);
+    @Override public String toString() {
+      return "CheckForErrors";
     }
   }
 }
diff --git a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
index 507ad8e..736a960 100644
--- a/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
+++ b/zipkin-storage/elasticsearch/src/main/java/zipkin2/elasticsearch/internal/client/HttpCall.java
@@ -31,7 +31,7 @@ import okio.Okio;
 import zipkin2.Call;
 import zipkin2.Callback;
 
-public final class HttpCall<V> extends Call<V> {
+public final class HttpCall<V> extends Call.Base<V> {
 
   public interface BodyConverter<V> {
     V convert(BufferedSource content) throws IOException;
@@ -61,7 +61,6 @@ public final class HttpCall<V> extends Call<V> {
   public final BodyConverter<V> bodyConverter;
   final Semaphore semaphore;
 
-
   HttpCall(Factory factory, Request request, BodyConverter<V> bodyConverter) {
     this(
       factory.ok.newCall(request),
@@ -76,7 +75,7 @@ public final class HttpCall<V> extends Call<V> {
     this.bodyConverter = bodyConverter;
   }
 
-  @Override public V execute() throws IOException {
+  @Override protected V doExecute() throws IOException {
     if (!semaphore.tryAcquire()) throw new IllegalStateException("over capacity");
     try {
       return parseResponse(call.execute(), bodyConverter);
@@ -85,22 +84,18 @@ public final class HttpCall<V> extends Call<V> {
     }
   }
 
-  @Override public void enqueue(Callback<V> delegate) {
+  @Override protected void doEnqueue(Callback<V> callback) {
     if (!semaphore.tryAcquire()) {
-      delegate.onError(new IllegalStateException("over capacity"));
+      callback.onError(new IllegalStateException("over capacity"));
       return;
     }
-    call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, delegate));
+    call.enqueue(new V2CallbackAdapter<>(semaphore, bodyConverter, callback));
   }
 
-  @Override public void cancel() {
+  @Override protected void doCancel() {
     call.cancel();
   }
 
-  @Override public boolean isCanceled() {
-    return call.isCanceled();
-  }
-
   @Override public HttpCall<V> clone() {
     return new HttpCall<V>(call.clone(), semaphore, bodyConverter);
   }
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
index b221fa0..0847f7d 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/InternalForTests.java
@@ -22,8 +22,8 @@ import java.io.UncheckedIOException;
 import java.util.List;
 import okio.BufferedSink;
 import zipkin2.DependencyLink;
-import zipkin2.elasticsearch.internal.BulkIndexWriter;
 import zipkin2.elasticsearch.internal.BulkCallBuilder;
+import zipkin2.elasticsearch.internal.BulkIndexWriter;
 
 /** Package accessor for integration tests */
 public class InternalForTests {
@@ -32,9 +32,8 @@ public class InternalForTests {
     String index = ((ElasticsearchSpanConsumer) es.spanConsumer())
       .formatTypeAndTimestampForInsert("dependency", midnightUTC);
     BulkCallBuilder indexer = new BulkCallBuilder(es, es.version(), "indexlinks");
-    for (DependencyLink link : links) {
-      indexer.index(index, "dependency", link, DEPENDENCY_LINK_BULK_INDEX_SUPPORT);
-    }
+    for (DependencyLink link : links)
+      indexer.index(index, "dependency", link, DEPENDENCY_LINK_WRITER);
     try {
       indexer.build().execute();
     } catch (IOException e) {
@@ -42,7 +41,7 @@ public class InternalForTests {
     }
   }
 
-  static final BulkIndexWriter<DependencyLink> DEPENDENCY_LINK_BULK_INDEX_SUPPORT =
+  static final BulkIndexWriter<DependencyLink> DEPENDENCY_LINK_WRITER =
     new BulkIndexWriter<DependencyLink>() {
       @Override public String writeDocument(DependencyLink link, BufferedSink sink) {
         JsonWriter writer = JsonWriter.of(sink);
diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java
b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java
index 18b3c97..9f3ed2e 100644
--- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java
+++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/internal/client/HttpCallTest.java
@@ -93,6 +93,25 @@ public class HttpCallTest {
   }
 
   @Test
+  public void cloned() throws Exception {
+    mws.enqueue(new MockResponse());
+
+    Call<?> call = http.newCall(request, b -> null);
+    call.execute();
+
+    try {
+      call.execute();
+      failBecauseExceptionWasNotThrown(IllegalStateException.class);
+    } catch (IllegalStateException expected) {
+      assertThat(expected).isInstanceOf(IllegalStateException.class);
+    }
+
+    mws.enqueue(new MockResponse());
+
+    call.clone().execute();
+  }
+
+  @Test
   public void executionException_httpFailure() throws Exception {
     mws.enqueue(new MockResponse().setResponseCode(500));
 
diff --git a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
index edac7a5..a883065 100644
--- a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
+++ b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java
@@ -23,17 +23,21 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.junit.Before;
 import org.junit.Test;
+import zipkin2.Call;
+import zipkin2.Callback;
 import zipkin2.Endpoint;
 import zipkin2.Span;
 import zipkin2.internal.Trace;
 
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static zipkin2.TestObjects.BACKEND;
 import static zipkin2.TestObjects.CLIENT_SPAN;
 import static zipkin2.TestObjects.DAY;
@@ -106,6 +110,58 @@ public abstract class ITSpanStore {
     allShouldWorkWhenEmpty();
   }
 
+  @Test public void consumer_properlyImplementsCallContract_execute() throws IOException
{
+    Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+
+    // Ensure the implementation didn't accidentally do I/O at assembly time.
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+    call.execute();
+
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+      .containsExactly(LOTS_OF_SPANS[0]);
+
+    try {
+      call.execute();
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalStateException e) {
+    }
+
+    // no problem to clone a call
+    call.clone().execute();
+  }
+
+  @Test public void consumer_properlyImplementsCallContract_submit() throws Exception {
+    Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0]));
+    // Ensure the implementation didn't accidentally do I/O at assembly time.
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty();
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Callback<Void> callback = new Callback<Void>() {
+      @Override public void onSuccess(Void value) {
+        latch.countDown();
+      }
+
+      @Override public void onError(Throwable t) {
+        latch.countDown();
+      }
+    };
+
+    call.enqueue(callback);
+    latch.await();
+
+    assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute())
+      .containsExactly(LOTS_OF_SPANS[0]);
+
+    try {
+      call.enqueue(callback);
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalStateException e) {
+    }
+
+    // no problem to clone a call
+    call.clone().execute();
+  }
+
   /**
    * Ideally, storage backends can deduplicate identical documents as this will prevent some
    * analysis problems such as double-counting dependency links or other statistics. While
this test
diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
index ce72238..88d2195 100644
--- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
+++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import zipkin2.Call;
+import zipkin2.Callback;
 import zipkin2.DependencyLink;
 import zipkin2.Endpoint;
 import zipkin2.Span;
@@ -189,8 +190,11 @@ public final class InMemoryStorage extends StorageComponent implements
SpanStore
     autocompleteTags.clear();
   }
 
-  @Override
-  public synchronized Call<Void> accept(List<Span> spans) {
+  @Override public Call<Void> accept(List<Span> spans) {
+    return new StoreSpansCall(spans);
+  }
+
+  synchronized void doAccept(List<Span> spans) {
     int delta = spans.size();
     int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount;
     evictToRecoverSpans(spansToRecover);
@@ -221,7 +225,36 @@ public final class InMemoryStorage extends StorageComponent implements
SpanStore
         }
       }
     }
-    return Call.create(null /* Void == null */);
+  }
+
+  final class StoreSpansCall extends Call.Base<Void> {
+    final List<Span> spans;
+
+    StoreSpansCall(List<Span> spans) {
+      this.spans = spans;
+    }
+
+    @Override protected Void doExecute() {
+      doAccept(spans);
+      return null;
+    }
+
+    @Override protected void doEnqueue(Callback<Void> callback) {
+      try {
+        callback.onSuccess(doExecute());
+      } catch (RuntimeException | Error e) {
+        Call.propagateIfFatal(e);
+        callback.onError(e);
+      }
+    }
+
+    @Override public Call<Void> clone() {
+      return new StoreSpansCall(spans);
+    }
+
+    @Override public String toString() {
+      return "StoreSpansCall{" + spans + "}";
+    }
   }
 
   /** Returns the count of spans evicted. */
diff --git a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
index 24f617f..7ed0d61 100644
--- a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
+++ b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java
@@ -83,9 +83,9 @@ public class InMemoryStorageTest {
   }
 
   /** Ensures we don't overload a partition due to key equality being conflated with order
*/
-  @Test public void differentiatesOnTraceIdWhenTimestampEqual() {
-    storage.accept(asList(CLIENT_SPAN));
-    storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build()));
+  @Test public void differentiatesOnTraceIdWhenTimestampEqual() throws IOException {
+    storage.accept(asList(CLIENT_SPAN)).execute();
+    storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build())).execute();
 
     assertThat(storage).extracting("spansByTraceIdTimeStamp.delegate")
       .allSatisfy(map -> assertThat((Map) map).hasSize(2));
@@ -100,8 +100,8 @@ public class InMemoryStorageTest {
       .timestamp(TODAY * 1000)
       .build();
 
-    storage.accept(asList(span));
-    storage.accept(asList(span));
+    storage.accept(asList(span)).execute();
+    storage.accept(asList(span)).execute();
 
     assertThat(storage.getDependencies(TODAY + 1000L, TODAY).execute()).containsOnly(
       DependencyLink.newBuilder().parent("kafka").child("app").callCount(1L).build()
@@ -119,7 +119,7 @@ public class InMemoryStorageTest {
       .timestamp(TODAY * 1000)
       .build();
 
-    storage.accept(asList(span1, span2));
+    storage.accept(asList(span1, span2)).execute();
 
     assertThat(storage.getSpanNames("app").execute()).containsOnly(
       "root"
@@ -153,7 +153,7 @@ public class InMemoryStorageTest {
       .putTag("http.path", "/users")
       .timestamp(TODAY * 1000)
       .build();
-    storage.accept(asList(span1, span2, span3, span4));
+    storage.accept(asList(span1, span2, span3, span4)).execute();
 
     assertThat(storage.getKeys().execute()).containsOnlyOnce("http.path");
     assertThat(storage.getValues("http.path").execute()).containsOnlyOnce("/users");


Mime
View raw message