Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3A14C200C6D for ; Sun, 7 May 2017 20:12:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 38FA9160B97; Sun, 7 May 2017 18:12:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 43547160BD3 for ; Sun, 7 May 2017 20:12:52 +0200 (CEST) Received: (qmail 40622 invoked by uid 500); 7 May 2017 18:12:51 -0000 Mailing-List: contact commits-help@eagle.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.apache.org Delivered-To: mailing list commits@eagle.apache.org Received: (qmail 40314 invoked by uid 99); 7 May 2017 18:12:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 07 May 2017 18:12:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13424EF9A9; Sun, 7 May 2017 18:12:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jaysen@apache.org To: commits@eagle.apache.org Date: Sun, 07 May 2017 18:13:03 -0000 Message-Id: <2207274f35574a8e9ddb042c2a317ba6@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/17] eagle git commit: [EAGLE-1014] add exception handling in CorrelationSpout.java archived-at: Sun, 07 May 2017 18:12:53 -0000 [EAGLE-1014] add exception handling in CorrelationSpout.java https://issues.apache.org/jira/browse/EAGLE-1014 Author: Zhao, Qingwen Closes #927 from qingwen220/EAGLE-1014. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/eaad6cf7 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/eaad6cf7 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/eaad6cf7 Branch: refs/heads/branch-0.5 Commit: eaad6cf74c896a97a061a3f600a0ec64a95c0963 Parents: 84d40ae Author: Zhao, Qingwen Authored: Tue May 2 11:29:46 2017 -0700 Committer: Jay Committed: Tue May 2 11:29:46 2017 -0700 ---------------------------------------------------------------------- .../eagle/alert/engine/spout/CorrelationSpout.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/eaad6cf7/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java index e9ee892..4338964 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java @@ -172,6 +172,7 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener wrapper.nextTuple(); } catch (Exception e) { LOG.error("unexpected exception is caught: {}", e.getMessage(), e); + collector.reportError(e); } } @@ -256,9 +257,14 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic)); continue; } - KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config), - conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); - newKafkaSpoutList.put(topic, newWrapper); + try { + KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config), + conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); + newKafkaSpoutList.put(topic, newWrapper); + } catch (Exception e) { + LOG.error("fail to create KafkaSpoutWrapper for topic {} due to {}", topic, e.getMessage(), e); + collector.reportError(e); + } } // iterate remove topics and then close KafkaSpout for (String topic : removeTopics) { @@ -285,6 +291,8 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener this.cachedSpoutSpec = newMeta; this.kafkaSpoutList = newKafkaSpoutList; this.sds = sds; + + LOG.info("after CorrelationSpout reloads, {} kafkaSpouts are generated for {} topics", kafkaSpoutList.size(), topics.size()); } /**