flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject [1/3] flume git commit: FLUME-2821: Flume-Kafka Source with new Consumer
Date Tue, 29 Mar 2016 17:44:35 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 5293eba9a -> e8c4a7bff


FLUME-2821: Flume-Kafka Source with new Consumer

(Grigoriy Rozhkov via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f8abaf78
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f8abaf78
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f8abaf78

Branch: refs/heads/trunk
Commit: f8abaf78fb98e91b7a228aaa231f4164d8dcfc97
Parents: 5293eba
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Tue Mar 29 09:42:24 2016 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Tue Mar 29 09:42:24 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  58 ++-
 flume-ng-sources/flume-kafka-source/pom.xml     |   5 +
 .../apache/flume/source/kafka/KafkaSource.java  | 398 +++++++++++++------
 .../source/kafka/KafkaSourceConstants.java      |  36 +-
 .../flume/source/kafka/KafkaSourceUtil.java     | 112 ------
 .../source/kafka/KafkaSourceEmbeddedKafka.java  |  96 +++--
 .../kafka/KafkaSourceEmbeddedZookeeper.java     |  17 +-
 .../flume/source/kafka/TestKafkaSource.java     | 281 ++++++++++---
 .../flume/source/kafka/TestKafkaSourceUtil.java |  92 -----
 pom.xml                                         |   5 +-
 10 files changed, 649 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 423e0cf..341ae42 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1188,9 +1188,9 @@ Example for agent named a1:
 Kafka Source
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
+Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics.
 If you have multiple Kafka sources running, you can configure them with the same Consumer Group
-so each will read a unique set of partitions for the topic.
+so each will read a unique set of partitions for the topics.
 
 
 
@@ -1198,11 +1198,13 @@ so each will read a unique set of partitions for the topic.
 Property Name                    Default      Description
 ===============================  ===========  ===================================================
 **channels**                     --
-**type**                         --           The component type name, needs to be ``org.apache.flume.source.kafka,KafkaSource``
-**zookeeperConnect**             --           URI of ZooKeeper used by Kafka cluster
-**groupId**                      flume        Unique identified of consumer group. Setting the same id in multiple sources or agents
+**type**                         --           The component type name, needs to be ``org.apache.flume.source.kafka.KafkaSource``
+**kafka.bootstrap.servers**      --           List of brokers in the Kafka cluster used by the source
+kafka.consumer.group.id          flume        Unique identified of consumer group. Setting the same id in multiple sources or agents
                                               indicates that they are part of the same consumer group
-**topic**                        --           Kafka topic we'll read messages from. At the time, this is a single topic only.
+**kafka.topics**                 --           Comma-separated list of topics the kafka consumer will read messages from.
+**kafka.topics.regex**           --           Regex that defines set of topics the source is subscribed on. This property has higher priority
+                                              than ``kafka.topics`` and overrides ``kafka.topics`` if exists.
 batchSize                        1000         Maximum number of messages written to Channel in one batch
 batchDurationMillis              1000         Maximum time (in ms) before a batch will be written to Channel
                                               The batch will be written whenever the first of size and time will be reached.
@@ -1214,31 +1216,49 @@ maxBackoffSleep                  5000         Maximum wait time that is triggere
                                               ideal for ingestion use cases but a lower value may be required for low latency operations
                                               with interceptors.
 Other Kafka Consumer Properties  --           These properties are used to configure the Kafka Consumer. Any producer property supported
-                                              by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.``.
-                                              For example: kafka.consumer.timeout.ms
-                                              Check `Kafka documentation <https://kafka.apache.org/08/configuration.html#consumerconfigs>` for details
+                                              by Kafka can be used. The only requirement is to prepend the property name with the prefix ``kafka.consumer``.
+                                              For example: kafka.consumer.auto.offset.reset
+                                              Check `Kafka documentation <http://kafka.apache.org/documentation.html#newconsumerconfigs>` for details
 ===============================  ===========  ===================================================
 
 .. note:: The Kafka Source overrides two Kafka consumer parameters:
-          auto.commit.enable is set to "false" by the source and we commit every batch. For improved performance
-          this can be set to "true", however, this can lead to loss of data
-          consumer.timeout.ms is set to 10ms, so when we check Kafka for new data we wait at most 10ms for the data to arrive
-          setting this to a higher value can reduce CPU utilization (we'll poll Kafka in less of a tight loop), but also means
-          higher latency in writing batches to channel (since we'll wait longer for data to arrive).
+          auto.commit.enable is set to "false" by the source and every batch is committed. Kafka source guarantees at least once
+          strategy of messages retrieval. The duplicates can be present when the source starts.
+          The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer)
+          and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.
 
+Deprecated Properties
 
-Example for agent named tier1:
+===============================  ===================  =============================================================================================
+Property Name                    Default              Description
+===============================  ===================  =============================================================================================
+topic                            --                   Use kafka.topics
+groupId                          flume                Use kafka.consumer.group.id
+zookeeperConnect                 --                   Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers
+                                                      to establish connection with kafka cluster
+===============================  ===================  =============================================================================================
+
+Example for topic subscription by comma-separated topic list.
 
 .. code-block:: properties
 
     tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
     tier1.sources.source1.channels = channel1
-    tier1.sources.source1.zookeeperConnect = localhost:2181
-    tier1.sources.source1.topic = test1
-    tier1.sources.source1.groupId = flume
-    tier1.sources.source1.kafka.consumer.timeout.ms = 100
+    tier1.sources.source1.batchSize = 5000
+    tier1.sources.source1.batchDurationMillis = 2000
+    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
+    tier1.sources.source1.kafka.topics = test1, test2
+    tier1.sources.source1.kafka.consumer.group.id = custom.g.id
 
+Example for topic subscription by regex
 
+.. code-block:: properties
+
+    tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
+    tier1.sources.source1.channels = channel1
+    tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
+    tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
+    # the default kafka.consumer.group.id=flume is used
 
 
 NetCat Source

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/pom.xml b/flume-ng-sources/flume-kafka-source/pom.xml
index 0f93476..5f5c2a8 100644
--- a/flume-ng-sources/flume-kafka-source/pom.xml
+++ b/flume-ng-sources/flume-kafka-source/pom.xml
@@ -62,6 +62,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.10</artifactId>
       <classifier>test</classifier>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index fd1dd3c..db806cc 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -17,40 +17,49 @@
 package org.apache.flume.source.kafka;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
-
-import org.apache.flume.*;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
 import org.apache.flume.source.AbstractPollableSource;
-import org.apache.flume.source.AbstractSource;
-import org.apache.flume.source.BasicSourceSemantics;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * A Source for Kafka which reads messages from a kafka topic.
+ * A Source for Kafka which reads messages from kafka topics.
  *
- * <tt>zookeeperConnect: </tt> Kafka's zookeeper connection string.
- * <b>Required</b>
+ * <tt>kafka.bootstrap.servers: </tt> A comma separated list of host:port pairs
+ * to use for establishing the initial connection to the Kafka cluster.
+ * For example host1:port1,host2:port2,...
+ * <b>Required</b> for kafka.
  * <p>
- * <tt>groupId: </tt> the group ID of consumer group. <b>Required</b>
+ * <tt>kafka.consumer.group.id: </tt> the group ID of consumer group. <b>Required</b>
  * <p>
- * <tt>topic: </tt> the topic to consume messages from. <b>Required</b>
+ * <tt>kafka.topics: </tt> the topic list separated by commas to consume messages from.
+ * <b>Required</b>
  * <p>
  * <tt>maxBatchSize: </tt> Maximum number of messages written to Channel in one
  * batch. Default: 1000
@@ -58,99 +67,167 @@ import org.slf4j.LoggerFactory;
  * <tt>maxBatchDurationMillis: </tt> Maximum number of milliseconds before a
  * batch (of any size) will be written to a channel. Default: 1000
  * <p>
- * <tt>kafka.auto.commit.enable: </tt> If true, commit automatically every time
- * period. if false, commit on each batch. Default: false
+ * <tt>kafka.consumer.*: </tt> Any property starting with "kafka.consumer" will be
+ * passed to the kafka consumer So you can use any configuration supported by Kafka 0.9.0.X
  * <p>
- * <tt>kafka.consumer.timeout.ms: </tt> Polling interval for new data for batch.
- * Low value means more CPU usage. High value means the time.upper.limit may be
- * missed. Default: 10
- *
- * Any property starting with "kafka" will be passed to the kafka consumer So
- * you can use any configuration supported by Kafka 0.8.1.1
  */
 public class KafkaSource extends AbstractPollableSource
         implements Configurable {
   private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
-  private ConsumerConnector consumer;
-  private ConsumerIterator<byte[],byte[]> it;
-  private String topic;
-  private int batchUpperLimit;
-  private int timeUpperLimit;
-  private int consumerTimeout;
-  private boolean kafkaAutoCommitEnabled;
+
   private Context context;
   private Properties kafkaProps;
-  private final List<Event> eventList = new ArrayList<Event>();
   private KafkaSourceCounter counter;
+  private KafkaConsumer<String, byte[]> consumer;
+  private Iterator<ConsumerRecord<String, byte[]>> it;
+
+  private final List<Event> eventList = new ArrayList<Event>();
+  private Map<TopicPartition, OffsetAndMetadata> tpAndOffsetMetadata;
+  private AtomicBoolean rebalanceFlag;
+
+  private Map<String, String> headers;
+
+  private int batchUpperLimit;
+  private int maxBatchDurationMillis;
+
+  private Subscriber subscriber;
+
+
+  /**
+   * This class is a helper to subscribe for topics by using
+   * different strategies
+   */
+  public abstract class Subscriber<T> {
+    public abstract void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener);
+    public T get() {return null;}
+  }
+
+  private class TopicListSubscriber extends Subscriber<List<String>> {
+    private List<String> topicList;
+    public TopicListSubscriber(String commaSeparatedTopics) {
+      this.topicList = Arrays.asList(commaSeparatedTopics.split("^\\s+|\\s*,\\s*|\\s+$"));
+    }
+    @Override
+    public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
+      consumer.subscribe(topicList, listener);
+    }
+    @Override
+    public List<String> get() {
+      return topicList;
+    }
+  }
+
+  private class PatternSubscriber extends Subscriber<Pattern> {
+    private Pattern pattern;
+    public PatternSubscriber(String regex) {
+      this.pattern = Pattern.compile(regex);
+    }
+    @Override
+    public void subscribe(KafkaConsumer<?, ?> consumer, SourceRebalanceListener listener) {
+      consumer.subscribe(pattern, listener);
+    }
+    @Override
+    public Pattern get() {
+      return pattern;
+    }
+  }
+
 
   @Override
   protected Status doProcess() throws EventDeliveryException {
+    final String batchUUID = UUID.randomUUID().toString();
     byte[] kafkaMessage;
-    byte[] kafkaKey;
+    String kafkaKey;
     Event event;
-    Map<String, String> headers;
-    long batchStartTime = System.currentTimeMillis();
-    long batchEndTime = System.currentTimeMillis() + timeUpperLimit;
+
     try {
-      boolean iterStatus = false;
-      long startTime = System.nanoTime();
+      // prepare time variables for new batch
+      final long nanoBatchStartTime = System.nanoTime();
+      final long batchStartTime = System.currentTimeMillis();
+      final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;
+
       while (eventList.size() < batchUpperLimit &&
-              System.currentTimeMillis() < batchEndTime) {
-        iterStatus = hasNext();
-        if (iterStatus) {
-          // get next message
-          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
-          kafkaMessage = messageAndMetadata.message();
-          kafkaKey = messageAndMetadata.key();
-
-          // Add headers to event (topic, timestamp, and key)
-          headers = new HashMap<String, String>();
-          headers.put(KafkaSourceConstants.TIMESTAMP,
-                  String.valueOf(System.currentTimeMillis()));
-          headers.put(KafkaSourceConstants.TOPIC, topic);
-          if (kafkaKey != null) {
-            headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+              System.currentTimeMillis() < maxBatchEndTime) {
+
+        if (it == null || !it.hasNext()) {
+          // Obtaining new records
+          // Poll time is remainder time for current batch.
+          ConsumerRecords<String, byte[]> records = consumer.poll(
+                  Math.max(0, maxBatchEndTime - System.currentTimeMillis()));
+          it = records.iterator();
+
+          // this flag is set to true in a callback when some partitions are revoked.
+          // If there are any records we commit them.
+          if (rebalanceFlag.get()) {
+            rebalanceFlag.set(false);
+            break;
           }
-          if (log.isDebugEnabled()) {
-            log.debug("Message: {}", new String(kafkaMessage));
+          // check records after poll
+          if (!it.hasNext()) {
+            if (log.isDebugEnabled()) {
+              counter.incrementKafkaEmptyCount();
+              log.debug("Returning with backoff. No more data to read");
+            }
+            // batch time exceeded
+            break;
           }
-          event = EventBuilder.withBody(kafkaMessage, headers);
-          eventList.add(event);
         }
+
+        // get next message
+        ConsumerRecord<String, byte[]> message = it.next();
+        kafkaKey = message.key();
+        kafkaMessage = message.value();
+
+        headers.clear();
+        // Add headers to event (timestamp, topic, partition, key)
+        headers.put(KafkaSourceConstants.TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()));
+        headers.put(KafkaSourceConstants.TOPIC_HEADER, message.topic());
+        headers.put(KafkaSourceConstants.PARTITION_HEADER, String.valueOf(message.partition()));
+        if (kafkaKey != null) {
+          headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);
+        }
+
+        if (log.isDebugEnabled()) {
+          log.debug("Topic: {} Partition: {} Message: {}", new String[]{
+                  message.topic(),
+                  String.valueOf(message.partition()),
+                  new String(kafkaMessage)});
+        }
+
+        event = EventBuilder.withBody(kafkaMessage, headers);
+        eventList.add(event);
+
         if (log.isDebugEnabled()) {
           log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime);
           log.debug("Event #: {}", eventList.size());
         }
+
+        // For each partition store next offset that is going to be read.
+        tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
+                new OffsetAndMetadata(message.offset() + 1, batchUUID));
       }
-      long endTime = System.nanoTime();
-      counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
-      counter.addToEventReceivedCount(Long.valueOf(eventList.size()));
-      // If we have events, send events to channel
-      // clear the event list
-      // and commit if Kafka doesn't auto-commit
+
       if (eventList.size() > 0) {
+        counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
+        counter.addToEventReceivedCount((long) eventList.size());
         getChannelProcessor().processEventBatch(eventList);
         counter.addToEventAcceptedCount(eventList.size());
-        eventList.clear();
         if (log.isDebugEnabled()) {
           log.debug("Wrote {} events to channel", eventList.size());
         }
-        if (!kafkaAutoCommitEnabled) {
-          // commit the read transactions to Kafka to avoid duplicates
+        eventList.clear();
+
+        if (!tpAndOffsetMetadata.isEmpty()) {
           long commitStartTime = System.nanoTime();
-          consumer.commitOffsets();
+          consumer.commitSync(tpAndOffsetMetadata);
           long commitEndTime = System.nanoTime();
-          counter.addToKafkaCommitTimer((commitEndTime-commitStartTime)/(1000*1000));
-        }
-      }
-      if (!iterStatus) {
-        if (log.isDebugEnabled()) {
-          counter.incrementKafkaEmptyCount();
-          log.debug("Returning with backoff. No more data to read");
+          counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
+          tpAndOffsetMetadata.clear();
         }
-        return Status.BACKOFF;
+        return Status.READY;
       }
-      return Status.READY;
+
+      return Status.BACKOFF;
     } catch (Exception e) {
       log.error("KafkaSource EXCEPTION, {}", e);
       return Status.BACKOFF;
@@ -161,96 +238,153 @@ public class KafkaSource extends AbstractPollableSource
    * We configure the source and generate properties for the Kafka Consumer
    *
    * Kafka Consumer properties are generated as follows:
-   *
    * 1. Generate a properties object with some static defaults that can be
-   * overridden by Source configuration 2. We add the configuration users added
-   * for Kafka (parameters starting with kafka. and must be valid Kafka Consumer
-   * properties 3. We add the source documented parameters which can override
-   * other properties
-   *
+   * overridden if corresponding properties are specified
+   * 2. We add the configuration users added for Kafka (parameters starting
+   * with kafka.consumer and must be valid Kafka Consumer properties
+   * 3. Add source level properties (with no prefix)
    * @param context
    */
   @Override
   protected void doConfigure(Context context) throws FlumeException {
     this.context = context;
+    headers = new HashMap<String, String>(4);
+    tpAndOffsetMetadata = new HashMap<TopicPartition, OffsetAndMetadata>();
+    rebalanceFlag = new AtomicBoolean(false);
+    kafkaProps = new Properties();
+
+    // can be removed in the next release
+    // See https://issues.apache.org/jira/browse/FLUME-2896
+    translateOldProperties(context);
+
+    String topicProperty = context.getString(KafkaSourceConstants.TOPICS_REGEX);
+    if (topicProperty != null && !topicProperty.isEmpty()) {
+      // create subscriber that uses pattern-based subscription
+      subscriber = new PatternSubscriber(topicProperty);
+    } else
+    if((topicProperty = context.getString(KafkaSourceConstants.TOPICS)) != null && !topicProperty.isEmpty()) {
+      // create subscriber that uses topic list subscription
+      subscriber = new TopicListSubscriber(topicProperty);
+    } else
+    if (subscriber == null) {
+      throw new ConfigurationException("At least one Kafka topic must be specified.");
+    }
+
     batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
             KafkaSourceConstants.DEFAULT_BATCH_SIZE);
-    timeUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
+    maxBatchDurationMillis = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
             KafkaSourceConstants.DEFAULT_BATCH_DURATION);
-    topic = context.getString(KafkaSourceConstants.TOPIC);
 
-    if(topic == null) {
-      throw new ConfigurationException("Kafka topic must be specified.");
+    String bootstrapServers = context.getString(KafkaSourceConstants.BOOTSTRAP_SERVERS);
+    if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+      throw new ConfigurationException("Bootstrap Servers must be specified");
     }
 
-    kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
-    consumerTimeout = Integer.parseInt(kafkaProps.getProperty(
-            KafkaSourceConstants.CONSUMER_TIMEOUT));
-    kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty(
-            KafkaSourceConstants.AUTO_COMMIT_ENABLED));
+    setConsumerProps(context, bootstrapServers);
 
     if (counter == null) {
       counter = new KafkaSourceCounter(getName());
     }
   }
 
+
+  // We can remove this once the properties are officially deprecated
+  private void translateOldProperties(Context ctx) {
+    // topic
+    String topic = context.getString(KafkaSourceConstants.TOPIC);
+    if (topic != null && !topic.isEmpty()) {
+      subscriber = new TopicListSubscriber(topic);
+      log.warn("{} is deprecated. Please use the parameter {}",
+              KafkaSourceConstants.TOPIC, KafkaSourceConstants.TOPICS);
+    }
+
+    // old groupId
+    String groupId = ctx.getString(KafkaSourceConstants.OLD_GROUP_ID);
+    if (groupId != null && !groupId.isEmpty()) {
+      kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+      log.warn("{} is deprecated. Please use the parameter {}",
+              KafkaSourceConstants.OLD_GROUP_ID,
+              KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    }
+  }
+
+
+  private void setConsumerProps(Context ctx, String bootStrapServers) {
+    String groupId = ctx.getString(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG);
+    if ((groupId == null || groupId.isEmpty()) &&
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+        groupId = KafkaSourceConstants.DEFAULT_GROUP_ID;
+        log.info("Group ID was not specified. Using " + groupId + " as the group id.");
+    }
+    kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER);
+    kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER);
+    //Defaults overridden based on config
+    kafkaProps.putAll(ctx.getSubProperties(KafkaSourceConstants.KAFKA_CONSUMER_PREFIX));
+    //These always take precedence over config
+    kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+    if (groupId != null) {
+      kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    }
+    kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
+
+    log.info(kafkaProps.toString());
+  }
+
+  Properties getConsumerProps() {
+    return kafkaProps;
+  }
+
+  <T> Subscriber<T> getSubscriber() {
+    return subscriber;
+  }
+
   @Override
   protected void doStart() throws FlumeException {
     log.info("Starting {}...", this);
 
-    try {
-      //initialize a consumer. This creates the connection to ZooKeeper
-      consumer = KafkaSourceUtil.getConsumer(kafkaProps);
-    } catch (Exception e) {
-      throw new FlumeException("Unable to create consumer. " +
-              "Check whether the ZooKeeper server is up and that the " +
-              "Flume agent can connect to it.", e);
-    }
+    //initialize a consumer.
+    consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
 
-    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-    // We always have just one topic being read by one thread
-    topicCountMap.put(topic, 1);
+    // Subscribe for topics by already specified strategy
+    subscriber.subscribe(consumer, new SourceRebalanceListener(rebalanceFlag));
 
-    // Get the message iterator for our topic
-    // Note that this succeeds even if the topic doesn't exist
-    // in that case we simply get no messages for the topic
-    // Also note that currently we only support a single topic
-    try {
-      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
-              consumer.createMessageStreams(topicCountMap);
-      List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
-      KafkaStream<byte[], byte[]> stream = topicList.get(0);
-      it = stream.iterator();
-    } catch (Exception e) {
-      throw new FlumeException("Unable to get message iterator from Kafka", e);
-    }
-    log.info("Kafka source {} do started.", getName());
+    // Connect to kafka. 1 second is optimal time.
+    it = consumer.poll(1000).iterator();
+    log.info("Kafka source {} started.", getName());
     counter.start();
   }
 
   @Override
   protected void doStop() throws FlumeException {
     if (consumer != null) {
-      // exit cleanly. This syncs offsets of messages read to ZooKeeper
-      // to avoid reading the same messages again
-      consumer.shutdown();
+      consumer.wakeup();
+      consumer.close();
     }
     counter.stop();
-    log.info("Kafka Source {} do stopped. Metrics: {}", getName(), counter);
+    log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
   }
+}
 
