zipkin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adrianc...@apache.org
Subject [incubator-zipkin-brave] 01/01: Refactors to reduce injector complexity and duplicate code
Date Fri, 31 May 2019 07:13:27 GMT
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch messaging-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin-brave.git

commit 9c21af4597f89ceef7d6158b7054a05213117680
Author: Adrian Cole <acole@pivotal.io>
AuthorDate: Fri May 31 15:12:34 2019 +0800

    Refactors to reduce injector complexity and duplicate code
    
    this adds messaging processor code and some cleanups. there's
    more to do and tests don't pass.
---
 .../jms/src/main/java/brave/jms/JmsAdapter.java    | 121 +++++----------
 .../jms/src/main/java/brave/jms/JmsTracing.java    | 121 ++++++++++-----
 .../java/brave/jms/TracingCompletionListener.java  |  59 +++++--
 .../java/brave/jms/TracingExceptionListener.java   |   4 +-
 .../main/java/brave/jms/TracingJMSConsumer.java    |  21 ++-
 .../main/java/brave/jms/TracingJMSProducer.java    | 105 ++++++-------
 .../java/brave/jms/TracingMessageConsumer.java     |  29 ++--
 .../java/brave/jms/TracingMessageListener.java     |  62 ++------
 .../java/brave/jms/TracingMessageProducer.java     | 170 +++++++++++----------
 .../jms/src/test/java/brave/jms/JmsTest.java       |  11 --
 .../brave/jms/TracingCompletionListenerTest.java   |  47 +++---
 .../java/brave/kafka/clients/KafkaTracing.java     | 150 ++++++++----------
 .../java/brave/kafka/clients/TracingConsumer.java  |  99 +++++-------
 .../java/brave/kafka/clients/TracingProducer.java  | 153 +++++++++----------
 .../brave/kafka/clients/TracingCallbackTest.java   |  30 ++--
 instrumentation/messaging/pom.xml                  |  12 ++
 .../main/java/brave/messaging/ConsumerHandler.java | 143 +++++++++++++++++
 .../main/java/brave/messaging/MessageAdapter.java  |  36 -----
 .../brave/messaging/MessageConsumerAdapter.java    |  36 -----
 .../brave/messaging/MessageProducerAdapter.java    |  36 -----
 .../java/brave/messaging/MessagingAdapter.java     |  73 +++++++++
 .../brave/messaging/MessagingConsumerHandler.java  | 133 ----------------
 .../brave/messaging/MessagingConsumerParser.java   |  44 ------
 .../java/brave/messaging/MessagingHandler.java     |  45 ------
 ...ChannelAdapter.java => MessagingOperation.java} |  17 +--
 .../main/java/brave/messaging/MessagingParser.java |  79 ++++++----
 .../brave/messaging/MessagingProducerHandler.java  |  83 ----------
 .../brave/messaging/MessagingProducerParser.java   |  42 -----
 .../java/brave/messaging/MessagingTracing.java     |  51 ++-----
 .../java/brave/messaging/ProcessorHandler.java     |  71 +++++++++
 .../main/java/brave/messaging/ProducerHandler.java |  94 ++++++++++++
 31 files changed, 1013 insertions(+), 1164 deletions(-)

diff --git a/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java b/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
index 5c87218..a5252be 100644
--- a/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
+++ b/instrumentation/jms/src/main/java/brave/jms/JmsAdapter.java
@@ -16,116 +16,67 @@
  */
 package brave.jms;
 
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageConsumerAdapter;
-import brave.messaging.MessageProducerAdapter;
+import brave.messaging.MessagingAdapter;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Queue;
 import javax.jms.Topic;
 
