Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BD530200CD8 for ; Wed, 2 Aug 2017 22:21:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BB49316A246; Wed, 2 Aug 2017 20:21:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DA34316A20B for ; Wed, 2 Aug 2017 22:21:07 +0200 (CEST) Received: (qmail 92605 invoked by uid 500); 2 Aug 2017 20:21:07 -0000 Mailing-List: contact commits-help@atlas.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.apache.org Delivered-To: mailing list commits@atlas.apache.org Received: (qmail 92596 invoked by uid 99); 2 Aug 2017 20:21:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Aug 2017 20:21:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EFB7EE00C5; Wed, 2 Aug 2017 20:21:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: madhan@apache.org To: commits@atlas.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: atlas git commit: ATLAS-1944: updated handling of shutdown in KafkaConsumer Date: Wed, 2 Aug 2017 20:21:06 +0000 (UTC) archived-at: Wed, 02 Aug 2017 20:21:08 -0000 Repository: atlas Updated Branches: refs/heads/0.8-incubating 7963525a4 -> 49c62b854 ATLAS-1944: updated handling of shutdown in KafkaConsumer Change-Id: I07cbe1955cd08005660f5189f30f0690809ce1b1 Signed-off-by: Madhan Neethiraj (cherry picked from commit 0267eecd831c4927cac7aa460a548f8628fb31f8) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/49c62b85 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/49c62b85 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/49c62b85 Branch: refs/heads/0.8-incubating Commit: 49c62b8544e3cf73d19f5cd3252bf304daf604b5 Parents: 7963525 Author: nixonrodrigues Authored: Tue Aug 1 11:51:21 2017 +0530 Committer: Madhan Neethiraj Committed: Wed Aug 2 13:20:39 2017 -0700 ---------------------------------------------------------------------- .../apache/atlas/kafka/AtlasKafkaConsumer.java | 7 +++ .../notification/NotificationConsumer.java | 3 ++ .../AbstractNotificationConsumerTest.java | 5 +++ .../notification/NotificationHookConsumer.java | 45 +++++++++++++++----- 4 files changed, 50 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/49c62b85/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index d431176..d3b4e49 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -96,4 +96,11 @@ public class AtlasKafkaConsumer extends AbstractNotificationConsumer { kafkaConsumer.close(); } } + + @Override + public void wakeup() { + if (kafkaConsumer != null) { + kafkaConsumer.wakeup(); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/49c62b85/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index 0bd75e1..f3e81ec 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -39,6 +39,8 @@ public interface NotificationConsumer { void close(); + void wakeup(); + /** * Fetch data for the topics from Kafka * @return List containing kafka message and partionId and offset. @@ -53,4 +55,5 @@ public interface NotificationConsumer { List> receive(long timeoutMilliSeconds); + } http://git-wip-us.apache.org/repos/asf/atlas/blob/49c62b85/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index bcee00c..3b2a093 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -203,6 +203,11 @@ public class AbstractNotificationConsumerTest { } @Override + public void wakeup() { + + } + + @Override public List> receive() { return receive(1000L); } http://git-wip-us.apache.org/repos/asf/atlas/blob/49c62b85/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index b8255b3..a74b841 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -168,12 +168,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void stopConsumerThreads() { + LOG.info("==> stopConsumerThreads()"); + if (consumers != null) { for (HookConsumer consumer : consumers) { - consumer.stop(); + consumer.shutdown(); } consumers.clear(); } + + LOG.info("<== stopConsumerThreads()"); } /** @@ -218,21 +222,35 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void doWork() { + LOG.info("==> HookConsumer doWork()"); + shouldRun.set(true); if (!serverAvailable(new NotificationHookConsumer.Timer())) { return; } - while (shouldRun.get()) { - try { - List> messages = consumer.receive(); - for (AtlasKafkaMessage msg : messages) { - handleMessage(msg); + try { + while (shouldRun.get()) { + try { + List> messages = consumer.receive(); + for (AtlasKafkaMessage msg : messages) { + handleMessage(msg); + } + } catch (Exception e) { + if (shouldRun.get()) { + LOG.warn("Exception in NotificationHookConsumer", e); + } } - } catch (Throwable t) { - LOG.warn("Failure in NotificationHookConsumer", t); } + } finally { + if (consumer != null) { + LOG.info("closing NotificationConsumer"); + + consumer.close(); + } + + LOG.info("<== HookConsumer doWork()"); } } @@ -369,7 +387,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void commit(AtlasKafkaMessage kafkaMessage) { recordFailedMessages(); TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); - consumer.commit(partition, kafkaMessage.getOffset()); + consumer.commit(partition, kafkaMessage.getOffset() + 1); } boolean serverAvailable(Timer timer) { @@ -397,11 +415,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void shutdown() { + LOG.info("==> HookConsumer shutdown()"); + super.initiateShutdown(); shouldRun.set(false); - consumer.close(); + if (consumer != null) { + consumer.wakeup(); + } super.awaitShutdown(); + + LOG.info("<== HookConsumer shutdown()"); } + } private void audit(String messageUser, String method, String path) {