flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject [2/2] flume git commit: FLUME-2983. Handle offset migration in the new Kafka Source
Date Thu, 01 Sep 2016 01:50:43 GMT
FLUME-2983. Handle offset migration in the new Kafka Source

Similar to FLUME-2972, offsets tracking the position in Kafka consumers
change from using zookeeper for offset storage to Kafka when moving from
0.8.x to 0.9.x.

FLUME-2821 makes the client change in the Kafka Source but does not
ensure existing offsets get migrated in order to continue consuming
where it left off. Flume should have some automated logic on startup to
check if Kafka offsets exist, if not and migration is enabled (by
default) then copy the offsets from Zookeeper and commit them to Kafka.

This change should also fix the backwards incompatibility caused by
removing the zookeeperConnect property. The bootstrap can be looked up
if zookeeperConnect is used.

Reviewers: Denes Arvay, Mike Percy

(Grant Henke via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/330e5728
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/330e5728
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/330e5728

Branch: refs/heads/trunk
Commit: 330e572879c9903f5b1084d84b237685aa9970c6
Parents: 53fb374
Author: Grant Henke <granthenke@gmail.com>
Authored: Wed Aug 31 18:44:00 2016 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Wed Aug 31 18:45:38 2016 -0700

----------------------------------------------------------------------
 .../flume/channel/kafka/KafkaChannel.java       |   2 +-
 .../flume/channel/kafka/TestKafkaChannel.java   |   2 +-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   7 +-
 flume-ng-sources/flume-kafka-source/pom.xml     |   1 +
 .../apache/flume/source/kafka/KafkaSource.java  | 183 +++++++++++++++++--
 .../source/kafka/KafkaSourceConstants.java      |   5 +-
 .../source/kafka/KafkaSourceEmbeddedKafka.java  |   4 +-
 .../flume/source/kafka/TestKafkaSource.java     | 163 ++++++++++++++++-
 8 files changed, 343 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index e7f1f2e..66b553a 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -193,7 +193,7 @@ public class KafkaChannel extends BasicChannelSemantics {
 
     migrateZookeeperOffsets = ctx.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
       DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
-    zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT);
+    zookeeperConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
 
     if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
       logger.debug("Kafka properties: {}", ctx);

http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index e7ae68f..57c0b28 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -255,7 +255,7 @@ public class TestKafkaChannel {
     createTopic(topic, 1);
 
     Context context = prepareDefaultContext(false);
-    context.put(ZOOKEEPER_CONNECT, testUtil.getZkUrl());
+    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
     context.put(GROUP_ID_FLUME, group);
     final KafkaChannel channel = createChannel(context);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 0fd1ec9..53844e3 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1260,8 +1260,13 @@ useFlumeEventFormat              false        By default events are
taken as byt
                                               true to read events as the Flume Avro binary
format. Used in conjunction with the same property
                                               on the KafkaSink or with the parseAsFlumeEvent
property on the Kafka Channel this will preserve
                                               any Flume headers sent on the producing side.
+migrateZookeeperOffsets          true         When no Kafka stored offset is found, look
up the offsets in Zookeeper and commit them to Kafka.
+                                              This should be true to support seamless Kafka
client migration from older versions of Flume.
+                                              Once migrated this can be set to false, though
that should generally not be required.
+                                              If no Zookeeper offset is found, the Kafka
configuration kafka.consumer.auto.offset.reset
+                                              defines how offsets are handled.
 Other Kafka Consumer Properties  --           These properties are used to configure the
Kafka Consumer. Any producer property supported
-                                              by Kafka can be used. The only requirement
is to prepend the property name with the prefix 
+                                              by Kafka can be used. The only requirement
is to prepend the property name with the prefix
                                               ``kafka.consumer``.
                                               For example: ``kafka.consumer.auto.offset.reset``
                                               Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>`_
for details

http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-sources/flume-kafka-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml
index 5f5c2a8..c89ea1a 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -51,6 +51,7 @@
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
+      <version>${kafka.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.zookeeper</groupId>

http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 86782c3..195eca3 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -29,9 +29,14 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
+import kafka.cluster.BrokerEndPoint;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -43,17 +48,25 @@ import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
 import org.apache.flume.source.AbstractPollableSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.JaasUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import scala.Option;
+
+import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
+import static scala.collection.JavaConverters.asJavaListConverter;
 
 /**
  * A Source for Kafka which reads messages from kafka topics.
@@ -84,6 +97,10 @@ public class KafkaSource extends AbstractPollableSource
         implements Configurable {
   private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
 
+  // Constants used only for offset migration zookeeper connections
+  private static final int ZK_SESSION_TIMEOUT = 30000;
+  private static final int ZK_CONNECTION_TIMEOUT = 30000;
+
   private Context context;
   private Properties kafkaProps;
   private KafkaSourceCounter counter;
@@ -106,6 +123,10 @@ public class KafkaSource extends AbstractPollableSource
 
   private Subscriber subscriber;
 
+  private String zookeeperConnect;
+  private String bootstrapServers;
+  private String groupId = DEFAULT_GROUP_ID;
+  private boolean migrateZookeeperOffsets = DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS;
 
   /**
    * This class is a helper to subscribe for topics by using
@@ -342,12 +363,44 @@ public class KafkaSource extends AbstractPollableSource
       log.debug(KafkaSourceConstants.AVRO_EVENT + " set to: {}", useAvroEventFormat);
     }
 
-    String bootstrapServers = context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS);
+    zookeeperConnect = context.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
+    migrateZookeeperOffsets = context.getBoolean(MIGRATE_ZOOKEEPER_OFFSETS,
+        DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS);
+
+    bootstrapServers = context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS);
     if (bootstrapServers == null || bootstrapServers.isEmpty()) {
-      throw new ConfigurationException("Bootstrap Servers must be specified");
+      if (zookeeperConnect == null || zookeeperConnect.isEmpty()) {
+        throw new ConfigurationException("Bootstrap Servers must be specified");
+      } else {
+        // For backwards compatibility look up the bootstrap from zookeeper
+        log.warn("{} is deprecated. Please use the parameter {}",
+            KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY,
+            KafkaSourceConstants.BOOTSTRAP_SERVERS);
+
+        // Lookup configured security protocol, just in case its not default
+        String securityProtocolStr =
+            context.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX)
+                .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+        if (securityProtocolStr == null || securityProtocolStr.isEmpty()) {
+          securityProtocolStr = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+        }
+        bootstrapServers =
+            lookupBootstrap(zookeeperConnect, SecurityProtocol.valueOf(securityProtocolStr));
+      }
     }
 
-    setConsumerProps(context, bootstrapServers);
+    String groupIdProperty =
+        context.getString(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    if (groupIdProperty != null && !groupIdProperty.isEmpty()) {
+      groupId = groupIdProperty; // Use the new group id property
+    }
+
+    if (groupId == null || groupId.isEmpty()) {
+      groupId = DEFAULT_GROUP_ID;
+      log.info("Group ID was not specified. Using {} as the group id.", groupId);
+    }
+
+    setConsumerProps(context);
 
     if (log.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
       log.debug("Kafka consumer properties: {}", kafkaProps);
@@ -369,23 +422,15 @@ public class KafkaSource extends AbstractPollableSource
     }
 
     // old groupId
-    String groupId = ctx.getString(KafkaSourceConstants.OLD_GROUP_ID);
+    groupId = ctx.getString(KafkaSourceConstants.OLD_GROUP_ID);
     if (groupId != null && !groupId.isEmpty()) {
-      kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
       log.warn("{} is deprecated. Please use the parameter {}",
               KafkaSourceConstants.OLD_GROUP_ID,
               KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
     }
   }
 
-  private void setConsumerProps(Context ctx, String bootStrapServers) {
-    String groupId = ctx.getString(
-        KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
-    if ((groupId == null || groupId.isEmpty()) &&
-        kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
-      groupId = KafkaSourceConstants.DEFAULT_GROUP_ID;
-      log.info("Group ID was not specified. Using " + groupId + " as the group id.");
-    }
+  private void setConsumerProps(Context ctx) {
     kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
     kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
@@ -393,7 +438,7 @@ public class KafkaSource extends AbstractPollableSource
     //Defaults overridden based on config
     kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
     //These always take precedence over config
-    kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
     if (groupId != null) {
       kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
     }
@@ -401,6 +446,31 @@ public class KafkaSource extends AbstractPollableSource
                    KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
   }
 
+  /**
+   * Generates the Kafka bootstrap connection string from the metadata stored in Zookeeper.
+   * Allows for backwards compatibility of the zookeeperConnect configuration.
+   */
+  private String lookupBootstrap(String zookeeperConnect, SecurityProtocol securityProtocol)
{
+    ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
+        JaasUtils.isZkSecurityEnabled());
+    try {
+      List<BrokerEndPoint> endPoints =
+          asJavaListConverter(zkUtils.getAllBrokerEndPointsForChannel(securityProtocol)).asJava();
+      List<String> connections = new ArrayList<>();
+      for (BrokerEndPoint endPoint : endPoints) {
+        connections.add(endPoint.connectionString());
+      }
+      return StringUtils.join(connections, ',');
+    } finally {
+      zkUtils.close();
+    }
+  }
+
+  @VisibleForTesting
+  String getBootstrapServers() {
+    return bootstrapServers;
+  }
+
   Properties getConsumerProps() {
     return kafkaProps;
   }
@@ -424,6 +494,21 @@ public class KafkaSource extends AbstractPollableSource
   protected void doStart() throws FlumeException {
     log.info("Starting {}...", this);
 
+    // As a migration step check if there are any offsets from the group stored in kafka
+    // If not read them from Zookeeper and commit them to Kafka
+    if (migrateZookeeperOffsets && zookeeperConnect != null && !zookeeperConnect.isEmpty())
{
+      // For simplicity we only support migration of a single topic via the TopicListSubscriber.
+      // There was no way to define a list of topics or a pattern in the previous Flume version.
+      if (subscriber instanceof TopicListSubscriber &&
+          ((TopicListSubscriber) subscriber).get().size() == 1) {
+        String topicStr = ((TopicListSubscriber) subscriber).get().get(0);
+        migrateOffsets(topicStr);
+      } else {
+        log.info("Will not attempt to migrate offsets " +
+            "because multiple topics or a pattern are defined");
+      }
+    }
+
     //initialize a consumer.
     consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
 
@@ -445,6 +530,76 @@ public class KafkaSource extends AbstractPollableSource
     counter.stop();
     log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
   }
+
+  private void migrateOffsets(String topicStr) {
+    ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
+        JaasUtils.isZkSecurityEnabled());
+    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps);
+    try {
+      Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
+          getKafkaOffsets(consumer, topicStr);
+      if (!kafkaOffsets.isEmpty()) {
+        log.info("Found Kafka offsets for topic " + topicStr +
+            ". Will not migrate from zookeeper");
+        log.debug("Offsets found: {}", kafkaOffsets);
+        return;
+      }
+
+      log.info("No Kafka offsets found. Migrating zookeeper offsets");
+      Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
+          getZookeeperOffsets(zkUtils, topicStr);
+      if (zookeeperOffsets.isEmpty()) {
+        log.warn("No offsets to migrate found in Zookeeper");
+        return;
+      }
+
+      log.info("Committing Zookeeper offsets to Kafka");
+      log.debug("Offsets to commit: {}", zookeeperOffsets);
+      consumer.commitSync(zookeeperOffsets);
+      // Read the offsets to verify they were committed
+      Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets =
+          getKafkaOffsets(consumer, topicStr);
+      log.debug("Offsets committed: {}", newKafkaOffsets);
+      if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
+        throw new FlumeException("Offsets could not be committed");
+      }
+    } finally {
+      zkUtils.close();
+      consumer.close();
+    }
+  }
+
+  private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
+      KafkaConsumer<String, byte[]> client, String topicStr) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+    List<PartitionInfo> partitions = client.partitionsFor(topicStr);
+    for (PartitionInfo partition : partitions) {
+      TopicPartition key = new TopicPartition(topicStr, partition.partition());
+      OffsetAndMetadata offsetAndMetadata = client.committed(key);
+      if (offsetAndMetadata != null) {
+        offsets.put(key, offsetAndMetadata);
+      }
+    }
+    return offsets;
+  }
+
+  private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client,
+                                                                     String topicStr) {
+    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+    ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
+    List<String> partitions = asJavaListConverter(
+        client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
+    for (String partition : partitions) {
+      TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
+      Option<String> data = client.readDataMaybeNull(
+          topicDirs.consumerOffsetDir() + "/" + partition)._1();
+      if (data.isDefined()) {
+        Long offset = Long.valueOf(data.get());
+        offsets.put(key, new OffsetAndMetadata(offset));
+      }
+    }
+    return offsets;
+  }
 }
 
 class SourceRebalanceListener implements ConsumerRebalanceListener {

http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 1f255f9..bf1a19d 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -37,11 +37,14 @@ public class KafkaSourceConstants {
   public static final int DEFAULT_BATCH_DURATION = 1000;
   public static final String DEFAULT_GROUP_ID = "flume";
 
+  public static final String MIGRATE_ZOOKEEPER_OFFSETS = "migrateZookeeperOffsets";
+  public static final boolean DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS = true;
+
   public static final String AVRO_EVENT = "useFlumeEventFormat";
   public static final boolean DEFAULT_AVRO_EVENT = false;
 
   /* Old Properties */
-
+  public static final String ZOOKEEPER_CONNECT_FLUME_KEY = "zookeeperConnect";
   public static final String TOPIC = "topic";
   public static final String OLD_GROUP_ID = "groupId";
 

http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index a3a2f92..53bd65c 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -36,7 +36,7 @@ import java.util.concurrent.ExecutionException;
 
 public class KafkaSourceEmbeddedKafka {
 
-  public static String HOST = InetAddress.getLoopbackAddress().getHostAddress();
+  public static String HOST = InetAddress.getLoopbackAddress().getCanonicalHostName();
 
   KafkaServerStartable kafkaServer;
   KafkaSourceEmbeddedZookeeper zookeeper;
@@ -80,7 +80,7 @@ public class KafkaSourceEmbeddedKafka {
     return zookeeper.getConnectString();
   }
 
-  public String getBrockers() {
+  public String getBootstrapServers() {
     return HOST + ":" + serverPort;
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/330e5728/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 1598741..9554201 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -21,9 +21,12 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import junit.framework.Assert;
 import kafka.common.TopicExistsException;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -33,6 +36,14 @@ import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -44,10 +55,13 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT;
@@ -63,6 +77,7 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX;
 import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER;
+import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -76,6 +91,8 @@ public class TestKafkaSource {
   private KafkaSourceEmbeddedKafka kafkaServer;
   private Context context;
   private List<Event> events;
+
+  private final Set<String> usedTopics = new HashSet<String>();
   private String topic0 = "test1";
   private String topic1 = "topic1";
 
@@ -86,19 +103,21 @@ public class TestKafkaSource {
     kafkaServer = new KafkaSourceEmbeddedKafka(null);
     try {
       kafkaServer.createTopic(topic0, 1);
+      usedTopics.add(topic0);
       kafkaServer.createTopic(topic1, 3);
+      usedTopics.add(topic1);
     } catch (TopicExistsException e) {
       //do nothing
       e.printStackTrace();
     }
-    context = prepareDefaultContext();
+    context = prepareDefaultContext("flume-group");
     kafkaSource.setChannelProcessor(createGoodChannel());
   }
 
-  private Context prepareDefaultContext() {
+  private Context prepareDefaultContext(String groupId) {
     Context context = new Context();
-    context.put(BOOTSTRAP_SERVERS, kafkaServer.getBrockers());
-    context.put(KAFKA_CONSUMER_PREFIX + "group.id", "flume-group");
+    context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapServers());
+    context.put(KAFKA_CONSUMER_PREFIX + "group.id", groupId);
     return context;
   }
 
@@ -575,6 +594,118 @@ public class TestKafkaSource {
 
   }
 
+  @Test
+  public void testBootstrapLookup() {
+    Context context = new Context();
+
+    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
+    context.put(TOPIC, "old.topic");
+    context.put(OLD_GROUP_ID, "old.groupId");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+    String bootstrapServers = source.getBootstrapServers();
+    Assert.assertEquals(kafkaServer.getBootstrapServers(), bootstrapServers);
+  }
+
+  @Test
+  public void testMigrateOffsetsNone() throws Exception {
+    doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
+  }
+
+  @Test
+  public void testMigrateOffsetsZookeeper() throws Exception {
+    doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
+  }
+
+  @Test
+  public void testMigrateOffsetsKafka() throws Exception {
+    doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
+  }
+
+  @Test
+  public void testMigrateOffsetsBoth() throws Exception {
+    doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
+  }
+
+  public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
+                                            String group) throws Exception {
+    // create a topic with 1 partition for simplicity
+    String topic = findUnusedTopic();
+    kafkaServer.createTopic(topic, 1);
+
+    Context context = prepareDefaultContext(group);
+    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, kafkaServer.getZkConnectString());
+    context.put(TOPIC, topic);
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+
+    // Produce some data and save an offset
+    Long fifthOffset = 0L;
+    Long tenthOffset = 0L;
+    Properties props = createProducerProps(kafkaServer.getBootstrapServers());
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+    for (int i = 1; i <= 50; i++) {
+      ProducerRecord<String, byte[]> data =
+          new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
+      RecordMetadata recordMetadata = producer.send(data).get();
+      if (i == 5) {
+        fifthOffset = recordMetadata.offset();
+      }
+      if (i == 10) {
+        tenthOffset = recordMetadata.offset();
+      }
+    }
+
+    // Commit 10th offset to zookeeper
+    if (hasZookeeperOffsets) {
+      ZkUtils zkUtils = ZkUtils.apply(kafkaServer.getZkConnectString(), 30000, 30000,
+          JaasUtils.isZkSecurityEnabled());
+      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic);
+      // we commit the tenth offset to ensure some data is missed.
+      Long offset = tenthOffset + 1;
+      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(),
+          zkUtils.updatePersistentPath$default$3());
+      zkUtils.close();
+    }
+
+    // Commit 5th offset to kafka
+    if (hasKafkaOffsets) {
+      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+      offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1));
+      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(source.getConsumerProps());
+      consumer.commitSync(offsets);
+      consumer.close();
+    }
+
+    // Start the source and read some data
+    source.setChannelProcessor(createGoodChannel());
+    source.start();
+    Thread.sleep(500L);
+    source.process();
+    List<Integer> finals = new ArrayList<Integer>(40);
+    for (Event event: events) {
+      finals.add(Integer.parseInt(new String(event.getBody())));
+    }
+    source.stop();
+
+    if (!hasKafkaOffsets && !hasZookeeperOffsets) {
+      // The default behavior is to start at the latest message in the log
+      org.junit.Assert.assertTrue("Source should read no messages", finals.isEmpty());
+    } else if (hasKafkaOffsets && hasZookeeperOffsets) {
+      // Respect Kafka offsets if they exist
+      org.junit.Assert.assertFalse("Source should not read the 5th message", finals.contains(5));
+      org.junit.Assert.assertTrue("Source should read the 6th message", finals.contains(6));
+    } else if (hasKafkaOffsets) {
+      // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing)
+      org.junit.Assert.assertFalse("Source should not read the 5th message", finals.contains(5));
+      org.junit.Assert.assertTrue("Source should read the 6th message", finals.contains(6));
+    } else {
+      // Otherwise migrate the ZooKeeper offsets if they exist
+      org.junit.Assert.assertFalse("Source should not read the 10th message", finals.contains(10));
+      org.junit.Assert.assertTrue("Source should read the 11th message", finals.contains(11));
+    }
+  }
+
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
@@ -604,4 +735,28 @@ public class TestKafkaSource {
 
     return channelProcessor;
   }
+
+  public String findUnusedTopic() {
+    String newTopic = null;
+    boolean topicFound = false;
+    while (!topicFound) {
+      newTopic = RandomStringUtils.randomAlphabetic(8);
+      if (!usedTopics.contains(newTopic)) {
+        usedTopics.add(newTopic);
+        topicFound = true;
+      }
+    }
+    return newTopic;
+  }
+
+  private Properties createProducerProps(String bootStrapServers) {
+    Properties props = new Properties();
+    props.put(ProducerConfig.ACKS_CONFIG, "-1");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.StringSerializer");
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    return props;
+  }
 }


Mime
View raw message