zipkin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adrianc...@apache.org
Subject [incubator-zipkin] branch master updated: Fixes logging and metrics for collectors (#2552)
Date Thu, 02 May 2019 04:01:55 GMT
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git


The following commit(s) were added to refs/heads/master by this push:
     new 134b53c  Fixes logging and metrics for collectors (#2552)
134b53c is described below

commit 134b53c4abb1b3209a534799b8cdc901478dbb56
Author: Adrian Cole <adriancole@users.noreply.github.com>
AuthorDate: Thu May 2 12:01:47 2019 +0800

    Fixes logging and metrics for collectors (#2552)
    
    Before, we were inconsistent about incrementing metrics, and in worst
    case, we double-incremented. This adds tests for the messaging based
    collectors and backfills tests to make the base Collector class 100%
    coverage.
    
    Separately, we have recently changed our web endpoint to produce its
    own error messages to the client. In other words, we no longer rely on
    (often unnecessary) wrapping of exceptions to correct the message being
    sent back. This changes the logic around message formatting so it is
    only used when logging is enabled.
---
 .travis.yml                                        |   1 +
 .../src/main/java/zipkin2/collector/Collector.java | 126 ++++++------
 .../test/java/zipkin2/collector/CollectorTest.java | 218 +++++++++++++++------
 .../zipkin2/collector/kafka/KafkaCollector.java    |   5 +-
 .../collector/kafka/KafkaCollectorWorker.java      |  13 +-
 ...fkaCollectorTest.java => ITKafkaCollector.java} | 107 +++++-----
 .../collector/kafka08/KafkaStreamProcessor.java    |   9 +-
 ...fkaCollectorTest.java => ITKafkaCollector.java} | 108 +++++-----
 .../collector/rabbitmq/RabbitMQCollector.java      |  11 +-
 .../collector/rabbitmq/ITRabbitMQCollector.java    |  40 ++--
 .../collector/scribe/ScribeSpanConsumer.java       |  24 +--
 .../collector/scribe/ScribeSpanConsumerTest.java   | 142 ++++++++------
 .../main/java/zipkin2/junit/ZipkinDispatcher.java  |  28 ++-
 .../test/java/zipkin2/junit/ZipkinRuleTest.java    |  13 +-
 .../server/internal/ZipkinGrpcCollector.java       |   5 +
 .../server/internal/ZipkinHttpCollector.java       |   1 +
 .../server/internal/ITZipkinGrpcCollector.kt       |  10 +-
 .../src/main/java/zipkin2/internal/JsonCodec.java  |   4 +-
 .../test/java/zipkin2/internal/JsonCodecTest.java  |  28 ++-
 19 files changed, 516 insertions(+), 377 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 3727799..3660f64 100755
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,6 +15,7 @@ cache:
 language: java
 
 jdk:
+  # needs to be JDK 1.8 as long as we start Kafka 0.8
   - oraclejdk8
 
 services:
diff --git a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java
index 7a7d63e..dd54d39 100644
--- a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java
+++ b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Supplier;
 import java.util.logging.Logger;
 import zipkin2.Callback;
 import zipkin2.Span;
@@ -29,6 +30,7 @@ import zipkin2.storage.StorageComponent;
 
 import static java.lang.String.format;
 import static java.util.logging.Level.FINE;
+import static zipkin2.Call.propagateIfFatal;
 
 /**
  * This component takes action on spans received from a transport. This includes deserializing,
@@ -39,6 +41,13 @@ import static java.util.logging.Level.FINE;
  * threads.
  */
 public class Collector { // not final for mock
+  static final Callback<Void> NOOP_CALLBACK = new Callback<Void>() {
+    @Override public void onSuccess(Void value) {
+    }
+
+    @Override public void onError(Throwable t) {
+    }
+  };
 
   /** Needed to scope this to the correct logging category */
   public static Builder newBuilder(Class<?> loggingClass) {
@@ -103,41 +112,60 @@ public class Collector { // not final for mock
     }
     metrics.incrementSpans(spans.size());
 
-    List<Span> sampled = sample(spans);
-    if (sampled.isEmpty()) {
+    List<Span> sampledSpans = sample(spans);
+    if (sampledSpans.isEmpty()) {
       callback.onSuccess(null);
       return;
     }
 
+    // In order to ensure callers are not blocked, we swap callbacks when we get to the storage
+    // phase of this process. Here, we create a callback whose sole purpose is classifying later
+    // errors on this bundle of spans in the same log category. This allows people to only turn on
+    // debug logging in one place.
+    Callback<Void> logOnErrorCallback = storeSpansCallback(sampledSpans);
     try {
-      record(sampled, acceptSpansCallback(sampled));
-      callback.onSuccess(null);
-    } catch (RuntimeException e) {
-      callback.onError(errorStoringSpans(sampled, e));
-      return;
+      store(sampledSpans, logOnErrorCallback);
+      callback.onSuccess(null); // release the callback given to the collector
+    } catch (RuntimeException | Error e) {
+      // While unexpected, invoking the storage command could raise an error synchronously. When
+      // that's the case, we wouldn't have invoked callback.onSuccess, so we need to handle the
+      // error here.
+      handleStorageError(spans, e, callback);
     }
   }
 
+  /**
+   * Before calling this, call {@link CollectorMetrics#incrementMessages()}, and {@link
+   * CollectorMetrics#incrementBytes(int)}. Do not call any other metrics callbacks as those are
+   * handled internal to this method.
+   *
+   * @param serialized not empty message
+   */
   public void acceptSpans(byte[] serialized, Callback<Void> callback) {
     BytesDecoder<Span> decoder;
     try {
       decoder = SpanBytesDecoderDetector.decoderForListMessage(serialized);
-    } catch (RuntimeException e) {
-      metrics.incrementBytes(serialized.length);
-      callback.onError(errorReading(e));
+    } catch (RuntimeException | Error e) {
+      handleDecodeError(e, callback);
       return;
     }
     acceptSpans(serialized, decoder, callback);
   }
 
+  /**
+   * Before calling this, call {@link CollectorMetrics#incrementMessages()}, and {@link
+   * CollectorMetrics#incrementBytes(int)}. Do not call any other metrics callbacks as those are
+   * handled internal to this method.
+   *
+   * @param serializedSpans not empty message
+   */
   public void acceptSpans(
-      byte[] serializedSpans, BytesDecoder<Span> decoder, Callback<Void> callback) {
-    metrics.incrementBytes(serializedSpans.length);
+    byte[] serializedSpans, BytesDecoder<Span> decoder, Callback<Void> callback) {
     List<Span> spans;
     try {
       spans = decodeList(decoder, serializedSpans);
-    } catch (RuntimeException e) {
-      callback.onError(errorReading(e));
+    } catch (RuntimeException | Error e) {
+      handleDecodeError(e, callback);
       return;
     }
     accept(spans, callback);
@@ -149,22 +177,14 @@ public class Collector { // not final for mock
     return out;
   }
 
-  void record(List<Span> sampled, Callback<Void> callback) {
-    storage.spanConsumer().accept(sampled).enqueue(callback);
+  void store(List<Span> sampledSpans, Callback<Void> callback) {
+    storage.spanConsumer().accept(sampledSpans).enqueue(callback);
   }
 
   String idString(Span span) {
     return span.traceId() + "/" + span.id();
   }
 
-  boolean shouldWarn() {
-    return logger.isLoggable(FINE);
-  }
-
-  void warn(String message, Throwable e) {
-    logger.log(FINE, message, e);
-  }
-
   List<Span> sample(List<Span> input) {
     List<Span> sampled = new ArrayList<>(input.size());
     for (int i = 0, length = input.size(); i < length; i++) {
@@ -178,60 +198,56 @@ public class Collector { // not final for mock
     return sampled;
   }
 
-  Callback<Void> acceptSpansCallback(final List<Span> spans) {
+  Callback<Void> storeSpansCallback(final List<Span> spans) {
     return new Callback<Void>() {
-      @Override
-      public void onSuccess(Void value) {}
+      @Override public void onSuccess(Void value) {
+      }
 
-      @Override
-      public void onError(Throwable t) {
-        errorStoringSpans(spans, t);
+      @Override public void onError(Throwable t) {
+        handleStorageError(spans, t, NOOP_CALLBACK);
       }
 
-      @Override
-      public String toString() {
-        return appendSpanIds(spans, new StringBuilder("AcceptSpans(")).append(")").toString();
+      @Override public String toString() {
+        return appendSpanIds(spans, new StringBuilder("StoreSpans(")) + ")";
       }
     };
   }
 
-  RuntimeException errorReading(Throwable e) {
-    return errorReading("Cannot decode spans", e);
-  }
-
-  RuntimeException errorReading(String message, Throwable e) {
+  void handleDecodeError(Throwable e, Callback<Void> callback) {
     metrics.incrementMessagesDropped();
-    return doError(message, e);
+    handleError(e, "Cannot decode spans"::toString, callback);
   }
 
   /**
    * When storing spans, an exception can be raised before or after the fact. This adds context of
    * span ids to give logs more relevance.
    */
-  RuntimeException errorStoringSpans(List<Span> spans, Throwable e) {
+  void handleStorageError(List<Span> spans, Throwable e, Callback<Void> callback) {
     metrics.incrementSpansDropped(spans.size());
     // The exception could be related to a span being huge. Instead of filling logs,
     // print trace id, span id pairs
-    StringBuilder msg = appendSpanIds(spans, new StringBuilder("Cannot store spans "));
-    return doError(msg.toString(), e);
+    handleError(e, () -> appendSpanIds(spans, new StringBuilder("Cannot store spans ")), callback);
   }
 
-  RuntimeException doError(String message, Throwable e) {
+  void handleError(Throwable e, Supplier<String> defaultLogMessage, Callback<Void> callback) {
+    propagateIfFatal(e);
+    callback.onError(e);
+    if (!logger.isLoggable(FINE)) return;
+
     String error = e.getMessage() != null ? e.getMessage() : "";
-    if (e instanceof RuntimeException
-        && (error.startsWith("Malformed") || error.startsWith("Truncated"))) {
-      if (shouldWarn()) warn(error, e);
-      return (RuntimeException) e;
-    } else {
-      if (shouldWarn()) {
-        message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), error);
-        warn(message, e);
-      }
-      return new RuntimeException(message, e);
+    // We have specific code that customizes log messages. Use this when the case.
+    if (error.startsWith("Malformed") || error.startsWith("Truncated")) {
+      logger.log(FINE, error, e);
+    } else { // otherwise, beautify the message
+      String message =
+        format("%s due to %s(%s)", defaultLogMessage.get(), e.getClass().getSimpleName(), error);
+      logger.log(FINE, message, e);
     }
   }
 
-  StringBuilder appendSpanIds(List<Span> spans, StringBuilder message) {
+  // TODO: this logic needs to be redone as service names are more important than span IDs. Also,
+  // span IDs repeat between client and server!
+  String appendSpanIds(List<Span> spans, StringBuilder message) {
     message.append("[");
     int i = 0;
     Iterator<Span> iterator = spans.iterator();
@@ -241,6 +257,6 @@ public class Collector { // not final for mock
     }
     if (iterator.hasNext()) message.append("...");
 
-    return message.append("]");
+    return message.append("]").toString();
   }
 }
diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
index 1f7ec8a..aabcbcb 100644
--- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
+++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
@@ -16,151 +16,239 @@
  */
 package zipkin2.collector;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import zipkin2.Callback;
 import zipkin2.Span;
 import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.codec.SpanBytesEncoder;
+import zipkin2.storage.InMemoryStorage;
 import zipkin2.storage.StorageComponent;
 
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static zipkin2.TestObjects.CLIENT_SPAN;
+import static zipkin2.TestObjects.TRACE;
+import static zipkin2.TestObjects.UTF_8;
 
 public class CollectorTest {
-  StorageComponent storage = mock(StorageComponent.class);
-  Callback callback = mock(Callback.class);
+  InMemoryStorage storage = InMemoryStorage.newBuilder().build();
+  Callback<Void> callback = mock(Callback.class);
+  CollectorMetrics metrics = mock(CollectorMetrics.class);
   Collector collector;
+  List<String> messages = new ArrayList<>();
+
+  Logger logger = new Logger("", null) {
+    {
+      setLevel(Level.ALL);
+    }
+
+    @Override public void log(Level level, String msg, Throwable thrown) {
+      assertThat(level).isEqualTo(Level.FINE);
+      messages.add(unprefixIdString(msg));
+    }
+  };
 
   @Before
   public void setup() {
-    collector = spy(Collector.newBuilder(Collector.class).storage(storage).build());
-    when(collector.shouldWarn()).thenReturn(true);
+    collector = spy(new Collector.Builder(logger).metrics(metrics).storage(storage).build());
     when(collector.idString(CLIENT_SPAN)).thenReturn("1"); // to make expectations easier to read
   }
 
+  @After
+  public void after() {
+    verifyNoMoreInteractions(metrics, callback);
+  }
+
   @Test
   public void unsampledSpansArentStored() {
-    when(storage.spanConsumer()).thenThrow(new AssertionError());
-
-    collector =
-        Collector.newBuilder(Collector.class)
-            .sampler(CollectorSampler.create(0.0f))
-            .storage(storage)
-            .build();
-
-    collector.accept(asList(CLIENT_SPAN), callback);
+    collector = new Collector.Builder(logger)
+      .sampler(CollectorSampler.create(0.0f))
+      .metrics(metrics)
+      .storage(storage)
+      .build();
+
+    collector.accept(TRACE, callback);
+
+    verify(callback).onSuccess(null);
+    assertThat(messages).isEmpty();
+    verify(metrics).incrementSpans(4);
+    verify(metrics).incrementSpansDropped(4);
+    assertThat(storage.getTraces()).isEmpty();
   }
 
   @Test
   public void errorDetectingFormat() {
-    CollectorMetrics metrics = mock(CollectorMetrics.class);
-
-    collector = Collector.newBuilder(Collector.class).metrics(metrics).storage(storage).build();
-
     collector.acceptSpans(new byte[] {'f', 'o', 'o'}, callback);
 
+    verify(callback).onError(any(RuntimeException.class));
     verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void convertsSpan2Format() {
-    byte[] bytes = SpanBytesEncoder.JSON_V2.encodeList(asList(CLIENT_SPAN));
+  public void acceptSpans_jsonV2() {
+    byte[] bytes = SpanBytesEncoder.JSON_V2.encodeList(TRACE);
     collector.acceptSpans(bytes, callback);
 
     verify(collector).acceptSpans(bytes, SpanBytesDecoder.JSON_V2, callback);
-    verify(collector).accept(asList(CLIENT_SPAN), callback);
+
+    verify(callback).onSuccess(null);
+    assertThat(messages).isEmpty();
+    verify(metrics).incrementSpans(4);
+    assertThat(storage.getTraces()).containsOnly(TRACE);
   }
 
   @Test
-  public void acceptSpansCallback_toStringIncludesSpanIds() {
-    Span span2 = CLIENT_SPAN.toBuilder().id("3").build();
-    when(collector.idString(span2)).thenReturn("3");
+  public void acceptSpans_decodingError() {
+    byte[] bytes = "[\"='".getBytes(UTF_8); // screwed up json
+    collector.acceptSpans(bytes, SpanBytesDecoder.JSON_V2, callback);
 
-    assertThat(collector.acceptSpansCallback(asList(CLIENT_SPAN, span2)))
-        .hasToString("AcceptSpans([1, 3])");
+    verify(callback).onError(any(IllegalArgumentException.class));
+    assertThat(messages).containsOnly("Malformed reading List<Span> from json");
+    verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void acceptSpansCallback_toStringIncludesSpanIds_noMoreThan3() {
-    assertThat(
-            collector.acceptSpansCallback(
-                asList(CLIENT_SPAN, CLIENT_SPAN, CLIENT_SPAN, CLIENT_SPAN)))
-        .hasToString("AcceptSpans([1, 1, 1, ...])");
+  public void accept_storageError() {
+    StorageComponent storage = mock(StorageComponent.class);
+    RuntimeException error = new RuntimeException("storage disabled");
+    when(storage.spanConsumer()).thenThrow(error);
+    collector = new Collector.Builder(logger)
+      .metrics(metrics)
+      .storage(storage)
+      .build();
+
+    collector.accept(TRACE, callback);
+
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 2, 2, ...] due to RuntimeException(storage disabled)");
+    verify(metrics).incrementSpans(4);
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
-  public void acceptSpansCallback_onErrorWithNullMessage() {
-    Callback<Void> callback = collector.acceptSpansCallback(asList(CLIENT_SPAN));
+  public void acceptSpans_emptyMessageOk() {
+    byte[] bytes = new byte[] {'[', ']'};
+    collector.acceptSpans(bytes, callback);
 
-    RuntimeException exception = new RuntimeException();
-    callback.onError(exception);
+    verify(collector).acceptSpans(bytes, SpanBytesDecoder.JSON_V1, callback);
 
-    verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception);
+    verify(callback).onSuccess(null);
+    assertThat(messages).isEmpty();
+    assertThat(storage.getTraces()).isEmpty();
   }
 
   @Test
-  public void acceptSpansCallback_onErrorWithMessage() {
-    Callback<Void> callback = collector.acceptSpansCallback(asList(CLIENT_SPAN));
-    RuntimeException exception = new IllegalArgumentException("no beer");
-    callback.onError(exception);
+  public void storeSpansCallback_toStringIncludesSpanIds() {
+    Span span2 = CLIENT_SPAN.toBuilder().id("3").build();
+    when(collector.idString(span2)).thenReturn("3");
+
+    assertThat(collector.storeSpansCallback(asList(CLIENT_SPAN, span2)))
+      .hasToString("StoreSpans([1, 3])");
+  }
 
-    verify(collector)
-        .warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception);
+  @Test
+  public void storeSpansCallback_toStringIncludesSpanIds_noMoreThan3() {
+    assertThat(unprefixIdString(collector.storeSpansCallback(TRACE).toString()))
+      .hasToString("StoreSpans([1, 1, 2, ...])");
   }
 
   @Test
-  public void errorAcceptingSpans_onErrorWithNullMessage() {
-    String message =
-        collector.errorStoringSpans(asList(CLIENT_SPAN), new RuntimeException()).getMessage();
+  public void storeSpansCallback_onErrorWithNullMessage() {
+    Callback<Void> callback = collector.storeSpansCallback(TRACE);
+    callback.onError(new RuntimeException());
 
-    assertThat(message).isEqualTo("Cannot store spans [1] due to RuntimeException()");
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to RuntimeException()");
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
-  public void errorAcceptingSpans_onErrorWithMessage() {
-    RuntimeException exception = new IllegalArgumentException("no beer");
-    String message = collector.errorStoringSpans(asList(CLIENT_SPAN), exception).getMessage();
+  public void storeSpansCallback_onErrorWithMessage() {
+    Callback<Void> callback = collector.storeSpansCallback(TRACE);
+    callback.onError(new IllegalArgumentException("no beer"));
 
-    assertThat(message)
-        .isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)");
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to IllegalArgumentException(no beer)");
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
-  public void errorDecoding_onErrorWithNullMessage() {
-    String message = collector.errorReading(new RuntimeException()).getMessage();
+  public void handleStorageError_onErrorWithNullMessage() {
+    RuntimeException error = new RuntimeException();
+    collector.handleStorageError(TRACE, error, callback);
+
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to RuntimeException()");
+    verify(metrics).incrementSpansDropped(4);
+  }
 
-    assertThat(message).isEqualTo("Cannot decode spans due to RuntimeException()");
+  @Test
+  public void handleStorageError_onErrorWithMessage() {
+    RuntimeException error = new IllegalArgumentException("no beer");
+    collector.handleStorageError(TRACE, error, callback);
+
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to IllegalArgumentException(no beer)");
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
-  public void errorDecoding_onErrorWithMessage() {
-    RuntimeException exception = new IllegalArgumentException("no beer");
-    String message = collector.errorReading(exception).getMessage();
+  public void handleDecodeError_onErrorWithNullMessage() {
+    RuntimeException error = new RuntimeException();
+    collector.handleDecodeError(error, callback);
 
-    assertThat(message).isEqualTo("Cannot decode spans due to IllegalArgumentException(no beer)");
+    verify(callback).onError(error);
+    assertThat(messages).containsOnly("Cannot decode spans due to RuntimeException()");
+    verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void errorDecoding_doesntWrapMalformedException() {
-    RuntimeException exception = new IllegalArgumentException("Malformed reading spans");
+  public void handleDecodeError_onErrorWithMessage() {
+    IllegalArgumentException error = new IllegalArgumentException("no beer");
+    collector.handleDecodeError(error, callback);
 
-    String message = collector.errorReading(exception).getMessage();
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot decode spans due to IllegalArgumentException(no beer)");
+    verify(metrics).incrementMessagesDropped();
+  }
+
+  @Test
+  public void handleDecodeError_doesntWrapMessageOnMalformedException() {
+    IllegalArgumentException error = new IllegalArgumentException("Malformed reading spans");
+    collector.handleDecodeError(error, callback);
 
-    assertThat(message).isEqualTo("Malformed reading spans");
+    verify(callback).onError(error);
+    assertThat(messages).containsOnly("Malformed reading spans");
+    verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void errorDecoding_doesntWrapTruncatedException() {
-    RuntimeException exception = new IllegalArgumentException("Truncated reading spans");
+  public void handleDecodeError_doesntWrapMessageOnTruncatedException() {
+    IllegalArgumentException error = new IllegalArgumentException("Truncated reading spans");
+    collector.handleDecodeError(error, callback);
 
-    String message = collector.errorReading(exception).getMessage();
+    verify(callback).onError(error);
+    assertThat(messages).containsOnly("Truncated reading spans");
+    verify(metrics).incrementMessagesDropped();
+  }
 
-    assertThat(message).isEqualTo("Truncated reading spans");
+  String unprefixIdString(String msg) {
+    return msg.replaceAll("7180c278b62e8f6a216a2aea45d08fc9/000000000000000", "");
   }
 }
diff --git a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java
index 0e3a42b..0757877 100644
--- a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java
+++ b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java
@@ -19,13 +19,10 @@ package zipkin2.collector.kafka;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -226,7 +223,7 @@ public final class KafkaCollector extends CollectorComponent {
     void close() {
       ExecutorService maybePool = pool;
       if (maybePool == null) return;
-      for(KafkaCollectorWorker worker: workers) {
+      for (KafkaCollectorWorker worker : workers) {
         worker.stop();
       }
       maybePool.shutdown();
diff --git a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java
index b60fe55..713a7eb 100644
--- a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java
+++ b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InterruptException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import zipkin2.Callback;
@@ -89,21 +88,25 @@ final class KafkaCollectorWorker implements Runnable {
         final ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(Duration.of(1000, ChronoUnit.MILLIS));
         LOG.debug("Kafka polling returned batch of {} messages.", consumerRecords.count());
         for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-          metrics.incrementMessages();
           final byte[] bytes = record.value();
+          metrics.incrementMessages();
+          metrics.incrementBytes(bytes.length);
+
+          if (bytes.length == 0) continue; // lenient on empty messages
 
           if (bytes.length < 2) { // need two bytes to check if protobuf
             metrics.incrementMessagesDropped();
           } else {
             // If we received legacy single-span encoding, decode it into a singleton list
             if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not list */) {
-              metrics.incrementBytes(bytes.length);
+              Span span;
               try {
-                Span span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
-                collector.accept(Collections.singletonList(span), NOOP);
+                span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
               } catch (RuntimeException e) {
                 metrics.incrementMessagesDropped();
+                continue;
               }
+              collector.accept(Collections.singletonList(span), NOOP);
             } else {
               collector.acceptSpans(bytes, NOOP);
             }
diff --git a/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaCollectorTest.java b/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/ITKafkaCollector.java
similarity index 84%
rename from zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaCollectorTest.java
rename to zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/ITKafkaCollector.java
index a131d11..0990128 100644
--- a/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaCollectorTest.java
+++ b/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/ITKafkaCollector.java
@@ -49,9 +49,11 @@ import zipkin2.storage.StorageComponent;
 import static org.assertj.core.api.Assertions.assertThat;
 import static zipkin2.TestObjects.CLIENT_SPAN;
 import static zipkin2.TestObjects.LOTS_OF_SPANS;
+import static zipkin2.TestObjects.UTF_8;
+import static zipkin2.codec.SpanBytesEncoder.JSON_V2;
 import static zipkin2.codec.SpanBytesEncoder.THRIFT;
 
-public class KafkaCollectorTest {
+public class ITKafkaCollector {
 
   static final int RANDOM_PORT = -1;
   static final EphemeralKafkaBroker broker =
@@ -156,44 +158,22 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(1);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
   @Test
   public void messageWithMultipleSpans_thrift() throws Exception {
-    KafkaCollector.Builder builder = builder("multiple_spans_thrift");
-
-    byte[] bytes = THRIFT.encodeList(spans);
-    produceSpans(bytes, builder.topic);
-
-    try (KafkaCollector collector = builder.build()) {
-      collector.start();
-      assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(2);
+    messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT);
   }
 
   /** Ensures list encoding works: a json encoded list of spans */
   @Test
   public void messageWithMultipleSpans_json() throws Exception {
-    KafkaCollector.Builder builder = builder("multiple_spans_json");
-
-    byte[] bytes = SpanBytesEncoder.JSON_V1.encodeList(spans);
-    produceSpans(bytes, builder.topic);
-
-    try (KafkaCollector collector = builder.build()) {
-      collector.start();
-      assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(2);
+    messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1);
   }
 
   /** Ensures list encoding works: a version 2 json list of spans */
@@ -220,8 +200,10 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures malformed spans don't hang the collector */
@@ -229,10 +211,12 @@ public class KafkaCollectorTest {
   public void skipsMalformedData() throws Exception {
     KafkaCollector.Builder builder = builder("decoder_exception");
 
+    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
+    byte[] malformed2 = "malformed".getBytes(UTF_8);
     produceSpans(THRIFT.encodeList(spans), builder.topic);
     produceSpans(new byte[0], builder.topic);
-    produceSpans("[\"='".getBytes(), builder.topic); // screwed up json
-    produceSpans("malformed".getBytes(), builder.topic);
+    produceSpans(malformed1, builder.topic);
+    produceSpans(malformed2, builder.topic);
     produceSpans(THRIFT.encodeList(spans), builder.topic);
 
     try (KafkaCollector collector = builder.build()) {
@@ -242,39 +226,38 @@ public class KafkaCollectorTest {
       assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(3);
+    assertThat(kafkaMetrics.messages()).isEqualTo(5);
+    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
+    assertThat(kafkaMetrics.bytes())
+      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Guards against errors that leak from storage, such as InvalidQueryException */
   @Test
-  public void skipsOnSpanConsumerException() throws Exception {
+  public void skipsOnSpanStorageException() throws Exception {
     AtomicInteger counter = new AtomicInteger();
-    consumer =
-        (input) ->
-            new Call.Base<Void>() {
-
-              @Override
-              protected Void doExecute() {
-                throw new AssertionError();
-              }
-
-              @Override
-              protected void doEnqueue(Callback callback) {
-                if (counter.getAndIncrement() == 1) {
-                  callback.onError(new RuntimeException("storage fell over"));
-                } else {
-                  receivedSpans.add(spans);
-                  callback.onSuccess(null);
-                }
-              }
-
-              @Override
-              public Call clone() {
-                throw new AssertionError();
-              }
-            };
+    consumer = (input) -> new Call.Base<Void>() {
+      @Override protected Void doExecute() {
+        throw new AssertionError();
+      }
+
+      @Override protected void doEnqueue(Callback<Void> callback) {
+        if (counter.getAndIncrement() == 1) {
+          callback.onError(new RuntimeException("storage fell over"));
+        } else {
+          receivedSpans.add(spans);
+          callback.onSuccess(null);
+        }
+      }
+
+      @Override public Call<Void> clone() {
+        throw new AssertionError();
+      }
+    };
     final StorageComponent storage = buildStorage(consumer);
-    KafkaCollector.Builder builder = builder("consumer_exception").storage(storage);
+    KafkaCollector.Builder builder = builder("storage_exception").storage(storage);
 
     produceSpans(THRIFT.encodeList(spans), builder.topic);
     produceSpans(THRIFT.encodeList(spans), builder.topic); // tossed on error
@@ -287,7 +270,11 @@ public class KafkaCollectorTest {
       assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.messages()).isEqualTo(3);
+    assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure
+    assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3);
+    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
   }
 
   @Test
@@ -296,7 +283,7 @@ public class KafkaCollectorTest {
 
     warmUpTopic(builder.topic);
 
-    final byte[] traceBytes = THRIFT.encodeList(spans);
+    final byte[] traceBytes = JSON_V2.encodeList(spans);
     try (KafkaCollector collector = builder.build()) {
       collector.start();
       waitForPartitionAssignments(collector);
@@ -308,9 +295,11 @@ public class KafkaCollectorTest {
 
     assertThat(threadsProvidingSpans.size()).isEqualTo(2);
 
-    assertThat(kafkaMetrics.messages()).isEqualTo(3);
+    assertThat(kafkaMetrics.messages()).isEqualTo(3); // 2 + empty body for warmup
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(traceBytes.length * 2);
     assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   @Test
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
index acd6198..4232c2e 100644
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
+++ b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
@@ -53,6 +53,8 @@ final class KafkaStreamProcessor implements Runnable {
     while (messages.hasNext()) {
       byte[] bytes = messages.next().message();
       metrics.incrementMessages();
+      metrics.incrementBytes(bytes.length);
+      if (bytes.length == 0) continue; // lenient on empty messages
 
       if (bytes.length < 2) { // need two bytes to check if protobuf
         metrics.incrementMessagesDropped();
@@ -61,13 +63,14 @@ final class KafkaStreamProcessor implements Runnable {
 
       // If we received legacy single-span encoding, decode it into a singleton list
       if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
+        Span span;
         try {
-          metrics.incrementBytes(bytes.length);
-          Span span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
-          collector.accept(Collections.singletonList(span), NOOP);
+          span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
         } catch (RuntimeException e) {
           metrics.incrementMessagesDropped();
+          continue;
         }
+        collector.accept(Collections.singletonList(span), NOOP);
       } else {
         collector.acceptSpans(bytes, NOOP);
       }
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaCollectorTest.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
similarity index 76%
rename from zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaCollectorTest.java
rename to zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
index c02c89c..290a71e 100644
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaCollectorTest.java
+++ b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
@@ -42,9 +42,10 @@ import zipkin2.storage.StorageComponent;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static zipkin2.TestObjects.CLIENT_SPAN;
+import static zipkin2.TestObjects.UTF_8;
 import static zipkin2.codec.SpanBytesEncoder.THRIFT;
 
-public class KafkaCollectorTest {
+public class ITKafkaCollector {
   @Rule public ExpectedException thrown = ExpectedException.none();
   @ClassRule public static Timeout globalTimeout = Timeout.seconds(20);
 
@@ -55,11 +56,10 @@ public class KafkaCollectorTest {
   InMemoryCollectorMetrics kafkaMetrics = metrics.forTransport("kafka");
 
   LinkedBlockingQueue<List<Span>> recvdSpans = new LinkedBlockingQueue<>();
-  SpanConsumer consumer =
-      (spans) -> {
-        recvdSpans.add(spans);
-        return Call.create(null);
-      };
+  SpanConsumer consumer = (spans) -> {
+    recvdSpans.add(spans);
+    return Call.create(null);
+  };
 
   @Test
   public void checkPasses() {
@@ -100,42 +100,22 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(1);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
   @Test
   public void messageWithMultipleSpans_thrift() throws Exception {
-    Builder builder = builder("multiple_spans_thrift");
-
-    byte[] bytes = THRIFT.encodeList(spans);
-    producer.send(new KeyedMessage<>(builder.topic, bytes));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT);
   }
 
   /** Ensures list encoding works: a json encoded list of spans */
   @Test
   public void messageWithMultipleSpans_json() throws Exception {
-    Builder builder = builder("multiple_spans_json");
-
-    byte[] bytes = SpanBytesEncoder.JSON_V1.encodeList(spans);
-    producer.send(new KeyedMessage<>(builder.topic, bytes));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1);
   }
 
   /** Ensures list encoding works: a version 2 json list of spans */
@@ -160,8 +140,10 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures malformed spans don't hang the collector */
@@ -169,10 +151,12 @@ public class KafkaCollectorTest {
   public void skipsMalformedData() throws Exception {
     Builder builder = builder("decoder_exception");
 
+    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
+    byte[] malformed2 = "malformed".getBytes(UTF_8);
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
     producer.send(new KeyedMessage<>(builder.topic, new byte[0]));
-    producer.send(new KeyedMessage<>(builder.topic, "[\"='".getBytes())); // screwed up json
-    producer.send(new KeyedMessage<>(builder.topic, "malformed".getBytes()));
+    producer.send(new KeyedMessage<>(builder.topic, malformed1));
+    producer.send(new KeyedMessage<>(builder.topic, malformed2));
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
 
     try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
@@ -181,39 +165,39 @@ public class KafkaCollectorTest {
       assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(3);
+    assertThat(kafkaMetrics.messages()).isEqualTo(5);
+    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
+    assertThat(kafkaMetrics.bytes())
+      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Guards against errors that leak from storage, such as InvalidQueryException */
   @Test
-  public void skipsOnConsumerException() throws Exception {
-    Builder builder = builder("consumer_exception");
+  public void skipsOnStorageException() throws Exception {
+    Builder builder = builder("storage_exception");
 
     AtomicInteger counter = new AtomicInteger();
-    consumer =
-        (input) ->
-            new Call.Base<Void>() {
-
-              @Override
-              protected Void doExecute() {
-                throw new AssertionError();
-              }
-
-              @Override
-              protected void doEnqueue(Callback callback) {
-                if (counter.getAndIncrement() == 1) {
-                  callback.onError(new RuntimeException("storage fell over"));
-                } else {
-                  recvdSpans.add(spans);
-                  callback.onSuccess(null);
-                }
-              }
-
-              @Override
-              public Call clone() {
-                throw new AssertionError();
-              }
-            };
+    consumer = (input) -> new Call.Base<Void>() {
+
+      @Override protected Void doExecute() {
+        throw new AssertionError();
+      }
+
+      @Override protected void doEnqueue(Callback<Void> callback) {
+        if (counter.getAndIncrement() == 1) {
+          callback.onError(new RuntimeException("storage fell over"));
+        } else {
+          recvdSpans.add(spans);
+          callback.onSuccess(null);
+        }
+      }
+
+      @Override public Call<Void> clone() {
+        throw new AssertionError();
+      }
+    };
 
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); // tossed on error
@@ -225,7 +209,11 @@ public class KafkaCollectorTest {
       assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.messages()).isEqualTo(3);
+    assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure
+    assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3);
+    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
   }
 
   Builder builder(String topic) {
diff --git a/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java b/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java
index dcff870..ec754cf 100644
--- a/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java
+++ b/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java
@@ -16,7 +16,7 @@
  */
 package zipkin2.collector.rabbitmq;
 
-import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Address;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -226,10 +226,13 @@ public final class RabbitMQCollector extends CollectorComponent {
     }
 
     @Override
-    public void handleDelivery(
-        String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+    public void handleDelivery(String tag, Envelope envelope, BasicProperties props, byte[] body) {
       metrics.incrementMessages();
-      this.collector.acceptSpans(body, NOOP);
+      metrics.incrementBytes(body.length);
+
+      if (body.length == 0) return; // lenient on empty messages
+
+      collector.acceptSpans(body, NOOP);
     }
   }
 
diff --git a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java
index fdcebaf..45359dc 100644
--- a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java
+++ b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java
@@ -16,13 +16,12 @@
  */
 package zipkin2.collector.rabbitmq;
 
+import com.rabbitmq.client.Channel;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-
-import com.rabbitmq.client.Channel;
 import org.junit.After;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -33,8 +32,10 @@ import zipkin2.codec.SpanBytesEncoder;
 import zipkin2.collector.CollectorMetrics;
 import zipkin2.storage.InMemoryStorage;
 
-import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static zipkin2.TestObjects.LOTS_OF_SPANS;
+import static zipkin2.TestObjects.UTF_8;
+import static zipkin2.codec.SpanBytesEncoder.THRIFT;
 import static zipkin2.collector.rabbitmq.RabbitMQCollector.builder;
 
 public class ITRabbitMQCollector {
@@ -71,15 +72,7 @@ public class ITRabbitMQCollector {
   /** Ensures list encoding works: a json encoded list of spans */
   @Test
   public void messageWithMultipleSpans_json() throws Exception {
-    byte[] message = SpanBytesEncoder.JSON_V1.encodeList(spans);
-    rabbit.publish(message);
-
-    Thread.sleep(1000);
-    assertThat(rabbit.storage.acceptedSpanCount()).isEqualTo(spans.size());
-
-    assertThat(rabbit.rabbitmqMetrics.messages()).isEqualTo(1);
-    assertThat(rabbit.rabbitmqMetrics.bytes()).isEqualTo(message.length);
-    assertThat(rabbit.rabbitmqMetrics.spans()).isEqualTo(spans.size());
+    messageWithMultipleSpans(SpanBytesEncoder.JSON_V1);
   }
 
   /** Ensures list encoding works: a version 2 json list of spans */
@@ -94,9 +87,7 @@ public class ITRabbitMQCollector {
     messageWithMultipleSpans(SpanBytesEncoder.PROTO3);
   }
 
-  void messageWithMultipleSpans(SpanBytesEncoder encoder)
-      throws IOException, TimeoutException, InterruptedException {
-
+  void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
     byte[] message = encoder.encodeList(spans);
     rabbit.publish(message);
 
@@ -104,22 +95,31 @@ public class ITRabbitMQCollector {
     assertThat(rabbit.storage.acceptedSpanCount()).isEqualTo(spans.size());
 
     assertThat(rabbit.rabbitmqMetrics.messages()).isEqualTo(1);
+    assertThat(rabbit.rabbitmqMetrics.messagesDropped()).isZero();
     assertThat(rabbit.rabbitmqMetrics.bytes()).isEqualTo(message.length);
     assertThat(rabbit.rabbitmqMetrics.spans()).isEqualTo(spans.size());
+    assertThat(rabbit.rabbitmqMetrics.spansDropped()).isZero();
   }
 
   /** Ensures malformed spans don't hang the collector */
   @Test
   public void skipsMalformedData() throws Exception {
-    rabbit.publish(SpanBytesEncoder.JSON_V2.encodeList(spans));
+    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
+    byte[] malformed2 = "malformed".getBytes(UTF_8);
+    rabbit.publish(THRIFT.encodeList(spans));
     rabbit.publish(new byte[0]);
-    rabbit.publish("[\"='".getBytes()); // screwed up json
-    rabbit.publish("malformed".getBytes());
-    rabbit.publish(SpanBytesEncoder.JSON_V2.encodeList(spans));
+    rabbit.publish(malformed1);
+    rabbit.publish(malformed2);
+    rabbit.publish(THRIFT.encodeList(spans));
 
     Thread.sleep(1000);
+
     assertThat(rabbit.rabbitmqMetrics.messages()).isEqualTo(5);
-    assertThat(rabbit.rabbitmqMetrics.messagesDropped()).isEqualTo(3);
+    assertThat(rabbit.rabbitmqMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
+    assertThat(rabbit.rabbitmqMetrics.bytes())
+      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
+    assertThat(rabbit.rabbitmqMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(rabbit.rabbitmqMetrics.spansDropped()).isZero();
   }
 
   /** See GitHub issue #2068 */
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
index 4e7cc6e..644514a 100644
--- a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -42,22 +43,23 @@ final class ScribeSpanConsumer implements Scribe {
   }
 
   @Override
-  public ListenableFuture<ResultCode> log(List<LogEntry> messages) {
+  public ListenableFuture<ResultCode> log(List<LogEntry> logEntries) {
     metrics.incrementMessages();
-    List<Span> spans;
+    List<Span> spans = new ArrayList<>();
+    int byteCount = 0;
     try {
-      spans =
-          messages
-              .stream()
-              .filter(m -> m.category.equals(category))
-              .map(m -> m.message.getBytes(StandardCharsets.ISO_8859_1))
-              .map(b -> Base64.getMimeDecoder().decode(b)) // finagle-zipkin uses mime encoding
-              .peek(b -> metrics.incrementBytes(b.length))
-              .map(SpanBytesDecoder.THRIFT::decodeOne)
-              .collect(Collectors.toList());
+      for (LogEntry logEntry : logEntries) {
+        if (!category.equals(logEntry.category)) continue;
+        byte[] bytes = logEntry.message.getBytes(StandardCharsets.ISO_8859_1);
+        bytes = Base64.getMimeDecoder().decode(bytes); // finagle-zipkin uses mime encoding
+        byteCount += bytes.length;
+        spans.add(SpanBytesDecoder.THRIFT.decodeOne(bytes));
+      }
     } catch (RuntimeException e) {
       metrics.incrementMessagesDropped();
       return Futures.immediateFailedFuture(e);
+    } finally {
+      metrics.incrementBytes(byteCount);
     }
 
     SettableFuture<ResultCode> result = SettableFuture.create();
diff --git a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
index 22320a4..567b87f 100644
--- a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
+++ b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
@@ -19,9 +19,7 @@ package zipkin2.collector.scribe;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.concurrent.ExecutionException;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import zipkin2.Call;
 import zipkin2.Callback;
 import zipkin2.CheckResult;
@@ -39,11 +37,9 @@ import zipkin2.v1.V1SpanConverter;
 import static com.google.common.base.Charsets.UTF_8;
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.core.Is.isA;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 
 public class ScribeSpanConsumerTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
   // scope to scribe as we aren't creating the consumer with the builder.
   InMemoryCollectorMetrics scribeMetrics = new InMemoryCollectorMetrics().forTransport("scribe");
 
@@ -98,16 +94,17 @@ public class ScribeSpanConsumerTest {
     assertThat(storage.getTraces()).containsExactly(asList(v2));
 
     assertThat(scribeMetrics.messages()).isEqualTo(1);
+    assertThat(scribeMetrics.messagesDropped()).isZero();
     assertThat(scribeMetrics.bytes()).isEqualTo(bytes.length);
     assertThat(scribeMetrics.spans()).isEqualTo(1);
+    assertThat(scribeMetrics.spansDropped()).isZero();
   }
 
   @Test
   public void entriesWithoutSpansAreSkipped() throws Exception {
-    SpanConsumer consumer =
-        (callback) -> {
-          throw new AssertionError(); // as we shouldn't get here.
-        };
+    SpanConsumer consumer = (callback) -> {
+      throw new AssertionError(); // as we shouldn't get here.
+    };
 
     ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
 
@@ -117,8 +114,11 @@ public class ScribeSpanConsumerTest {
 
     scribe.log(asList(entry)).get();
 
+    assertThat(scribeMetrics.messages()).isEqualTo(1);
+    assertThat(scribeMetrics.messagesDropped()).isZero();
     assertThat(scribeMetrics.bytes()).isZero();
     assertThat(scribeMetrics.spans()).isZero();
+    assertThat(scribeMetrics.spansDropped()).isZero();
   }
 
   @Test
@@ -129,18 +129,25 @@ public class ScribeSpanConsumerTest {
     entry.category = "zipkin";
     entry.message = "notbase64";
 
-    thrown.expect(ExecutionException.class); // from dereferenced future
-    thrown.expectCause(isA(IllegalArgumentException.class));
+    try {
+      scribe.log(asList(entry)).get();
+      failBecauseExceptionWasNotThrown(ExecutionException.class);
+    } catch (ExecutionException e) {
+      assertThat(e.getCause()).isInstanceOf(IllegalArgumentException.class);
+    }
 
-    scribe.log(asList(entry)).get();
+    assertThat(scribeMetrics.messages()).isEqualTo(1);
+    assertThat(scribeMetrics.messagesDropped()).isEqualTo(1);
+    assertThat(scribeMetrics.bytes()).isZero();
+    assertThat(scribeMetrics.spans()).isZero();
+    assertThat(scribeMetrics.spansDropped()).isZero();
   }
 
   @Test
   public void consumerExceptionBeforeCallbackSetsFutureException() throws Exception {
-    consumer =
-        (input) -> {
-          throw new NullPointerException();
-        };
+    consumer = (input) -> {
+      throw new NullPointerException("endpoint was null");
+    };
 
     ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
 
@@ -148,10 +155,18 @@ public class ScribeSpanConsumerTest {
     entry.category = "zipkin";
     entry.message = encodedSpan;
 
-    thrown.expect(ExecutionException.class); // from dereferenced future
-    thrown.expectMessage("Cannot store spans [abfb01327cc4d38f/cdd29fb81067d374]");
+    try {
+      scribe.log(asList(entry)).get();
+      failBecauseExceptionWasNotThrown(ExecutionException.class);
+    } catch (ExecutionException e) {
+      assertThat(e.getCause()).hasMessage("endpoint was null");
+    }
 
-    scribe.log(asList(entry)).get();
+    assertThat(scribeMetrics.messages()).isEqualTo(1);
+    assertThat(scribeMetrics.messagesDropped()).isZero();
+    assertThat(scribeMetrics.bytes()).isEqualTo(bytes.length);
+    assertThat(scribeMetrics.spans()).isEqualTo(1);
+    assertThat(scribeMetrics.spansDropped()).isEqualTo(1);
   }
 
   /**
@@ -160,25 +175,20 @@ public class ScribeSpanConsumerTest {
    */
   @Test
   public void callbackExceptionDoesntThrow() throws Exception {
-    consumer =
-        (input) ->
-            new Call.Base<Void>() {
-
-              @Override
-              protected Void doExecute() {
-                throw new AssertionError();
-              }
-
-              @Override
-              protected void doEnqueue(Callback callback) {
-                callback.onError(new NullPointerException());
-              }
-
-              @Override
-              public Call clone() {
-                throw new AssertionError();
-              }
-            };
+    consumer = (input) -> new Call.Base<Void>() {
+      @Override protected Void doExecute() {
+        throw new AssertionError();
+      }
+
+      @Override protected void doEnqueue(Callback<Void> callback) {
+        callback.onError(new NullPointerException());
+      }
+
+      @Override
+      public Call<Void> clone() {
+        throw new AssertionError();
+      }
+    };
 
     ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
 
@@ -188,6 +198,10 @@ public class ScribeSpanConsumerTest {
 
     scribe.log(asList(entry)).get();
 
+    assertThat(scribeMetrics.messages()).isEqualTo(1);
+    assertThat(scribeMetrics.messagesDropped()).isZero();
+    assertThat(scribeMetrics.bytes()).isEqualTo(bytes.length);
+    assertThat(scribeMetrics.spans()).isEqualTo(1);
     assertThat(scribeMetrics.spansDropped()).isEqualTo(1);
   }
 
@@ -203,34 +217,36 @@ public class ScribeSpanConsumerTest {
     newScribeSpanConsumer(entry.category, consumer).log(asList(entry)).get();
 
     assertThat(storage.getTraces()).containsExactly(asList(v2));
+
+    assertThat(scribeMetrics.messages()).isEqualTo(1);
+    assertThat(scribeMetrics.messagesDropped()).isZero();
+    assertThat(scribeMetrics.bytes())
+      .isEqualTo(Base64.getMimeDecoder().decode(entry.message).length);
+    assertThat(scribeMetrics.spans()).isEqualTo(1);
+    assertThat(scribeMetrics.spansDropped()).isZero();
   }
 
   ScribeSpanConsumer newScribeSpanConsumer(String category, SpanConsumer consumer) {
     return new ScribeSpanConsumer(
-        ScribeCollector.newBuilder()
-            .category(category)
-            .metrics(scribeMetrics)
-            .storage(
-                new StorageComponent() {
-                  @Override
-                  public SpanStore spanStore() {
-                    throw new AssertionError();
-                  }
-
-                  @Override
-                  public SpanConsumer spanConsumer() {
-                    return consumer;
-                  }
-
-                  @Override
-                  public CheckResult check() {
-                    return CheckResult.OK;
-                  }
-
-                  @Override
-                  public void close() {
-                    throw new AssertionError();
-                  }
-                }));
+      ScribeCollector.newBuilder()
+        .category(category)
+        .metrics(scribeMetrics)
+        .storage(new StorageComponent() {
+          @Override public SpanStore spanStore() {
+            throw new AssertionError();
+          }
+
+          @Override public SpanConsumer spanConsumer() {
+            return consumer;
+          }
+
+          @Override public CheckResult check() {
+            return CheckResult.OK;
+          }
+
+          @Override public void close() {
+            throw new AssertionError();
+          }
+        }));
   }
 }
diff --git a/zipkin-junit/src/main/java/zipkin2/junit/ZipkinDispatcher.java b/zipkin-junit/src/main/java/zipkin2/junit/ZipkinDispatcher.java
index 7f1ccda..a79d22b 100644
--- a/zipkin-junit/src/main/java/zipkin2/junit/ZipkinDispatcher.java
+++ b/zipkin-junit/src/main/java/zipkin2/junit/ZipkinDispatcher.java
@@ -66,8 +66,8 @@ final class ZipkinDispatcher extends Dispatcher {
   }
 
   MockResponse acceptSpans(RecordedRequest request, SpanBytesDecoder decoder) {
-    metrics.incrementMessages();
     byte[] body = request.getBody().readByteArray();
+    metrics.incrementMessages();
     String encoding = request.getHeader("Content-Encoding");
     if (encoding != null && encoding.contains("gzip")) {
       try {
@@ -80,23 +80,21 @@ final class ZipkinDispatcher extends Dispatcher {
         return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
       }
     }
+    metrics.incrementBytes(body.length);
 
     final MockResponse result = new MockResponse();
-    consumer.acceptSpans(
-        body,
-        decoder,
-        new Callback<Void>() {
-          @Override
-          public void onSuccess(Void value) {
-            result.setResponseCode(202);
-          }
+    if (body.length == 0) return result.setResponseCode(202); // lenient on empty
 
-          @Override
-          public void onError(Throwable t) {
-            String message = t.getMessage();
-            result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
-          }
-        });
+    consumer.acceptSpans(body, decoder, new Callback<Void>() {
+      @Override public void onSuccess(Void value) {
+        result.setResponseCode(202);
+      }
+
+      @Override public void onError(Throwable t) {
+        String message = t.getMessage();
+        result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
+      }
+    });
     return result;
   }
 }
diff --git a/zipkin-junit/src/test/java/zipkin2/junit/ZipkinRuleTest.java b/zipkin-junit/src/test/java/zipkin2/junit/ZipkinRuleTest.java
index 0e01c38..be379c7 100644
--- a/zipkin-junit/src/test/java/zipkin2/junit/ZipkinRuleTest.java
+++ b/zipkin-junit/src/test/java/zipkin2/junit/ZipkinRuleTest.java
@@ -178,14 +178,11 @@ public class ZipkinRuleTest {
     gzipSink.close();
     ByteString gzippedJson = sink.readByteString();
 
-    client
-        .newCall(
-            new Request.Builder()
-                .url(zipkin.httpUrl() + "/api/v1/spans")
-                .addHeader("Content-Encoding", "gzip")
-                .post(RequestBody.create(MediaType.parse("application/json"), gzippedJson))
-                .build())
-        .execute();
+    client.newCall(new Request.Builder()
+      .url(zipkin.httpUrl() + "/api/v1/spans")
+      .addHeader("Content-Encoding", "gzip")
+      .post(RequestBody.create(MediaType.parse("application/json"), gzippedJson))
+      .build()).execute();
 
     assertThat(zipkin.collectorMetrics().bytes()).isEqualTo(spansInJson.length);
   }
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
index 62c3f80..9ae9a94 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
@@ -58,6 +58,11 @@ final class ZipkinGrpcCollector {
 
     @Override protected CompletableFuture<byte[]> handleMessage(byte[] bytes) {
       metrics.incrementMessages();
+      metrics.incrementBytes(bytes.length);
+
+      if (bytes.length == 0) {
+        return CompletableFuture.completedFuture(bytes); // lenient on empty messages
+      }
       CompletableFutureCallback result = new CompletableFutureCallback();
       collector.acceptSpans(bytes, SpanBytesDecoder.PROTO3, result);
       return result;
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
index 5c1769d..251c641 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
@@ -132,6 +132,7 @@ public class ZipkinHttpCollector {
     if (!decoder.decodeList(serializedSpans, spans)) {
       throw new IllegalArgumentException("Empty " + decoder.name() + " message");
     }
+    // UnzippingBytesRequestConverter handles incrementing message and bytes
     collector.accept(spans, result);
     return HttpResponse.from(result);
   }
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
index 85c5e06..d4dddf7 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
@@ -40,6 +40,7 @@ import zipkin2.codec.SpanBytesEncoder
 import zipkin2.proto3.ListOfSpans
 import zipkin2.storage.InMemoryStorage
 
+/** This tests that we accept messages constructed by other clients. */
 @SpringBootTest(classes = [ZipkinServer::class],
   webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
   properties = ["spring.config.name=zipkin-server", "zipkin.collector.grpc.enabled=true"])
@@ -74,12 +75,17 @@ class ITZipkinGrpcCollector {
       .build().create(SpanService::class)
   }
 
-  /** This tests that we accept messages constructed by other clients. */
-  @Test fun report_withWireGrpcLibrary() {
+  @Test fun report_trace() {
     runBlocking {
       spanService.Report(request) // Result is effectively void
     }
     assertThat<List<Span>>(storage.traces)
       .containsExactly(TestObjects.TRACE)
   }
+
+  @Test fun report_emptyIsOk() {
+    runBlocking {
+      spanService.Report(ListOfSpans.Builder().build())
+    }
+  }
 }
diff --git a/zipkin/src/main/java/zipkin2/internal/JsonCodec.java b/zipkin/src/main/java/zipkin2/internal/JsonCodec.java
index ecc2934..4577111 100644
--- a/zipkin/src/main/java/zipkin2/internal/JsonCodec.java
+++ b/zipkin/src/main/java/zipkin2/internal/JsonCodec.java
@@ -236,7 +236,9 @@ public final class JsonCodec {
 
   static IllegalArgumentException exceptionReading(String type, Exception e) {
     String cause = e.getMessage() == null ? "Error" : e.getMessage();
-    if (cause.indexOf("malformed") != -1) cause = "Malformed";
+    if (cause.indexOf("Expected BEGIN_OBJECT") != -1 || cause.indexOf("malformed") != -1) {
+      cause = "Malformed";
+    }
     String message = format("%s reading %s from json", cause, type);
     throw new IllegalArgumentException(message, e);
   }
diff --git a/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java b/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java
index 94e79ba..af8d253 100644
--- a/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java
+++ b/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java
@@ -16,11 +16,16 @@
  */
 package zipkin2.internal;
 
+import java.io.IOException;
+import org.assertj.core.api.Assertions;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static zipkin2.TestObjects.UTF_8;
+import static zipkin2.internal.JsonCodec.exceptionReading;
 
 public class JsonCodecTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
@@ -46,7 +51,7 @@ public class JsonCodecTest {
       }
     }
 
-    new Foo().toString();
+    new Foo().toString(); // cause the exception
   }
 
   @Test public void doesntStackOverflowOnToBufferWriterBug_Overflow() {
@@ -72,6 +77,25 @@ public class JsonCodecTest {
       }
     }
 
-    new Foo().toString();
+    new Foo().toString(); // cause the exception
+  }
+
+  @Test public void exceptionReading_malformedJsonWraps() {
+    // grab a real exception from the gson library
+    Exception error = null;
+    byte[] bytes = "[\"='".getBytes(UTF_8);
+    try {
+      new JsonCodec.JsonReader(bytes).beginObject();
+      failBecauseExceptionWasNotThrown(IllegalStateException.class);
+    } catch (IOException | IllegalStateException e) {
+      error = e;
+    }
+
+    try {
+      exceptionReading("List<Span>", error);
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalArgumentException e) {
+      assertThat(e).hasMessage("Malformed reading List<Span> from json");
+    }
   }
 }


Mime
View raw message