camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoordo...@apache.org
Subject camel git commit: CAMEL-10115: introduced pollTimeoutMs with default 5000
Date Mon, 04 Jul 2016 12:21:40 GMT
Repository: camel
Updated Branches:
  refs/heads/CAMEL-10115 [created] e27af6d81


CAMEL-10115: introduced pollTimeoutMs with default 5000


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e27af6d8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e27af6d8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e27af6d8

Branch: refs/heads/CAMEL-10115
Commit: e27af6d815e683b7b2d046a4b7c54bd625e9bcf8
Parents: 16cd1a7
Author: Arno Noordover <anoordover@users.noreply.github.com>
Authored: Sun Jul 3 20:30:03 2016 +0200
Committer: Arno Noordover <anoordover@users.noreply.github.com>
Committed: Mon Jul 4 13:54:49 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc |  7 ++-
 .../component/kafka/KafkaConfiguration.java     | 15 +++++-
 .../camel/component/kafka/KafkaConsumer.java    |  4 +-
 .../component/kafka/KafkaConsumerTest.java      | 12 +++--
 .../clients/consumer/KafkaConsumerTest.java     | 52 ++++++++++++++++++++
 5 files changed, 83 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index 557eec2..575c6a2 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -87,8 +87,10 @@ The Kafka component supports 1 options which are listed below.
 
 
 
+
+
 // endpoint options: START
-The Kafka component supports 73 endpoint options which are listed below:
+The Kafka component supports 74 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -115,6 +117,7 @@ The Kafka component supports 73 endpoint options which are listed below:
 | keyDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer |
String | Deserializer class for key that implements the Deserializer interface.
 | maxPartitionFetchBytes | consumer | 1048576 | Integer | The maximum amount of data per-partition
the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes.
This size must be at least as large as the maximum message size the server allows or else
it is possible for the producer to send messages larger than the consumer can fetch. If that
happens the consumer can get stuck trying to fetch a large message on a certain partition.
 | partitionAssignor | consumer | org.apache.kafka.clients.consumer.RangeAssignor | String
| The class name of the partition assignment strategy that the client will use to distribute
partition ownership amongst consumer instances when group management is used
+| pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the KafkaConsumer.
 | seekToBeginning | consumer | false | boolean | If the option is true then KafkaConsumer
will read from beginning on startup.
 | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when
using Kafka's group management facilities.
 | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer
| String | Deserializer class for value that implements the Deserializer interface.
@@ -180,6 +183,8 @@ The Kafka component supports 73 endpoint options which are listed below:
 
 
 
+
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 35ebd36..c69f32f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -71,7 +71,9 @@ public class KafkaConfiguration {
     //session.timeout.ms
     @UriParam(label = "consumer", defaultValue = "30000")
     private Integer sessionTimeoutMs = 30000;
-    //auto.offset.reset
+    @UriParam(label = "consumer", defaultValue = "5000")
+    private Long pollTimeoutMs = 5000L;
+    //auto.offset.reset1
     @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none")
     private String autoOffsetReset = "latest";
     //partition.assignment.strategy
@@ -1074,6 +1076,17 @@ public class KafkaConfiguration {
         this.sessionTimeoutMs = sessionTimeoutMs;
     }
 
+    public Long getPollTimeoutMs() {
+        return pollTimeoutMs;
+    }
+
+    /**
+     * The timeout used when polling the KafkaConsumer.
+     */
+    public void setPollTimeoutMs(Long pollTimeoutMs) {
+        this.pollTimeoutMs = pollTimeoutMs;
+    }
+
     public String getPartitionAssignor() {
         return partitionAssignor;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 4780dfe..65ec6e3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -41,11 +41,13 @@ public class KafkaConsumer extends DefaultConsumer {
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
+    private final Long pollTimeoutMs;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
+        this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
 
         if (endpoint.getBrokers() == null) {
             throw new IllegalArgumentException("BootStrap servers must be specified");
@@ -125,7 +127,7 @@ public class KafkaConsumer extends DefaultConsumer {
                     consumer.seekToBeginning(consumer.assignment());
                 }
                 while (isRunAllowed() && !isSuspendingOrSuspended()) {
-                    ConsumerRecords<Object, Object> allRecords = consumer.poll(Long.MAX_VALUE);
+                    ConsumerRecords<Object, Object> allRecords = consumer.poll(pollTimeoutMs);
                     for (TopicPartition partition : allRecords.partitions()) {
                         List<ConsumerRecord<Object, Object>> partitionRecords
= allRecords
                             .records(partition);

http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 86bc163..eaf880f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -21,6 +21,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class KafkaConsumerTest {
 
@@ -29,20 +30,23 @@ public class KafkaConsumerTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresBootstrapServers() throws Exception {
-        Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
+        when(endpoint.getGroupId()).thenReturn("groupOne");
+        when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration());
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void consumerRequiresGroupId() throws Exception {
-        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:1234");
+        when(endpoint.getBrokers()).thenReturn("localhost:1234");
+        when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration());
         new KafkaConsumer(endpoint, processor);
     }
 
     @Test
     public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception {
-        Mockito.when(endpoint.getGroupId()).thenReturn("groupOne");
-        Mockito.when(endpoint.getBrokers()).thenReturn("localhost:2181");
+        when(endpoint.getGroupId()).thenReturn("groupOne");
+        when(endpoint.getBrokers()).thenReturn("localhost:2181");
+        when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration());
         new KafkaConsumer(endpoint, processor);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
new file mode 100644
index 0000000..809a267
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.hamcrest.core.IsNot;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaConsumerTest {
+
+    @Mock
+    private KafkaConsumer<Object, Object> kafkaConsumer;
+
+    @Before
+    public void init() {
+        when(kafkaConsumer.poll(1000)).thenReturn(ConsumerRecords.empty());
+    }
+    @Test
+    public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() {
+        ConsumerRecords<Object, Object> consumerRecords = kafkaConsumer.poll(1000);
+        assertThat(consumerRecords, IsNot.not(nullValue()));
+    }
+
+    @Test
+    public void testPollGivenReturnsEmptyPartitionsShouldNotBeNull() {
+        ConsumerRecords<Object, Object> consumerRecords = kafkaConsumer.poll(1000);
+        assertThat(consumerRecords.partitions(), IsNot.not(nullValue()));
+    }
+}


Mime
View raw message