-import static brave.jms.JmsTracing.JMS_QUEUE;
-import static brave.jms.JmsTracing.JMS_TOPIC;
+abstract class JmsAdapter<T> extends MessagingAdapter<Destination, T, T> {
+  final String remoteServiceName;
 
-class JmsAdapter {
-
-  static class JmsMessageConsumerAdapter implements MessageConsumerAdapter<Message> {
-
-    final JmsTracing jmsTracing;
-
-    JmsMessageConsumerAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
-    }
-
-    static JmsMessageConsumerAdapter create(JmsTracing jmsTracing) {
-      return new JmsMessageConsumerAdapter(jmsTracing);
-    }
+  JmsAdapter(JmsTracing jmsTracing) {
+    remoteServiceName = jmsTracing.remoteServiceName;
+  }
 
-    @Override public String operation(Message message) {
-      return "receive";
+  static final class MessageAdapter extends JmsAdapter<Message> {
+    MessageAdapter(JmsTracing jmsTracing) {
+      super(jmsTracing);
     }
 
-    @Override public String identifier(Message message) {
+    @Override public String correlationId(Message message) {
       try {
         return message.getJMSMessageID();
-      } catch (JMSException e) {
+      } catch (JMSException ignored) {
         // don't crash on wonky exceptions!
       }
       return null;
     }
-
-    @Override public String identifierTagKey() {
-      return "jms.message_id";
-    }
   }
 
-  static class JmsMessageProducerAdapter implements MessageProducerAdapter<Message> {
-
-    final JmsTracing jmsTracing;
-
-    JmsMessageProducerAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
-    }
-
-    static JmsMessageProducerAdapter create(JmsTracing jmsTracing) {
-      return new JmsMessageProducerAdapter(jmsTracing);
-    }
-
-    @Override public String operation(Message message) {
-      return "send";
-    }
+  @Override public T carrier(T message) {
+    return message;
+  }
 
-    @Override public String identifier(Message message) {
-      try {
-        return message.getJMSMessageID();
-      } catch (JMSException e) {
-        // don't crash on wonky exceptions!
+  @Override public String channel(Destination channel) {
+    try {
+      if (channel instanceof Queue) {
+        return ((Queue) channel).getQueueName();
+      } else if (channel instanceof Topic) {
+        return ((Topic) channel).getTopicName();
       }
-      return null;
-    }
-
-    @Override public String identifierTagKey() {
-      return null;
+      // TODO: we could use toString here..
+    } catch (JMSException ignored) {
+      // don't crash on wonky exceptions!
     }
+    return null;
   }
 
-  static class JmsChannelAdapter implements ChannelAdapter<Destination> {
-
-    final JmsTracing jmsTracing;
-
-    JmsChannelAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
-    }
-
-    static JmsChannelAdapter create(JmsTracing jmsTracing) {
-      return new JmsChannelAdapter(jmsTracing);
-    }
-
-    @Override public String channel(Destination destination) {
-      try {
-        if (destination instanceof Queue) {
-          return ((Queue) destination).getQueueName();
-        } else if (destination instanceof Topic) {
-          return ((Topic) destination).getTopicName();
-        }
-      } catch (JMSException ignored) {
-        // don't crash on wonky exceptions!
-      }
-      return null;
+  @Override public String channelType(Destination channel) {
+    if (channel instanceof Queue) {
+      return "queue";
+    } else if (channel instanceof Topic) {
+      return "topic";
     }
+    return null;
+  }
 
-    @Override public String channelTagKey(Destination destination) {
-      if (destination instanceof Queue) {
-        return JMS_QUEUE;
-      } else if (destination instanceof Topic) {
-        return JMS_TOPIC;
-      }
-      return null;
-    }
+  @Override public String messageKey(T message) {
+    return null;
+  }
 
-    @Override public String remoteServiceName(Destination message) {
-      return jmsTracing.remoteServiceName;
-    }
+  @Override public String brokerName(Destination channel) {
+    return remoteServiceName;
   }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
index bc0531a..383bbf2 100644
--- a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
+++ b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
@@ -17,9 +17,18 @@
 package brave.jms;
 
 import brave.Span;
+import brave.SpanCustomizer;
 import brave.Tracing;
+import brave.internal.Nullable;
+import brave.jms.JmsAdapter.MessageAdapter;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
 import brave.messaging.MessagingTracing;
+import brave.messaging.ProcessorHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.Propagation.Getter;
+import brave.propagation.Propagation.Setter;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContext.Extractor;
 import brave.propagation.TraceContext.Injector;
@@ -31,7 +40,9 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import javax.jms.Queue;
 import javax.jms.QueueConnection;
+import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.XAConnection;
 import javax.jms.XAConnectionFactory;
@@ -42,8 +53,19 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
 
 /** Use this class to decorate your Jms consumer / producer and enable Tracing. */
 public final class JmsTracing {
-  static final String JMS_QUEUE = "jms.queue";
-  static final String JMS_TOPIC = "jms.topic";
+  static final Setter<Message, String> SETTER = new Setter<Message, String>() {
+    @Override public void put(Message carrier, String key, String value) {
+      try {
+        carrier.setStringProperty(key, value);
+      } catch (JMSException e) {
+        // don't crash on wonky exceptions!
+      }
+    }
+
+    @Override public String toString() {
+      return "Message::setStringProperty";
+    }
+  };
 
   static final Getter<Message, String> GETTER = new Getter<Message, String>() {
     @Override public String get(Message carrier, String key) {
@@ -69,17 +91,18 @@ public final class JmsTracing {
   }
 
   public static final class Builder {
-    final MessagingTracing msgTracing;
+    final MessagingTracing messageTracing;
     String remoteServiceName = "jms";
 
     Builder(Tracing tracing) {
       if (tracing == null) throw new NullPointerException("tracing == null");
-      this.msgTracing = MessagingTracing.create(tracing);
+      this.messageTracing = MessagingTracing.newBuilder(tracing)
+        .parser(new LegacyMessagingParser()).build();
     }
 
-    Builder(MessagingTracing msgTracing) {
-      if (msgTracing == null) throw new NullPointerException("msgTracing == null");
-      this.msgTracing = msgTracing;
+    Builder(MessagingTracing messageTracing) {
+      if (messageTracing == null) throw new NullPointerException("messageTracing == null");
+      this.messageTracing = messageTracing;
     }
 
     /**
@@ -95,38 +118,28 @@ public final class JmsTracing {
     }
   }
 
-  final MessagingTracing msgTracing;
-  final Extractor<Message> extractor;
-  final Injector<Message> injector;
-  final JmsAdapter.JmsChannelAdapter channelAdapter;
-  final JmsAdapter.JmsMessageConsumerAdapter consumerMessageAdapter;
-  final JmsAdapter.JmsMessageProducerAdapter producerMessageAdapter;
+  final MessagingTracing messageTracing;
+  final ConsumerHandler<Destination, Message, Message> consumerHandler;
+  final ProcessorHandler<Destination, Message, Message> processorHandler;
+  final ProducerHandler<Destination, Message, Message> producerHandler;
+  final Extractor<Message> messageExtractor;
+  final Injector<Message> messageInjector;
+  final MessageAdapter messageAdapter;
   final String remoteServiceName;
   final Set<String> propagationKeys;
 
   JmsTracing(Builder builder) { // intentionally hidden constructor
-    this.msgTracing = builder.msgTracing;
-    this.extractor = msgTracing.tracing().propagation().extractor(GETTER);
+    this.messageTracing = builder.messageTracing;
+    this.messageExtractor = messageTracing.tracing().propagation().extractor(GETTER);
     this.remoteServiceName = builder.remoteServiceName;
-    this.propagationKeys = new LinkedHashSet<>(msgTracing.tracing().propagation().keys());
-    this.consumerMessageAdapter = JmsAdapter.JmsMessageConsumerAdapter.create(this);
-    this.channelAdapter = JmsAdapter.JmsChannelAdapter.create(this);
-    this.producerMessageAdapter = JmsAdapter.JmsMessageProducerAdapter.create(this);
-    this.injector = new Injector<Message>() {
-      @Override public void inject(TraceContext traceContext, Message carrier) {
-        try {
-          PropertyFilter.MESSAGE.filterProperties(carrier, propagationKeys);
-          carrier.setStringProperty("b3", writeB3SingleFormatWithoutParentId(traceContext));
-        } catch (JMSException e) {
-          // don't crash on wonky exceptions!
-        }
-      }
-
-      @Override
-      public String toString() {
-        return "Message::setStringProperty(\"b3\",singleHeaderFormatWithoutParent)";
-      }
-    };
+    this.propagationKeys = new LinkedHashSet<>(messageTracing.tracing().propagation().keys());
+    this.messageAdapter = new MessageAdapter(this);
+    this.messageInjector = new FilteringInjector<>(PropertyFilter.MESSAGE, propagationKeys, SETTER);
+    consumerHandler =
+      ConsumerHandler.create(messageTracing, messageAdapter, messageExtractor, messageInjector);
+    processorHandler = ProcessorHandler.create(messageTracing, consumerHandler);
+    producerHandler =
+      ProducerHandler.create(messageTracing, messageAdapter, messageExtractor, messageInjector);
   }
 
   public Connection connection(Connection connection) {
@@ -200,8 +213,7 @@ public final class JmsTracing {
    * one couldn't be extracted.
    */
   public Span nextSpan(Message message) {
-    return msgTracing.nextSpan(channelAdapter, consumerMessageAdapter, extractor, message,
-      destination(message));
+    return processorHandler.startProcessor(destination(message), message, false);
   }
 
   //TraceContextOrSamplingFlags extractAndClearMessage(Message message) {
@@ -212,7 +224,7 @@ public final class JmsTracing {
   //  return extracted;
   //}
 
-  Destination destination(Message message) {
+  @Nullable static Destination destination(Message message) {
     try {
       return message.getJMSDestination();
     } catch (JMSException e) {
@@ -220,4 +232,39 @@ public final class JmsTracing {
     }
     return null;
   }
+
+  static class FilteringInjector<C> implements TraceContext.Injector<C> {
+    final PropertyFilter filter;
+    final Set<String> namesToClear;
+    final Setter<C, String> setter;
+
+    FilteringInjector(PropertyFilter filter, Set<String> namesToClear, Setter<C, String> setter) {
+      this.filter = filter;
+      this.namesToClear = namesToClear;
+      this.setter = setter;
+    }
+
+    @Override public void inject(TraceContext traceContext, C carrier) {
+      filter.filterProperties(carrier, namesToClear);
+      setter.put(carrier, "b3", writeB3SingleFormatWithoutParentId(traceContext));
+    }
+
+    @Override public String toString() {
+      return setter + "(\"b3\",singleHeaderFormatWithoutParent)";
+    }
+  }
+
+  static class LegacyMessagingParser extends MessagingParser {
+    @Override
+    protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+      Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+      String channelName = adapter.channel(channel);
+      if (channelName == null) return;
+      if (channel instanceof Queue) {
+        customizer.tag("jms.queue", channelName);
+      } else if (channel instanceof Topic) {
+        customizer.tag("jms.topic", channelName);
+      }
+    }
+  }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
index 3a7d50a..3df209a 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingCompletionListener.java
@@ -17,28 +17,64 @@
 package brave.jms;
 
 import brave.Span;
+import brave.internal.Nullable;
+import brave.messaging.ProducerHandler;
 import brave.propagation.CurrentTraceContext;
 import brave.propagation.CurrentTraceContext.Scope;
 import javax.jms.CompletionListener;
+import javax.jms.Destination;
 import javax.jms.Message;
 
 /**
- * Decorates, then finishes a producer span. Allows tracing to record the duration between batching
+ * Decorates, then finishes a producer span. Allows tracing to message the duration between batching
  * for send and actual send.
  */
-@JMS2_0 final class TracingCompletionListener implements CompletionListener {
-  static CompletionListener create(CompletionListener delegate, Span span,
-      CurrentTraceContext current) {
-    if (span.isNoop()) return delegate; // save allocation overhead
-    return new TracingCompletionListener(delegate, span, current);
+@JMS2_0 class TracingCompletionListener<Msg> implements CompletionListener {
+  static <Msg> TracingCompletionListener<Msg> create(@Nullable CompletionListener delegate,
+    ProducerHandler<Destination, Msg, Msg> handler, CurrentTraceContext current,
+    Destination destination, Msg message, Span span) {
+    if (delegate == null) {
+      return new TracingCompletionListener<>(handler, destination, message, span);
+    }
+    return new TracingForwardingCompletionListener<>(delegate, handler, current, destination,
+      message, span);
   }
 
+  final ProducerHandler<Destination, Msg, Msg> handler;
+  final Destination destination;
+  final Msg message;
   final Span span;
+
+  TracingCompletionListener(ProducerHandler<Destination, Msg, Msg> handler, Destination destination,
+    Msg message, Span span) {
+    this.handler = handler;
+    this.destination = destination;
+    this.message = message;
+    this.span = span;
+  }
+
+  @Override public void onCompletion(Message message) {
+    finish(null);
+  }
+
+  @Override public void onException(Message message, Exception exception) {
+    finish(exception);
+  }
+
+  void finish(@Nullable Throwable error) {
+    if (error != null) span.error(error);
+    handler.finishSend(destination, message, span);
+  }
+}
+
+final class TracingForwardingCompletionListener<Msg> extends TracingCompletionListener<Msg> {
   final CompletionListener delegate;
   final CurrentTraceContext current;
 
-  TracingCompletionListener(CompletionListener delegate, Span span, CurrentTraceContext current) {
-    this.span = span;
+  TracingForwardingCompletionListener(CompletionListener delegate,
+    ProducerHandler<Destination, Msg, Msg> handler, CurrentTraceContext current,
+    Destination destination, Msg message, Span span) {
+    super(handler, destination, message, span);
     this.delegate = delegate;
     this.current = current;
   }
@@ -47,16 +83,15 @@ import javax.jms.Message;
     try (Scope ws = current.maybeScope(span.context())) {
       delegate.onCompletion(message);
     } finally {
-      span.finish();
+      finish(null);
     }
   }
 
   @Override public void onException(Message message, Exception exception) {
-    try {
+    try (Scope ws = current.maybeScope(span.context())) {
       delegate.onException(message, exception);
-      span.error(exception);
     } finally {
-      span.finish();
+      finish(exception);
     }
   }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
index 9397a5f..8f801f0 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingExceptionListener.java
@@ -24,13 +24,13 @@ import javax.jms.JMSException;
 
 final class TracingExceptionListener {
   static ExceptionListener create(JmsTracing jmsTracing) {
-    return new TagError(jmsTracing.msgTracing.tracing().tracer());
+    return new TagError(jmsTracing.messageTracing.tracing().tracer());
   }
 
   static ExceptionListener create(ExceptionListener delegate, JmsTracing jmsTracing) {
     if (delegate == null) throw new NullPointerException("exceptionListener == null");
     if (delegate instanceof TagError) return delegate;
-    return new DelegateAndTagError(delegate, jmsTracing.msgTracing.tracing().tracer());
+    return new DelegateAndTagError(delegate, jmsTracing.messageTracing.tracing().tracer());
   }
 
   static class TagError implements ExceptionListener {
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java b/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
index 4a4275b..939789e 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingJMSConsumer.java
@@ -16,25 +16,22 @@
  */
 package brave.jms;
 
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
 import javax.jms.Destination;
 import javax.jms.JMSConsumer;
 import javax.jms.JMSRuntimeException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
-@JMS2_0 final class TracingJMSConsumer extends MessagingConsumerHandler<JMSConsumer, Destination, Message> implements JMSConsumer {
+@JMS2_0 final class TracingJMSConsumer implements JMSConsumer {
+  final JMSConsumer delegate;
   final JmsTracing jmsTracing;
   final Destination destination;
+  final ConsumerHandler<Destination, Message, Message> handler;
 
   TracingJMSConsumer(JMSConsumer delegate, Destination destination, JmsTracing jmsTracing) {
-    super(
-        delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        jmsTracing.consumerMessageAdapter,
-        jmsTracing.extractor,
-        jmsTracing.injector);
+    this.delegate = delegate;
+    this.handler = jmsTracing.consumerHandler;
     this.destination = destination;
     this.jmsTracing = jmsTracing;
   }
@@ -53,19 +50,19 @@ import javax.jms.MessageListener;
 
   @Override public Message receive() {
     Message message = delegate.receive();
-    handleConsume(destination, message);
+    handler.handleReceive(destination, message);
     return message;
   }
 
   @Override public Message receive(long timeout) {
     Message message = delegate.receive(timeout);
-    handleConsume(destination, message);
+    handler.handleReceive(destination, message);
     return message;
   }
 
   @Override public Message receiveNoWait() {
     Message message = delegate.receiveNoWait();
-    handleConsume(destination, message);
+    handler.handleReceive(destination, message);
     return message;
   }
 
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java b/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
index e380cf2..2482aea 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingJMSProducer.java
@@ -19,11 +19,10 @@ package brave.jms;
 import brave.Span;
 import brave.Tracer;
 import brave.Tracer.SpanInScope;
-import brave.messaging.MessageProducerAdapter;
-import brave.messaging.MessagingProducerHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.CurrentTraceContext;
 import brave.propagation.Propagation.Getter;
-import brave.propagation.TraceContext;
+import brave.propagation.Propagation.Setter;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
@@ -32,45 +31,24 @@ import javax.jms.Destination;
 import javax.jms.JMSProducer;
 import javax.jms.Message;
 
-import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentId;
-
-@JMS2_0 final class TracingJMSProducer
-    extends MessagingProducerHandler<JMSProducer, Destination, JMSProducer>
-    implements JMSProducer {
-
-  static final Getter<JMSProducer, String> GETTER = new Getter<JMSProducer, String>() {
-    @Override public String get(JMSProducer carrier, String key) {
-      return carrier.getStringProperty(key);
-    }
-
-    @Override public String toString() {
-      return "JMSProducer::getStringProperty";
-    }
-  };
+@JMS2_0 final class TracingJMSProducer implements JMSProducer {
 
+  final JMSProducer delegate;
+  final ProducerHandler<Destination, JMSProducer, JMSProducer> handler;
   final Tracer tracer;
   final CurrentTraceContext current;
 
   TracingJMSProducer(JMSProducer delegate, JmsTracing jmsTracing) {
-    super(
-        delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        JmsProducerAdapter.create(jmsTracing),
-        jmsTracing.msgTracing.tracing().propagation().extractor(GETTER),
-        new TraceContext.Injector<JMSProducer>() {
-          @Override public void inject(TraceContext traceContext, JMSProducer carrier) {
-            PropertyFilter.JMS_PRODUCER.filterProperties(carrier, jmsTracing.propagationKeys);
-            carrier.setProperty("b3", writeB3SingleFormatWithoutParentId(traceContext));
-          }
-
-          @Override
-          public String toString() {
-            return "Message::setStringProperty(\"b3\",singleHeaderFormatWithoutParent)";
-          }
-        });
-    this.tracer = jmsTracing.msgTracing.tracing().tracer();
-    this.current = jmsTracing.msgTracing.tracing().currentTraceContext();
+    this.delegate = delegate;
+    this.handler = ProducerHandler.create(
+      jmsTracing.messageTracing,
+      new JMSProducerAdapter(jmsTracing),
+      jmsTracing.messageTracing.tracing().propagation().extractor(GETTER),
+      new JmsTracing.FilteringInjector<>(PropertyFilter.JMS_PRODUCER, jmsTracing.propagationKeys,
+        SETTER)
+    );
+    tracer = jmsTracing.messageTracing.tracing().tracer();
+    current = jmsTracing.messageTracing.tracing().currentTraceContext();
   }
 
   // Partial function pattern as this needs to work before java 8 method references
@@ -130,24 +108,30 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
   }
 
   void send(Send send, Destination destination, Object message) {
-    Span span = handleProduce(destination, this);
+    Span span = handler.startSend(destination, this);
+    // TODO: document what's going on with the getAsync dance here as it makes the lifecycle complex
+    // it isn't clear why a synchronous method would imply an async call, for example.
     final CompletionListener oldCompletionListener = getAsync();
+    boolean shouldFinish = true;
     if (oldCompletionListener != null) {
-      delegate.setAsync(TracingCompletionListener.create(oldCompletionListener, span, current));
+      shouldFinish = false;
+      delegate.setAsync(TracingCompletionListener.create(oldCompletionListener, handler, current,
+        destination, delegate, span));
     }
     SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
     try {
       send.apply(delegate, destination, message);
     } catch (RuntimeException | Error e) {
       span.error(e);
-      span.finish();
+      shouldFinish = true;
       throw e;
     } finally {
       ws.close();
-      if (oldCompletionListener != null) {
+      if (oldCompletionListener != null) { // TODO: why are we doing this
         delegate.setAsync(oldCompletionListener);
-      } else {
-        span.finish();
+      }
+      if (shouldFinish) {
+        handler.finishSend(destination, this, span);
       }
     }
   }
@@ -345,31 +329,34 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
     return delegate.getJMSReplyTo();
   }
 
-  static class JmsProducerAdapter implements MessageProducerAdapter<JMSProducer> {
-    final JmsTracing jmsTracing;
-
-    JmsProducerAdapter(JmsTracing jmsTracing) {
-      this.jmsTracing = jmsTracing;
+  // helper types inlined here to avoid having JmsTracing access JMS 2.0 types
+  static final Getter<JMSProducer, String> GETTER = new Getter<JMSProducer, String>() {
+    @Override public String get(JMSProducer carrier, String key) {
+      return carrier.getStringProperty(key);
     }
 
-    static JmsProducerAdapter create(JmsTracing jmsTracing) {
-      return new JmsProducerAdapter(jmsTracing);
+    @Override public String toString() {
+      return "JMSProducer::getStringProperty";
     }
+  };
 
-    @Override public String operation(JMSProducer message) {
-      return "send";
+  static final Setter<JMSProducer, String> SETTER = new Setter<JMSProducer, String>() {
+    @Override public void put(JMSProducer carrier, String key, String value) {
+      carrier.setProperty(key, value);
     }
 
-    @Override public String identifier(JMSProducer message) {
-      return message.getJMSCorrelationID();
+    @Override public String toString() {
+      return "JMSProducer::setProperty";
     }
+  };
 
-    //@Override public void clearPropagation(JMSProducer message) {
-    //  PropertyFilter.JMS_PRODUCER.filterProperties(message, jmsTracing.propagationKeys);
-    //}
+  static final class JMSProducerAdapter extends JmsAdapter<JMSProducer> {
+    JMSProducerAdapter(JmsTracing jmsTracing) {
+      super(jmsTracing);
+    }
 
-    @Override public String identifierTagKey() {
-      return "jms.correlation_id";
+    @Override public String correlationId(JMSProducer message) {
+      return message.getJMSCorrelationID();
     }
   }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
index b489cc0..8b33646 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageConsumer.java
@@ -16,7 +16,7 @@
  */
 package brave.jms;
 
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -29,30 +29,31 @@ import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSubscriber;
 
+import static brave.jms.JmsTracing.destination;
 import static brave.jms.TracingConnection.TYPE_QUEUE;
 import static brave.jms.TracingConnection.TYPE_TOPIC;
 
 /** Implements all interfaces as according to ActiveMQ, this is typical of JMS 1.1. */
-final class TracingMessageConsumer
-    extends MessagingConsumerHandler<MessageConsumer, Destination, Message>
-    implements QueueReceiver, TopicSubscriber {
+final class TracingMessageConsumer implements QueueReceiver, TopicSubscriber {
 
   static TracingMessageConsumer create(MessageConsumer delegate, JmsTracing jmsTracing) {
     if (delegate instanceof TracingMessageConsumer) return (TracingMessageConsumer) delegate;
     return new TracingMessageConsumer(delegate, jmsTracing);
   }
 
+  final MessageConsumer delegate;
   final JmsTracing jmsTracing;
+  final ConsumerHandler<Destination, Message, Message> handler;
   final int types;
 
   TracingMessageConsumer(MessageConsumer delegate, JmsTracing jmsTracing) {
-    super(
-        delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        jmsTracing.consumerMessageAdapter,
-        jmsTracing.extractor,
-        jmsTracing.injector);
+    this.delegate = delegate;
+    this.handler = ConsumerHandler.create(
+      jmsTracing.messageTracing,
+      jmsTracing.messageAdapter,
+      jmsTracing.messageExtractor,
+      jmsTracing.messageInjector
+    );
     this.jmsTracing = jmsTracing;
     int types = 0;
     if (delegate instanceof QueueSender) types |= TYPE_QUEUE;
@@ -74,19 +75,19 @@ final class TracingMessageConsumer
 
   @Override public Message receive() throws JMSException {
     Message message = delegate.receive();
-    handleConsume(message.getJMSDestination(), message);
+    handler.handleReceive(destination(message), message);
     return message;
   }
 
   @Override public Message receive(long timeout) throws JMSException {
     Message message = delegate.receive(timeout);
-    handleConsume(message.getJMSDestination(), message);
+    handler.handleReceive(destination(message), message);
     return message;
   }
 
   @Override public Message receiveNoWait() throws JMSException {
     Message message = delegate.receiveNoWait();
-    handleConsume(message.getJMSDestination(), message);
+    handler.handleReceive(destination(message), message);
     return message;
   }
 
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
index 3f733cc..93b6bd3 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java
@@ -17,28 +17,25 @@
 package brave.jms;
 
 import brave.Span;
-import brave.Tracer;
-import brave.Tracer.SpanInScope;
-import brave.Tracing;
-import brave.messaging.ChannelAdapter;
-import brave.propagation.TraceContextOrSamplingFlags;
+import brave.messaging.ProcessorHandler;
+import brave.propagation.CurrentTraceContext;
+import brave.propagation.CurrentTraceContext.Scope;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
-import static brave.Span.Kind.CONSUMER;
+import static brave.jms.JmsTracing.destination;
 
 /**
  * When {@link #addConsumerSpan} this creates 2 spans:
  * <ol>
- *   <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the destination</li>
- *   <li>A child span with the duration of the delegated listener</li>
+ * <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the destination</li>
+ * <li>A child span with the duration of the delegated listener</li>
  * </ol>
  *
  * <p>{@link #addConsumerSpan} should only be set when the message consumer is not traced.
  */
 final class TracingMessageListener implements MessageListener {
-
   /** Creates a message listener which also adds a consumer span. */
   static MessageListener create(MessageListener delegate, JmsTracing jmsTracing) {
     if (delegate instanceof TracingMessageListener) return delegate;
@@ -46,54 +43,27 @@ final class TracingMessageListener implements MessageListener {
   }
 
   final MessageListener delegate;
-  final JmsTracing jmsTracing;
-  final Tracing tracing;
-  final Tracer tracer;
-  final String remoteServiceName;
+  final CurrentTraceContext current;
+  final ProcessorHandler<Destination, Message, Message> handler;
   final boolean addConsumerSpan;
-  final ChannelAdapter<Destination> channelAdapter;
 
   TracingMessageListener(MessageListener delegate, JmsTracing jmsTracing, boolean addConsumerSpan) {
     this.delegate = delegate;
-    this.jmsTracing = jmsTracing;
-    this.tracing = jmsTracing.msgTracing.tracing();
-    this.tracer = jmsTracing.msgTracing.tracing().tracer();
-    this.remoteServiceName = jmsTracing.remoteServiceName;
+    this.current = jmsTracing.messageTracing.tracing().currentTraceContext();
+    this.handler = jmsTracing.processorHandler;
     this.addConsumerSpan = addConsumerSpan;
-    channelAdapter = JmsAdapter.JmsChannelAdapter.create(jmsTracing);
   }
 
   @Override public void onMessage(Message message) {
-    Span listenerSpan = startMessageListenerSpan(message);
-    try (SpanInScope ws = tracer.withSpanInScope(listenerSpan)) {
+    Span listenerSpan =
+      handler.startProcessor(destination(message), message, addConsumerSpan).name("on-message");
+    try (Scope ws = current.newScope(listenerSpan.context())) {
       delegate.onMessage(message);
-    } catch (Throwable t) {
-      listenerSpan.error(t);
-      throw t;
+    } catch (RuntimeException | Error e) {
+      listenerSpan.error(e);
+      throw e;
     } finally {
       listenerSpan.finish();
     }
   }
-
-  Span startMessageListenerSpan(Message message) {
-    if (!addConsumerSpan) return jmsTracing.nextSpan(message).name("on-message").start();
-    TraceContextOrSamplingFlags extracted = jmsTracing.extractor.extract(message);
-
-    // JMS has no visibility of the incoming message, which incidentally could be local!
-    Span consumerSpan = tracer.nextSpan(extracted).kind(CONSUMER).name("receive");
-    Span listenerSpan = tracer.newChild(consumerSpan.context());
-
-    if (!consumerSpan.isNoop()) {
-      long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
-      consumerSpan.start(timestamp);
-      if (remoteServiceName != null) consumerSpan.remoteServiceName(remoteServiceName);
-      jmsTracing.msgTracing.consumerParser().channel(channelAdapter, jmsTracing.destination(message), consumerSpan);
-      long consumerFinish = timestamp + 1L; // save a clock reading
-      consumerSpan.finish(consumerFinish);
-
-      // not using scoped span as we want to start late
-      listenerSpan.name("on-message").start(consumerFinish);
-    }
-    return listenerSpan;
-  }
 }
diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
index f79b8b3..8aaf910 100644
--- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
+++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageProducer.java
@@ -18,9 +18,9 @@ package brave.jms;
 
 import brave.Span;
 import brave.Tracer;
-import brave.Tracer.SpanInScope;
-import brave.messaging.MessagingProducerHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.CurrentTraceContext;
+import brave.propagation.CurrentTraceContext.Scope;
 import javax.jms.CompletionListener;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -31,46 +31,32 @@ import javax.jms.QueueSender;
 import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 
+import static brave.jms.JmsTracing.destination;
 import static brave.jms.TracingConnection.TYPE_QUEUE;
 import static brave.jms.TracingConnection.TYPE_TOPIC;
 
 /** Implements all interfaces as according to ActiveMQ, this is typical of JMS 1.1. */
-final class TracingMessageProducer extends MessagingProducerHandler<MessageProducer, Destination, Message>
-    implements QueueSender, TopicPublisher {
-
+final class TracingMessageProducer implements QueueSender, TopicPublisher {
   static TracingMessageProducer create(MessageProducer delegate, JmsTracing jmsTracing) {
     if (delegate instanceof TracingMessageProducer) return (TracingMessageProducer) delegate;
     return new TracingMessageProducer(delegate, jmsTracing);
   }
 
+  final MessageProducer delegate;
   final int types;
+  final ProducerHandler<Destination, Message, Message> handler;
   final CurrentTraceContext current;
   final Tracer tracer;
 
   TracingMessageProducer(MessageProducer delegate, JmsTracing jmsTracing) {
-    super(delegate,
-        jmsTracing.msgTracing,
-        jmsTracing.channelAdapter,
-        jmsTracing.producerMessageAdapter,
-        jmsTracing.extractor,
-        jmsTracing.injector);
+    this.delegate = delegate;
     int types = 0;
     if (delegate instanceof QueueSender) types |= TYPE_QUEUE;
     if (delegate instanceof TopicPublisher) types |= TYPE_TOPIC;
     this.types = types;
-    this.current = jmsTracing.msgTracing.tracing().currentTraceContext();
-    this.tracer = jmsTracing.msgTracing.tracing().tracer();
-  }
-
-  Destination destination(Message message) {
-    try {
-      Destination result = message.getJMSDestination();
-      if (result != null) return result;
-      return delegate.getDestination();
-    } catch (JMSException ignored) {
-      // don't crash on wonky exceptions!
-    }
-    return null;
+    this.handler = jmsTracing.producerHandler;
+    this.current = jmsTracing.messageTracing.tracing().currentTraceContext();
+    this.tracer = jmsTracing.messageTracing.tracing().tracer();
   }
 
   @Override public void setDisableMessageID(boolean value) throws JMSException {
@@ -132,8 +118,10 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
   }
 
   @Override public void send(Message message) throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(message);
     } catch (RuntimeException | JMSException | Error e) {
@@ -141,14 +129,16 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   @Override public void send(Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    throws JMSException {
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -156,32 +146,32 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   enum SendDestination {
     DESTINATION {
       @Override void apply(MessageProducer producer, Destination destination, Message message)
-          throws JMSException {
+        throws JMSException {
         producer.send(destination, message);
       }
     },
     QUEUE {
       @Override void apply(MessageProducer producer, Destination destination, Message message)
-          throws JMSException {
+        throws JMSException {
         ((QueueSender) producer).send((Queue) destination, message);
       }
     },
     TOPIC {
       @Override void apply(MessageProducer producer, Destination destination, Message message)
-          throws JMSException {
+        throws JMSException {
         ((TopicPublisher) producer).publish((Topic) destination, message);
       }
     };
 
     abstract void apply(MessageProducer producer, Destination destination, Message message)
-        throws JMSException;
+      throws JMSException;
   }
 
   @Override public void send(Destination destination, Message message) throws JMSException {
@@ -189,9 +179,9 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
   }
 
   void send(SendDestination sendDestination, Destination destination, Message message)
-      throws JMSException {
-    Span span = handleProduce(destination, message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    throws JMSException {
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       sendDestination.apply(delegate, destination, message);
     } catch (RuntimeException | JMSException | Error e) {
@@ -199,15 +189,15 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   @Override
   public void send(Destination destination, Message message, int deliveryMode, int priority,
-      long timeToLive) throws JMSException {
-    Span span = handleProduce(destination, message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    long timeToLive) throws JMSException {
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(destination, message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -215,20 +205,24 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0
   public void send(Message message, CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    completionListener =
+      tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
-      delegate.send(message, TracingCompletionListener.create(completionListener, span, current));
+      delegate.send(message, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
@@ -237,32 +231,44 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0 public void send(Message message, int deliveryMode, int priority, long timeToLive,
-      CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination(message), message);
-    completionListener = TracingCompletionListener.create(completionListener, span, current);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    CompletionListener completionListener) throws JMSException {
+    Destination destination = destination(message);
+
+    Span span = handler.startSend(destination, message);
+    completionListener =
+      tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(message, deliveryMode, priority, timeToLive, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
     }
   }
 
+  private CompletionListener tracingCompletionListener(Message message,
+    CompletionListener completionListener,
+    Span span, Destination destination) {
+    completionListener =
+      TracingCompletionListener.create(completionListener, handler, current, destination, message,
+        span);
+    return completionListener;
+  }
+
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0 public void send(Destination destination, Message message,
-      CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination, message);
-    completionListener = TracingCompletionListener.create(completionListener, span, current);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    CompletionListener completionListener) throws JMSException {
+    Span span = handler.startSend(destination, message);
+    completionListener = tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(destination, message, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
@@ -271,15 +277,15 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   /* @Override JMS 2.0 method: Intentionally no override to ensure JMS 1.1 works! */
   @JMS2_0 public void send(Destination destination, Message message, int deliveryMode, int priority,
-      long timeToLive, CompletionListener completionListener) throws JMSException {
-    Span span = handleProduce(destination, message);
-    completionListener = TracingCompletionListener.create(completionListener, span, current);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    long timeToLive, CompletionListener completionListener) throws JMSException {
+    Span span = handler.startSend(destination, message);
+    completionListener = tracingCompletionListener(message, completionListener, span, destination);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       delegate.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
     } catch (RuntimeException | JMSException | Error e) {
       span.error(e);
-      span.finish();
+      handler.finishSend(destination, message, span);
       throw e;
     } finally {
       ws.close();
@@ -300,11 +306,14 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   @Override
   public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
+    throws JMSException {
     checkQueueSender();
     QueueSender qs = (QueueSender) delegate;
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Destination destination = destination(message);
+    if (destination == null) destination = qs.getDestination();
+
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       qs.send(queue, message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -312,7 +321,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
@@ -332,9 +341,11 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
   @Override public void publish(Message message) throws JMSException {
     checkTopicPublisher();
     TopicPublisher tp = (TopicPublisher) delegate;
+    Destination destination = destination(message);
+    if (destination == null) destination = tp.getDestination();
 
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       tp.publish(message);
     } catch (RuntimeException | JMSException | Error e) {
@@ -342,17 +353,19 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
   @Override public void publish(Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
+    throws JMSException {
     checkTopicPublisher();
     TopicPublisher tp = (TopicPublisher) delegate;
+    Destination destination = destination(message);
+    if (destination == null) destination = tp.getDestination();
 
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       tp.publish(message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -360,7 +373,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
@@ -371,12 +384,14 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
 
   @Override
   public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
-      throws JMSException {
+    throws JMSException {
     checkTopicPublisher();
     TopicPublisher tp = (TopicPublisher) delegate;
+    Destination destination = destination(message);
+    if (destination == null) destination = tp.getDestination();
 
-    Span span = handleProduce(destination(message), message);
-    SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
+    Span span = handler.startSend(destination, message);
+    Scope ws = current.newScope(span.context()); // animal-sniffer mistakes this for AutoCloseable
     try {
       tp.publish(topic, message, deliveryMode, priority, timeToLive);
     } catch (RuntimeException | JMSException | Error e) {
@@ -384,7 +399,7 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw e;
     } finally {
       ws.close();
-      span.finish();
+      handler.finishSend(destination, message, span);
     }
   }
 
@@ -393,5 +408,4 @@ final class TracingMessageProducer extends MessagingProducerHandler<MessageProdu
       throw new IllegalStateException(delegate + " is not a TopicPublisher");
     }
   }
-
 }
diff --git a/instrumentation/jms/src/test/java/brave/jms/JmsTest.java b/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
index 67637ce..9acbc4c 100644
--- a/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
+++ b/instrumentation/jms/src/test/java/brave/jms/JmsTest.java
@@ -39,17 +39,6 @@ import zipkin2.Span;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public abstract class JmsTest {
-  static final Propagation.Setter<Message, String> SETTER =
-      new Propagation.Setter<Message, String>() {
-        @Override public void put(Message carrier, String key, String value) {
-          try {
-            carrier.setStringProperty(key, value);
-          } catch (JMSException e) {
-            throw new AssertionError(e);
-          }
-        }
-      };
-
   @After public void tearDown() {
     tracing.close();
   }
diff --git a/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java b/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
index a5f2d2d..bf35264 100644
--- a/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
+++ b/instrumentation/jms/src/test/java/brave/jms/TracingCompletionListenerTest.java
@@ -17,8 +17,8 @@
 package brave.jms;
 
 import brave.Span;
-import brave.sampler.Sampler;
 import javax.jms.CompletionListener;
+import javax.jms.Destination;
 import javax.jms.Message;
 import org.junit.Test;
 
@@ -27,46 +27,36 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 public class TracingCompletionListenerTest extends JmsTest {
-  @Test public void create_returns_input_on_noop() {
-    Span span = tracing.tracer().withSampler(Sampler.NEVER_SAMPLE).nextSpan();
-
-    CompletionListener delegate = mock(CompletionListener.class);
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(delegate, span, current);
-
-    assertThat(tracingCompletionListener).isSameAs(delegate);
-  }
+  Destination destination = mock(Destination.class);
+  Message message = mock(Message.class);
+  CompletionListener delegate = mock(CompletionListener.class);
 
   @Test public void on_completion_should_finish_span() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(mock(CompletionListener.class), span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
     tracingCompletionListener.onCompletion(message);
 
     assertThat(takeSpan()).isNotNull();
   }
 
   @Test public void on_exception_should_tag_if_exception() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(mock(CompletionListener.class), span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
     tracingCompletionListener.onException(message, new Exception("Test exception"));
 
     assertThat(takeSpan().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 
   @Test public void on_completion_should_forward_then_finish_span() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener delegate = mock(CompletionListener.class);
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(delegate, span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
     tracingCompletionListener.onCompletion(message);
 
     verify(delegate).onCompletion(message);
@@ -74,7 +64,6 @@ public class TracingCompletionListenerTest extends JmsTest {
   }
 
   @Test public void on_completion_should_have_span_in_scope() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
     CompletionListener delegate = new CompletionListener() {
@@ -87,23 +76,25 @@ public class TracingCompletionListenerTest extends JmsTest {
       }
     };
 
-    TracingCompletionListener.create(delegate, span, current).onCompletion(message);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
+
+    tracingCompletionListener.onCompletion(message);
 
     takeSpan(); // consumer reported span
   }
 
   @Test public void on_exception_should_forward_then_tag() throws Exception {
-    Message message = mock(Message.class);
     Span span = tracing.tracer().nextSpan().start();
 
-    CompletionListener delegate = mock(CompletionListener.class);
-    CompletionListener tracingCompletionListener =
-        TracingCompletionListener.create(delegate, span, current);
+    CompletionListener tracingCompletionListener = TracingCompletionListener.create(
+      delegate, jmsTracing.producerHandler, current, destination, message, span);
+
     Exception e = new Exception("Test exception");
     tracingCompletionListener.onException(message, e);
 
     verify(delegate).onException(message, e);
     assertThat(takeSpan().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 }
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
index ae1359b..46159ac 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java
@@ -17,21 +17,27 @@
 package brave.kafka.clients;
 
 import brave.Span;
+import brave.SpanCustomizer;
 import brave.Tracing;
+import brave.internal.Nullable;
+import brave.kafka.clients.TracingConsumer.ConsumerRecordAdapter;
+import brave.kafka.clients.TracingProducer.ProducerRecordAdapter;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
 import brave.messaging.MessagingTracing;
+import brave.messaging.ProcessorHandler;
+import brave.messaging.ProducerHandler;
 import brave.propagation.B3SingleFormat;
 import brave.propagation.Propagation;
 import brave.propagation.TraceContext;
 import brave.propagation.TraceContext.Extractor;
 import brave.propagation.TraceContext.Injector;
-import brave.propagation.TraceContextOrSamplingFlags;
-import java.util.Iterator;
 import java.util.List;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 
 import static brave.kafka.clients.KafkaPropagation.B3_SINGLE_TEST_HEADERS;
@@ -42,11 +48,6 @@ import static brave.propagation.B3SingleFormat.writeB3SingleFormatWithoutParentI
 
 /** Use this class to decorate your Kafka consumer / producer and enable Tracing. */
 public final class KafkaTracing {
-
-  static final String PROTOCOL = "kafka";
-  static final String PRODUCER_OPERATION = "send";
-  static final String CONSUMER_OPERATION = "poll";
-
   public static KafkaTracing create(Tracing tracing) {
     return new Builder(tracing).build();
   }
@@ -56,18 +57,19 @@ public final class KafkaTracing {
   }
 
   public static final class Builder {
-    final MessagingTracing msgTracing;
+    final MessagingTracing messagingTracing;
     String remoteServiceName = "kafka";
     boolean writeB3SingleFormat;
 
     Builder(Tracing tracing) {
       if (tracing == null) throw new NullPointerException("tracing == null");
-      this.msgTracing = MessagingTracing.create(tracing);
+      this.messagingTracing =
+        MessagingTracing.newBuilder(tracing).parser(new LegacyMessagingParser()).build();
     }
 
-    Builder(MessagingTracing msgTracing) {
-      if (msgTracing == null) throw new NullPointerException("msgTracing == null");
-      this.msgTracing = msgTracing;
+    Builder(MessagingTracing messagingTracing) {
+      if (messagingTracing == null) throw new NullPointerException("messagingTracing == null");
+      this.messagingTracing = messagingTracing;
     }
 
     /**
@@ -95,72 +97,40 @@ public final class KafkaTracing {
     }
   }
 
-  final MessagingTracing msgTracing;
-  final String remoteServiceName;
-  final List<String> propagationKeys;
-
-  boolean singleFormat;
+  final MessagingTracing messagingTracing;
+  final ConsumerHandler<String, ConsumerRecord, Headers> consumerHandler;
+  final ProcessorHandler<String, ConsumerRecord, Headers> processorHandler;
+  final ProducerHandler<String, ProducerRecord, Headers> producerHandler;
 
   KafkaTracing(Builder builder) { // intentionally hidden constructor
-    this.msgTracing = builder.msgTracing;
-    this.remoteServiceName = builder.remoteServiceName;
-    this.propagationKeys = msgTracing.tracing().propagation().keys();
-    final Extractor<Headers> extractor =
-        msgTracing.tracing().propagation().extractor(HEADERS_GETTER);
-    List<String> keyList = msgTracing.tracing().propagation().keys();
-    singleFormat = false;
+    this.messagingTracing = builder.messagingTracing;
+    Extractor<Headers> extractor =
+      messagingTracing.tracing().propagation().extractor(HEADERS_GETTER);
+    Injector<Headers> injector;
+    List<String> keyList = messagingTracing.tracing().propagation().keys();
     if (builder.writeB3SingleFormat || keyList.equals(Propagation.B3_SINGLE_STRING.keys())) {
       TraceContext testExtraction = extractor.extract(B3_SINGLE_TEST_HEADERS).context();
       if (!TEST_CONTEXT.equals(testExtraction)) {
         throw new IllegalArgumentException(
-            "KafkaTracing.Builder.writeB3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!");
+          "KafkaTracing.Builder.writeB3SingleFormat set, but Tracing.Builder.propagationFactory cannot parse this format!");
       }
-      singleFormat = true;
-    }
-  }
-
-  <K, V> Extractor<ProducerRecord<K, V>> producerRecordExtractor() {
-    return msgTracing.tracing()
-        .propagation()
-        .extractor((record, key) -> HEADERS_GETTER.get(record.headers(), key));
-  }
-
-  <K, V> Injector<ProducerRecord<K, V>> producerRecordInjector() {
-    return singleFormat ?
-        new Injector<ProducerRecord<K, V>>() {
-          @Override public void inject(TraceContext traceContext, ProducerRecord<K, V> carrier) {
-            carrier.headers().add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
-          }
-
-          @Override public String toString() {
-            return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
-          }
+      injector = new Injector<Headers>() {
+        @Override public void inject(TraceContext traceContext, Headers headers) {
+          headers.add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
         }
-        : msgTracing.tracing().propagation().injector((record, key, value) -> {
-          HEADERS_SETTER.put(record.headers(), key, value);
-        });
-  }
 
-  <K, V> Extractor<ConsumerRecord<K, V>> consumerRecordExtractor() {
-    return msgTracing.tracing()
-        .propagation()
-        .extractor((record, key) -> HEADERS_GETTER.get(record.headers(), key));
-  }
-
-  <K, V> Injector<ConsumerRecord<K, V>> consumerRecordInjector() {
-    return singleFormat ?
-        new Injector<ConsumerRecord<K, V>>() {
-          @Override public void inject(TraceContext traceContext, ConsumerRecord<K, V> carrier) {
-            carrier.headers().add("b3", writeB3SingleFormatWithoutParentIdAsBytes(traceContext));
-          }
-
-          @Override public String toString() {
-            return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
-          }
+        @Override public String toString() {
+          return "Headers::add(\"b3\",singleHeaderFormatWithoutParent)";
         }
-        : msgTracing.tracing().propagation().injector((record, key, value) -> {
-          HEADERS_SETTER.put(record.headers(), key, value);
-        });
+      };
+    } else {
+      injector = messagingTracing.tracing().propagation().injector(HEADERS_SETTER);
+    }
+    consumerHandler = ConsumerHandler.create(
+      messagingTracing, new ConsumerRecordAdapter(builder.remoteServiceName), extractor, injector);
+    processorHandler = ProcessorHandler.create(messagingTracing, consumerHandler);
+    producerHandler = ProducerHandler.create(
+      messagingTracing, new ProducerRecordAdapter(builder.remoteServiceName), extractor, injector);
   }
 
   /**
@@ -185,35 +155,37 @@ public final class KafkaTracing {
    * one couldn't be extracted.
    */
   public <K, V> Span nextSpan(ConsumerRecord<K, V> record) {
-    final TracingConsumer.KafkaConsumerAdapter<K, V> adapter =
-        TracingConsumer.KafkaConsumerAdapter.create(this);
-    return msgTracing.nextSpan(adapter, adapter, consumerRecordExtractor(), record, record);
-  }
-
-  <Record> String channelTagKey(Record record) {
-    return String.format("%s.topic", PROTOCOL);
+    return processorHandler.startProcessor(record.topic(), record, false);
   }
 
-  String recordKey(Object key) {
+  @Nullable static String recordKey(Object key) {
     if (key instanceof String && !"".equals(key)) {
       return key.toString();
     }
     return null;
   }
 
-  String identifierTagKey() {
-    return String.format("%s.key", PROTOCOL);
-  }
+  static class LegacyMessagingParser extends MessagingParser {
+    @Override
+    protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+      Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+      String channelName = adapter.channel(channel);
+      if (channelName != null) customizer.tag("kafka.topic", channelName);
+      if (msg != null && context.parentId() == null) { // is a root span
+        String messageKey = adapter.messageKey(msg);
+        if (messageKey != null) customizer.tag("kafka.key", messageKey);
+      }
+    }
 
-  // BRAVE6: consider a messaging variant of extraction which clears headers as they are read.
-  // this could prevent having to go back and clear them later. Another option is to encourage,
-  // then special-case single header propagation. When there's only 1 propagation key, you don't
-  // need to do a loop!
-  void clearHeaders(Headers headers) {
-    // Headers::remove creates and consumes an iterator each time. This does one loop instead.
-    for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
-      Header next = i.next();
-      if (propagationKeys.contains(next.key())) i.remove();
+    /** Returns the span name of a message operation. Defaults to the operation name. */
+    @Override protected <Chan, Msg, C> String spanName(String operation,
+      MessagingAdapter<Chan, Msg, C> adapter, Chan channel, @Nullable Msg msg) {
+      switch (operation) {
+        case "receive":
+        case "receive-batch":
+          return "poll";
+      }
+      return super.spanName(operation, adapter, channel, msg);
     }
   }
 }
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
index 7f17d30..de2b06d 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java
@@ -18,9 +18,8 @@ package brave.kafka.clients;
 
 import brave.Span;
 import brave.Tracing;
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageConsumerAdapter;
-import brave.messaging.MessagingConsumerHandler;
+import brave.messaging.ConsumerHandler;
+import brave.messaging.MessagingAdapter;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -40,39 +39,31 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 
 /**
  * Kafka Consumer decorator. Read records headers to create and complete a child of the incoming
  * producers span if possible.
  */
-final class TracingConsumer<K, V>
-    extends MessagingConsumerHandler<Consumer<K, V>, ConsumerRecord<K, V>, ConsumerRecord<K, V>>
-    implements Consumer<K, V> {
-
-  final KafkaTracing kafkaTracing;
-  final Tracing tracing;
-  final String remoteServiceName;
-
+final class TracingConsumer<K, V> implements Consumer<K, V> {
   // replicate org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener behaviour
   static final ConsumerRebalanceListener NO_OP_CONSUMER_REBALANCE_LISTENER =
-      new ConsumerRebalanceListener() {
-        @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        }
+    new ConsumerRebalanceListener() {
+      @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+      }
 
-        @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-        }
-      };
+      @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+      }
+    };
+
+  final Consumer<K, V> delegate;
+  final Tracing tracing;
+  final ConsumerHandler<String, ConsumerRecord, Headers> handler;
 
   TracingConsumer(Consumer<K, V> delegate, KafkaTracing kafkaTracing) {
-    super(delegate,
-        kafkaTracing.msgTracing,
-        KafkaConsumerAdapter.create(kafkaTracing),
-        KafkaConsumerAdapter.create(kafkaTracing),
-        kafkaTracing.consumerRecordExtractor(),
-        kafkaTracing.consumerRecordInjector());
-    this.kafkaTracing = kafkaTracing;
-    this.tracing = kafkaTracing.msgTracing.tracing();
-    this.remoteServiceName = kafkaTracing.remoteServiceName;
+    this.delegate = delegate;
+    this.handler = kafkaTracing.consumerHandler;
+    this.tracing = kafkaTracing.messagingTracing.tracing();
   }
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
@@ -84,15 +75,13 @@ final class TracingConsumer<K, V>
   @Override public ConsumerRecords<K, V> poll(long timeout) {
     ConsumerRecords<K, V> records = delegate.poll(timeout);
     if (records.isEmpty() || tracing.isNoop()) return records;
-    long timestamp = 0L;
     Map<String, Span> consumerSpansForTopic = new LinkedHashMap<>();
     for (TopicPartition partition : records.partitions()) {
-      List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
+      List<? extends ConsumerRecord> recordsInPartition = records.records(partition);
       consumerSpansForTopic =
-          handleConsume(recordsInPartition.size() > 0 ? recordsInPartition.get(0) : null,
-              recordsInPartition, consumerSpansForTopic);
+        handler.startBulkReceive(partition.topic(), recordsInPartition, consumerSpansForTopic);
     }
-    for (Span span : consumerSpansForTopic.values()) span.finish(timestamp);
+    handler.finishBulkReceive(consumerSpansForTopic);
     return records;
   }
 
@@ -156,7 +145,7 @@ final class TracingConsumer<K, V>
   }
 
   @Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
-      OffsetCommitCallback callback) {
+    OffsetCommitCallback callback) {
     delegate.commitAsync(offsets, callback);
   }
 
@@ -228,13 +217,13 @@ final class TracingConsumer<K, V>
   }
 
   @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
-      Map<TopicPartition, Long> timestampsToSearch) {
+    Map<TopicPartition, Long> timestampsToSearch) {
     return delegate.offsetsForTimes(timestampsToSearch);
   }
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
   public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
-      Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
+    Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
     return delegate.offsetsForTimes(timestampsToSearch, timeout);
   }
 
@@ -245,7 +234,7 @@ final class TracingConsumer<K, V>
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
   public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions,
-      Duration timeout) {
+    Duration timeout) {
     return delegate.beginningOffsets(partitions, timeout);
   }
 
@@ -255,7 +244,7 @@ final class TracingConsumer<K, V>
 
   // Do not use @Override annotation to avoid compatibility issue version < 2.0
   public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions,
-      Duration timeout) {
+    Duration timeout) {
     return delegate.endOffsets(partitions, timeout);
   }
 
@@ -276,40 +265,36 @@ final class TracingConsumer<K, V>
     delegate.wakeup();
   }
 
-  static class KafkaConsumerAdapter<K, V> implements MessageConsumerAdapter<ConsumerRecord<K, V>>,
-      ChannelAdapter<ConsumerRecord<K, V>> {
-    final KafkaTracing kafkaTracing;
-
-    KafkaConsumerAdapter(KafkaTracing kafkaTracing) {
-      this.kafkaTracing = kafkaTracing;
-    }
+  static final class ConsumerRecordAdapter
+    extends MessagingAdapter<String, ConsumerRecord, Headers> {
+    final String remoteServiceName;
 
-    static <K, V> KafkaConsumerAdapter<K, V> create(KafkaTracing kafkaTracing) {
-      return new KafkaConsumerAdapter<>(kafkaTracing);
+    ConsumerRecordAdapter(String remoteServiceName) {
+      this.remoteServiceName = remoteServiceName;
     }
 
-    @Override public String channel(ConsumerRecord message) {
-      return message.topic();
+    @Override public Headers carrier(ConsumerRecord message) {
+      return message.headers();
     }
 
-    @Override public String operation(ConsumerRecord message) {
-      return KafkaTracing.CONSUMER_OPERATION;
+    @Override public String channel(String topic) {
+      return topic;
     }
 
-    @Override public String identifier(ConsumerRecord message) {
-      return kafkaTracing.recordKey(message.key());
+    @Override public String channelType(String channel) {
+      return "topic";
     }
 
-    @Override public String remoteServiceName(ConsumerRecord message) {
-      return kafkaTracing.remoteServiceName;
+    @Override public String messageKey(ConsumerRecord message) {
+      return KafkaTracing.recordKey(message.key());
     }
 
-    @Override public String channelTagKey(ConsumerRecord<K, V> message) {
-      return kafkaTracing.channelTagKey(message);
+    @Override public String correlationId(ConsumerRecord message) {
+      return null;
     }
 
-    @Override public String identifierTagKey() {
-      return kafkaTracing.identifierTagKey();
+    @Override public String brokerName(String topic) {
+      return remoteServiceName;
     }
   }
 }
diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
index 9ce4092..6186e3e 100644
--- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
+++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java
@@ -18,11 +18,12 @@ package brave.kafka.clients;
 
 import brave.Span;
 import brave.Tracer;
+import brave.Tracing;
 import brave.internal.Nullable;
-import brave.messaging.ChannelAdapter;
-import brave.messaging.MessageProducerAdapter;
-import brave.messaging.MessagingProducerHandler;
-import brave.propagation.CurrentTraceContext;
+import brave.messaging.MessagingAdapter;
+import brave.messaging.MessagingParser;
+import brave.messaging.ProducerHandler;
+import brave.propagation.CurrentTraceContext.Scope;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -37,28 +38,22 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 
-final class TracingProducer<K, V>
-    extends MessagingProducerHandler<Producer<K, V>, ProducerRecord<K, V>, ProducerRecord<K, V>>
-    implements Producer<K, V> {
+final class TracingProducer<K, V> implements Producer<K, V> {
 
+  final Producer<K, V> delegate;
   final KafkaTracing kafkaTracing;
-  final CurrentTraceContext current;
-  final Tracer tracer;
-  @Nullable final String remoteServiceName;
+  final Tracing tracing;
+  final ProducerHandler<String, ProducerRecord, Headers> handler;
+  final MessagingParser parser;
 
   TracingProducer(Producer<K, V> delegate, KafkaTracing kafkaTracing) {
-    super(
-        delegate,
-        kafkaTracing.msgTracing,
-        KafkaProducerAdapter.create(kafkaTracing),
-        KafkaProducerAdapter.create(kafkaTracing),
-        kafkaTracing.producerRecordExtractor(),
-        kafkaTracing.producerRecordInjector());
+    this.delegate = delegate;
+    this.handler = kafkaTracing.producerHandler;
     this.kafkaTracing = kafkaTracing;
-    this.current = kafkaTracing.msgTracing.tracing().currentTraceContext();
-    this.tracer = kafkaTracing.msgTracing.tracing().tracer();
-    this.remoteServiceName = kafkaTracing.remoteServiceName;
+    this.tracing = kafkaTracing.messagingTracing.tracing();
+    this.parser = kafkaTracing.messagingTracing.parser();
   }
 
   @Override public void initTransactions() {
@@ -82,25 +77,27 @@ final class TracingProducer<K, V>
    * tracing.
    */
   @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
-    return this.send(record, null);
+    return send(record, null);
   }
 
   /**
    * This wraps the send method to add tracing.
    *
    * <p>When there is no current span, this attempts to extract one from headers. This is possible
-   * when a call to produce a message happens directly after a tracing consumer received a span. One
+   * when a call to produce a message happens directly after a tracing producer received a span. One
    * example scenario is Kafka Streams instrumentation.
    */
   // TODO: make b3single an option and then note how using this minimizes overhead
   @Override
   public Future<RecordMetadata> send(ProducerRecord<K, V> record, @Nullable Callback callback) {
-    Span span = handleProduce(record, record);
+    Span span = handler.startSend(record.topic(), record);
+    if (span.isNoop()) return delegate.send(record, callback);
 
-    try (Tracer.SpanInScope ws = tracer.withSpanInScope(span)) {
-      return delegate.send(record, TracingCallback.create(callback, span, current));
+    FinishSpanCallback finishSpanCallback = tracingCallback(record, callback, span);
+    try (Tracer.SpanInScope ws = tracing.tracer().withSpanInScope(span)) {
+      return delegate.send(record, finishSpanCallback);
     } catch (RuntimeException | Error e) {
-      span.error(e).finish(); // finish as an exception means the callback won't finish the span
+      finishSpanCallback.finish(e); // an exception might imply the callback was not invoked
       throw e;
     }
   }
@@ -132,89 +129,87 @@ final class TracingProducer<K, V>
 
   @Override
   public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
-      String consumerGroupId) {
-    delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
+    String producerGroupId) {
+    delegate.sendOffsetsToTransaction(offsets, producerGroupId);
   }
 
   /**
    * Decorates, then finishes a producer span. Allows tracing to record the duration between
    * batching for send and actual send.
    */
-  static final class TracingCallback {
-    static Callback create(@Nullable Callback delegate, Span span, CurrentTraceContext current) {
-      if (span.isNoop()) return delegate; // save allocation overhead
-      if (delegate == null) return new FinishSpan(span);
-      return new DelegateAndFinishSpan(delegate, span, current);
-    }
+  FinishSpanCallback tracingCallback(ProducerRecord<K, V> record, @Nullable Callback delegate,
+    Span span) {
+    if (delegate == null) return new FinishSpanCallback(record, span);
+    return new DelegateAndFinishSpanCallback(record, delegate, span);
+  }
 
-    static class FinishSpan implements Callback {
-      final Span span;
+  final class DelegateAndFinishSpanCallback extends FinishSpanCallback {
+    final Callback delegate;
 
-      FinishSpan(Span span) {
-        this.span = span;
-      }
+    DelegateAndFinishSpanCallback(ProducerRecord<K, V> record, Callback delegate, Span span) {
+      super(record, span);
+      this.delegate = delegate;
+    }
 
-      @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
-        if (exception != null) span.error(exception);
-        span.finish();
+    @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
+      try (Scope ws = tracing.currentTraceContext().maybeScope(span.context())) {
+        delegate.onCompletion(metadata, exception);
+      } finally {
+        finish(exception);
       }
     }
+  }
 
-    static final class DelegateAndFinishSpan extends FinishSpan {
-      final Callback delegate;
-      final CurrentTraceContext current;
-
-      DelegateAndFinishSpan(Callback delegate, Span span, CurrentTraceContext current) {
-        super(span);
-        this.delegate = delegate;
-        this.current = current;
-      }
+  class FinishSpanCallback implements Callback {
+    final ProducerRecord<K, V> record;
+    final Span span;
 
-      @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
-        try (CurrentTraceContext.Scope ws = current.maybeScope(span.context())) {
-          delegate.onCompletion(metadata, exception);
-        } finally {
-          super.onCompletion(metadata, exception);
-        }
-      }
+    FinishSpanCallback(ProducerRecord<K, V> record, Span span) {
+      this.record = record;
+      this.span = span;
     }
-  }
 
-  static final class KafkaProducerAdapter<K, V> implements
-      MessageProducerAdapter<ProducerRecord<K, V>>,
-      ChannelAdapter<ProducerRecord<K, V>> {
-    final KafkaTracing kafkaTracing;
+    @Override public void onCompletion(RecordMetadata metadata, @Nullable Exception exception) {
+      finish(exception);
+    }
 
-    KafkaProducerAdapter(KafkaTracing kafkaTracing) {
-      this.kafkaTracing = kafkaTracing;
+    void finish(@Nullable Throwable error) {
+      if (error != null) span.error(error);
+      handler.finishSend(record.topic(), record, span);
+      span.finish();
     }
+  }
+
+  static final class ProducerRecordAdapter
+    extends MessagingAdapter<String, ProducerRecord, Headers> {
+    final String remoteServiceName;
 
-    static <K, V> KafkaProducerAdapter<K, V> create(KafkaTracing kafkaTracing) {
-      return new KafkaProducerAdapter<>(kafkaTracing);
+    ProducerRecordAdapter(String remoteServiceName) {
+      this.remoteServiceName = remoteServiceName;
     }
 
-    @Override public String channel(ProducerRecord message) {
-      return message.topic();
+    @Override public Headers carrier(ProducerRecord message) {
+      return message.headers();
     }
 
-    @Override public String operation(ProducerRecord message) {
-      return KafkaTracing.PRODUCER_OPERATION;
+    @Override public String channel(String topic) {
+      return topic;
     }
 
-    @Override public String identifier(ProducerRecord message) {
-      return kafkaTracing.recordKey(message.key());
+    @Override public String channelType(String channel) {
+      return "topic";
     }
 
-    @Override public String remoteServiceName(ProducerRecord message) {
-      return kafkaTracing.remoteServiceName;
+    @Override public String messageKey(ProducerRecord message) {
+      return KafkaTracing.recordKey(message.key());
     }
 
-    @Override public String channelTagKey(ProducerRecord<K, V> message) {
-      return kafkaTracing.channelTagKey(message);
+    @Override public String correlationId(ProducerRecord message) {
+      return null;
     }
 
-    @Override public String identifierTagKey() {
-      return kafkaTracing.identifierTagKey();
+    @Override public String brokerName(String topic) {
+      return remoteServiceName;
     }
   }
 }
diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
index 7cf6caa..e1a0dab 100644
--- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
+++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/TracingCallbackTest.java
@@ -17,8 +17,9 @@
 package brave.kafka.clients;
 
 import brave.Span;
-import brave.sampler.Sampler;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.Test;
@@ -28,19 +29,14 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 public class TracingCallbackTest extends BaseTracingTest {
-  @Test public void create_returns_input_on_noop() {
-    Span span = tracing.tracer().withSampler(Sampler.NEVER_SAMPLE).nextSpan();
-
-    Callback delegate = mock(Callback.class);
-    Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
-
-    assertThat(tracingCallback).isSameAs(delegate);
-  }
+  Producer<String, String> producer = mock(Producer.class);
+  ProducerRecord<String, String> record = mock(ProducerRecord.class);
+  TracingProducer<String, String> tracingProducer = new TracingProducer<>(producer, kafkaTracing);
 
   @Test public void on_completion_should_finish_span() {
     Span span = tracing.tracer().nextSpan().start();
 
-    Callback tracingCallback = TracingProducer.TracingCallback.create(null, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, null, span);
     tracingCallback.onCompletion(createRecordMetadata(), null);
 
     assertThat(spans.getFirst()).isNotNull();
@@ -49,18 +45,18 @@ public class TracingCallbackTest extends BaseTracingTest {
   @Test public void on_completion_should_tag_if_exception() {
     Span span = tracing.tracer().nextSpan().start();
 
-    Callback tracingCallback = TracingProducer.TracingCallback.create(null, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, null, span);
     tracingCallback.onCompletion(null, new Exception("Test exception"));
 
     assertThat(spans.getFirst().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 
   @Test public void on_completion_should_forward_then_finish_span() {
     Span span = tracing.tracer().nextSpan().start();
 
     Callback delegate = mock(Callback.class);
-    Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
     RecordMetadata md = createRecordMetadata();
     tracingCallback.onCompletion(md, null);
 
@@ -73,14 +69,16 @@ public class TracingCallbackTest extends BaseTracingTest {
 
     Callback delegate = (metadata, exception) -> assertThat(current.get()).isSameAs(span.context());
 
-    TracingProducer.TracingCallback.create(delegate, span, current).onCompletion(createRecordMetadata(), null);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
+
+    tracingCallback.onCompletion(createRecordMetadata(), null);
   }
 
   @Test public void on_completion_should_forward_then_tag_if_exception() {
     Span span = tracing.tracer().nextSpan().start();
 
     Callback delegate = mock(Callback.class);
-    Callback tracingCallback = TracingProducer.TracingCallback.create(delegate, span, current);
+    Callback tracingCallback = tracingProducer.tracingCallback(record, delegate, span);
     RecordMetadata md = createRecordMetadata();
     Exception e = new Exception("Test exception");
     tracingCallback.onCompletion(md, e);
@@ -88,7 +86,7 @@ public class TracingCallbackTest extends BaseTracingTest {
     verify(delegate).onCompletion(md, e);
 
     assertThat(spans.getFirst().tags())
-        .containsEntry("error", "Test exception");
+      .containsEntry("error", "Test exception");
   }
 
   RecordMetadata createRecordMetadata() {
diff --git a/instrumentation/messaging/pom.xml b/instrumentation/messaging/pom.xml
index 310ac2b..1f6ed5b 100644
--- a/instrumentation/messaging/pom.xml
+++ b/instrumentation/messaging/pom.xml
@@ -38,6 +38,18 @@
     <errorprone.args>-Xep:MissingOverride:OFF</errorprone.args>
   </properties>
 
+  <dependencies>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>sqs</artifactId>
+      <version>2.5.52</version>
+    </dependency>
+    <dependency>
+      <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+      <version>5.7.1</version>
+    </dependency>
+  </dependencies>
   <build>
     <plugins>
       <plugin>
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java
new file mode 100644
index 0000000..7456b16
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ConsumerHandler.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracing;
+import brave.internal.Nullable;
+import brave.propagation.TraceContext;
+import brave.propagation.TraceContextOrSamplingFlags;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ConsumerHandler<Chan, Msg, C> {
+
+  public static <Chan, Msg, C> ConsumerHandler<Chan, Msg, C> create(
+    MessagingTracing messagingTracing,
+    MessagingAdapter<Chan, Msg, C> adapter,
+    TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector
+  ) {
+    return new ConsumerHandler<>(messagingTracing, adapter, extractor, injector);
+  }
+
+  final Tracing tracing;
+  final TraceContext.Extractor<C> extractor;
+  final TraceContext.Injector<C> injector;
+  final MessagingParser parser;
+  final MessagingAdapter<Chan, Msg, C> adapter;
+
+  ConsumerHandler(MessagingTracing messagingTracing,
+    MessagingAdapter<Chan, Msg, C> adapter,
+    TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector
+  ) {
+    this.tracing = messagingTracing.tracing;
+    this.extractor = extractor;
+    this.injector = injector;
+    this.parser = messagingTracing.parser;
+    this.adapter = adapter;
+  }
+
+  public void handleReceive(Chan channel, Msg message) {
+    if (message == null || tracing.isNoop()) return;
+    C carrier = adapter.carrier(message);
+    TraceContextOrSamplingFlags extracted = extractor.extract(carrier);
+    handleReceive(channel, message, carrier, adapter.brokerName(channel), extracted, false);
+  }
+
+  /** Returns a started processor span if {@code createProcessor} is true */
+  @Nullable Span handleReceive(Chan channel, Msg message, C carrier, String remoteServiceName,
+    TraceContextOrSamplingFlags extracted, boolean createProcessor) {
+    Span span = tracing.tracer().nextSpan(extracted);
+    // Creating the processor while the consumer is not finished ensures clocks are the same. This
+    // allows the processor to start later, but not be subject to clock drift relative to the parent.
+    Span processorSpan = createProcessor ? tracing.tracer().newChild(span.context()) : null;
+    if (!span.isNoop()) {
+      span.kind(Span.Kind.CONSUMER);
+      parser.start("receive", adapter, channel, message, span.context(), span.customizer());
+      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+
+      // incur timestamp overhead only once
+      long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+      span.start(timestamp);
+      parser.finish("receive", adapter, channel, message, span.context(), span.customizer());
+      span.finish(timestamp + 1);
+
+      // eventhough we are setting the timestamp here, start timestamp is allowed to be overwritten
+      // later as needed. Doing so here avoids the overhead of a tick reading
+      if (processorSpan != null) processorSpan.start(timestamp + 1);
+    }
+    injector.inject(createProcessor ? processorSpan.context() : span.context(), carrier);
+    return processorSpan;
+  }
+
+  public Map<Chan, Span> startBulkReceive(Chan channel, List<? extends Msg> messages,
+    Map<Chan, Span> spanForChannel) {
+    long timestamp = 0L;
+    for (int i = 0, length = messages.size(); i < length; i++) {
+      Msg message = messages.get(i);
+      C carrier = adapter.carrier(message);
+      TraceContextOrSamplingFlags extracted = extractor.extract(carrier);
+      String remoteServiceName = adapter.brokerName(channel);
+
+      // If we extracted neither a trace context, nor request-scoped data (extra),
+      // make or reuse a span for this topic
+      if (extracted.samplingFlags() != null && extracted.extra().isEmpty()) {
+        Span span = spanForChannel.get(channel);
+        if (span == null) {
+          span = tracing.tracer().nextSpan(extracted);
+          if (!span.isNoop()) {
+            span.kind(Span.Kind.CONSUMER);
+            parser.start("receive-batch", adapter, channel, null, span.context(),
+              span.customizer());
+            if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+            // incur timestamp overhead only once
+            if (timestamp == 0L) {
+              timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+            }
+            span.start(timestamp);
+          }
+          spanForChannel.put(channel, span);
+        }
+        injector.inject(span.context(), carrier);
+      } else { // we extracted request-scoped data, so cannot share a consumer span.
+        handleReceive(channel, message, carrier, remoteServiceName, extracted, false);
+      }
+    }
+    return spanForChannel;
+  }
+
+  public void finishBulkReceive(Map<Chan, Span> spanForChannel) {
+    long timestamp = 0L;
+    for (Map.Entry<Chan, Span> entry : spanForChannel.entrySet()) {
+      Span span = entry.getValue();
+      // incur timestamp overhead only once
+      if (timestamp == 0L) {
+        timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
+      }
+      parser.finish("receive-batch", adapter, entry.getKey(), null, span.context(),
+        span.customizer());
+      span.finish(timestamp);
+    }
+  }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java
deleted file mode 100644
index 502890b..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-interface MessageAdapter<Msg> {
-  /**
-   * Messaging operation semantics, e.g. pull, push, send, receive, etc.
-   */
-  String operation(Msg message);
-
-  /**
-   * Message identifier, e.g. kafka record key, jms message correlation id.
-   */
-  String identifier(Msg message);
-
-  /**
-   * Removes propagation context from Message context carrier.
-   */
-  //void clearPropagation(Msg message);
-
-  String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java
deleted file mode 100644
index f31e327..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageConsumerAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-public interface MessageConsumerAdapter<Msg> extends MessageAdapter<Msg> {
-  /**
-   * Messaging operation semantics, e.g. pull, push, send, receive, etc.
-   */
-  String operation(Msg message);
-
-  /**
-   * Message identifier, e.g. kafka record key, jms message correlation id.
-   */
-  String identifier(Msg message);
-
-  /**
-   * Removes propagation context from Message context carrier.
-   */
-  //void clearPropagation(Msg message);
-
-  String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java
deleted file mode 100644
index 2bfbde6..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessageProducerAdapter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-public interface MessageProducerAdapter<Msg> extends MessageAdapter<Msg> {
-  /**
-   * Messaging operation semantics, e.g. pull, push, send, receive, etc.
-   */
-  String operation(Msg message);
-
-  /**
-   * Message identifier, e.g. kafka record key, jms message correlation id.
-   */
-  String identifier(Msg message);
-
-  /**
-   * Removes propagation context from Message context carrier.
-   */
-  //void clearPropagation(Msg message);
-
-  String identifierTagKey();
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java
new file mode 100644
index 0000000..11a4e6c
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.internal.Nullable;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+// abstract class instead of interface to allow method adds before Java 1.8
+public abstract class MessagingAdapter<Chan, Msg, C> {
+  // TODO: make some of these methods not abstract as they don't have meaning for all impls
+
+  /** Returns the trace context carrier from the message. Usually, this is headers. */
+  public abstract C carrier(Msg message);
+
+  /**
+   * Messaging channel, e.g. kafka queue or JMS topic name. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.channel"
+   */
+  @Nullable public abstract String channel(Chan channel);
+
+  /**
+   * Type of channel, e.g. queue or topic. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.channel_type"
+   */
+  // TODO: naming is arbitrary.. we once used "kind" for Span but only because stackdriver did..
+  // Not sure we should re-use kind for parity with that or not..
+  @Nullable public abstract String channelType(Chan channel);
+
+  /**
+   * Key used to identity or partition messages. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.key"
+   */
+  @Nullable public abstract String messageKey(Msg message);
+
+  /**
+   * Identifier used to correlate logs. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with the key "message.correlation_id"
+   */
+  @Nullable public abstract String correlationId(Msg message);
+
+  /**
+   * Message broker name. {@code null} if unreadable.
+   *
+   * <p>Conventionally associated with {@link Span#remoteServiceName(String)}
+   */
+  @Nullable public abstract String brokerName(Chan channel);
+
+  protected MessagingAdapter() {
+  }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java
deleted file mode 100644
index 629e3b9..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerHandler.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.Span;
-import brave.SpanCustomizer;
-import brave.Tracing;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
-import java.util.List;
-import java.util.Map;
-
-public class MessagingConsumerHandler<C, Chan, Msg>
-    extends MessagingHandler<Chan, Msg, ChannelAdapter<Chan>, MessageAdapter<Msg>> {
-
-  static public <C, Chan, Msg> MessagingConsumerHandler<C, Chan, Msg> create(
-      C delegate,
-      MessagingTracing tracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageConsumerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    return new MessagingConsumerHandler<>(delegate, tracing, channelAdapter, messageAdapter,
-        extractor, injector);
-  }
-
-  public final C delegate;
-  final Tracing tracing;
-
-  public MessagingConsumerHandler(
-      C delegate,
-      MessagingTracing messagingTracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageConsumerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    super(messagingTracing.tracing.currentTraceContext(), channelAdapter, messageAdapter,
-        messagingTracing.consumerParser, extractor, injector);
-    this.delegate = delegate;
-    this.tracing = messagingTracing.tracing;
-  }
-
-  public Span nextSpan(Chan channel, Msg message) {
-    TraceContextOrSamplingFlags extracted = extractor.extract(message);
-    Span result = tracing.tracer().nextSpan(extracted);
-    if (extracted.context() == null && !result.isNoop()) {
-      addTags(channel, result);
-    }
-    return result;
-  }
-
-  /** When an upstream context was not present, lookup keys are unlikely added */
-  void addTags(Chan channel, SpanCustomizer result) {
-    parser.channel(channelAdapter, channel, result);
-    //parser.identifier(messageAdapter, message, result);
-  }
-
-  public void handleConsume(Chan channel, Msg message) {
-    if (message == null || tracing.isNoop()) return;
-    // remove prior propagation headers from the message
-    Span span = nextSpan(channel, message);
-    if (!span.isNoop()) {
-      span.kind(Span.Kind.CONSUMER);
-      parser.message(channelAdapter, messageAdapter, channel, message, span);
-
-      // incur timestamp overhead only once
-      long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
-      span.start(timestamp).finish(timestamp);
-    }
-    injector.inject(span.context(), message);
-  }
-
-  public Map<String, Span> handleConsume(Chan chan, List<Msg> messages,
-      Map<String, Span> spanForChannel) {
-    long timestamp = 0L;
-    for (int i = 0, length = messages.size(); i < length; i++) {
-      Msg message = messages.get(i);
-      TraceContextOrSamplingFlags extracted = extractor.extract(message);
-
-      // If we extracted neither a trace context, nor request-scoped data (extra),
-      // make or reuse a span for this topic
-      if (extracted.samplingFlags() != null && extracted.extra().isEmpty()) {
-        String channel = channelAdapter.channel(chan);
-        Span span = spanForChannel.get(channel);
-        if (span == null) {
-          span = tracing.tracer().nextSpan(extracted);
-          if (!span.isNoop()) {
-            span.name(messageAdapter.operation(message)).kind(Span.Kind.CONSUMER);
-            parser.message(channelAdapter, messageAdapter, chan, message, span);
-            String remoteServiceName = channelAdapter.remoteServiceName(chan);
-            if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
-            // incur timestamp overhead only once
-            if (timestamp == 0L) {
-              timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
-            }
-            span.start(timestamp);
-          }
-          spanForChannel.put(channel, span);
-        }
-        injector.inject(span.context(), message);
-      } else { // we extracted request-scoped data, so cannot share a consumer span.
-        Span span = tracing.tracer().nextSpan(extracted);
-        if (!span.isNoop()) {
-          span.kind(Span.Kind.CONSUMER);
-          parser.message(channelAdapter, messageAdapter, chan, message, span);
-          String remoteServiceName = channelAdapter.remoteServiceName(chan);
-          if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
-          // incur timestamp overhead only once
-          if (timestamp == 0L) {
-            timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
-          }
-          span.start(timestamp).finish(timestamp); // span won't be shared by other records
-        }
-        injector.inject(span.context(), message);
-      }
-    }
-    return spanForChannel;
-  }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java
deleted file mode 100644
index c463860..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingConsumerParser.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.SpanCustomizer;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
-
-public class MessagingConsumerParser extends MessagingParser {
-
-  public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      Chan channel, Msg message, SpanCustomizer customizer) {
-    customizer.name(messageAdapter.operation(message));
-    channel(channelAdapter, channel, customizer);
-    //identifier(messageAdapter, message, customizer);
-  }
-
-  //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
-  //    MessageAdapter<Msg> adapter,
-  //    TraceContext.Extractor<Msg> extractor,
-  //    Msg message) {
-  //  // clear propagation headers if we were able to extract a span
-  //  //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
-  //  //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
-  //  //adapter.clearPropagation(message);
-  //  //}
-  //  return extractor.extract(message);
-  //}
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java
deleted file mode 100644
index 0d22086..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.propagation.CurrentTraceContext;
-import brave.propagation.TraceContext;
-
-abstract class MessagingHandler<Chan, Msg, CA extends ChannelAdapter<Chan>, MA extends MessageAdapter<Msg>> {
-
-  final CurrentTraceContext currentTraceContext;
-  final CA channelAdapter;
-  final MA messageAdapter;
-  final MessagingParser parser;
-  final TraceContext.Extractor<Msg> extractor;
-  final TraceContext.Injector<Msg> injector;
-
-  MessagingHandler(
-      CurrentTraceContext currentTraceContext,
-      CA channelAdapter,
-      MA adapter,
-      MessagingParser parser,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    this.currentTraceContext = currentTraceContext;
-    this.channelAdapter = channelAdapter;
-    this.messageAdapter = adapter;
-    this.parser = parser;
-    this.extractor = extractor;
-    this.injector = injector;
-  }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
similarity index 72%
rename from instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java
rename to instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
index 601410e..8194791 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/ChannelAdapter.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingOperation.java
@@ -16,16 +16,9 @@
  */
 package brave.messaging;
 
-public interface ChannelAdapter<Channel> {
-  /**
-   * Messaging channel, e.g. kafka topic, jms queue, jms topic, etc.
-   */
-  String channel(Channel channel);
-
-  String channelTagKey(Channel channel);
-
-  /**
-   * Messaging broker service, e.g. kafka-cluster, jms-server.
-   */
-  String remoteServiceName(Channel channel);
+public enum MessagingOperation {
+  SEND,
+  BULK_SEND,
+  RECEIVE,
+  BULK_RECEIVE;
 }
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
index 73ae644..87ccbed 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingParser.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *     messaging://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,42 +17,61 @@
 package brave.messaging;
 
 import brave.SpanCustomizer;
+import brave.Tracing;
+import brave.internal.Nullable;
+import brave.propagation.ExtraFieldPropagation;
+import brave.propagation.TraceContext;
 
+/**
+ * <p>Methods will not be invoked with a span in scope. Please use the explicit {@link
+ * TraceContext} if you need to create tags based on propagated data like {@link
+ * ExtraFieldPropagation}.
+ */
+// there are no producer/consumer subtypes because it is simpler than multiple inheritance when the
+// same libary handles producer and consumer relationships.
 public class MessagingParser {
 
-  public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      Chan channel, Msg message, SpanCustomizer customizer) {
-    customizer.name(messageAdapter.operation(message));
-    channel(channelAdapter, channel, customizer);
-    identifier(messageAdapter, message, customizer);
+  /**
+   * Override to change what data to add to the span when a message operation starts. By default,
+   * this sets the span name to the operation name the tag "messaging.channel" if available.
+   *
+   * <p>If you only want to change the span name, you can override {@link
+   * #spanName(String, MessagingAdapter, Object, Object)} instead.
+   *
+   * @param msg null when a bulk operation
+   * @see #spanName(String, MessagingAdapter, Object, Object)
+   * @see #finish(String, MessagingAdapter, Object, Object, TraceContext, SpanCustomizer)
+   */
+  // Context is here so that people can decide to add tags based on local root etc.
+  public <Chan, Msg, C> void start(String operation, MessagingAdapter<Chan, Msg, C> adapter,
+    Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+    customizer.name(spanName(operation, adapter, channel, msg));
+    addMessageTags(adapter, channel, msg, context, customizer);
   }
 
-  public <Chan> void channel(ChannelAdapter<Chan> adapter, Chan chan,
-      SpanCustomizer customizer) {
-    String channel = adapter.channel(chan);
-    if (chan != null) customizer.tag(adapter.channelTagKey(chan), channel);
+  // channel is nullable as JMS could have an exception getting it from the message
+  protected <Chan, Msg, C> void addMessageTags(MessagingAdapter<Chan, Msg, C> adapter,
+    @Nullable Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
+    String channelName = adapter.channel(channel);
+    if (channelName != null) customizer.tag("messaging.channel", channelName);
   }
 
-  public <Msg> void identifier(MessageAdapter<Msg> adapter, Msg message,
-      SpanCustomizer customizer) {
-    String identifier = adapter.identifier(message);
-    if (identifier != null) {
-      customizer.tag(adapter.identifierTagKey(), identifier);
-    }
+  /**
+   * Override to change what data to add to the span when a message operation completes.
+   *
+   * <p>This adds no tags by default. Error tagging is delegated to {@link Tracing#errorParser()}.
+   *
+   * @param msg null when a bulk operation
+   * @see #start(String, MessagingAdapter, Object, Object, TraceContext, SpanCustomizer)
+   */
+  // Context is here so that people can add tags based on extra fields without the cost of a scoping
+  public <Chan, Msg, C> void finish(String operation, MessagingAdapter<Chan, Msg, C> adapter,
+    Chan channel, @Nullable Msg msg, TraceContext context, SpanCustomizer customizer) {
   }
 
-  //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
-  //    MessageAdapter<Msg> adapter,
-  //    TraceContext.Extractor<Msg> extractor,
-  //    Msg message) {
-  //  TraceContextOrSamplingFlags extracted = extractor.extract(message);
-  //  // clear propagation headers if we were able to extract a span
-  //  //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
-  //  //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
-  //  //TODO check we dont need this
-  //  //  adapter.clearPropagation(message);
-  //  //}
-  //  return extracted;
-  //}
+  /** Returns the span name of a message operation. Defaults to the operation name. */
+  protected <Chan, Msg, C> String spanName(String operation,
+    MessagingAdapter<Chan, Msg, C> adapter, Chan channel, @Nullable Msg msg) {
+    return operation;
+  }
 }
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java
deleted file mode 100644
index d330fc3..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.Span;
-import brave.Tracer;
-import brave.propagation.TraceContext;
-
-public class MessagingProducerHandler<P, Chan, Msg>
-    extends MessagingHandler<Chan, Msg, ChannelAdapter<Chan>, MessageProducerAdapter<Msg>> {
-
-  public static <P, Chan, Msg> MessagingProducerHandler<P, Chan, Msg> create(
-      P delegate,
-      MessagingTracing tracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageProducerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    return new MessagingProducerHandler<>(delegate, tracing, channelAdapter, messageAdapter,
-        extractor, injector);
-  }
-
-  public final P delegate;
-  final Tracer tracer;
-
-  public MessagingProducerHandler(
-      P delegate,
-      MessagingTracing messagingTracing,
-      ChannelAdapter<Chan> channelAdapter,
-      MessageProducerAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      TraceContext.Injector<Msg> injector) {
-    super(messagingTracing.tracing.currentTraceContext(), channelAdapter, messageAdapter,
-        messagingTracing.producerParser, extractor, injector);
-    this.delegate = delegate;
-    this.tracer = messagingTracing.tracing.tracer();
-  }
-
-  public Span handleProduce(Chan channel, Msg message) {
-    TraceContext maybeParent = currentTraceContext.get();
-    // Unlike message consumers, we try current span before trying extraction. This is the proper
-    // order because the span in scope should take precedence over a potentially stale header entry.
-    //
-    // NOTE: Brave instrumentation used properly does not result in stale header entries, as we
-    // always clear message headers after reading.
-    Span span;
-    if (maybeParent == null) {
-      span = tracer.nextSpan(extractor.extract(message));
-    } else {
-      // As JMS is sensitive about write access to headers, we  defensively clear even if it seems
-      // upstream would have cleared (because there is a span in scope!).
-      span = tracer.newChild(maybeParent);
-      //TODO check we dont need this
-      // messageAdapter.clearPropagation(message);
-    }
-
-    if (!span.isNoop()) {
-      span.kind(Span.Kind.PRODUCER).name(messageAdapter.operation(message));
-      parser.message(channelAdapter, messageAdapter, channel, message, span);
-      String remoteServiceName = channelAdapter.remoteServiceName(channel);
-      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
-      span.start();
-    }
-
-    injector.inject(span.context(), message);
-
-    return span;
-  }
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java
deleted file mode 100644
index 84723b5..0000000
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingProducerParser.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package brave.messaging;
-
-import brave.SpanCustomizer;
-
-public class MessagingProducerParser extends MessagingParser {
-
-  public <Chan, Msg> void message(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      Chan channel, Msg message, SpanCustomizer customizer) {
-    customizer.name(messageAdapter.operation(message));
-    channel(channelAdapter, channel, customizer);
-    identifier(messageAdapter, message, customizer);
-  }
-
-  //public <Msg> TraceContextOrSamplingFlags extractContextAndClearMessage(
-  //    MessageAdapter<Msg> adapter,
-  //    TraceContext.Extractor<Msg> extractor,
-  //    Msg message) {
-  //  // clear propagation headers if we were able to extract a span
-  //  //TODO check if correct to not filter on empty flags. Diff between kafka and jms instrumentation
-  //  //if (!extracted.equals(TraceContextOrSamplingFlags.EMPTY)) {
-  //  //adapter.clearPropagation(message);
-  //  //}
-  //  return extractor.extract(message);
-  //}
-}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java b/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
index a2d6e66..109ddaa 100644
--- a/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
+++ b/instrumentation/messaging/src/main/java/brave/messaging/MessagingTracing.java
@@ -16,13 +16,9 @@
  */
 package brave.messaging;
 
-import brave.Span;
 import brave.Tracing;
-import brave.propagation.TraceContext;
-import brave.propagation.TraceContextOrSamplingFlags;
 
 public class MessagingTracing {
-
   public static MessagingTracing create(Tracing tracing) {
     return newBuilder(tracing).build();
   }
@@ -32,66 +28,37 @@ public class MessagingTracing {
   }
 
   final Tracing tracing;
-  final MessagingConsumerParser consumerParser;
-  final MessagingProducerParser producerParser;
+  final MessagingParser parser;
 
   MessagingTracing(Builder builder) {
     this.tracing = builder.tracing;
-    this.consumerParser = builder.consumerParser;
-    this.producerParser = builder.producerParser;
+    this.parser = builder.parser;
   }
 
   public Tracing tracing() {
     return tracing;
   }
 
-  public MessagingProducerParser producerParser() {
-    return producerParser;
-  }
-
-  public MessagingConsumerParser consumerParser() {
-    return consumerParser;
-  }
-
-  public <Chan, Msg> Span nextSpan(ChannelAdapter<Chan> channelAdapter,
-      MessageAdapter<Msg> messageAdapter,
-      TraceContext.Extractor<Msg> extractor,
-      Msg message,
-      Chan channel) {
-    TraceContextOrSamplingFlags extracted = extractor.extract(message);
-    Span result = tracing.tracer().nextSpan(extracted);
-
-    // When an upstream context was not present, lookup keys are unlikely added
-    if (extracted.context() == null && !result.isNoop()) {
-      consumerParser.channel(channelAdapter, channel, result);
-      consumerParser.identifier(messageAdapter, message, result);
-    }
-    return result;
+  public MessagingParser parser() {
+    return parser;
   }
 
   public static class Builder {
     final Tracing tracing;
-    MessagingConsumerParser consumerParser = new MessagingConsumerParser();
-    MessagingProducerParser producerParser = new MessagingProducerParser();
+    MessagingParser parser = new MessagingParser();
 
     Builder(Tracing tracing) {
       if (tracing == null) throw new NullPointerException("tracing == null");
       this.tracing = tracing;
     }
 
-    public Builder consumerParser(MessagingConsumerParser consumerParser) {
-      if (producerParser == null) throw new NullPointerException("consumerParser == null");
-      this.consumerParser = consumerParser;
-      return this;
-    }
-
-    public Builder producerParser(MessagingProducerParser producerParser) {
-      if (producerParser == null) throw new NullPointerException("producerParser == null");
-      this.producerParser = producerParser;
+    public Builder parser(MessagingParser parser) {
+      if (parser == null) throw new NullPointerException("parser == null");
+      this.parser = parser;
       return this;
     }
 
-    MessagingTracing build() {
+    public MessagingTracing build() {
       return new MessagingTracing(this);
     }
   }
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java
new file mode 100644
index 0000000..fa5da4c
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ProcessorHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracer;
+import brave.internal.Nullable;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ProcessorHandler<Chan, Msg, C> {
+  public static <Chan, Msg, C> ProcessorHandler<Chan, Msg, C> create(
+    MessagingTracing messagingTracing,
+    ConsumerHandler<Chan, Msg, C> consumerHandler
+  ) {
+    return new ProcessorHandler<>(messagingTracing, consumerHandler);
+  }
+
+  final Tracer tracer;
+  final ConsumerHandler<Chan, Msg, C> consumerHandler;
+  final MessagingAdapter<Chan, Msg, C> adapter;
+
+  ProcessorHandler(MessagingTracing messagingTracing,
+    ConsumerHandler<Chan, Msg, C> consumerHandler) {
+    this.tracer = messagingTracing.tracing.tracer();
+    this.consumerHandler = consumerHandler;
+    this.adapter = consumerHandler.adapter;
+  }
+
+  /**
+   * When {@code addConsumerSpan} is true, this creates 2 spans:
+   * <ol>
+   * <li>A duration 1 {@link Span.Kind#CONSUMER} span to represent receipt from the
+   * destination</li>
+   * <li>A child span with the duration of the delegated listener</li>
+   * </ol>
+   *
+   * <p>{@code addConsumerSpan} should only be set when the message consumer is not traced.
+   */
+  // channel is nullable as JMS could have an exception getting it from the message
+  public Span startProcessor(@Nullable Chan channel, Msg message, boolean addConsumerSpan) {
+    if (!addConsumerSpan) {
+      Span result = tracer.nextSpan().start();
+      if (!result.isNoop()) {
+        consumerHandler.parser.addMessageTags(adapter, channel, message,
+          result.context(), result.customizer());
+      }
+      return result;
+    }
+    C carrier = adapter.carrier(message);
+    return consumerHandler.handleReceive(channel, message, carrier,
+      adapter.brokerName(channel), consumerHandler.extractor.extract(carrier), true);
+  }
+}
diff --git a/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java b/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java
new file mode 100644
index 0000000..81d9c72
--- /dev/null
+++ b/instrumentation/messaging/src/main/java/brave/messaging/ProducerHandler.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brave.messaging;
+
+import brave.Span;
+import brave.Tracing;
+import brave.propagation.TraceContext;
+
+/**
+ * @param <Chan> the type of the channel
+ * @param <Msg> the type of the message
+ * @param <C> the type that carriers the trace context, usually headers
+ */
+public final class ProducerHandler<Chan, Msg, C> {
+
+  public static <Chan, Msg, C> ProducerHandler<Chan, Msg, C> create(
+    MessagingTracing messagingTracing,
+    MessagingAdapter<Chan, Msg, C> adapter,
+    TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector
+  ) {
+    return new ProducerHandler<>(
+      messagingTracing.tracing, messagingTracing.parser(), adapter, extractor, injector);
+  }
+
+  final Tracing tracing;
+  final TraceContext.Extractor<C> extractor;
+  final TraceContext.Injector<C> injector;
+  final MessagingParser parser;
+  final MessagingAdapter<Chan, Msg, C> adapter;
+
+  ProducerHandler(Tracing tracing, MessagingParser parser,
+    MessagingAdapter<Chan, Msg, C> adapter, TraceContext.Extractor<C> extractor,
+    TraceContext.Injector<C> injector) {
+    this.tracing = tracing;
+    this.extractor = extractor;
+    this.injector = injector;
+    this.parser = parser;
+    this.adapter = adapter;
+  }
+
+  /**
+   * Attempts to resume a trace from the current span, falling back to extracting context from the
+   * carrier. Tags are added before the span is started.
+   *
+   * <p>This is typically called before the send is processed by the actual library.
+   */
+  public Span startSend(Chan channel, Msg message) {
+    C carrier = adapter.carrier(message);
+    TraceContext maybeParent = tracing.currentTraceContext().get();
+    // Unlike message consumers, we try current span before trying extraction. This is the proper
+    // order because the span in scope should take precedence over a potentially stale header entry.
+    //
+    // NOTE: Brave instrumentation used properly does not result in stale header entries, as we use
+    // propagation formats that can always be overwritten.
+    Span span;
+    if (maybeParent == null) {
+      span = tracing.tracer().nextSpan(extractor.extract(carrier));
+    } else {
+      span = tracing.tracer().newChild(maybeParent);
+    }
+
+    if (!span.isNoop()) {
+      span.kind(Span.Kind.PRODUCER);
+      parser.start("send", adapter, channel, message, span.context(), span.customizer());
+      String remoteServiceName = adapter.brokerName(channel);
+      if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
+      span.start();
+    }
+
+    injector.inject(span.context(), carrier);
+
+    return span;
+  }
+
+  public void finishSend(Chan channel, Msg message, Span span) {
+    parser.finish("send", adapter, channel, message, span.context(), span.customizer());
+    span.finish();
+  }
+}


Mime
View raw message