zipkin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adrianc...@apache.org
Subject [incubator-zipkin] 01/01: Fixes logging and metrics for collectors
Date Wed, 01 May 2019 03:58:54 GMT
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch collector-normalization
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git

commit de519ab764ef62b6b7b5a71aa1fbe105eaeacecd
Author: Adrian Cole <acole@pivotal.io>
AuthorDate: Wed May 1 11:55:44 2019 +0800

    Fixes logging and metrics for collectors
    
    Before, we were inconsistent about incrementing metrics, and in worst
    case, we double-incremented. This adds tests for the messaging based
    collectors and backfills tests to make the base Collector class 100%
    coverage.
    
    Separately, we have recently changed our web endpoint to produce its
    own error messages to the client. In other words, we no longer rely on
    (often unnecessary) wrapping of exceptions to correct the message being
    sent back. This changes the logic around message formatting so it is
    only used when logging is enabled.
---
 .travis.yml                                        |   1 +
 .../src/main/java/zipkin2/collector/Collector.java | 105 ++++++-----
 .../test/java/zipkin2/collector/CollectorTest.java | 210 +++++++++++++++------
 .../zipkin2/collector/kafka/KafkaCollector.java    |   5 +-
 .../collector/kafka/KafkaCollectorWorker.java      |  13 +-
 ...fkaCollectorTest.java => ITKafkaCollector.java} | 107 +++++------
 .../collector/kafka08/KafkaStreamProcessor.java    |   9 +-
 ...fkaCollectorTest.java => ITKafkaCollector.java} | 108 +++++------
 .../collector/rabbitmq/RabbitMQCollector.java      |  11 +-
 .../collector/rabbitmq/ITRabbitMQCollector.java    |  38 ++--
 .../collector/scribe/ScribeSpanConsumer.java       |  24 +--
 .../collector/scribe/ScribeSpanConsumerTest.java   |  42 ++---
 .../server/internal/ZipkinGrpcCollector.java       |   5 +
 .../server/internal/ZipkinHttpCollector.java       |   1 +
 .../server/internal/ITZipkinGrpcCollector.kt       |  10 +-
 .../src/main/java/zipkin2/internal/JsonCodec.java  |   4 +-
 .../test/java/zipkin2/internal/JsonCodecTest.java  |  28 ++-
 17 files changed, 421 insertions(+), 300 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 3727799..3660f64 100755
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,6 +15,7 @@ cache:
 language: java
 
 jdk:
+  # needs to be JDK 1.8 as long as we start Kafka 0.8
   - oraclejdk8
 
 services:
diff --git a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java
index 7a7d63e..9ddf348 100644
--- a/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java
+++ b/zipkin-collector/core/src/main/java/zipkin2/collector/Collector.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Supplier;
 import java.util.logging.Logger;
 import zipkin2.Callback;
 import zipkin2.Span;
@@ -29,6 +30,7 @@ import zipkin2.storage.StorageComponent;
 
 import static java.lang.String.format;
 import static java.util.logging.Level.FINE;
