Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 80B69109C5 for ; Tue, 30 Dec 2014 19:43:45 +0000 (UTC) Received: (qmail 74094 invoked by uid 500); 30 Dec 2014 19:43:46 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 74059 invoked by uid 500); 30 Dec 2014 19:43:46 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 74050 invoked by uid 99); 30 Dec 2014 19:43:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Dec 2014 19:43:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AAAFDA0F60F; Tue, 30 Dec 2014 19:43:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flume git commit: FLUME-2578. Kafka source throws NPE if Kafka record has null key Date: Tue, 30 Dec 2014 19:43:45 +0000 (UTC) Repository: flume Updated Branches: refs/heads/flume-1.6 159178aa6 -> 70ba4a97f FLUME-2578. Kafka source throws NPE if Kafka record has null key (Gwen Shapira via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/70ba4a97 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/70ba4a97 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/70ba4a97 Branch: refs/heads/flume-1.6 Commit: 70ba4a97f11d8afd0f1d3f1eb31f7958ca808f29 Parents: 159178a Author: Hari Shreedharan Authored: Tue Dec 30 11:42:47 2014 -0800 Committer: Hari Shreedharan Committed: Tue Dec 30 11:43:31 2014 -0800 ---------------------------------------------------------------------- .../apache/flume/source/kafka/KafkaSource.java | 4 +++- .../flume/source/kafka/TestKafkaSource.java | 25 ++++++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/70ba4a97/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 7bc03da..00a81c6 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -102,7 +102,9 @@ public class KafkaSource extends AbstractSource headers.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis())); headers.put(KafkaSourceConstants.TOPIC, topic); - headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); + if (kafkaKey != null) { + headers.put(KafkaSourceConstants.KEY, new String(kafkaKey)); + } if (log.isDebugEnabled()) { log.debug("Message: {}", new String(kafkaMessage)); } http://git-wip-us.apache.org/repos/asf/flume/blob/70ba4a97/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 72eec77..8ec14cc 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -105,8 +105,6 @@ public class TestKafkaSource { Assert.assertEquals("hello, world", new String(events.get(0).getBody(), Charsets.UTF_8)); - - } @SuppressWarnings("unchecked") @@ -301,6 +299,29 @@ public class TestKafkaSource { } + @SuppressWarnings("unchecked") + @Test + public void testNullKey() throws EventDeliveryException, + SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException, InterruptedException { + context.put(KafkaSourceConstants.BATCH_SIZE,"1"); + kafkaSource.configure(context); + kafkaSource.start(); + + Thread.sleep(500L); + + kafkaServer.produce(topicName, null , "hello, world"); + + Thread.sleep(500L); + + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + + Assert.assertEquals("hello, world", new String(events.get(0).getBody(), + Charsets.UTF_8)); + } + ChannelProcessor createGoodChannel() { ChannelProcessor channelProcessor = mock(ChannelProcessor.class);