zipkin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From adrianc...@apache.org
Subject [incubator-zipkin] 01/01: Removes Kafka 0.8 support (KAFKA_ZOOKEEPER)
Date Tue, 07 May 2019 08:35:52 GMT
This is an automated email from the ASF dual-hosted git repository.

adriancole pushed a commit to branch no-kafka08
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git

commit 0bd0144aeea4bb85afb1755d897c79ae1489d88f
Author: Adrian Cole <acole@pivotal.io>
AuthorDate: Tue May 7 16:34:52 2019 +0800

    Removes Kafka 0.8 support (KAFKA_ZOOKEEPER)
    
    This removes support for Kafka 0.8 (last updated almost 4 years ago).
    Notably, this means those using `KAFKA_ZOOKEEPER` to configure their
    broker need to switch to `KAFKA_BOOTSTRAP_SERVERS` instead.
    
    Note: Kafka 0.8 was not packaged into zipkin-server, it was an optional
    add-on. However, our docker image was created in such a way that it felt
    like it was available by default.
    
    See https://lists.apache.org/thread.html/432df5a806ee27dd959ded5ebf5e7cc6bd4370f6b1b1daf7bf594e80@%3Cdev.zipkin.apache.org%3E
---
 .travis.yml                                        |   6 -
 zipkin-autoconfigure/collector-kafka08/README.md   | 135 ----------
 zipkin-autoconfigure/collector-kafka08/pom.xml     | 104 --------
 .../kafka08/KafkaZooKeeperSetCondition.java        |  45 ----
 .../ZipkinKafka08CollectorAutoConfiguration.java   |  63 -----
 .../kafka08/ZipkinKafkaCollectorProperties.java    |  90 -------
 .../src/main/resources/META-INF/spring.factories   |   2 -
 .../src/main/resources/zipkin-server-kafka08.yml   |   7 -
 .../autoconfigure/collector/kafka08/Access.java    |  42 ----
 ...ipkinKafka08CollectorAutoConfigurationTest.java | 111 ---------
 .../kafka/v1/NestedPropertyOverrideTest.java       |  46 ----
 zipkin-autoconfigure/pom.xml                       |   1 -
 zipkin-collector/kafka/README.md                   |   2 +-
 zipkin-collector/kafka08/README.md                 |  41 ---
 zipkin-collector/kafka08/pom.xml                   |  59 -----
 .../zipkin2/collector/kafka08/KafkaCollector.java  | 277 ---------------------
 .../collector/kafka08/KafkaStreamProcessor.java    |  84 -------
 .../collector/kafka08/ITKafkaCollector.java        | 250 -------------------
 .../zipkin2/collector/kafka08/KafkaTestGraph.java  |  51 ----
 .../kafka08/src/test/resources/log4j.properties    |   7 -
 .../kafka08/src/test/resources/log4j2.properties   |  11 -
 zipkin-collector/pom.xml                           |   1 -
 zipkin-server/README.md                            |   8 -
 23 files changed, 1 insertion(+), 1442 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 9747dd9..74e4a50f 100755
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,7 +11,6 @@ cache:
   # zipkin-ui gets dependencies via NPM
   - $HOME/.npm
   - $HOME/.m2
-  - $HOME/kafka_2.11-0.8.2.2
 
 language: java
 
@@ -28,11 +27,6 @@ before_install:
   # Required for Elasticsearch 5 (See https://github.com/docker-library/docs/tree/master/elasticsearch#host-setup)
   - sudo sysctl -w vm.max_map_count=262144
 
-  # Manually install and run zk+kafka as it isn't an available service
-  - test -d $HOME/kafka_2.11-0.8.2.2/bin || curl -sSL https://archive.apache.org/dist/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz | bash -c '(cd $HOME; tar -xzf -)'
-  - nohup bash -c "cd $HOME/kafka_2.11-0.8.2.2 && bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &"
-  - nohup bash -c "cd $HOME/kafka_2.11-0.8.2.2 && bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &"
-
   # Quiet Maven invoker logs (Downloading... when running zipkin-server/src/it)
   - echo "MAVEN_OPTS='-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn'" > ~/.mavenrc
 