+import static zipkin2.Call.propagateIfFatal;
 
 /**
  * This component takes action on spans received from a transport. This includes deserializing,
@@ -39,6 +41,13 @@ import static java.util.logging.Level.FINE;
  * threads.
  */
 public class Collector { // not final for mock
+  static final Callback<Void> NOOP_CALLBACK = new Callback<Void>() {
+    @Override public void onSuccess(Void value) {
+    }
+
+    @Override public void onError(Throwable t) {
+    }
+  };
 
   /** Needed to scope this to the correct logging category */
   public static Builder newBuilder(Class<?> loggingClass) {
@@ -110,34 +119,46 @@ public class Collector { // not final for mock
     }
 
     try {
+      // adding a separate callback intentionally decouples collection from storage
       record(sampled, acceptSpansCallback(sampled));
       callback.onSuccess(null);
-    } catch (RuntimeException e) {
-      callback.onError(errorStoringSpans(sampled, e));
-      return;
+    } catch (RuntimeException | Error e) {
+      handleStorageError(spans, e, callback);
     }
   }
 
+  /**
+   * Before calling this, call {@link CollectorMetrics#incrementMessages()}, and {@link
+   * CollectorMetrics#incrementBytes(int)}. Do not call any other metrics callbacks as those are
+   * handled internal to this method.
+   *
+   * @param serialized not empty message
+   */
   public void acceptSpans(byte[] serialized, Callback<Void> callback) {
     BytesDecoder<Span> decoder;
     try {
       decoder = SpanBytesDecoderDetector.decoderForListMessage(serialized);
-    } catch (RuntimeException e) {
-      metrics.incrementBytes(serialized.length);
-      callback.onError(errorReading(e));
+    } catch (RuntimeException | Error e) {
+      handleDecodeError(e, callback);
       return;
     }
     acceptSpans(serialized, decoder, callback);
   }
 
+  /**
+   * Before calling this, call {@link CollectorMetrics#incrementMessages()}, and {@link
+   * CollectorMetrics#incrementBytes(int)}. Do not call any other metrics callbacks as those are
+   * handled internal to this method.
+   *
+   * @param serializedSpans not empty message
+   */
   public void acceptSpans(
-      byte[] serializedSpans, BytesDecoder<Span> decoder, Callback<Void> callback) {
-    metrics.incrementBytes(serializedSpans.length);
+    byte[] serializedSpans, BytesDecoder<Span> decoder, Callback<Void> callback) {
     List<Span> spans;
     try {
       spans = decodeList(decoder, serializedSpans);
-    } catch (RuntimeException e) {
-      callback.onError(errorReading(e));
+    } catch (RuntimeException | Error e) {
+      handleDecodeError(e, callback);
       return;
     }
     accept(spans, callback);
@@ -157,14 +178,6 @@ public class Collector { // not final for mock
     return span.traceId() + "/" + span.id();
   }
 
-  boolean shouldWarn() {
-    return logger.isLoggable(FINE);
-  }
-
-  void warn(String message, Throwable e) {
-    logger.log(FINE, message, e);
-  }
-
   List<Span> sample(List<Span> input) {
     List<Span> sampled = new ArrayList<>(input.size());
     for (int i = 0, length = input.size(); i < length; i++) {
@@ -180,58 +193,54 @@ public class Collector { // not final for mock
 
   Callback<Void> acceptSpansCallback(final List<Span> spans) {
     return new Callback<Void>() {
-      @Override
-      public void onSuccess(Void value) {}
+      @Override public void onSuccess(Void value) {
+      }
 
-      @Override
-      public void onError(Throwable t) {
-        errorStoringSpans(spans, t);
+      @Override public void onError(Throwable t) {
+        handleStorageError(spans, t, NOOP_CALLBACK);
       }
 
-      @Override
-      public String toString() {
-        return appendSpanIds(spans, new StringBuilder("AcceptSpans(")).append(")").toString();
+      @Override public String toString() {
+        return appendSpanIds(spans, new StringBuilder("AcceptSpans(")) + ")";
       }
     };
   }
 
-  RuntimeException errorReading(Throwable e) {
-    return errorReading("Cannot decode spans", e);
-  }
-
-  RuntimeException errorReading(String message, Throwable e) {
+  void handleDecodeError(Throwable e, Callback<Void> callback) {
     metrics.incrementMessagesDropped();
-    return doError(message, e);
+    handleError(e, "Cannot decode spans"::toString, callback);
   }
 
   /**
    * When storing spans, an exception can be raised before or after the fact. This adds context of
    * span ids to give logs more relevance.
    */
-  RuntimeException errorStoringSpans(List<Span> spans, Throwable e) {
+  void handleStorageError(List<Span> spans, Throwable e, Callback<Void> callback) {
     metrics.incrementSpansDropped(spans.size());
     // The exception could be related to a span being huge. Instead of filling logs,
     // print trace id, span id pairs
-    StringBuilder msg = appendSpanIds(spans, new StringBuilder("Cannot store spans "));
-    return doError(msg.toString(), e);
+    handleError(e, () -> appendSpanIds(spans, new StringBuilder("Cannot store spans ")), callback);
   }
 
-  RuntimeException doError(String message, Throwable e) {
+  void handleError(Throwable e, Supplier<String> defaultLogMessage, Callback<Void> callback) {
+    propagateIfFatal(e);
+    callback.onError(e);
+    if (!logger.isLoggable(FINE)) return;
+
     String error = e.getMessage() != null ? e.getMessage() : "";
-    if (e instanceof RuntimeException
-        && (error.startsWith("Malformed") || error.startsWith("Truncated"))) {
-      if (shouldWarn()) warn(error, e);
-      return (RuntimeException) e;
-    } else {
-      if (shouldWarn()) {
-        message = format("%s due to %s(%s)", message, e.getClass().getSimpleName(), error);
-        warn(message, e);
-      }
-      return new RuntimeException(message, e);
+    // We have specific code that customizes log messages. Use this when the case.
+    if (error.startsWith("Malformed") || error.startsWith("Truncated")) {
+      logger.log(FINE, error, e);
+    } else { // otherwise, beautify the message
+      String message =
+        format("%s due to %s(%s)", defaultLogMessage.get(), e.getClass().getSimpleName(), error);
+      logger.log(FINE, message, e);
     }
   }
 
-  StringBuilder appendSpanIds(List<Span> spans, StringBuilder message) {
+  // TODO: this logic needs to be redone as service names are more important than span IDs. Also,
+  // span IDs repeat between client and server!
+  String appendSpanIds(List<Span> spans, StringBuilder message) {
     message.append("[");
     int i = 0;
     Iterator<Span> iterator = spans.iterator();
@@ -241,6 +250,6 @@ public class Collector { // not final for mock
     }
     if (iterator.hasNext()) message.append("...");
 
-    return message.append("]");
+    return message.append("]").toString();
   }
 }
diff --git a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
index 1f7ec8a..d0ee433 100644
--- a/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
+++ b/zipkin-collector/core/src/test/java/zipkin2/collector/CollectorTest.java
@@ -16,65 +16,138 @@
  */
 package zipkin2.collector;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import zipkin2.Callback;
 import zipkin2.Span;
 import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.codec.SpanBytesEncoder;
+import zipkin2.storage.InMemoryStorage;
 import zipkin2.storage.StorageComponent;
 
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static zipkin2.TestObjects.CLIENT_SPAN;
+import static zipkin2.TestObjects.TRACE;
+import static zipkin2.TestObjects.UTF_8;
 
 public class CollectorTest {
-  StorageComponent storage = mock(StorageComponent.class);
-  Callback callback = mock(Callback.class);
+  InMemoryStorage storage = InMemoryStorage.newBuilder().build();
+  Callback<Void> callback = mock(Callback.class);
+  CollectorMetrics metrics = mock(CollectorMetrics.class);
   Collector collector;
+  List<String> messages = new ArrayList<>();
+
+  Logger logger = new Logger("", null) {
+    {
+      setLevel(Level.ALL);
+    }
+
+    @Override public void log(Level level, String msg, Throwable thrown) {
+      assertThat(level).isEqualTo(Level.FINE);
+      messages.add(unprefixIdString(msg));
+    }
+  };
 
   @Before
   public void setup() {
-    collector = spy(Collector.newBuilder(Collector.class).storage(storage).build());
-    when(collector.shouldWarn()).thenReturn(true);
+    collector = spy(new Collector.Builder(logger).metrics(metrics).storage(storage).build());
     when(collector.idString(CLIENT_SPAN)).thenReturn("1"); // to make expectations easier to read
   }
 
+  @After
+  public void after() {
+    verifyNoMoreInteractions(metrics, callback);
+  }
+
   @Test
   public void unsampledSpansArentStored() {
-    when(storage.spanConsumer()).thenThrow(new AssertionError());
+    collector = new Collector.Builder(logger)
+      .sampler(CollectorSampler.create(0.0f))
+      .metrics(metrics)
+      .storage(storage)
+      .build();
+
+    collector.accept(TRACE, callback);
+
+    verify(callback).onSuccess(null);
+    assertThat(messages).isEmpty();
+    verify(metrics).incrementSpans(4);
+    verify(metrics).incrementSpansDropped(4);
+    assertThat(storage.getTraces()).isEmpty();
+  }
 
-    collector =
-        Collector.newBuilder(Collector.class)
-            .sampler(CollectorSampler.create(0.0f))
-            .storage(storage)
-            .build();
+  @Test
+  public void errorDetectingFormat() {
+    collector.acceptSpans(new byte[] {'f', 'o', 'o'}, callback);
 
-    collector.accept(asList(CLIENT_SPAN), callback);
+    verify(callback).onError(any(RuntimeException.class));
+    verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void errorDetectingFormat() {
-    CollectorMetrics metrics = mock(CollectorMetrics.class);
+  public void acceptSpans_jsonV2() {
+    byte[] bytes = SpanBytesEncoder.JSON_V2.encodeList(TRACE);
+    collector.acceptSpans(bytes, callback);
 
-    collector = Collector.newBuilder(Collector.class).metrics(metrics).storage(storage).build();
+    verify(collector).acceptSpans(bytes, SpanBytesDecoder.JSON_V2, callback);
 
-    collector.acceptSpans(new byte[] {'f', 'o', 'o'}, callback);
+    verify(callback).onSuccess(null);
+    assertThat(messages).isEmpty();
+    verify(metrics).incrementSpans(4);
+    assertThat(storage.getTraces()).containsOnly(TRACE);
+  }
+
+  @Test
+  public void acceptSpans_decodingError() {
+    byte[] bytes = "[\"='".getBytes(UTF_8); // screwed up json
+    collector.acceptSpans(bytes, SpanBytesDecoder.JSON_V2, callback);
 
+    verify(callback).onError(any(IllegalArgumentException.class));
+    assertThat(messages).containsOnly("Malformed reading List<Span> from json");
     verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void convertsSpan2Format() {
-    byte[] bytes = SpanBytesEncoder.JSON_V2.encodeList(asList(CLIENT_SPAN));
+  public void accept_storageError() {
+    StorageComponent storage = mock(StorageComponent.class);
+    RuntimeException error = new RuntimeException("storage disabled");
+    when(storage.spanConsumer()).thenThrow(error);
+    collector = new Collector.Builder(logger)
+      .metrics(metrics)
+      .storage(storage)
+      .build();
+
+    collector.accept(TRACE, callback);
+
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 2, 2, ...] due to RuntimeException(storage disabled)");
+    verify(metrics).incrementSpans(4);
+    verify(metrics).incrementSpansDropped(4);
+  }
+
+  @Test
+  public void acceptSpans_emptyMessageOk() {
+    byte[] bytes = new byte[] {'[', ']'};
     collector.acceptSpans(bytes, callback);
 
-    verify(collector).acceptSpans(bytes, SpanBytesDecoder.JSON_V2, callback);
-    verify(collector).accept(asList(CLIENT_SPAN), callback);
+    verify(collector).acceptSpans(bytes, SpanBytesDecoder.JSON_V1, callback);
+
+    verify(callback).onSuccess(null);
+    assertThat(messages).isEmpty();
+    assertThat(storage.getTraces()).isEmpty();
   }
 
   @Test
@@ -83,84 +156,99 @@ public class CollectorTest {
     when(collector.idString(span2)).thenReturn("3");
 
     assertThat(collector.acceptSpansCallback(asList(CLIENT_SPAN, span2)))
-        .hasToString("AcceptSpans([1, 3])");
+      .hasToString("AcceptSpans([1, 3])");
   }
 
   @Test
   public void acceptSpansCallback_toStringIncludesSpanIds_noMoreThan3() {
-    assertThat(
-            collector.acceptSpansCallback(
-                asList(CLIENT_SPAN, CLIENT_SPAN, CLIENT_SPAN, CLIENT_SPAN)))
-        .hasToString("AcceptSpans([1, 1, 1, ...])");
+    assertThat(unprefixIdString(collector.acceptSpansCallback(TRACE).toString()))
+      .hasToString("AcceptSpans([1, 1, 2, ...])");
   }
 
   @Test
   public void acceptSpansCallback_onErrorWithNullMessage() {
-    Callback<Void> callback = collector.acceptSpansCallback(asList(CLIENT_SPAN));
+    Callback<Void> callback = collector.acceptSpansCallback(TRACE);
+    callback.onError(new RuntimeException());
 
-    RuntimeException exception = new RuntimeException();
-    callback.onError(exception);
-
-    verify(collector).warn("Cannot store spans [1] due to RuntimeException()", exception);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to RuntimeException()");
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
   public void acceptSpansCallback_onErrorWithMessage() {
-    Callback<Void> callback = collector.acceptSpansCallback(asList(CLIENT_SPAN));
-    RuntimeException exception = new IllegalArgumentException("no beer");
-    callback.onError(exception);
+    Callback<Void> callback = collector.acceptSpansCallback(TRACE);
+    callback.onError(new IllegalArgumentException("no beer"));
 
-    verify(collector)
-        .warn("Cannot store spans [1] due to IllegalArgumentException(no beer)", exception);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to IllegalArgumentException(no beer)");
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
-  public void errorAcceptingSpans_onErrorWithNullMessage() {
-    String message =
-        collector.errorStoringSpans(asList(CLIENT_SPAN), new RuntimeException()).getMessage();
-
-    assertThat(message).isEqualTo("Cannot store spans [1] due to RuntimeException()");
+  public void handleStorageError_onErrorWithNullMessage() {
+    RuntimeException error = new RuntimeException();
+    collector.handleStorageError(TRACE, error, callback);
+
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to RuntimeException()");
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
-  public void errorAcceptingSpans_onErrorWithMessage() {
-    RuntimeException exception = new IllegalArgumentException("no beer");
-    String message = collector.errorStoringSpans(asList(CLIENT_SPAN), exception).getMessage();
-
-    assertThat(message)
-        .isEqualTo("Cannot store spans [1] due to IllegalArgumentException(no beer)");
+  public void handleStorageError_onErrorWithMessage() {
+    RuntimeException error = new IllegalArgumentException("no beer");
+    collector.handleStorageError(TRACE, error, callback);
+
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot store spans [1, 1, 2, ...] due to IllegalArgumentException(no beer)");
+    verify(metrics).incrementSpansDropped(4);
   }
 
   @Test
-  public void errorDecoding_onErrorWithNullMessage() {
-    String message = collector.errorReading(new RuntimeException()).getMessage();
+  public void handleDecodeError_onErrorWithNullMessage() {
+    RuntimeException error = new RuntimeException();
+    collector.handleDecodeError(error, callback);
 
-    assertThat(message).isEqualTo("Cannot decode spans due to RuntimeException()");
+    verify(callback).onError(error);
+    assertThat(messages).containsOnly("Cannot decode spans due to RuntimeException()");
+    verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void errorDecoding_onErrorWithMessage() {
-    RuntimeException exception = new IllegalArgumentException("no beer");
-    String message = collector.errorReading(exception).getMessage();
+  public void handleDecodeError_onErrorWithMessage() {
+    IllegalArgumentException error = new IllegalArgumentException("no beer");
+    collector.handleDecodeError(error, callback);
 
-    assertThat(message).isEqualTo("Cannot decode spans due to IllegalArgumentException(no beer)");
+    verify(callback).onError(error);
+    assertThat(messages)
+      .containsOnly("Cannot decode spans due to IllegalArgumentException(no beer)");
+    verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void errorDecoding_doesntWrapMalformedException() {
-    RuntimeException exception = new IllegalArgumentException("Malformed reading spans");
-
-    String message = collector.errorReading(exception).getMessage();
+  public void handleDecodeError_doesntWrapMessageOnMalformedException() {
+    IllegalArgumentException error = new IllegalArgumentException("Malformed reading spans");
+    collector.handleDecodeError(error, callback);
 
-    assertThat(message).isEqualTo("Malformed reading spans");
+    verify(callback).onError(error);
+    assertThat(messages).containsOnly("Malformed reading spans");
+    verify(metrics).incrementMessagesDropped();
   }
 
   @Test
-  public void errorDecoding_doesntWrapTruncatedException() {
-    RuntimeException exception = new IllegalArgumentException("Truncated reading spans");
+  public void handleDecodeError_doesntWrapMessageOnTruncatedException() {
+    IllegalArgumentException error = new IllegalArgumentException("Truncated reading spans");
+    collector.handleDecodeError(error, callback);
 
-    String message = collector.errorReading(exception).getMessage();
+    verify(callback).onError(error);
+    assertThat(messages).containsOnly("Truncated reading spans");
+    verify(metrics).incrementMessagesDropped();
+  }
 
-    assertThat(message).isEqualTo("Truncated reading spans");
+  String unprefixIdString(String msg) {
+    return msg.replaceAll("7180c278b62e8f6a216a2aea45d08fc9/000000000000000", "");
   }
 }
diff --git a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java
index 0e3a42b..0757877 100644
--- a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java
+++ b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollector.java
@@ -19,13 +19,10 @@ package zipkin2.collector.kafka;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -226,7 +223,7 @@ public final class KafkaCollector extends CollectorComponent {
     void close() {
       ExecutorService maybePool = pool;
       if (maybePool == null) return;
-      for(KafkaCollectorWorker worker: workers) {
+      for (KafkaCollectorWorker worker : workers) {
         worker.stop();
       }
       maybePool.shutdown();
diff --git a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java
index b60fe55..713a7eb 100644
--- a/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java
+++ b/zipkin-collector/kafka/src/main/java/zipkin2/collector/kafka/KafkaCollectorWorker.java
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InterruptException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import zipkin2.Callback;
@@ -89,21 +88,25 @@ final class KafkaCollectorWorker implements Runnable {
         final ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(Duration.of(1000, ChronoUnit.MILLIS));
         LOG.debug("Kafka polling returned batch of {} messages.", consumerRecords.count());
         for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-          metrics.incrementMessages();
           final byte[] bytes = record.value();
+          metrics.incrementMessages();
+          metrics.incrementBytes(bytes.length);
+
+          if (bytes.length == 0) continue; // lenient on empty messages
 
           if (bytes.length < 2) { // need two bytes to check if protobuf
             metrics.incrementMessagesDropped();
           } else {
             // If we received legacy single-span encoding, decode it into a singleton list
             if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not list */) {
-              metrics.incrementBytes(bytes.length);
+              Span span;
               try {
-                Span span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
-                collector.accept(Collections.singletonList(span), NOOP);
+                span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
               } catch (RuntimeException e) {
                 metrics.incrementMessagesDropped();
+                continue;
               }
+              collector.accept(Collections.singletonList(span), NOOP);
             } else {
               collector.acceptSpans(bytes, NOOP);
             }
diff --git a/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaCollectorTest.java b/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/ITKafkaCollector.java
similarity index 84%
rename from zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaCollectorTest.java
rename to zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/ITKafkaCollector.java
index a131d11..0990128 100644
--- a/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaCollectorTest.java
+++ b/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/ITKafkaCollector.java
@@ -49,9 +49,11 @@ import zipkin2.storage.StorageComponent;
 import static org.assertj.core.api.Assertions.assertThat;
 import static zipkin2.TestObjects.CLIENT_SPAN;
 import static zipkin2.TestObjects.LOTS_OF_SPANS;
+import static zipkin2.TestObjects.UTF_8;
+import static zipkin2.codec.SpanBytesEncoder.JSON_V2;
 import static zipkin2.codec.SpanBytesEncoder.THRIFT;
 
-public class KafkaCollectorTest {
+public class ITKafkaCollector {
 
   static final int RANDOM_PORT = -1;
   static final EphemeralKafkaBroker broker =
@@ -156,44 +158,22 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(1);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
   @Test
   public void messageWithMultipleSpans_thrift() throws Exception {
-    KafkaCollector.Builder builder = builder("multiple_spans_thrift");
-
-    byte[] bytes = THRIFT.encodeList(spans);
-    produceSpans(bytes, builder.topic);
-
-    try (KafkaCollector collector = builder.build()) {
-      collector.start();
-      assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(2);
+    messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT);
   }
 
   /** Ensures list encoding works: a json encoded list of spans */
   @Test
   public void messageWithMultipleSpans_json() throws Exception {
-    KafkaCollector.Builder builder = builder("multiple_spans_json");
-
-    byte[] bytes = SpanBytesEncoder.JSON_V1.encodeList(spans);
-    produceSpans(bytes, builder.topic);
-
-    try (KafkaCollector collector = builder.build()) {
-      collector.start();
-      assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(2);
+    messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1);
   }
 
   /** Ensures list encoding works: a version 2 json list of spans */
@@ -220,8 +200,10 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures malformed spans don't hang the collector */
@@ -229,10 +211,12 @@ public class KafkaCollectorTest {
   public void skipsMalformedData() throws Exception {
     KafkaCollector.Builder builder = builder("decoder_exception");
 
+    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
+    byte[] malformed2 = "malformed".getBytes(UTF_8);
     produceSpans(THRIFT.encodeList(spans), builder.topic);
     produceSpans(new byte[0], builder.topic);
-    produceSpans("[\"='".getBytes(), builder.topic); // screwed up json
-    produceSpans("malformed".getBytes(), builder.topic);
+    produceSpans(malformed1, builder.topic);
+    produceSpans(malformed2, builder.topic);
     produceSpans(THRIFT.encodeList(spans), builder.topic);
 
     try (KafkaCollector collector = builder.build()) {
@@ -242,39 +226,38 @@ public class KafkaCollectorTest {
       assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(3);
+    assertThat(kafkaMetrics.messages()).isEqualTo(5);
+    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
+    assertThat(kafkaMetrics.bytes())
+      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Guards against errors that leak from storage, such as InvalidQueryException */
   @Test
-  public void skipsOnSpanConsumerException() throws Exception {
+  public void skipsOnSpanStorageException() throws Exception {
     AtomicInteger counter = new AtomicInteger();
-    consumer =
-        (input) ->
-            new Call.Base<Void>() {
-
-              @Override
-              protected Void doExecute() {
-                throw new AssertionError();
-              }
-
-              @Override
-              protected void doEnqueue(Callback callback) {
-                if (counter.getAndIncrement() == 1) {
-                  callback.onError(new RuntimeException("storage fell over"));
-                } else {
-                  receivedSpans.add(spans);
-                  callback.onSuccess(null);
-                }
-              }
-
-              @Override
-              public Call clone() {
-                throw new AssertionError();
-              }
-            };
+    consumer = (input) -> new Call.Base<Void>() {
+      @Override protected Void doExecute() {
+        throw new AssertionError();
+      }
+
+      @Override protected void doEnqueue(Callback<Void> callback) {
+        if (counter.getAndIncrement() == 1) {
+          callback.onError(new RuntimeException("storage fell over"));
+        } else {
+          receivedSpans.add(spans);
+          callback.onSuccess(null);
+        }
+      }
+
+      @Override public Call<Void> clone() {
+        throw new AssertionError();
+      }
+    };
     final StorageComponent storage = buildStorage(consumer);
-    KafkaCollector.Builder builder = builder("consumer_exception").storage(storage);
+    KafkaCollector.Builder builder = builder("storage_exception").storage(storage);
 
     produceSpans(THRIFT.encodeList(spans), builder.topic);
     produceSpans(THRIFT.encodeList(spans), builder.topic); // tossed on error
@@ -287,7 +270,11 @@ public class KafkaCollectorTest {
       assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.messages()).isEqualTo(3);
+    assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure
+    assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3);
+    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
   }
 
   @Test
@@ -296,7 +283,7 @@ public class KafkaCollectorTest {
 
     warmUpTopic(builder.topic);
 
-    final byte[] traceBytes = THRIFT.encodeList(spans);
+    final byte[] traceBytes = JSON_V2.encodeList(spans);
     try (KafkaCollector collector = builder.build()) {
       collector.start();
       waitForPartitionAssignments(collector);
@@ -308,9 +295,11 @@ public class KafkaCollectorTest {
 
     assertThat(threadsProvidingSpans.size()).isEqualTo(2);
 
-    assertThat(kafkaMetrics.messages()).isEqualTo(3);
+    assertThat(kafkaMetrics.messages()).isEqualTo(3); // 2 + empty body for warmup
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(traceBytes.length * 2);
     assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   @Test
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
index acd6198..4232c2e 100644
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
+++ b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
@@ -53,6 +53,8 @@ final class KafkaStreamProcessor implements Runnable {
     while (messages.hasNext()) {
       byte[] bytes = messages.next().message();
       metrics.incrementMessages();
+      metrics.incrementBytes(bytes.length);
+      if (bytes.length == 0) continue; // lenient on empty messages
 
       if (bytes.length < 2) { // need two bytes to check if protobuf
         metrics.incrementMessagesDropped();
@@ -61,13 +63,14 @@ final class KafkaStreamProcessor implements Runnable {
 
       // If we received legacy single-span encoding, decode it into a singleton list
       if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
+        Span span;
         try {
-          metrics.incrementBytes(bytes.length);
-          Span span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
-          collector.accept(Collections.singletonList(span), NOOP);
+          span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
         } catch (RuntimeException e) {
           metrics.incrementMessagesDropped();
+          continue;
         }
+        collector.accept(Collections.singletonList(span), NOOP);
       } else {
         collector.acceptSpans(bytes, NOOP);
       }
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaCollectorTest.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
similarity index 76%
rename from zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaCollectorTest.java
rename to zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
index c02c89c..290a71e 100644
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaCollectorTest.java
+++ b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
@@ -42,9 +42,10 @@ import zipkin2.storage.StorageComponent;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static zipkin2.TestObjects.CLIENT_SPAN;
+import static zipkin2.TestObjects.UTF_8;
 import static zipkin2.codec.SpanBytesEncoder.THRIFT;
 
-public class KafkaCollectorTest {
+public class ITKafkaCollector {
   @Rule public ExpectedException thrown = ExpectedException.none();
   @ClassRule public static Timeout globalTimeout = Timeout.seconds(20);
 
@@ -55,11 +56,10 @@ public class KafkaCollectorTest {
   InMemoryCollectorMetrics kafkaMetrics = metrics.forTransport("kafka");
 
   LinkedBlockingQueue<List<Span>> recvdSpans = new LinkedBlockingQueue<>();
-  SpanConsumer consumer =
-      (spans) -> {
-        recvdSpans.add(spans);
-        return Call.create(null);
-      };
+  SpanConsumer consumer = (spans) -> {
+    recvdSpans.add(spans);
+    return Call.create(null);
+  };
 
   @Test
   public void checkPasses() {
@@ -100,42 +100,22 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(1);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
   @Test
   public void messageWithMultipleSpans_thrift() throws Exception {
-    Builder builder = builder("multiple_spans_thrift");
-
-    byte[] bytes = THRIFT.encodeList(spans);
-    producer.send(new KeyedMessage<>(builder.topic, bytes));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT);
   }
 
   /** Ensures list encoding works: a json encoded list of spans */
   @Test
   public void messageWithMultipleSpans_json() throws Exception {
-    Builder builder = builder("multiple_spans_json");
-
-    byte[] bytes = SpanBytesEncoder.JSON_V1.encodeList(spans);
-    producer.send(new KeyedMessage<>(builder.topic, bytes));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1);
   }
 
   /** Ensures list encoding works: a version 2 json list of spans */
@@ -160,8 +140,10 @@ public class KafkaCollectorTest {
     }
 
     assertThat(kafkaMetrics.messages()).isEqualTo(1);
+    assertThat(kafkaMetrics.messagesDropped()).isZero();
     assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
     assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Ensures malformed spans don't hang the collector */
@@ -169,10 +151,12 @@ public class KafkaCollectorTest {
   public void skipsMalformedData() throws Exception {
     Builder builder = builder("decoder_exception");
 
+    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
+    byte[] malformed2 = "malformed".getBytes(UTF_8);
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
     producer.send(new KeyedMessage<>(builder.topic, new byte[0]));
-    producer.send(new KeyedMessage<>(builder.topic, "[\"='".getBytes())); // screwed up json
-    producer.send(new KeyedMessage<>(builder.topic, "malformed".getBytes()));
+    producer.send(new KeyedMessage<>(builder.topic, malformed1));
+    producer.send(new KeyedMessage<>(builder.topic, malformed2));
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
 
     try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
@@ -181,39 +165,39 @@ public class KafkaCollectorTest {
       assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(3);
+    assertThat(kafkaMetrics.messages()).isEqualTo(5);
+    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
+    assertThat(kafkaMetrics.bytes())
+      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(kafkaMetrics.spansDropped()).isZero();
   }
 
   /** Guards against errors that leak from storage, such as InvalidQueryException */
   @Test
-  public void skipsOnConsumerException() throws Exception {
-    Builder builder = builder("consumer_exception");
+  public void skipsOnStorageException() throws Exception {
+    Builder builder = builder("storage_exception");
 
     AtomicInteger counter = new AtomicInteger();
-    consumer =
-        (input) ->
-            new Call.Base<Void>() {
-
-              @Override
-              protected Void doExecute() {
-                throw new AssertionError();
-              }
-
-              @Override
-              protected void doEnqueue(Callback callback) {
-                if (counter.getAndIncrement() == 1) {
-                  callback.onError(new RuntimeException("storage fell over"));
-                } else {
-                  recvdSpans.add(spans);
-                  callback.onSuccess(null);
-                }
-              }
-
-              @Override
-              public Call clone() {
-                throw new AssertionError();
-              }
-            };
+    consumer = (input) -> new Call.Base<Void>() {
+
+      @Override protected Void doExecute() {
+        throw new AssertionError();
+      }
+
+      @Override protected void doEnqueue(Callback<Void> callback) {
+        if (counter.getAndIncrement() == 1) {
+          callback.onError(new RuntimeException("storage fell over"));
+        } else {
+          recvdSpans.add(spans);
+          callback.onSuccess(null);
+        }
+      }
+
+      @Override public Call<Void> clone() {
+        throw new AssertionError();
+      }
+    };
 
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
     producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); // tossed on error
@@ -225,7 +209,11 @@ public class KafkaCollectorTest {
       assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
     }
 
-    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size());
+    assertThat(kafkaMetrics.messages()).isEqualTo(3);
+    assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure
+    assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3);
+    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3);
+    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
   }
 
   Builder builder(String topic) {
diff --git a/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java b/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java
index dcff870..ec754cf 100644
--- a/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java
+++ b/zipkin-collector/rabbitmq/src/main/java/zipkin2/collector/rabbitmq/RabbitMQCollector.java
@@ -16,7 +16,7 @@
  */
 package zipkin2.collector.rabbitmq;
 
-import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Address;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -226,10 +226,13 @@ public final class RabbitMQCollector extends CollectorComponent {
     }
 
     @Override
-    public void handleDelivery(
-        String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+    public void handleDelivery(String tag, Envelope envelope, BasicProperties props, byte[] body) {
       metrics.incrementMessages();
-      this.collector.acceptSpans(body, NOOP);
+      metrics.incrementBytes(body.length);
+
+      if (body.length == 0) return; // lenient on empty messages
+
+      collector.acceptSpans(body, NOOP);
     }
   }
 
diff --git a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java
index fdcebaf..e53f471 100644
--- a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java
+++ b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/ITRabbitMQCollector.java
@@ -16,13 +16,12 @@
  */
 package zipkin2.collector.rabbitmq;
 
+import com.rabbitmq.client.Channel;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-
-import com.rabbitmq.client.Channel;
 import org.junit.After;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -33,8 +32,10 @@ import zipkin2.codec.SpanBytesEncoder;
 import zipkin2.collector.CollectorMetrics;
 import zipkin2.storage.InMemoryStorage;
 
-import static org.assertj.core.api.Java6Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 import static zipkin2.TestObjects.LOTS_OF_SPANS;
+import static zipkin2.TestObjects.UTF_8;
+import static zipkin2.codec.SpanBytesEncoder.THRIFT;
 import static zipkin2.collector.rabbitmq.RabbitMQCollector.builder;
 
 public class ITRabbitMQCollector {
@@ -71,15 +72,7 @@ public class ITRabbitMQCollector {
   /** Ensures list encoding works: a json encoded list of spans */
   @Test
   public void messageWithMultipleSpans_json() throws Exception {
-    byte[] message = SpanBytesEncoder.JSON_V1.encodeList(spans);
-    rabbit.publish(message);
-
-    Thread.sleep(1000);
-    assertThat(rabbit.storage.acceptedSpanCount()).isEqualTo(spans.size());
-
-    assertThat(rabbit.rabbitmqMetrics.messages()).isEqualTo(1);
-    assertThat(rabbit.rabbitmqMetrics.bytes()).isEqualTo(message.length);
-    assertThat(rabbit.rabbitmqMetrics.spans()).isEqualTo(spans.size());
+    messageWithMultipleSpans(SpanBytesEncoder.JSON_V1);
   }
 
   /** Ensures list encoding works: a version 2 json list of spans */
@@ -94,9 +87,7 @@ public class ITRabbitMQCollector {
     messageWithMultipleSpans(SpanBytesEncoder.PROTO3);
   }
 
-  void messageWithMultipleSpans(SpanBytesEncoder encoder)
-      throws IOException, TimeoutException, InterruptedException {
-
+  void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
     byte[] message = encoder.encodeList(spans);
     rabbit.publish(message);
 
@@ -104,13 +95,23 @@ public class ITRabbitMQCollector {
     assertThat(rabbit.storage.acceptedSpanCount()).isEqualTo(spans.size());
 
     assertThat(rabbit.rabbitmqMetrics.messages()).isEqualTo(1);
+    assertThat(rabbit.rabbitmqMetrics.messagesDropped()).isZero();
     assertThat(rabbit.rabbitmqMetrics.bytes()).isEqualTo(message.length);
     assertThat(rabbit.rabbitmqMetrics.spans()).isEqualTo(spans.size());
+    assertThat(rabbit.rabbitmqMetrics.spansDropped()).isZero();
   }
 
   /** Ensures malformed spans don't hang the collector */
   @Test
   public void skipsMalformedData() throws Exception {
+    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
+    byte[] malformed2 = "malformed".getBytes(UTF_8);
+    rabbit.publish(THRIFT.encodeList(spans));
+    rabbit.publish(new byte[0]);
+    rabbit.publish(malformed1);
+    rabbit.publish(malformed2);
+    rabbit.publish(THRIFT.encodeList(spans));
+
     rabbit.publish(SpanBytesEncoder.JSON_V2.encodeList(spans));
     rabbit.publish(new byte[0]);
     rabbit.publish("[\"='".getBytes()); // screwed up json
@@ -118,8 +119,13 @@ public class ITRabbitMQCollector {
     rabbit.publish(SpanBytesEncoder.JSON_V2.encodeList(spans));
 
     Thread.sleep(1000);
+
     assertThat(rabbit.rabbitmqMetrics.messages()).isEqualTo(5);
-    assertThat(rabbit.rabbitmqMetrics.messagesDropped()).isEqualTo(3);
+    assertThat(rabbit.rabbitmqMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
+    assertThat(rabbit.rabbitmqMetrics.bytes())
+      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
+    assertThat(rabbit.rabbitmqMetrics.spans()).isEqualTo(spans.size() * 2);
+    assertThat(rabbit.rabbitmqMetrics.spansDropped()).isZero();
   }
 
   /** See GitHub issue #2068 */
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
index 4e7cc6e..644514a 100644
--- a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
@@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -42,22 +43,23 @@ final class ScribeSpanConsumer implements Scribe {
   }
 
   @Override
-  public ListenableFuture<ResultCode> log(List<LogEntry> messages) {
+  public ListenableFuture<ResultCode> log(List<LogEntry> logEntries) {
     metrics.incrementMessages();
-    List<Span> spans;
+    List<Span> spans = new ArrayList<>();
+    int byteCount = 0;
     try {
-      spans =
-          messages
-              .stream()
-              .filter(m -> m.category.equals(category))
-              .map(m -> m.message.getBytes(StandardCharsets.ISO_8859_1))
-              .map(b -> Base64.getMimeDecoder().decode(b)) // finagle-zipkin uses mime encoding
-              .peek(b -> metrics.incrementBytes(b.length))
-              .map(SpanBytesDecoder.THRIFT::decodeOne)
-              .collect(Collectors.toList());
+      for (LogEntry logEntry : logEntries) {
+        if (!category.equals(logEntry.category)) continue;
+        byte[] bytes = logEntry.message.getBytes(StandardCharsets.ISO_8859_1);
+        bytes = Base64.getMimeDecoder().decode(bytes); // finagle-zipkin uses mime encoding
+        byteCount += bytes.length;
+        spans.add(SpanBytesDecoder.THRIFT.decodeOne(bytes));
+      }
     } catch (RuntimeException e) {
       metrics.incrementMessagesDropped();
       return Futures.immediateFailedFuture(e);
+    } finally {
+      metrics.incrementBytes(byteCount);
     }
 
     SettableFuture<ResultCode> result = SettableFuture.create();
diff --git a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
index 22320a4..10fa92b 100644
--- a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
+++ b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
@@ -137,10 +137,9 @@ public class ScribeSpanConsumerTest {
 
   @Test
   public void consumerExceptionBeforeCallbackSetsFutureException() throws Exception {
-    consumer =
-        (input) -> {
-          throw new NullPointerException();
-        };
+    consumer = (input) -> {
+      throw new NullPointerException("endpoint was null");
+    };
 
     ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
 
@@ -149,7 +148,7 @@ public class ScribeSpanConsumerTest {
     entry.message = encodedSpan;
 
     thrown.expect(ExecutionException.class); // from dereferenced future
-    thrown.expectMessage("Cannot store spans [abfb01327cc4d38f/cdd29fb81067d374]");
+    thrown.expectMessage("endpoint was null");
 
     scribe.log(asList(entry)).get();
   }
@@ -160,25 +159,20 @@ public class ScribeSpanConsumerTest {
    */
   @Test
   public void callbackExceptionDoesntThrow() throws Exception {
-    consumer =
-        (input) ->
-            new Call.Base<Void>() {
-
-              @Override
-              protected Void doExecute() {
-                throw new AssertionError();
-              }
-
-              @Override
-              protected void doEnqueue(Callback callback) {
-                callback.onError(new NullPointerException());
-              }
-
-              @Override
-              public Call clone() {
-                throw new AssertionError();
-              }
-            };
+    consumer = (input) -> new Call.Base<Void>() {
+      @Override protected Void doExecute() {
+        throw new AssertionError();
+      }
+
+      @Override protected void doEnqueue(Callback<Void> callback) {
+        callback.onError(new NullPointerException());
+      }
+
+      @Override
+      public Call<Void> clone() {
+        throw new AssertionError();
+      }
+    };
 
     ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
 
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
index 62c3f80..9ae9a94 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinGrpcCollector.java
@@ -58,6 +58,11 @@ final class ZipkinGrpcCollector {
 
     @Override protected CompletableFuture<byte[]> handleMessage(byte[] bytes) {
       metrics.incrementMessages();
+      metrics.incrementBytes(bytes.length);
+
+      if (bytes.length == 0) {
+        return CompletableFuture.completedFuture(bytes); // lenient on empty messages
+      }
       CompletableFutureCallback result = new CompletableFutureCallback();
       collector.acceptSpans(bytes, SpanBytesDecoder.PROTO3, result);
       return result;
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
index 5c1769d..251c641 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/ZipkinHttpCollector.java
@@ -132,6 +132,7 @@ public class ZipkinHttpCollector {
     if (!decoder.decodeList(serializedSpans, spans)) {
       throw new IllegalArgumentException("Empty " + decoder.name() + " message");
     }
+    // UnzippingBytesRequestConverter handles incrementing message and bytes
     collector.accept(spans, result);
     return HttpResponse.from(result);
   }
diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
index 85c5e06..d4dddf7 100644
--- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
+++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/ITZipkinGrpcCollector.kt
@@ -40,6 +40,7 @@ import zipkin2.codec.SpanBytesEncoder
 import zipkin2.proto3.ListOfSpans
 import zipkin2.storage.InMemoryStorage
 
+/** This tests that we accept messages constructed by other clients. */
 @SpringBootTest(classes = [ZipkinServer::class],
   webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
   properties = ["spring.config.name=zipkin-server", "zipkin.collector.grpc.enabled=true"])
@@ -74,12 +75,17 @@ class ITZipkinGrpcCollector {
       .build().create(SpanService::class)
   }
 
-  /** This tests that we accept messages constructed by other clients. */
-  @Test fun report_withWireGrpcLibrary() {
+  @Test fun report_trace() {
     runBlocking {
       spanService.Report(request) // Result is effectively void
     }
     assertThat<List<Span>>(storage.traces)
       .containsExactly(TestObjects.TRACE)
   }
+
+  @Test fun report_emptyIsOk() {
+    runBlocking {
+      spanService.Report(ListOfSpans.Builder().build())
+    }
+  }
 }
diff --git a/zipkin/src/main/java/zipkin2/internal/JsonCodec.java b/zipkin/src/main/java/zipkin2/internal/JsonCodec.java
index ecc2934..4577111 100644
--- a/zipkin/src/main/java/zipkin2/internal/JsonCodec.java
+++ b/zipkin/src/main/java/zipkin2/internal/JsonCodec.java
@@ -236,7 +236,9 @@ public final class JsonCodec {
 
   static IllegalArgumentException exceptionReading(String type, Exception e) {
     String cause = e.getMessage() == null ? "Error" : e.getMessage();
-    if (cause.indexOf("malformed") != -1) cause = "Malformed";
+    if (cause.indexOf("Expected BEGIN_OBJECT") != -1 || cause.indexOf("malformed") != -1) {
+      cause = "Malformed";
+    }
     String message = format("%s reading %s from json", cause, type);
     throw new IllegalArgumentException(message, e);
   }
diff --git a/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java b/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java
index 94e79ba..af8d253 100644
--- a/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java
+++ b/zipkin/src/test/java/zipkin2/internal/JsonCodecTest.java
@@ -16,11 +16,16 @@
  */
 package zipkin2.internal;
 
+import java.io.IOException;
+import org.assertj.core.api.Assertions;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static zipkin2.TestObjects.UTF_8;
+import static zipkin2.internal.JsonCodec.exceptionReading;
 
 public class JsonCodecTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
@@ -46,7 +51,7 @@ public class JsonCodecTest {
       }
     }
 
-    new Foo().toString();
+    new Foo().toString(); // cause the exception
   }
 
   @Test public void doesntStackOverflowOnToBufferWriterBug_Overflow() {
@@ -72,6 +77,25 @@ public class JsonCodecTest {
       }
     }
 
-    new Foo().toString();
+    new Foo().toString(); // cause the exception
+  }
+
+  @Test public void exceptionReading_malformedJsonWraps() {
+    // grab a real exception from the gson library
+    Exception error = null;
+    byte[] bytes = "[\"='".getBytes(UTF_8);
+    try {
+      new JsonCodec.JsonReader(bytes).beginObject();
+      failBecauseExceptionWasNotThrown(IllegalStateException.class);
+    } catch (IOException | IllegalStateException e) {
+      error = e;
+    }
+
+    try {
+      exceptionReading("List<Span>", error);
+      failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
+    } catch (IllegalArgumentException e) {
+      assertThat(e).hasMessage("Malformed reading List<Span> from json");
+    }
   }
 }


Mime
View raw message