Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 81D3B200BEC for ; Wed, 14 Dec 2016 13:08:18 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 808E5160B13; Wed, 14 Dec 2016 12:08:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A23EF160B19 for ; Wed, 14 Dec 2016 13:08:17 +0100 (CET) Received: (qmail 61638 invoked by uid 500); 14 Dec 2016 12:08:16 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 61538 invoked by uid 99); 14 Dec 2016 12:08:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Dec 2016 12:08:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AF6C5E0772; Wed, 14 Dec 2016 12:08:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 14 Dec 2016 12:08:17 -0000 Message-Id: <08e587e4a5414512afcb4e81db8f0f55@git.apache.org> In-Reply-To: <2e1eb31eebd3467b9204acdfa3e32e60@git.apache.org> References: <2e1eb31eebd3467b9204acdfa3e32e60@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] camel git commit: CAMEL-10594: Improve shutdown of kafka consumer to be graceful and break out the while loop during stopping. archived-at: Wed, 14 Dec 2016 12:08:18 -0000 CAMEL-10594: Improve shutdown of kafka consumer to be graceful and break out the while loop during stopping. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f91d100b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f91d100b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f91d100b Branch: refs/heads/camel-2.18.x Commit: f91d100bd03cadeefeab27664e750f6505505ff5 Parents: 09b072f Author: Claus Ibsen Authored: Wed Dec 14 13:07:34 2016 +0100 Committer: Claus Ibsen Committed: Wed Dec 14 13:07:59 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/component/kafka/KafkaConsumer.java | 15 +++++++++------ .../camel/component/kafka/KafkaConsumerFullTest.java | 4 ++-- .../kafka/clients/consumer/KafkaConsumerTest.java | 1 + 3 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 6cd5108..66c4335 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -68,8 +68,9 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { - super.doStart(); LOG.info("Starting Kafka consumer"); + super.doStart(); + executor = endpoint.createExecutor(); for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) { executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps())); @@ -78,17 +79,18 @@ public class KafkaConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { - super.doStop(); LOG.info("Stopping Kafka consumer"); if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { - getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor); } else { executor.shutdownNow(); } } executor = null; + + super.doStop(); } class KafkaFetchRecords implements Runnable { @@ -117,7 +119,7 @@ public class KafkaConsumer extends DefaultConsumer { @SuppressWarnings("unchecked") public void run() { try { - LOG.debug("Subscribing {} to topic {}", threadId, topicName); + LOG.info("Subscribing {} to topic {}", threadId, topicName); consumer.subscribe(Arrays.asList(topicName.split(","))); if (endpoint.getConfiguration().isSeekToBeginning()) { @@ -126,7 +128,7 @@ public class KafkaConsumer extends DefaultConsumer { consumer.poll(100); consumer.seekToBeginning(consumer.assignment()); } - while (isRunAllowed() && !isSuspendingOrSuspended()) { + while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) { ConsumerRecords allRecords = consumer.poll(pollTimeoutMs); for (TopicPartition partition : allRecords.partitions()) { List> partitionRecords = allRecords @@ -151,10 +153,11 @@ public class KafkaConsumer extends DefaultConsumer { } } } - LOG.debug("Unsubscribing {} from topic {}", threadId, topicName); + LOG.info("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); } catch (InterruptException e) { getExceptionHandler().handleException("Interrupted while consuming " + threadId + " from kafka topic", e); + LOG.info("Unsubscribing {} from topic {}", threadId, topicName); consumer.unsubscribe(); Thread.currentThread().interrupt(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java index 01c0bd1..e8e6c9e 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java @@ -76,7 +76,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { } @Test - public void kaftMessageIsConsumedByCamel() throws InterruptedException, IOException { + public void kafkaMessageIsConsumedByCamel() throws InterruptedException, IOException { to.expectedMessageCount(5); to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); for (int k = 0; k < 5; k++) { @@ -89,7 +89,7 @@ public class KafkaConsumerFullTest extends BaseEmbeddedKafkaTest { @Test @Ignore("Currently there is a bug in kafka which leads to an uninterruptable thread so a resub take too long (works manually)") - public void kaftMessageIsConsumedByCamelSeekedToBeginning() throws Exception { + public void kafkaMessageIsConsumedByCamelSeekedToBeginning() throws Exception { to.expectedMessageCount(5); to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4"); for (int k = 0; k < 5; k++) { http://git-wip-us.apache.org/repos/asf/camel/blob/f91d100b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 1c4f0ee..d74d5e0 100644 --- a/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -37,6 +37,7 @@ public class KafkaConsumerTest { public void init() { when(kafkaConsumer.poll(1000)).thenReturn(ConsumerRecords.empty()); } + @Test public void testPollGivenReturnsEmptyConsumerRecordShouldNotBeNull() { ConsumerRecords consumerRecords = kafkaConsumer.poll(1000);