Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C7EB7200B30 for ; Mon, 4 Jul 2016 14:21:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C671E160A65; Mon, 4 Jul 2016 12:21:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C1F1A160A55 for ; Mon, 4 Jul 2016 14:21:43 +0200 (CEST) Received: (qmail 75413 invoked by uid 500); 4 Jul 2016 12:21:41 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 75404 invoked by uid 99); 4 Jul 2016 12:21:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jul 2016 12:21:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6EF00E03C0; Mon, 4 Jul 2016 12:21:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: anoordover@apache.org To: commits@camel.apache.org Message-Id: <43e0de0a547944d1ae6b0cec3e6faec1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: camel git commit: CAMEL-10115: introduced pollTimeoutMs with default 5000 Date: Mon, 4 Jul 2016 12:21:40 +0000 (UTC) archived-at: Mon, 04 Jul 2016 12:21:45 -0000 Repository: camel Updated Branches: refs/heads/CAMEL-10115 [created] e27af6d81 CAMEL-10115: introduced pollTimeoutMs with default 5000 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e27af6d8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e27af6d8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e27af6d8 Branch: refs/heads/CAMEL-10115 Commit: e27af6d815e683b7b2d046a4b7c54bd625e9bcf8 Parents: 16cd1a7 Author: Arno Noordover Authored: Sun Jul 3 20:30:03 2016 +0200 Committer: Arno Noordover Committed: Mon Jul 4 13:54:49 2016 +0200 ---------------------------------------------------------------------- components/camel-kafka/src/main/docs/kafka.adoc | 7 ++- .../component/kafka/KafkaConfiguration.java | 15 +++++- .../camel/component/kafka/KafkaConsumer.java | 4 +- .../component/kafka/KafkaConsumerTest.java | 12 +++-- .../clients/consumer/KafkaConsumerTest.java | 52 ++++++++++++++++++++ 5 files changed, 83 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/main/docs/kafka.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc index 557eec2..575c6a2 100644 --- a/components/camel-kafka/src/main/docs/kafka.adoc +++ b/components/camel-kafka/src/main/docs/kafka.adoc @@ -87,8 +87,10 @@ The Kafka component supports 1 options which are listed below. + + // endpoint options: START -The Kafka component supports 73 endpoint options which are listed below: +The Kafka component supports 74 endpoint options which are listed below: {% raw %} [width="100%",cols="2s,1,1m,1m,5",options="header"] @@ -115,6 +117,7 @@ The Kafka component supports 73 endpoint options which are listed below: | keyDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for key that implements the Deserializer interface. | maxPartitionFetchBytes | consumer | 1048576 | Integer | The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be partitions max.partition.fetch.bytes. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens the consumer can get stuck trying to fetch a large message on a certain partition. | partitionAssignor | consumer | org.apache.kafka.clients.consumer.RangeAssignor | String | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used +| pollTimeoutMs | consumer | 5000 | Long | The timeout used when polling the KafkaConsumer. | seekToBeginning | consumer | false | boolean | If the option is true then KafkaConsumer will read from beginning on startup. | sessionTimeoutMs | consumer | 30000 | Integer | The timeout used to detect failures when using Kafka's group management facilities. | valueDeserializer | consumer | org.apache.kafka.common.serialization.StringDeserializer | String | Deserializer class for value that implements the Deserializer interface. @@ -180,6 +183,8 @@ The Kafka component supports 73 endpoint options which are listed below: + + For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 35ebd36..c69f32f 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -71,7 +71,9 @@ public class KafkaConfiguration { //session.timeout.ms @UriParam(label = "consumer", defaultValue = "30000") private Integer sessionTimeoutMs = 30000; - //auto.offset.reset + @UriParam(label = "consumer", defaultValue = "5000") + private Long pollTimeoutMs = 5000L; + //auto.offset.reset1 @UriParam(label = "consumer", defaultValue = "latest", enums = "latest,earliest,none") private String autoOffsetReset = "latest"; //partition.assignment.strategy @@ -1074,6 +1076,17 @@ public class KafkaConfiguration { this.sessionTimeoutMs = sessionTimeoutMs; } + public Long getPollTimeoutMs() { + return pollTimeoutMs; + } + + /** + * The timeout used when polling the KafkaConsumer. + */ + public void setPollTimeoutMs(Long pollTimeoutMs) { + this.pollTimeoutMs = pollTimeoutMs; + } + public String getPartitionAssignor() { return partitionAssignor; } http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 4780dfe..65ec6e3 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -41,11 +41,13 @@ public class KafkaConsumer extends DefaultConsumer { protected ExecutorService executor; private final KafkaEndpoint endpoint; private final Processor processor; + private final Long pollTimeoutMs; public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; + this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs(); if (endpoint.getBrokers() == null) { throw new IllegalArgumentException("BootStrap servers must be specified"); @@ -125,7 +127,7 @@ public class KafkaConsumer extends DefaultConsumer { consumer.seekToBeginning(consumer.assignment()); } while (isRunAllowed() && !isSuspendingOrSuspended()) { - ConsumerRecords allRecords = consumer.poll(Long.MAX_VALUE); + ConsumerRecords allRecords = consumer.poll(pollTimeoutMs); for (TopicPartition partition : allRecords.partitions()) { List> partitionRecords = allRecords .records(partition); http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index 86bc163..eaf880f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import org.mockito.Mockito; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class KafkaConsumerTest { @@ -29,20 +30,23 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void consumerRequiresBootstrapServers() throws Exception { - Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); + when(endpoint.getGroupId()).thenReturn("groupOne"); + when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration()); new KafkaConsumer(endpoint, processor); } @Test(expected = IllegalArgumentException.class) public void consumerRequiresGroupId() throws Exception { - Mockito.when(endpoint.getBrokers()).thenReturn("localhost:1234"); + when(endpoint.getBrokers()).thenReturn("localhost:1234"); + when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration()); new KafkaConsumer(endpoint, processor); } @Test public void consumerOnlyRequiresBootstrapServersAndGroupId() throws Exception { - Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); - Mockito.when(endpoint.getBrokers()).thenReturn("localhost:2181"); + when(endpoint.getGroupId()).thenReturn("groupOne"); + when(endpoint.getBrokers()).thenReturn("localhost:2181"); + when(endpoint.getConfiguration()).thenReturn(new KafkaConfiguration()); new KafkaConsumer(endpoint, processor); } } http://git-wip-us.apache.org/repos/asf/camel/blob/e27af6d8/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java new file mode 100644 index 0000000..809a267 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.hamcrest.core.IsNot; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.hamcrest.core.IsNull.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class KafkaConsumerTest { + + @Mock + private KafkaConsumer kafkaConsumer; + + @Before + public void init() { + when(kafkaConsumer.poll(1000)).thenReturn(ConsumerRecords.empty()); + } + @Test + public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() { + ConsumerRecords consumerRecords = kafkaConsumer.poll(1000); + assertThat(consumerRecords, IsNot.not(nullValue())); + } + + @Test + public void testPollGivenReturnsEmptyPartitionsShouldNotBeNull() { + ConsumerRecords consumerRecords = kafkaConsumer.poll(1000); + assertThat(consumerRecords.partitions(), IsNot.not(nullValue())); + } +}