Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1D674200BFA for ; Thu, 12 Jan 2017 10:52:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1C1C2160B4C; Thu, 12 Jan 2017 09:52:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6B2DC160B2D for ; Thu, 12 Jan 2017 10:52:00 +0100 (CET) Received: (qmail 58703 invoked by uid 500); 12 Jan 2017 09:51:59 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 58692 invoked by uid 99); 12 Jan 2017 09:51:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 12 Jan 2017 09:51:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5D8A5DFB79; Thu, 12 Jan 2017 09:51:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Thu, 12 Jan 2017 09:51:59 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] camel git commit: CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method archived-at: Thu, 12 Jan 2017 09:52:01 -0000 Repository: camel Updated Branches: refs/heads/camel-2.18.x 70d4750fb -> 5a4f641b6 refs/heads/master 312b57d14 -> d22d0ca06 CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d22d0ca0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d22d0ca0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d22d0ca0 Branch: refs/heads/master Commit: d22d0ca0607e0013d4be1dd3385959ac208ae1b8 Parents: 312b57d Author: Antoine DESSAIGNE Authored: Wed Jan 11 17:25:56 2017 +0100 Committer: Claus Ibsen Committed: Thu Jan 12 10:47:54 2017 +0100 ---------------------------------------------------------------------- .../camel/component/kafka/KafkaConsumer.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d22d0ca0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 31a4180..4362390 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.kafka; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -45,6 +46,8 @@ public class KafkaConsumer extends DefaultConsumer { private final KafkaEndpoint endpoint; private final Processor processor; private final Long pollTimeoutMs; + // This list helps working around the infinite loop of KAFKA-1894 + private final List tasks = new ArrayList<>(); public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) { super(endpoint, processor); @@ -75,7 +78,9 @@ public class KafkaConsumer extends DefaultConsumer { executor = endpoint.createExecutor(); for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) { - executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps())); + KafkaFetchRecords task = new KafkaFetchRecords(endpoint.getConfiguration().getTopic(), i + "", getProps()); + executor.submit(task); + tasks.add(task); } } @@ -89,7 +94,12 @@ public class KafkaConsumer extends DefaultConsumer { } else { executor.shutdownNow(); } + if (!executor.isTerminated()) { + tasks.forEach(KafkaFetchRecords::shutdown); + executor.shutdownNow(); + } } + tasks.clear(); executor = null; super.doStop(); @@ -195,6 +205,11 @@ public class KafkaConsumer extends DefaultConsumer { IOHelper.close(consumer); } } + + private void shutdown() { + // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the infinite loop + consumer.wakeup(); + } } protected String serializeOffsetKey(TopicPartition topicPartition) {