flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2499. Include Kafka Message Key in Event Headers.
Date Thu, 16 Oct 2014 05:21:54 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk f979b2683 -> 622faa6f8


FLUME-2499. Include Kafka Message Key in Event Headers.

(Ricky Saltzer via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/622faa6f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/622faa6f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/622faa6f

Branch: refs/heads/trunk
Commit: 622faa6f8812cd751dfad22deaa21bf0aa613111
Parents: f979b26
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Wed Oct 15 22:20:34 2014 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Wed Oct 15 22:21:49 2014 -0700

----------------------------------------------------------------------
 .../apache/flume/source/kafka/KafkaSource.java  | 67 ++++++++++++--------
 .../source/kafka/KafkaSourceConstants.java      |  1 +
 2 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/622faa6f/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 9d77b47..7bc03da 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -27,6 +27,7 @@ import kafka.consumer.ConsumerTimeoutException;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 
+import kafka.message.MessageAndMetadata;
 import org.apache.flume.*;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
@@ -38,23 +39,30 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * A Source for Kafka which reads messages from kafka.
- * I use this in company production environment and its performance is good.
- * Over 100k messages per second can be read from kafka in one source.<p>
- * <tt>kafka.zookeeper.connect: </tt> the zookeeper ip kafka use.<p>
- * <tt>kafka.group.id: </tt> the groupid of consumer group.<p>
- * <tt>topic: </tt> the topic to read from kafka.<p>
- * maxBatchSize - maximum number of messages written to Channel in one batch
- * maxBatchDurationMillis - maximum time before a batch (of any size)
- *                          will be written to Channel
- * kafka.auto.commit.enable - if true, commit automatically every time period.
- *                      if false, commit on each batch.
- * kafka.consumer.timeout.ms -  polling interval for new data for batch.
- *                        Low value means more CPU usage.
- *                        High value means the time.upper.limit may be missed.
+ * A Source for Kafka which reads messages from a kafka topic.
  *
- * Any property starting with "kafka" will be passed to the kafka consumer
- * So you can use any configuration supported by Kafka 0.8.1.1
+ * <tt>zookeeperConnect: </tt> Kafka's zookeeper connection string.
+ * <b>Required</b>
+ * <p>
+ * <tt>groupId: </tt> the group ID of consumer group. <b>Required</b>
+ * <p>
+ * <tt>topic: </tt> the topic to consume messages from. <b>Required</b>
+ * <p>
+ * <tt>maxBatchSize: </tt> Maximum number of messages written to Channel in one
+ * batch. Default: 1000
+ * <p>
+ * <tt>maxBatchDurationMillis: </tt> Maximum number of milliseconds before a
+ * batch (of any size) will be written to a channel. Default: 1000
+ * <p>
+ * <tt>kafka.auto.commit.enable: </tt> If true, commit automatically every time
+ * period. if false, commit on each batch. Default: false
+ * <p>
+ * <tt>kafka.consumer.timeout.ms: </tt> Polling interval for new data for batch.
+ * Low value means more CPU usage. High value means the time.upper.limit may be
+ * missed. Default: 10
+ *
+ * Any property starting with "kafka" will be passed to the kafka consumer So
+ * you can use any configuration supported by Kafka 0.8.1.1
  */
 public class KafkaSource extends AbstractSource
         implements Configurable, PollableSource {
@@ -72,7 +80,8 @@ public class KafkaSource extends AbstractSource
 
   public Status process() throws EventDeliveryException {
 
-    byte[] bytes;
+    byte[] kafkaMessage;
+    byte[] kafkaKey;
     Event event;
     Map<String, String> headers;
     long batchStartTime = System.currentTimeMillis();
@@ -84,16 +93,20 @@ public class KafkaSource extends AbstractSource
         iterStatus = hasNext();
         if (iterStatus) {
           // get next message
-          bytes = it.next().message();
+          MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
+          kafkaMessage = messageAndMetadata.message();
+          kafkaKey = messageAndMetadata.key();
 
+          // Add headers to event (topic, timestamp, and key)
           headers = new HashMap<String, String>();
           headers.put(KafkaSourceConstants.TIMESTAMP,
                   String.valueOf(System.currentTimeMillis()));
-          headers.put(KafkaSourceConstants.TOPIC,topic);
+          headers.put(KafkaSourceConstants.TOPIC, topic);
+          headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
           if (log.isDebugEnabled()) {
-            log.debug("Message: {}", new String(bytes));
+            log.debug("Message: {}", new String(kafkaMessage));
           }
-          event = EventBuilder.withBody(bytes, headers);
+          event = EventBuilder.withBody(kafkaMessage, headers);
           eventList.add(event);
         }
         if (log.isDebugEnabled()) {
@@ -132,12 +145,12 @@ public class KafkaSource extends AbstractSource
    * We configure the source and generate properties for the Kafka Consumer
    *
    * Kafka Consumer properties are generated as follows:
-   * 1. Generate a properties object with some static defaults that
-   * can be overridden by Source configuration
-   * 2. We add the configuration users added for Kafka (parameters starting
-   * with kafka. and must be valid Kafka Consumer properties
-   * 3. We add the source documented parameters which can override other
-   * properties
+   *
+   * 1. Generate a properties object with some static defaults that can be
+   * overridden by Source configuration 2. We add the configuration users added
+   * for Kafka (parameters starting with kafka. and must be valid Kafka Consumer
+   * properties 3. We add the source documented parameters which can override
+   * other properties
    *
    * @param context
    */

http://git-wip-us.apache.org/repos/asf/flume/blob/622faa6f/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
index 7390618..911012c 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java
@@ -18,6 +18,7 @@ package org.apache.flume.source.kafka;
 
 public class KafkaSourceConstants {
   public static final String TOPIC = "topic";
+  public static final String KEY = "key";
   public static final String TIMESTAMP = "timestamp";
   public static final String BATCH_SIZE = "batchSize";
   public static final String BATCH_DURATION_MS = "batchDurationMillis";


Mime
View raw message