diff --git a/zipkin-autoconfigure/collector-kafka08/README.md b/zipkin-autoconfigure/collector-kafka08/README.md
deleted file mode 100644
index 558b74d..0000000
--- a/zipkin-autoconfigure/collector-kafka08/README.md
+++ /dev/null
@@ -1,135 +0,0 @@
-# Kafka 0.8 Collector Auto-configure Module
-
-This module provides support for running the kafa 0.8 collector as a
-component of Zipkin server. To activate this collector, reference the
-module jar when running the Zipkin server and configure the ZooKeeper
-connection string via the `KAFKA_ZOOKEEPER` environment
-variable or `zipkin.collector.kafka.zookeeper` property.
-
-## Quick start
-
-JRE 8 is required to run Zipkin server.
-
-Fetch the latest released
-[executable jar for Zipkin server](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec)
-and
-[autoconfigure module jar for the kafka collector](https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-autoconfigure-collector-kafka08&v=LATEST&c=module).
-Run Zipkin server with the Kafka 0.10+ collector enabled.
-
-For example:
-
-```bash
-$ curl -sSL https://zipkin.io/quickstart.sh | bash -s
-$ curl -sSL https://zipkin.io/quickstart.sh | bash -s io.zipkin.java:zipkin-autoconfigure-collector-kafka08:LATEST:module kafka08.jar
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-After executing these steps, the Zipkin UI will be available
-[http://localhost:9411](http://localhost:9411) or port 9411 of the remote host the Zipkin server
-was started on.
-
-The Zipkin server can be further configured as described in the
-[Zipkin server documentation](../../zipkin-server/README.md).
-
-## How this works
-
-The Zipkin server executable jar and the autoconfigure module jar for
-the kafka collector are required. The module jar contains the code for
-loading and configuring the kafka collector, and any dependencies that
-are not already packaged in the Zipkin server jar
-(e.g. zipkin-collector-kafka08, kafka-clients).
-
-Using PropertiesLauncher as the main class runs the Zipkin server
-executable jar the same as it would be if executed using
-`java -jar zipkin.jar`, except it provides the option to load resources
-from outside the executable jar into the classpath. Those external
-resources are specified using the `loader.path` system property. In this
-case, it is configured to load the kafka collector module jar
-(`zipkin-autoconfigure-collector-kafka08-module.jar`) and the jar files
-contained in the `lib/` directory within that module jar
-(`zipkin-autoconfigure-collector-kafka08-module.jar!/lib`).
-
-The `spring.profiles=kafka08` system property causes configuration from
-[zipkin-server-kafka08.yml](src/main/resources/zipkin-server-kafka08.yml)
-to be loaded.
-
-For more information on how this works, see [Spring Boot's documentation
-on the executable jar format](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html). The
-[section on PropertiesLauncher](https://docs.spring.io/spring-boot/docs/current/reference/html/executable-jar.html#executable-jar-property-launcher-features)
-has more detail on how the external module jar and the libraries it
-contains are loaded.
-
-## Configuration
-
-The following configuration points apply apply when `KAFKA_ZOOKEEPER` or
-`zipkin.collector.kafka.zookeeper` is set. They can be configured by
-setting an environment variable or by setting a java system property
-using the `-Dproperty.name=value` command line argument. Some settings
-correspond to "Consumer Configs" in [Kafka 0.8 documentation](https://kafka.apache.org/082/documentation.html#consumerconfigs).
-
-Environment Variable | Property | Consumer Config | Description
---- | --- | --- | ---
-`KAFKA_ZOOKEEPER` | `zipkin.collector.kafka.zookeeper` | zookeeper.connect | Comma-separated list of zookeeper host/ports, ex. 127.0.0.1:2181. No default
-`KAFKA_GROUP_ID` | `zipkin.collector.kafka.group-id` | group.id | The consumer group this process is consuming on behalf of. Defaults to `zipkin`
-`KAFKA_TOPIC` | `zipkin.collector.kafka.topic` | N/A | The topic that zipkin spans will be consumed from. Defaults to `zipkin`
-`KAFKA_STREAMS` | `zipkin.collector.kafka.streams` | N/A | Count of threads consuming the topic. Defaults to `1`
-
-### Other Kafka consumer properties
-You may need to set other [Kafka consumer properties](https://kafka.apache.org/082/documentation.html#consumerconfigs), in
-addition to the ones with explicit properties defined by the collector.
-In this case, you need to prefix that property name with
-`zipkin.collector.kafka.overrides` and pass it as a system property argument.
-
-For example, to override `auto.offset.reset`, you can set a system property named
-`zipkin.collector.kafka.overrides.auto.offset.reset`:
-
-```bash
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -Dzipkin.collector.kafka.overrides.auto.offset.reset=latest \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-### Examples
-
-Multiple ZooKeeper servers:
-
-```bash
-$ KAFKA_ZOOKEEPER=zk1:2181,zk2:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-Alternate topic name(s):
-
-```bash
-$ KAFKA_ZOOKEEPER=127.0.0.1:2181 \
-    java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -Dzipkin.collector.kafka.topic=zapkin,zipken \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
-
-Specifying ZooKeeper as a system property, instead of an environment variable:
-
-```bash
-$ java \
-    -Dloader.path='kafka08.jar,kafka08.jar!/lib' \
-    -Dspring.profiles.active=kafka08 \
-    -Dzipkin.collector.kafka.zookeeper=127.0.0.1:2181 \
-    -cp zipkin.jar \
-    org.springframework.boot.loader.PropertiesLauncher
-```
diff --git a/zipkin-autoconfigure/collector-kafka08/pom.xml b/zipkin-autoconfigure/collector-kafka08/pom.xml
deleted file mode 100644
index 7d384ab..0000000
--- a/zipkin-autoconfigure/collector-kafka08/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.zipkin</groupId>
-    <artifactId>zipkin-autoconfigure-parent</artifactId>
-    <version>2.13.1-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>zipkin-autoconfigure-collector-kafka08</artifactId>
-  <name>Auto Configuration: Kafka Collector</name>
-
-  <properties>
-    <main.basedir>${project.basedir}/../..</main.basedir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>${project.groupId}.zipkin2</groupId>
-      <artifactId>zipkin-collector-kafka08</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <!-- com.101tec:zkclient has a log4j dep, re-route it with the bridge
-         https://logging.apache.org/log4j/2.x/manual/migration.html -->
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-1.2-api</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.springframework.boot</groupId>
-        <artifactId>spring-boot-maven-plugin</artifactId>
-        <version>${spring-boot.version}</version>
-        <configuration>
-          <layoutFactory implementation="zipkin.layout.ZipkinLayoutFactory">
-            <name>zipkin</name>
-          </layoutFactory>
-          <classifier>module</classifier>
-          <!-- exclude dependencies already packaged in zipkin-server -->
-          <!-- https://github.com/spring-projects/spring-boot/issues/3426 transitive exclude doesn't work -->
-          <excludeGroupIds>
-            org.springframework.boot,org.springframework,org.slf4j,commons-logging,com.google.code.gson
-          </excludeGroupIds>
-          <excludes>
-            <!-- excludes direct dependency instead of the group id, as otherwise we'd exclude ourselves -->
-            <exclude>
-              <groupId>${project.groupId}.zipkin2</groupId>
-              <artifactId>zipkin</artifactId>
-            </exclude>
-            <exclude>
-              <groupId>${project.groupId}.zipkin2</groupId>
-              <artifactId>zipkin-collector</artifactId>
-            </exclude>
-            <!-- excludes already packaged logging libraries in the server. Can't use group ID or we
-                 would miss the api bridge -->
-            <exclude>
-              <groupId>org.apache.logging.log4j</groupId>
-              <artifactId>log4j-core</artifactId>
-            </exclude>
-            <exclude>
-              <groupId>org.apache.logging.log4j</groupId>
-              <artifactId>log4j-api</artifactId>
-            </exclude>
-          </excludes>
-        </configuration>
-        <dependencies>
-          <dependency>
-            <groupId>org.apache.zipkin.layout</groupId>
-            <artifactId>zipkin-layout-factory</artifactId>
-            <version>${zipkin-layout-factory.version}</version>
-          </dependency>
-        </dependencies>
-      </plugin>
-    </plugins>
-  </build>
-</project>
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java
deleted file mode 100644
index cc86eea..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/KafkaZooKeeperSetCondition.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
-import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
-import org.springframework.context.annotation.ConditionContext;
-import org.springframework.core.type.AnnotatedTypeMetadata;
-
-/**
- * This condition passes when {@link ZipkinKafkaCollectorProperties#getZookeeper()} is set to
- * non-empty.
- *
- * <p>This is here because the yaml defaults this property to empty like this, and spring-boot
- * doesn't have an option to treat empty properties as unset.
- *
- * <pre>{@code
- * zookeeper: ${KAFKA_ZOOKEEPER:}
- * }</pre>
- */
-final class KafkaZooKeeperSetCondition extends SpringBootCondition {
-  static final String PROPERTY_NAME = "zipkin.collector.kafka.zookeeper";
-
-  @Override
-  public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
-    String kafkaZookeeper = context.getEnvironment().getProperty(PROPERTY_NAME);
-    return kafkaZookeeper == null || kafkaZookeeper.isEmpty()
-        ? ConditionOutcome.noMatch(PROPERTY_NAME + " isn't set")
-        : ConditionOutcome.match();
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java
deleted file mode 100644
index 54b8f8a..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfiguration.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Conditional;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.collector.kafka08.KafkaCollector;
-import zipkin2.storage.StorageComponent;
-
-/**
- * This collector consumes a topic, decodes spans from thrift messages and stores them subject to
- * sampling policy.
- */
-@Configuration
-@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
-@Conditional(KafkaZooKeeperSetCondition.class)
-class ZipkinKafka08CollectorAutoConfiguration {
-
-  /**
-   * This launches a thread to run start. This prevents a several second hang, or worse crash if
-   * zookeeper isn't running, yet.
-   */
-  @Bean
-  KafkaCollector kafka(
-      ZipkinKafkaCollectorProperties kafka,
-      CollectorSampler sampler,
-      CollectorMetrics metrics,
-      StorageComponent storage) {
-    final KafkaCollector result =
-        kafka.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();
-
-    // don't use @Bean(initMethod = "start") as it can crash the process if zookeeper is down
-    Thread start =
-        new Thread("start " + result.getClass().getSimpleName()) {
-          @Override
-          public void run() {
-            result.start();
-          }
-        };
-    start.setDaemon(true);
-    start.start();
-
-    return result;
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java b/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java
deleted file mode 100644
index 28e9663..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafkaCollectorProperties.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import zipkin2.collector.kafka08.KafkaCollector;
-
-@ConfigurationProperties("zipkin.collector.kafka")
-class ZipkinKafkaCollectorProperties {
-  private String topic = "zipkin";
-  private String zookeeper;
-  private String groupId = "zipkin";
-  private int streams = 1;
-  private int maxMessageSize = 1024 * 1024;
-  private Map<String, String> overrides = new LinkedHashMap<>();
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public void setTopic(String topic) {
-    this.topic = topic;
-  }
-
-  public String getZookeeper() {
-    return zookeeper;
-  }
-
-  public void setZookeeper(String zookeeper) {
-    this.zookeeper = "".equals(zookeeper) ? null : zookeeper;
-  }
-
-  public String getGroupId() {
-    return groupId;
-  }
-
-  public void setGroupId(String groupId) {
-    this.groupId = groupId;
-  }
-
-  public int getStreams() {
-    return streams;
-  }
-
-  public void setStreams(int streams) {
-    this.streams = streams;
-  }
-
-  public int getMaxMessageSize() {
-    return maxMessageSize;
-  }
-
-  public void setMaxMessageSize(int maxMessageSize) {
-    this.maxMessageSize = maxMessageSize;
-  }
-
-  public Map<String, String> getOverrides() {
-    return overrides;
-  }
-
-  public void setOverrides(Map<String, String> overrides) {
-    this.overrides = overrides;
-  }
-
-  public KafkaCollector.Builder toBuilder() {
-    return KafkaCollector.builder()
-        .topic(topic)
-        .zookeeper(zookeeper)
-        .groupId(groupId)
-        .streams(streams)
-        .maxMessageSize(maxMessageSize)
-        .overrides(overrides);
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories b/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index 9daca53..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,2 +0,0 @@
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-zipkin2.autoconfigure.collector.kafka08.ZipkinKafka08CollectorAutoConfiguration
diff --git a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml b/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml
deleted file mode 100644
index 04ccf25..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/main/resources/zipkin-server-kafka08.yml
+++ /dev/null
@@ -1,7 +0,0 @@
-zipkin:
-  collector:
-    kafka:
-      # ZooKeeper host string, comma-separated host:port value.
-      zookeeper: ${KAFKA_ZOOKEEPER:}
-      # Maximum size of a message containing spans in bytes
-      max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java
deleted file mode 100644
index 1ceeb0d..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/Access.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.kafka08.KafkaCollector;
-
-/** opens package access for testing */
-public final class Access {
-
-  /** Just registering properties to avoid automatically connecting to a Kafka server */
-  public static void registerKafkaProperties(AnnotationConfigApplicationContext context) {
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class, EnableKafkaCollectorProperties.class);
-  }
-
-  @Configuration
-  @EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
-  static class EnableKafkaCollectorProperties {}
-
-  public static KafkaCollector.Builder collectorBuilder(
-      AnnotationConfigApplicationContext context) {
-    return context.getBean(ZipkinKafkaCollectorProperties.class).toBuilder();
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java
deleted file mode 100644
index b8c2b81..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/autoconfigure/collector/kafka08/ZipkinKafka08CollectorAutoConfigurationTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.autoconfigure.collector.kafka08;
-
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
-import org.springframework.boot.test.util.TestPropertyValues;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.collector.kafka08.KafkaCollector;
-import zipkin2.storage.InMemoryStorage;
-import zipkin2.storage.StorageComponent;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class ZipkinKafka08CollectorAutoConfigurationTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  AnnotationConfigApplicationContext context;
-
-  @After
-  public void close() {
-    if (context != null) {
-      context.close();
-    }
-  }
-
-  @Test
-  public void doesntProvidesCollectorComponent_whenKafkaZooKeeperUnset() {
-    context = new AnnotationConfigApplicationContext();
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class,
-        ZipkinKafka08CollectorAutoConfiguration.class,
-        InMemoryConfiguration.class);
-    context.refresh();
-
-    thrown.expect(NoSuchBeanDefinitionException.class);
-    context.getBean(Collector.class);
-  }
-
-  @Test
-  public void providesCollectorComponent_whenZooKeeperSet() {
-    context = new AnnotationConfigApplicationContext();
-    TestPropertyValues.of("zipkin.collector.kafka.zookeeper:localhost").applyTo(context);
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class,
-        ZipkinKafka08CollectorAutoConfiguration.class,
-        InMemoryConfiguration.class);
-    context.refresh();
-
-    assertThat(context.getBean(KafkaCollector.class)).isNotNull();
-  }
-
-  @Test
-  public void canOverrideProperty_topic() {
-    context = new AnnotationConfigApplicationContext();
-    TestPropertyValues.of(
-        "zipkin.collector.kafka.zookeeper:localhost",
-        "zipkin.collector.kafka.topic:zapkin")
-    .applyTo(context);
-    context.register(
-        PropertyPlaceholderAutoConfiguration.class,
-        ZipkinKafka08CollectorAutoConfiguration.class,
-        InMemoryConfiguration.class);
-    context.refresh();
-
-    assertThat(context.getBean(ZipkinKafkaCollectorProperties.class).getTopic())
-        .isEqualTo("zapkin");
-  }
-
-  @Configuration
-  static class InMemoryConfiguration {
-    @Bean
-    CollectorSampler sampler() {
-      return CollectorSampler.ALWAYS_SAMPLE;
-    }
-
-    @Bean
-    CollectorMetrics metrics() {
-      return CollectorMetrics.NOOP_METRICS;
-    }
-
-    @Bean
-    StorageComponent storage() {
-      return InMemoryStorage.newBuilder().build();
-    }
-  }
-}
diff --git a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java b/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java
deleted file mode 100644
index 88a490c..0000000
--- a/zipkin-autoconfigure/collector-kafka08/src/test/java/zipkin2/collector/kafka/v1/NestedPropertyOverrideTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import org.junit.Test;
-import org.springframework.boot.test.util.TestPropertyValues;
-import org.springframework.context.annotation.AnnotationConfigApplicationContext;
-import zipkin2.autoconfigure.collector.kafka08.Access;
-import zipkin2.storage.InMemoryStorage;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class NestedPropertyOverrideTest {
-  @Test
-  public void overrideWithNestedProperties() {
-    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
-    TestPropertyValues.of(
-        "zipkin.collector.kafka.zookeeper:localhost",
-        "zipkin.collector.kafka.overrides.auto.offset.reset:largest").applyTo(context);
-    Access.registerKafkaProperties(context);
-    context.refresh();
-
-    assertThat(
-            Access.collectorBuilder(context)
-                .storage(InMemoryStorage.newBuilder().build())
-                .build()
-                .connector
-                .config
-                .autoOffsetReset())
-        .isEqualTo("largest");
-  }
-}
diff --git a/zipkin-autoconfigure/pom.xml b/zipkin-autoconfigure/pom.xml
index cc1c81e..5cc3810 100644
--- a/zipkin-autoconfigure/pom.xml
+++ b/zipkin-autoconfigure/pom.xml
@@ -37,7 +37,6 @@
   </properties>
 
   <modules>
-    <module>collector-kafka08</module>
     <module>collector-scribe</module>
   </modules>
 
diff --git a/zipkin-collector/kafka/README.md b/zipkin-collector/kafka/README.md
index 9b459a0..49eed4d 100644
--- a/zipkin-collector/kafka/README.md
+++ b/zipkin-collector/kafka/README.md
@@ -1,4 +1,4 @@
-# collector-kafka10
+# collector-kafka
 
 ## KafkaCollector
 This collector is implemented as a Kafka consumer supporting Kafka brokers running
diff --git a/zipkin-collector/kafka08/README.md b/zipkin-collector/kafka08/README.md
deleted file mode 100644
index 142f53f..0000000
--- a/zipkin-collector/kafka08/README.md
+++ /dev/null
@@ -1,41 +0,0 @@
-# collector-kafka
-
-## KafkaCollector
-This collector polls a Kafka 8.2.2+ topic for messages that contain
-a list of spans in json or TBinaryProtocol big-endian encoding. These
-spans are pushed to a span consumer.
-
-`zipkin2.collector.kafka08.KafkaCollector.Builder` includes defaults that will
-operate against a Kafka topic advertised in Zookeeper.
-
-## Encoding spans into Kafka messages
-The message's binary data includes a list of spans. Supported encodings
-are the same as the http [POST /spans](http://zipkin.io/zipkin-api/#/paths/%252Fspans) body.
-
-### Json
-The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).
-
-`Codec.JSON.writeSpans(spans)` performs the correct json encoding.
-
-Here's an example, sending a list of a single span to the zipkin topic:
-
-```bash
-$ kafka-console-producer.sh --broker-list $ADVERTISED_HOST:9092 --topic zipkin
-[{"traceId":"1","name":"bang","id":"2","timestamp":1234,"binaryAnnotations":[{"key":"lc","value":"bamm-bamm","endpoint":{"serviceName":"flintstones","ipv4":"127.0.0.1"}}]}]
-```
-
-### Thrift
-The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol
-
-`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:
-```
-write_byte(12) // type of the list elements: 12 == struct
-write_i32(count) // count of spans that will follow
-for (int i = 0; i < count; i++) {
-  writeTBinaryProtocol(spans(i))
-}
-```
-
-### Legacy encoding
-Older versions of zipkin accepted a single span per message, as opposed
-to a list per message. This practice is deprecated, but still supported.
diff --git a/zipkin-collector/kafka08/pom.xml b/zipkin-collector/kafka08/pom.xml
deleted file mode 100644
index 10d80a0..0000000
--- a/zipkin-collector/kafka08/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-        http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.zipkin.zipkin2</groupId>
-    <artifactId>zipkin-collector-parent</artifactId>
-    <version>2.13.1-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>zipkin-collector-kafka08</artifactId>
-  <name>Collector: Kafka (Legacy)</name>
-
-  <properties>
-    <main.basedir>${project.basedir}/../..</main.basedir>
-    <!-- This is pinned to Kafka 0.8.x client as 0.9.x brokers work with them, but not visa-versa
-         http://docs.confluent.io/2.0.0/upgrade.html -->
-    <kafka.version>0.8.2.2</kafka.version>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>zipkin-collector</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.11</artifactId>
-      <version>${kafka.version}</version>
-      <exclusions>
-        <!-- don't eagerly bind slf4j -->
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-log4j12</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-</project>
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java
deleted file mode 100644
index b68cc03..0000000
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaCollector.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ZookeeperConsumerConnector;
-import zipkin2.CheckResult;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorComponent;
-import zipkin2.collector.CollectorMetrics;
-import zipkin2.collector.CollectorSampler;
-import zipkin2.storage.SpanConsumer;
-import zipkin2.storage.StorageComponent;
-
-import static kafka.consumer.Consumer.createJavaConsumerConnector;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-
-/**
- * This collector polls a Kafka topic for messages that contain TBinaryProtocol big-endian encoded
- * lists of spans. These spans are pushed to a {@link SpanConsumer#accept span consumer}.
- *
- * <p>This collector remains a Kafka 0.8.x consumer, while Zipkin systems update to 0.9+.
- */
-public final class KafkaCollector extends CollectorComponent {
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  /** Configuration including defaults needed to consume spans from a Kafka topic. */
-  public static final class Builder extends CollectorComponent.Builder {
-    final Properties properties = new Properties();
-    Collector.Builder delegate = Collector.newBuilder(KafkaCollector.class);
-    CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
-    String topic = "zipkin";
-    int streams = 1;
-
-    @Override
-    public Builder storage(StorageComponent storage) {
-      delegate.storage(storage);
-      return this;
-    }
-
-    @Override
-    public Builder sampler(CollectorSampler sampler) {
-      delegate.sampler(sampler);
-      return this;
-    }
-
-    @Override
-    public Builder metrics(CollectorMetrics metrics) {
-      if (metrics == null) throw new NullPointerException("metrics == null");
-      this.metrics = metrics.forTransport("kafka");
-      delegate.metrics(this.metrics);
-      return this;
-    }
-
-    /** Topic zipkin spans will be consumed from. Defaults to "zipkin" */
-    public Builder topic(String topic) {
-      if (topic == null) throw new NullPointerException("topic == null");
-      this.topic = topic;
-      return this;
-    }
-
-    /** The zookeeper connect string, ex. 127.0.0.1:2181. No default */
-    public Builder zookeeper(String zookeeper) {
-      if (zookeeper == null) throw new NullPointerException("zookeeper == null");
-      properties.put("zookeeper.connect", zookeeper);
-      return this;
-    }
-
-    /** The consumer group this process is consuming on behalf of. Defaults to "zipkin" */
-    public Builder groupId(String groupId) {
-      if (groupId == null) throw new NullPointerException("groupId == null");
-      properties.put(GROUP_ID_CONFIG, groupId);
-      return this;
-    }
-
-    /** Count of threads/streams consuming the topic. Defaults to 1 */
-    public Builder streams(int streams) {
-      this.streams = streams;
-      return this;
-    }
-
-    /** Maximum size of a message containing spans in bytes. Defaults to 1 MiB */
-    public Builder maxMessageSize(int bytes) {
-      properties.put("fetch.message.max.bytes", String.valueOf(bytes));
-      return this;
-    }
-
-    /**
-     * By default, a consumer will be built from properties derived from builder defaults, as well
-     * "auto.offset.reset" -> "smallest". Any properties set here will override the consumer config.
-     *
-     * <p>For example: Only consume spans since you connected by setting the below.
-     *
-     * <pre>{@code
-     * Map<String, String> overrides = new LinkedHashMap<>();
-     * overrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
-     * builder.overrides(overrides);
-     * }</pre>
-     *
-     * @see org.apache.kafka.clients.consumer.ConsumerConfig
-     */
-    public final Builder overrides(Map<String, ?> overrides) {
-      if (overrides == null) throw new NullPointerException("overrides == null");
-      properties.putAll(overrides);
-      return this;
-    }
-
-    @Override
-    public KafkaCollector build() {
-      return new KafkaCollector(this);
-    }
-
-    Builder() {
-      // Settings below correspond to "Old Consumer Configs"
-      // http://kafka.apache.org/documentation.html
-      properties.put(GROUP_ID_CONFIG, "zipkin");
-      properties.put("fetch.message.max.bytes", String.valueOf(1024 * 1024));
-      // Same default as zipkin-scala, and keeps tests from hanging
-      properties.put(AUTO_OFFSET_RESET_CONFIG, "smallest");
-    }
-  }
-
-  final LazyConnector connector;
-  final LazyStreams streams;
-
-  KafkaCollector(Builder builder) {
-    connector = new LazyConnector(builder);
-    streams = new LazyStreams(builder, connector);
-  }
-
-  @Override
-  public KafkaCollector start() {
-    connector.get();
-    streams.get();
-    return this;
-  }
-
-  @Override
-  public CheckResult check() {
-    try {
-      connector.get(); // make sure the connector didn't throw
-      CheckResult failure = streams.failure.get(); // check the streams didn't quit
-      if (failure != null) return failure;
-      return CheckResult.OK;
-    } catch (RuntimeException e) {
-      return CheckResult.failed(e);
-    }
-  }
-
-  static final class LazyConnector {
-
-    final ConsumerConfig config;
-    volatile ZookeeperConsumerConnector connector;
-
-    LazyConnector(Builder builder) {
-      this.config = new ConsumerConfig(builder.properties);
-    }
-
-    ZookeeperConsumerConnector get() {
-      if (connector == null) {
-        synchronized (this) {
-          if (connector == null) {
-            connector = (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
-          }
-        }
-      }
-      return connector;
-    }
-
-    void close() {
-      ZookeeperConsumerConnector maybeConnector = connector;
-      if (maybeConnector == null) return;
-      maybeConnector.shutdown();
-    }
-  }
-
-  @Override
-  public void close() {
-    streams.close();
-    connector.close();
-  }
-
-  static final class LazyStreams {
-    final int streams;
-    final String topic;
-    final Collector collector;
-    final CollectorMetrics metrics;
-    final LazyConnector connector;
-    final AtomicReference<CheckResult> failure = new AtomicReference<>();
-    volatile ExecutorService pool;
-
-    LazyStreams(Builder builder, LazyConnector connector) {
-      this.streams = builder.streams;
-      this.topic = builder.topic;
-      this.collector = builder.delegate.build();
-      this.metrics = builder.metrics;
-      this.connector = connector;
-    }
-
-    ExecutorService get() {
-      if (pool == null) {
-        synchronized (this) {
-          if (pool == null) {
-            pool = compute();
-          }
-        }
-      }
-      return pool;
-    }
-
-    void close() {
-      ExecutorService maybePool = pool;
-      if (maybePool == null) return;
-      maybePool.shutdownNow();
-      try {
-        maybePool.awaitTermination(1, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        // at least we tried
-      }
-    }
-
-    ExecutorService compute() {
-      ExecutorService pool =
-          streams == 1
-              ? Executors.newSingleThreadExecutor()
-              : Executors.newFixedThreadPool(streams);
-
-      Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
-      topicCountMap.put(topic, streams);
-
-      for (KafkaStream<byte[], byte[]> stream :
-          connector.get().createMessageStreams(topicCountMap).get(topic)) {
-        pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
-      }
-      return pool;
-    }
-
-    Runnable guardFailures(final Runnable delegate) {
-      return new Runnable() {
-        @Override
-        public void run() {
-          try {
-            delegate.run();
-          } catch (RuntimeException e) {
-            failure.set(CheckResult.failed(e));
-          }
-        }
-      };
-    }
-  }
-}
diff --git a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java b/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
deleted file mode 100644
index 4232c2e..0000000
--- a/zipkin-collector/kafka08/src/main/java/zipkin2/collector/kafka08/KafkaStreamProcessor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Collections;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import zipkin2.Callback;
-import zipkin2.Span;
-import zipkin2.codec.SpanBytesDecoder;
-import zipkin2.collector.Collector;
-import zipkin2.collector.CollectorMetrics;
-
-/** Consumes spans from Kafka messages, ignoring malformed input */
-final class KafkaStreamProcessor implements Runnable {
-  static final Callback<Void> NOOP =
-      new Callback<Void>() {
-        @Override
-        public void onSuccess(Void value) {}
-
-        @Override
-        public void onError(Throwable t) {}
-      };
-
-  final KafkaStream<byte[], byte[]> stream;
-  final Collector collector;
-  final CollectorMetrics metrics;
-
-  KafkaStreamProcessor(
-      KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) {
-    this.stream = stream;
-    this.collector = collector;
-    this.metrics = metrics;
-  }
-
-  @Override
-  public void run() {
-    ConsumerIterator<byte[], byte[]> messages = stream.iterator();
-    while (messages.hasNext()) {
-      byte[] bytes = messages.next().message();
-      metrics.incrementMessages();
-      metrics.incrementBytes(bytes.length);
-      if (bytes.length == 0) continue; // lenient on empty messages
-
-      if (bytes.length < 2) { // need two bytes to check if protobuf
-        metrics.incrementMessagesDropped();
-        continue;
-      }
-
-      // If we received legacy single-span encoding, decode it into a singleton list
-      if (!protobuf3(bytes) && bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
-        Span span;
-        try {
-          span = SpanBytesDecoder.THRIFT.decodeOne(bytes);
-        } catch (RuntimeException e) {
-          metrics.incrementMessagesDropped();
-          continue;
-        }
-        collector.accept(Collections.singletonList(span), NOOP);
-      } else {
-        collector.acceptSpans(bytes, NOOP);
-      }
-    }
-  }
-
-  /* span key or trace ID key */
-  static boolean protobuf3(byte[] bytes) {
-    return bytes[0] == 10 && bytes[1] != 0; // varint follows and won't be zero
-  }
-}
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
deleted file mode 100644
index 290a71e..0000000
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/ITKafkaCollector.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.Timeout;
-import zipkin2.Call;
-import zipkin2.Callback;
-import zipkin2.CheckResult;
-import zipkin2.Span;
-import zipkin2.TestObjects;
-import zipkin2.codec.SpanBytesEncoder;
-import zipkin2.collector.InMemoryCollectorMetrics;
-import zipkin2.collector.kafka08.KafkaCollector.Builder;
-import zipkin2.storage.SpanConsumer;
-import zipkin2.storage.SpanStore;
-import zipkin2.storage.StorageComponent;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static zipkin2.TestObjects.CLIENT_SPAN;
-import static zipkin2.TestObjects.UTF_8;
-import static zipkin2.codec.SpanBytesEncoder.THRIFT;
-
-public class ITKafkaCollector {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @ClassRule public static Timeout globalTimeout = Timeout.seconds(20);
-
-  List<Span> spans = Arrays.asList(TestObjects.LOTS_OF_SPANS[0], TestObjects.LOTS_OF_SPANS[1]);
-
-  Producer<String, byte[]> producer = KafkaTestGraph.INSTANCE.producer();
-  InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics();
-  InMemoryCollectorMetrics kafkaMetrics = metrics.forTransport("kafka");
-
-  LinkedBlockingQueue<List<Span>> recvdSpans = new LinkedBlockingQueue<>();
-  SpanConsumer consumer = (spans) -> {
-    recvdSpans.add(spans);
-    return Call.create(null);
-  };
-
-  @Test
-  public void checkPasses() {
-    try (KafkaCollector collector = newKafkaTransport(builder("check_passes"), consumer)) {
-      assertThat(collector.check().ok()).isTrue();
-    }
-  }
-
-  @Test
-  public void start_failsOnInvalidZooKeeper() {
-    thrown.expect(ZkTimeoutException.class);
-    thrown.expectMessage("Unable to connect to zookeeper server within timeout: 6000");
-
-    Builder builder = builder("fail_invalid_zk").zookeeper("1.1.1.1");
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {}
-  }
-
-  @Test
-  public void canSetMaxMessageSize() {
-    Builder builder = builder("max_message").maxMessageSize(1);
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(collector.connector.get().config().fetchMessageMaxBytes()).isEqualTo(1);
-    }
-  }
-
-  /** Ensures legacy encoding works: a single TBinaryProtocol encoded span */
-  @Test
-  public void messageWithSingleThriftSpan() throws Exception {
-    Builder builder = builder("single_span");
-
-    byte[] bytes = THRIFT.encode(CLIENT_SPAN);
-    producer.send(new KeyedMessage<>(builder.topic, bytes));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactly(CLIENT_SPAN);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.messagesDropped()).isZero();
-    assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(1);
-    assertThat(kafkaMetrics.spansDropped()).isZero();
-  }
-
-  /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
-  @Test
-  public void messageWithMultipleSpans_thrift() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_thrift"), THRIFT);
-  }
-
-  /** Ensures list encoding works: a json encoded list of spans */
-  @Test
-  public void messageWithMultipleSpans_json() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_json"), SpanBytesEncoder.JSON_V1);
-  }
-
-  /** Ensures list encoding works: a version 2 json list of spans */
-  @Test
-  public void messageWithMultipleSpans_json2() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_json2"), SpanBytesEncoder.JSON_V2);
-  }
-
-  /** Ensures list encoding works: proto3 ListOfSpans */
-  @Test
-  public void messageWithMultipleSpans_proto3() throws Exception {
-    messageWithMultipleSpans(builder("multiple_spans_proto3"), SpanBytesEncoder.PROTO3);
-  }
-
-  void messageWithMultipleSpans(Builder builder, SpanBytesEncoder encoder) throws Exception {
-    byte[] message = encoder.encodeList(spans);
-
-    producer.send(new KeyedMessage<>(builder.topic, message));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsAll(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(1);
-    assertThat(kafkaMetrics.messagesDropped()).isZero();
-    assertThat(kafkaMetrics.bytes()).isEqualTo(message.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
-    assertThat(kafkaMetrics.spansDropped()).isZero();
-  }
-
-  /** Ensures malformed spans don't hang the collector */
-  @Test
-  public void skipsMalformedData() throws Exception {
-    Builder builder = builder("decoder_exception");
-
-    byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
-    byte[] malformed2 = "malformed".getBytes(UTF_8);
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-    producer.send(new KeyedMessage<>(builder.topic, new byte[0]));
-    producer.send(new KeyedMessage<>(builder.topic, malformed1));
-    producer.send(new KeyedMessage<>(builder.topic, malformed2));
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-      // the only way we could read this, is if the malformed spans were skipped.
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(5);
-    assertThat(kafkaMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
-    assertThat(kafkaMetrics.bytes())
-      .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 2);
-    assertThat(kafkaMetrics.spansDropped()).isZero();
-  }
-
-  /** Guards against errors that leak from storage, such as InvalidQueryException */
-  @Test
-  public void skipsOnStorageException() throws Exception {
-    Builder builder = builder("storage_exception");
-
-    AtomicInteger counter = new AtomicInteger();
-    consumer = (input) -> new Call.Base<Void>() {
-
-      @Override protected Void doExecute() {
-        throw new AssertionError();
-      }
-
-      @Override protected void doEnqueue(Callback<Void> callback) {
-        if (counter.getAndIncrement() == 1) {
-          callback.onError(new RuntimeException("storage fell over"));
-        } else {
-          recvdSpans.add(spans);
-          callback.onSuccess(null);
-        }
-      }
-
-      @Override public Call<Void> clone() {
-        throw new AssertionError();
-      }
-    };
-
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans))); // tossed on error
-    producer.send(new KeyedMessage<>(builder.topic, THRIFT.encodeList(spans)));
-
-    try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-      // the only way we could read this, is if the malformed span was skipped.
-      assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
-    }
-
-    assertThat(kafkaMetrics.messages()).isEqualTo(3);
-    assertThat(kafkaMetrics.messagesDropped()).isZero(); // storage failure isn't a message failure
-    assertThat(kafkaMetrics.bytes()).isEqualTo(THRIFT.encodeList(spans).length * 3);
-    assertThat(kafkaMetrics.spans()).isEqualTo(spans.size() * 3);
-    assertThat(kafkaMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
-  }
-
-  Builder builder(String topic) {
-    return new Builder().metrics(metrics).zookeeper("127.0.0.1:2181").topic(topic);
-  }
-
-  KafkaCollector newKafkaTransport(Builder builder, SpanConsumer consumer) {
-    return new KafkaCollector(builder.storage(buildStorage(consumer))).start();
-  }
-
-  StorageComponent buildStorage(final SpanConsumer spanConsumer) {
-    return new StorageComponent() {
-      @Override
-      public SpanStore spanStore() {
-        throw new AssertionError();
-      }
-
-      @Override
-      public SpanConsumer spanConsumer() {
-        return spanConsumer;
-      }
-
-      @Override
-      public CheckResult check() {
-        return CheckResult.OK;
-      }
-
-      @Override
-      public void close() {
-        throw new AssertionError();
-      }
-    };
-  }
-}
diff --git a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java b/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java
deleted file mode 100644
index 3ff9beb..0000000
--- a/zipkin-collector/kafka08/src/test/java/zipkin2/collector/kafka08/KafkaTestGraph.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.kafka08;
-
-import java.util.Properties;
-import kafka.common.FailedToSendMessageException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkTimeoutException;
-import org.junit.AssumptionViolatedException;
-
-/** Tests only execute when ZK and Kafka are listening on 127.0.0.1 on default ports. */
-enum KafkaTestGraph {
-  INSTANCE;
-
-  private AssumptionViolatedException ex;
-  private Producer<String, byte[]> producer;
-
-  synchronized Producer<String, byte[]> producer() {
-    if (ex != null) throw ex;
-    if (this.producer == null) {
-      Properties producerProps = new Properties();
-      producerProps.put("metadata.broker.list", "127.0.0.1:9092");
-      producerProps.put("producer.type", "sync");
-      producer = new Producer<>(new ProducerConfig(producerProps));
-      try {
-        new ZkClient("127.0.0.1:2181", 1000);
-        producer.send(new KeyedMessage<>("test", new byte[0]));
-      } catch (FailedToSendMessageException | ZkTimeoutException e) {
-        throw ex = new AssumptionViolatedException(e.getMessage(), e);
-      }
-    }
-    return producer;
-  }
-}
diff --git a/zipkin-collector/kafka08/src/test/resources/log4j.properties b/zipkin-collector/kafka08/src/test/resources/log4j.properties
deleted file mode 100644
index 15345f3..0000000
--- a/zipkin-collector/kafka08/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,7 +0,0 @@
-# By default, everything goes to console and file
-log4j.rootLogger=WARN, A1
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
-log4j.appender.A1.ImmediateFlush=true
diff --git a/zipkin-collector/kafka08/src/test/resources/log4j2.properties b/zipkin-collector/kafka08/src/test/resources/log4j2.properties
deleted file mode 100755
index c437666..0000000
--- a/zipkin-collector/kafka08/src/test/resources/log4j2.properties
+++ /dev/null
@@ -1,11 +0,0 @@
-appenders=console
-appender.console.type=Console
-appender.console.name=STDOUT
-appender.console.layout.type=PatternLayout
-appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n
-rootLogger.level=warn
-rootLogger.appenderRefs=stdout
-rootLogger.appenderRef.stdout.ref=STDOUT
-# don't waste logs when ZK check fails
-logger.zk.name=org.apache.zookeeper.ClientCnxn
-logger.zk.level=off
diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml
index 153398d..246f176 100644
--- a/zipkin-collector/pom.xml
+++ b/zipkin-collector/pom.xml
@@ -42,7 +42,6 @@
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>scribe</module>
-    <module>kafka08</module>
   </modules>
 
   <dependencies>
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 7832c84..7e64055 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -352,14 +352,6 @@ Specifying bootstrap servers as a system property, instead of an environment var
 $ java -Dzipkin.collector.kafka.bootstrap-servers=127.0.0.1:9092 -jar zipkin.jar
 ```
 
-#### Migration from Kafka < 0.8.1
-
-As explained [on kafka wiki](https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka), offsets were stored in ZooKeeper. This has changed and offsets are now stored directly in Kafka. You need to update offsets in Kafka 0.10 by following the instructions.
-
-#### Kafka (Legacy) Collector
-The default collector is for Kafka 0.10.x+ brokers. You can use Kafka
-0.8 brokers via an external module. See [zipkin-autoconfigure/collector-kafka08](../zipkin-autoconfigure/collector-kafka08/).
-
 ### RabbitMQ collector
 The [RabbitMQ collector](../zipkin-collector/rabbitmq) will be enabled when the `addresses` or `uri` for the RabbitMQ server(s) is set.
 


Mime
View raw message