-  /**
-   * Check if there are messages waiting in Kafka,
-   * waiting until timeout (10ms by default) for messages to arrive.
-   * and catching the timeout exception to return a boolean
-   */
-  boolean hasNext() {
-    try {
-      it.hasNext();
-      return true;
-    } catch (ConsumerTimeoutException e) {
-      return false;
+
+class SourceRebalanceListener implements ConsumerRebalanceListener {
+  private static final Logger log = LoggerFactory.getLogger(SourceRebalanceListener.class);
+  private AtomicBoolean rebalanceFlag;
+
+  public SourceRebalanceListener(AtomicBoolean rebalanceFlag) {
+    this.rebalanceFlag = rebalanceFlag;
+  }
+
+  // Set a flag that a rebalance has occurred. Then commit already read events to kafka.
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} revoked.", partition.topic(), partition.partition());
+      rebalanceFlag.set(true);
     }
   }
 
-}
\ No newline at end of file
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+    for (TopicPartition partition : partitions) {
+      log.info("topic {} - partition {} assigned.", partition.topic(), partition.partition());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 911012c..2999cf2 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -16,25 +16,33 @@
  */
 package org.apache.flume.source.kafka;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+
 public class KafkaSourceConstants {
-  public static final String TOPIC = "topic";
-  public static final String KEY = "key";
-  public static final String TIMESTAMP = "timestamp";
+
+  public static final String KAFKA_PREFIX = "kafka.";
+  public static final String KAFKA_CONSUMER_PREFIX = KAFKA_PREFIX + "consumer.";
+  public static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+  public static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
+  public static final String BOOTSTRAP_SERVERS = KAFKA_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+  public static final String TOPICS = KAFKA_PREFIX + "topics";
+  public static final String TOPICS_REGEX = TOPICS + "." + "regex";
+  public static final String DEFAULT_AUTO_COMMIT =  "false";
   public static final String BATCH_SIZE = "batchSize";
   public static final String BATCH_DURATION_MS = "batchDurationMillis";
-  public static final String CONSUMER_TIMEOUT = "consumer.timeout.ms";
-  public static final String AUTO_COMMIT_ENABLED = "auto.commit.enable";
-  public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
-  public static final String ZOOKEEPER_CONNECT_FLUME = "zookeeperConnect";
-  public static final String GROUP_ID = "group.id";
-  public static final String GROUP_ID_FLUME = "groupId";
-  public static final String PROPERTY_PREFIX = "kafka.";
-
-
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final int DEFAULT_BATCH_DURATION = 1000;
-  public static final String DEFAULT_CONSUMER_TIMEOUT = "10";
-  public static final String DEFAULT_AUTO_COMMIT =  "false";
   public static final String DEFAULT_GROUP_ID = "flume";
 
+  /* Old Properties */
+
+  public static final String TOPIC = "topic";
+  public static final String OLD_GROUP_ID = "groupId";
+
+  // flume event headers
+  public static final String TOPIC_HEADER = "topic";
+  public static final String KEY_HEADER = "key";
+  public static final String TIMESTAMP_HEADER = "timestamp";
+  public static final String PARTITION_HEADER = "partition";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
deleted file mode 100644
index 4a4034b..0000000
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceUtil.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.source.kafka;
-
-import java.util.Map;
-import java.util.Properties;
-
-import kafka.common.KafkaException;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.javaapi.consumer.ConsumerConnector;
-
-import org.apache.flume.Context;
-import org.apache.flume.conf.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaSourceUtil {
-  private static final Logger log =
-          LoggerFactory.getLogger(KafkaSourceUtil.class);
-
-  public static Properties getKafkaProperties(Context context) {
-    log.info("context={}",context.toString());
-    Properties props =  generateDefaultKafkaProps();
-    setKafkaProps(context,props);
-    addDocumentedKafkaProps(context,props);
-    return props;
-  }
-
-  public static ConsumerConnector getConsumer(Properties kafkaProps) {
-    ConsumerConfig consumerConfig =
-            new ConsumerConfig(kafkaProps);
-    ConsumerConnector consumer =
-            Consumer.createJavaConsumerConnector(consumerConfig);
-    return consumer;
-  }
-
-  /**
-   * Generate consumer properties object with some defaults
-   * @return
-   */
-  private static Properties generateDefaultKafkaProps() {
-    Properties props = new Properties();
-    props.put(KafkaSourceConstants.AUTO_COMMIT_ENABLED,
-            KafkaSourceConstants.DEFAULT_AUTO_COMMIT);
-    props.put(KafkaSourceConstants.CONSUMER_TIMEOUT,
-            KafkaSourceConstants.DEFAULT_CONSUMER_TIMEOUT);
-    props.put(KafkaSourceConstants.GROUP_ID,
-            KafkaSourceConstants.DEFAULT_GROUP_ID);
-    return props;
-  }
-
-  /**
-   * Add all configuration parameters starting with "kafka"
-   * to consumer properties
-   */
-  private static void setKafkaProps(Context context,Properties kafkaProps) {
-
-    Map<String,String> kafkaProperties =
-            context.getSubProperties(KafkaSourceConstants.PROPERTY_PREFIX);
-
-    for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
-
-      kafkaProps.put(prop.getKey(), prop.getValue());
-      if (log.isDebugEnabled()) {
-        log.debug("Reading a Kafka Producer Property: key: "
-                + prop.getKey() + ", value: " + prop.getValue());
-      }
-    }
-  }
-
-  /**
-   * Some of the producer properties are especially important
-   * We documented them and gave them a camel-case name to match Flume config
-   * If user set these, we will override any existing parameters with these
-   * settings.
-   * Knowledge of which properties are documented is maintained here for now.
-   * If this will become a maintenance issue we'll set a proper data structure.
-   */
-  private static void addDocumentedKafkaProps(Context context,
-                                              Properties kafkaProps)
-          throws ConfigurationException {
-    String zookeeperConnect = context.getString(
-            KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME);
-    if (zookeeperConnect == null) {
-      throw new ConfigurationException("ZookeeperConnect must contain " +
-              "at least one ZooKeeper server");
-    }
-    kafkaProps.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, zookeeperConnect);
-
-    String groupID = context.getString(KafkaSourceConstants.GROUP_ID_FLUME);
-
-    if (groupID != null ) {
-      kafkaProps.put(KafkaSourceConstants.GROUP_ID, groupID);
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
index 26c5c9d..46d545f 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java
@@ -18,27 +18,59 @@ package org.apache.flume.source.kafka;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServerStartable;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import kafka.admin.AdminUtils;
+import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
-import kafka.utils.ZKStringSerializer$;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 
 public class KafkaSourceEmbeddedKafka {
+
+  public static String HOST;
+
+  static {
+    try {
+      HOST = InetAddress.getLocalHost().getHostAddress();
+    } catch (UnknownHostException e) {
+      throw new RuntimeException("Host address can not be obtained", e);
+    }
+  }
+
   KafkaServerStartable kafkaServer;
   KafkaSourceEmbeddedZookeeper zookeeper;
+
   int zkPort = 21818; // none-standard
-  Producer<String,String> producer;
+  int serverPort = 18922;
+
+  KafkaProducer<String, String> producer;
+  File dir;
 
-  public KafkaSourceEmbeddedKafka() {
+  public KafkaSourceEmbeddedKafka(Properties properties) {
     zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
+    dir = new File(System.getProperty("java.io.tmpdir"), "kafka_log-" + UUID.randomUUID());
+    try {
+      FileUtils.deleteDirectory(dir);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
     Properties props = new Properties();
     props.put("zookeeper.connect",zookeeper.getConnectString());
     props.put("broker.id","1");
+    props.put("host.name", "localhost");
+    props.put("port", String.valueOf(serverPort));
+    props.put("log.dir", dir.getAbsolutePath());
+    if (properties != null)
+      props.putAll(props);
     KafkaConfig config = new KafkaConfig(props);
     kafkaServer = new KafkaServerStartable(config);
     kafkaServer.startup();
@@ -55,37 +87,49 @@ public class KafkaSourceEmbeddedKafka {
     return zookeeper.getConnectString();
   }
 
-  private void initProducer()
-  {
-    Properties props = new Properties();
-    props.put("metadata.broker.list","127.0.0.1:" +
-            kafkaServer.serverConfig().port());
-    props.put("serializer.class","kafka.serializer.StringEncoder");
-    props.put("request.required.acks", "1");
-
-    ProducerConfig config = new ProducerConfig(props);
-
-    producer = new Producer<String,String>(config);
+  public String getBrockers() {
+    return HOST + ":" + serverPort;
+  }
 
+  private void initProducer() {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", HOST + ":" + serverPort);
+    props.put("acks", "1");
+    producer = new KafkaProducer<String,String>(props,
+            new StringSerializer(), new StringSerializer());
   }
 
   public void produce(String topic, String k, String v) {
-    KeyedMessage<String,String> message = new KeyedMessage<String,String>(topic,k,v);
-    producer.send(message);
+    ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, k, v);
+    try {
+      producer.send(rec).get();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
   }
 
-  public void createTopic(String topicName) {
+  public void produce(String topic, int partition, String k, String v) {
+    ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, partition, k, v);
+    try {
+      producer.send(rec).get();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void createTopic(String topicName, int numPartitions) {
     // Create a ZooKeeper client
     int sessionTimeoutMs = 10000;
     int connectionTimeoutMs = 10000;
-    ZkClient zkClient = new ZkClient(zookeeper.getConnectString(),
-            sessionTimeoutMs, connectionTimeoutMs,
-            ZKStringSerializer$.MODULE$);
-
-    int numPartitions = 1;
+    ZkClient zkClient = ZkUtils.createZkClient(HOST + ":" + zkPort, sessionTimeoutMs, connectionTimeoutMs);
+    ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
     int replicationFactor = 1;
     Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkClient, topicName, numPartitions,
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions,
             replicationFactor, topicConfig);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
index 1b8a271..db144c2 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedZookeeper.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.UUID;
 
 public class KafkaSourceEmbeddedZookeeper {
   private int zkPort;
@@ -31,19 +32,25 @@ public class KafkaSourceEmbeddedZookeeper {
   File dir;
 
 
-  public KafkaSourceEmbeddedZookeeper(int zkPort){
-    int numConnections = 5000;
+  public KafkaSourceEmbeddedZookeeper(int zkPort) {
     int tickTime = 2000;
 
     this.zkPort = zkPort;
 
     String dataDirectory = System.getProperty("java.io.tmpdir");
-    dir = new File(dataDirectory, "zookeeper").getAbsoluteFile();
+    dir = new File(dataDirectory, "zookeeper" + UUID.randomUUID()).getAbsoluteFile();
+
+    try {
+      FileUtils.deleteDirectory(dir);
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.exit(1);
+    }
 
     try {
       this.zookeeper = new ZooKeeperServer(dir,dir,tickTime);
       this.factory = new NIOServerCnxnFactory();
-      factory.configure(new InetSocketAddress("127.0.0.1",zkPort),0);
+      factory.configure(new InetSocketAddress(KafkaSourceEmbeddedKafka.HOST, zkPort),0);
       factory.startup(zookeeper);
     } catch (IOException e) {
       e.printStackTrace();
@@ -59,6 +66,6 @@ public class KafkaSourceEmbeddedZookeeper {
   }
 
   public String getConnectString() {
-    return "127.0.0.1:"+zkPort;
+    return KafkaSourceEmbeddedKafka.HOST + ":" + zkPort;
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 8ec14cc..8e04da8 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -23,17 +23,18 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
 import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import junit.framework.Assert;
 import kafka.common.TopicExistsException;
-import kafka.consumer.ConsumerIterator;
-import kafka.message.Message;
 
 import org.apache.flume.*;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,42 +43,42 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flume.source.kafka.KafkaSourceConstants.*;
+
 public class TestKafkaSource {
   private static final Logger log =
           LoggerFactory.getLogger(TestKafkaSource.class);
 
   private KafkaSource kafkaSource;
   private KafkaSourceEmbeddedKafka kafkaServer;
-  private ConsumerIterator<byte[], byte[]> mockIt;
-  private Message message;
   private Context context;
   private List<Event> events;
-  private String topicName = "test1";
-
+  private String topic0 = "test1";
+  private String topic1 = "topic1";
 
   @SuppressWarnings("unchecked")
   @Before
   public void setup() throws Exception {
-
     kafkaSource = new KafkaSource();
-    kafkaServer = new KafkaSourceEmbeddedKafka();
+    kafkaServer = new KafkaSourceEmbeddedKafka(null);
     try {
-      kafkaServer.createTopic(topicName);
+      kafkaServer.createTopic(topic0, 1);
+      kafkaServer.createTopic(topic1, 3);
     } catch (TopicExistsException e) {
       //do nothing
+      e.printStackTrace();
     }
-
-
-    context = new Context();
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,
-            kafkaServer.getZkConnectString());
-    context.put(KafkaSourceConstants.GROUP_ID_FLUME,"flume");
-    context.put(KafkaSourceConstants.TOPIC,topicName);
-    context.put("kafka.consumer.timeout.ms","100");
-
+    context = prepareDefaultContext();
     kafkaSource.setChannelProcessor(createGoodChannel());
   }
 
+  private Context prepareDefaultContext() {
+    Context context = new Context();
+    context.put(BOOTSTRAP_SERVERS, kafkaServer.getBrockers());
+    context.put(KAFKA_CONSUMER_PREFIX + "group.id", "flume-group");
+    return context;
+  }
+
   @After
   public void tearDown() throws Exception {
     kafkaSource.stop();
@@ -86,19 +87,89 @@ public class TestKafkaSource {
 
   @SuppressWarnings("unchecked")
   @Test
+  public void testOffsets() throws InterruptedException, EventDeliveryException {
+    long batchDuration = 2000;
+    context.put(TOPICS, topic1);
+    context.put(BATCH_DURATION_MS,
+            String.valueOf(batchDuration));
+    context.put(BATCH_SIZE, "3");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    Thread.sleep(500L);
+    Status status = kafkaSource.process();
+    assertEquals(Status.BACKOFF, status);
+    assertEquals(0, events.size());
+    kafkaServer.produce(topic1, "", "record1");
+    kafkaServer.produce(topic1, "", "record2");
+    Thread.sleep(500L);
+    status = kafkaSource.process();
+    assertEquals(Status.READY, status);
+    assertEquals(2, events.size());
+    events.clear();
+    kafkaServer.produce(topic1, "", "record3");
+    kafkaServer.produce(topic1, "", "record4");
+    kafkaServer.produce(topic1, "", "record5");
+    Thread.sleep(500L);
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record3", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record4", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record5", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+    kafkaServer.produce(topic1, "", "record6");
+    kafkaServer.produce(topic1, "", "record7");
+    kafkaServer.produce(topic1, "", "record8");
+    kafkaServer.produce(topic1, "", "record9");
+    kafkaServer.produce(topic1, "", "record10");
+    Thread.sleep(500L);
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record6", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record7", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record8", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+    kafkaServer.produce(topic1, "", "record11");
+    // status must be READY due to time out exceed.
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record9", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record10", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record11", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+    kafkaServer.produce(topic1, "", "record12");
+    kafkaServer.produce(topic1, "", "record13");
+    // stop kafka source
+    kafkaSource.stop();
+    // start again
+    kafkaSource = new KafkaSource();
+    kafkaSource.setChannelProcessor(createGoodChannel());
+    kafkaSource.configure(context);
+    kafkaSource.start();
+    kafkaServer.produce(topic1, "", "record14");
+    Thread.sleep(1000L);
+    assertEquals(Status.READY, kafkaSource.process());
+    assertEquals(3, events.size());
+    assertEquals("record12", new String(events.get(0).getBody(), Charsets.UTF_8));
+    assertEquals("record13", new String(events.get(1).getBody(), Charsets.UTF_8));
+    assertEquals("record14", new String(events.get(2).getBody(), Charsets.UTF_8));
+    events.clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
   public void testProcessItNotEmpty() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topic0, "", "hello, world");
 
     Thread.sleep(500L);
-
     Assert.assertEquals(Status.READY, kafkaSource.process());
     Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
     Assert.assertEquals(1, events.size());
@@ -112,14 +183,15 @@ public class TestKafkaSource {
   public void testProcessItNotEmptyBatch() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"2");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"2");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
-    kafkaServer.produce(topicName, "", "foo, bar");
+    kafkaServer.produce(topic0, "", "hello, world");
+    kafkaServer.produce(topic0, "", "foo, bar");
 
     Thread.sleep(500L);
 
@@ -138,6 +210,7 @@ public class TestKafkaSource {
   public void testProcessItEmpty() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
+    context.put(TOPICS, topic0);
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
@@ -151,7 +224,7 @@ public class TestKafkaSource {
   public void testNonExistingTopic() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.TOPIC,"faketopic");
+    context.put(TOPICS,"faketopic");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
@@ -162,10 +235,11 @@ public class TestKafkaSource {
 
   @SuppressWarnings("unchecked")
   @Test(expected= FlumeException.class)
-  public void testNonExistingZk() throws EventDeliveryException,
+  public void testNonExistingKafkaServer() throws EventDeliveryException,
           SecurityException, NoSuchFieldException, IllegalArgumentException,
           IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME,"blabla:666");
+    context.put(TOPICS, topic0);
+    context.put(BOOTSTRAP_SERVERS,"blabla:666");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
@@ -177,37 +251,39 @@ public class TestKafkaSource {
   @Test
   public void testBatchTime() throws InterruptedException,
           EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"250");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_DURATION_MS, "250");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
     for (int i=1; i<5000; i++) {
-      kafkaServer.produce(topicName, "", "hello, world " + i);
+      kafkaServer.produce(topic0, "", "hello, world " + i);
     }
     Thread.sleep(500L);
 
+    long error = 50;
     long startTime = System.currentTimeMillis();
     Status status = kafkaSource.process();
     long endTime = System.currentTimeMillis();
     assertEquals(Status.READY, status);
     assertTrue(endTime - startTime <
-            ( context.getLong(KafkaSourceConstants.BATCH_DURATION_MS) +
-            context.getLong("kafka.consumer.timeout.ms")) );
+            (context.getLong(BATCH_DURATION_MS) + error));
   }
 
   // Consume event, stop source, start again and make sure we are not
   // consuming same event again
   @Test
   public void testCommit() throws InterruptedException, EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topic0, "", "hello, world");
 
     Thread.sleep(500L);
 
@@ -224,14 +300,14 @@ public class TestKafkaSource {
   @Test
   public void testNonCommit() throws EventDeliveryException,
           InterruptedException {
-
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"1");
+    context.put(BATCH_DURATION_MS,"30000");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "hello, world");
+    kafkaServer.produce(topic0, "", "hello, world");
     Thread.sleep(500L);
 
     kafkaSource.setChannelProcessor(createBadChannel());
@@ -252,13 +328,14 @@ public class TestKafkaSource {
   @Test
   public void testTwoBatches() throws InterruptedException,
           EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"1");
+    context.put(BATCH_DURATION_MS, "30000");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "event 1");
+    kafkaServer.produce(topic0, "", "event 1");
     Thread.sleep(500L);
 
     kafkaSource.process();
@@ -266,7 +343,7 @@ public class TestKafkaSource {
             Charsets.UTF_8));
     events.clear();
 
-    kafkaServer.produce(topicName, "", "event 2");
+    kafkaServer.produce(topic0, "", "event 2");
     Thread.sleep(500L);
     kafkaSource.process();
     Assert.assertEquals("event 2", new String(events.get(0).getBody(),
@@ -276,14 +353,15 @@ public class TestKafkaSource {
   @Test
   public void testTwoBatchesWithAutocommit() throws InterruptedException,
           EventDeliveryException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
-    context.put(KafkaSourceConstants.BATCH_DURATION_MS,"30000");
-    context.put("kafka.auto.commit.enable","true");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE,"1");
+    context.put(BATCH_DURATION_MS,"30000");
+    context.put(KAFKA_CONSUMER_PREFIX + "enable.auto.commit", "true");
     kafkaSource.configure(context);
     kafkaSource.start();
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, "", "event 1");
+    kafkaServer.produce(topic0, "", "event 1");
     Thread.sleep(500L);
 
     kafkaSource.process();
@@ -291,7 +369,7 @@ public class TestKafkaSource {
             Charsets.UTF_8));
     events.clear();
 
-    kafkaServer.produce(topicName, "", "event 2");
+    kafkaServer.produce(topic0, "", "event 2");
     Thread.sleep(500L);
     kafkaSource.process();
     Assert.assertEquals("event 2", new String(events.get(0).getBody(),
@@ -304,13 +382,14 @@ public class TestKafkaSource {
   public void testNullKey() throws EventDeliveryException,
       SecurityException, NoSuchFieldException, IllegalArgumentException,
       IllegalAccessException, InterruptedException {
-    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
     kafkaSource.configure(context);
     kafkaSource.start();
 
     Thread.sleep(500L);
 
-    kafkaServer.produce(topicName, null , "hello, world");
+    kafkaServer.produce(topic0, null, "hello, world");
 
     Thread.sleep(500L);
 
@@ -322,6 +401,110 @@ public class TestKafkaSource {
         Charsets.UTF_8));
   }
 
+  @Test
+  public void testSourceProperties() {
+    Context context = new Context();
+    context.put(TOPICS, "test1, test2");
+    context.put(TOPICS_REGEX, "^stream[0-9]$");
+    context.put(BOOTSTRAP_SERVERS, "bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+
+    //check that kafka.topics.regex has higher priority than topics
+    //type of subscriber should be PatternSubscriber
+    KafkaSource.Subscriber<Pattern> subscriber = source.getSubscriber();
+    Pattern pattern = subscriber.get();
+    Assert.assertTrue(pattern.matcher("stream1").find());
+  }
+
+  @Test
+  public void testKafkaProperties() {
+    Context context = new Context();
+    context.put(TOPICS, "test1, test2");
+    context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.default.group.id");
+    context.put(KAFKA_CONSUMER_PREFIX + "fake.property", "kafka.property.value");
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    context.put(KAFKA_CONSUMER_PREFIX + "bootstrap.servers", "bad-bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+    Properties kafkaProps = source.getConsumerProps();
+
+    //check that we have defaults set
+    assertEquals(
+            String.valueOf(DEFAULT_AUTO_COMMIT),
+            kafkaProps.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    //check that kafka properties override the default and get correct name
+    assertEquals(
+            "override.default.group.id",
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+    //check that any kafka property gets in
+    assertEquals(
+            "kafka.property.value",
+            kafkaProps.getProperty("fake.property"));
+    //check that documented property overrides defaults
+    assertEquals(
+            "real-bootstrap-servers-list",
+            kafkaProps.getProperty("bootstrap.servers"));
+  }
+
+  @Test
+  public void testOldProperties() {
+    Context context = new Context();
+
+    context.put(TOPIC, "old.topic");
+    context.put(OLD_GROUP_ID, "old.groupId");
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+    Properties kafkaProps = source.getConsumerProps();
+
+    KafkaSource.Subscriber<List<String>> subscriber = source.getSubscriber();
+    //check topic was set
+    assertEquals(
+            "old.topic",
+            subscriber.get().get(0));
+    //check that kafka old properties override the default and get correct name
+    assertEquals(
+            "old.groupId",
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+
+    source = new KafkaSource();
+    context.put(KAFKA_CONSUMER_PREFIX + ConsumerConfig.GROUP_ID_CONFIG, "override.old.group.id");
+    source.doConfigure(context);
+    kafkaProps = source.getConsumerProps();
+    //check that kafka new properties override old
+    assertEquals(
+            "override.old.group.id",
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+
+    context.clear();
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    context.put(TOPIC, "old.topic");
+    source = new KafkaSource();
+    source.doConfigure(context);
+    kafkaProps = source.getConsumerProps();
+    //check defaults set
+    assertEquals(
+            KafkaSourceConstants.DEFAULT_GROUP_ID,
+            kafkaProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
+  }
+
+  @Test
+  public void testPatternBasedSubscription() {
+    Context context = new Context();
+
+    context.put(TOPICS_REGEX, "^topic[0-9]$");
+    context.put(OLD_GROUP_ID, "old.groupId");
+    context.put(BOOTSTRAP_SERVERS, "real-bootstrap-servers-list");
+    KafkaSource source = new KafkaSource();
+    source.doConfigure(context);
+    KafkaSource.Subscriber<Pattern> subscriber = source.getSubscriber();
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(subscriber.get().matcher("topic" + i).find());
+    }
+    Assert.assertFalse(subscriber.get().matcher("topic").find());
+  }
+
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);
@@ -352,4 +535,4 @@ public class TestKafkaSource {
 
     return channelProcessor;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
deleted file mode 100644
index 0cbb4b6..0000000
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSourceUtil.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.source.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.flume.Context;
-import org.apache.zookeeper.server.*;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestKafkaSourceUtil {
-  private Properties props = new Properties();
-  private Context context = new Context();
-  private int zkPort = 21818; // none-standard
-  private KafkaSourceEmbeddedZookeeper zookeeper;
-
-  @Before
-  public void setUp() throws Exception {
-    context.put("kafka.consumer.timeout", "10");
-    context.put("type", "KafkaSource");
-    context.put("topic", "test");
-    context.put("zookeeperConnect", "127.0.0.1:"+zkPort);
-    context.put("groupId","test");
-    props = KafkaSourceUtil.getKafkaProperties(context);
-    zookeeper = new KafkaSourceEmbeddedZookeeper(zkPort);
-
-
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    zookeeper.stopZookeeper();
-  }
-
-
-  @Test
-  public void testGetConsumer() {
-    ConsumerConnector cc = KafkaSourceUtil.getConsumer(props);
-    assertNotNull(cc);
-
-  }
-
-  @Test
-  public void testKafkaConsumerProperties() {
-    Context context = new Context();
-    context.put("kafka.auto.commit.enable", "override.default.autocommit");
-    context.put("kafka.fake.property", "kafka.property.value");
-    context.put("kafka.zookeeper.connect","bad-zookeeper-list");
-    context.put("zookeeperConnect","real-zookeeper-list");
-    Properties kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
-
-    //check that we have defaults set
-    assertEquals(
-            kafkaProps.getProperty(KafkaSourceConstants.GROUP_ID),
-            KafkaSourceConstants.DEFAULT_GROUP_ID);
-    //check that kafka properties override the default and get correct name
-    assertEquals(
-            kafkaProps.getProperty(KafkaSourceConstants.AUTO_COMMIT_ENABLED),
-            "override.default.autocommit");
-    //check that any kafka property gets in
-    assertEquals(kafkaProps.getProperty("fake.property"),
-            "kafka.property.value");
-    //check that documented property overrides defaults
-    assertEquals(kafkaProps.getProperty("zookeeper.connect")
-            ,"real-zookeeper-list");
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/f8abaf78/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 15c086b..3b2c97c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@ limitations under the License.
     <elasticsearch.version>0.90.1</elasticsearch.version>
     <hadoop2.version>2.4.0</hadoop2.version>
     <thrift.version>0.7.0</thrift.version>
+    <kafka.version>0.9.0.1</kafka.version>
     <kite.version>1.0.0</kite.version>
     <hive.version>1.0.0</hive.version>
     <xalan.verion>2.7.1</xalan.verion>
@@ -1337,12 +1338,12 @@ limitations under the License.
       <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.10</artifactId>
-        <version>0.8.1.1</version>
+        <version>${kafka.version}</version>
       </dependency>
       <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.10</artifactId>
-        <version>0.8.1.1</version>
+        <version>${kafka.version}</version>
         <classifier>test</classifier>
         <scope>test</scope>
       </dependency>


Mime
View raw message