flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject flume git commit: FLUME-2578. Kafka source throws NPE if Kafka record has null key
Date Tue, 30 Dec 2014 19:43:45 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.6 159178aa6 -> 70ba4a97f


FLUME-2578. Kafka source throws NPE if Kafka record has null key

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/70ba4a97
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/70ba4a97
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/70ba4a97

Branch: refs/heads/flume-1.6
Commit: 70ba4a97f11d8afd0f1d3f1eb31f7958ca808f29
Parents: 159178a
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Tue Dec 30 11:42:47 2014 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Tue Dec 30 11:43:31 2014 -0800

----------------------------------------------------------------------
 .../apache/flume/source/kafka/KafkaSource.java  |  4 +++-
 .../flume/source/kafka/TestKafkaSource.java     | 25 ++++++++++++++++++--
 2 files changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/70ba4a97/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 7bc03da..00a81c6 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -102,7 +102,9 @@ public class KafkaSource extends AbstractSource
           headers.put(KafkaSourceConstants.TIMESTAMP,
                   String.valueOf(System.currentTimeMillis()));
           headers.put(KafkaSourceConstants.TOPIC, topic);
-          headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+          if (kafkaKey != null) {
+            headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
+          }
           if (log.isDebugEnabled()) {
             log.debug("Message: {}", new String(kafkaMessage));
           }

http://git-wip-us.apache.org/repos/asf/flume/blob/70ba4a97/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 72eec77..8ec14cc 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -105,8 +105,6 @@ public class TestKafkaSource {
 
     Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
             Charsets.UTF_8));
-
-
   }
 
   @SuppressWarnings("unchecked")
@@ -301,6 +299,29 @@ public class TestKafkaSource {
 
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNullKey() throws EventDeliveryException,
+      SecurityException, NoSuchFieldException, IllegalArgumentException,
+      IllegalAccessException, InterruptedException {
+    context.put(KafkaSourceConstants.BATCH_SIZE,"1");
+    kafkaSource.configure(context);
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topicName, null , "hello, world");
+
+    Thread.sleep(500L);
+
+    Assert.assertEquals(Status.READY, kafkaSource.process());
+    Assert.assertEquals(Status.BACKOFF, kafkaSource.process());
+    Assert.assertEquals(1, events.size());
+
+    Assert.assertEquals("hello, world", new String(events.get(0).getBody(),
+        Charsets.UTF_8));
+  }
+
   ChannelProcessor createGoodChannel() {
 
     ChannelProcessor channelProcessor = mock(ChannelProcessor.class);


Mime
View raw message