flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject flume git commit: FLUME-2734: Kafka Channel timeout property is overridden by default value
Date Wed, 30 Sep 2015 16:34:57 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk a2f55e180 -> d8d97db4b


FLUME-2734: Kafka Channel timeout property is overridden by default value

(Johny Rufus via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d8d97db4
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d8d97db4
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d8d97db4

Branch: refs/heads/trunk
Commit: d8d97db4be954fc15e4632d6c9ae5dd8f46c189c
Parents: a2f55e1
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Wed Sep 30 09:34:31 2015 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Wed Sep 30 09:34:31 2015 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/kafka/KafkaChannel.java  |  5 +++--
 .../flume/channel/kafka/TestKafkaChannel.java     | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d8d97db4/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 80a122d..c83d4f6 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -177,13 +177,14 @@ public class KafkaChannel extends BasicChannelSemantics {
       throw new ConfigurationException(
         "Zookeeper Connection must be specified");
     }
-    Long timeout = ctx.getLong(TIMEOUT, Long.valueOf(DEFAULT_TIMEOUT));
     kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX));
     kafkaConf.put(GROUP_ID, groupId);
     kafkaConf.put(BROKER_LIST_KEY, brokerList);
     kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect);
     kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false));
-    kafkaConf.put(CONSUMER_TIMEOUT, String.valueOf(timeout));
+    if(kafkaConf.get(CONSUMER_TIMEOUT) == null) {
+      kafkaConf.put(CONSUMER_TIMEOUT, DEFAULT_TIMEOUT);
+    }
     kafkaConf.put(REQUIRED_ACKS_KEY, "-1");
     LOGGER.info(kafkaConf.toString());
     parseAsFlumeEvent =

http://git-wip-us.apache.org/repos/asf/flume/blob/d8d97db4/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index e665431..25b9e40 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -35,6 +35,8 @@ import org.apache.flume.event.EventBuilder;
 import org.apache.flume.sink.kafka.util.TestUtil;
 import org.junit.*;
 
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -149,6 +151,22 @@ public class TestKafkaChannel {
     channel.stop();
   }
 
+  @Test
+  public void testTimeoutConfig() throws Exception {
+    Context context = prepareDefaultContext(true);
+    KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+    Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
+      .equals(DEFAULT_TIMEOUT));
+
+    String timeout = "1000";
+    context.put("kafka."+CONSUMER_TIMEOUT, timeout);
+    channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+    Assert.assertTrue(channel.getKafkaConf().get(CONSUMER_TIMEOUT)
+            .equals(timeout));
+  }
+
   /**
    * This method starts a channel, puts events into it. The channel is then
    * stopped and restarted. Then we check to make sure if all events we put


Mime
View raw message