Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 11B4F200BB9 for ; Mon, 24 Oct 2016 08:09:57 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0564E160AFD; Mon, 24 Oct 2016 06:09:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 24918160AFC for ; Mon, 24 Oct 2016 08:09:55 +0200 (CEST) Received: (qmail 38691 invoked by uid 500); 24 Oct 2016 06:09:55 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 38681 invoked by uid 99); 24 Oct 2016 06:09:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 06:09:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D72E11806F7 for ; Mon, 24 Oct 2016 06:09:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 9XAw5YahQ22l for ; Mon, 24 Oct 2016 06:09:52 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id AFDB75FBBD for ; Mon, 24 Oct 2016 06:09:51 +0000 (UTC) Received: (qmail 38662 invoked by uid 99); 24 Oct 2016 06:09:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 06:09:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CA43EDFE80; Mon, 24 Oct 2016 06:09:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jinhuwu@apache.org To: commits@eagle.incubator.apache.org Message-Id: <5271692999384b43ba58d000875557e3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-673] add numOfPublishExecutors to alert engine topology Date: Mon, 24 Oct 2016 06:09:50 +0000 (UTC) archived-at: Mon, 24 Oct 2016 06:09:57 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master 56e7048ff -> e520e4011 [EAGLE-673] add numOfPublishExecutors to alert engine topology Author: wujinhu Closes #555 from wujinhu/EAGLE-673. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e520e401 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e520e401 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e520e401 Branch: refs/heads/master Commit: e520e4011796ebdea52f80ca43b9bffddf3aa50a Parents: 56e7048 Author: wujinhu Authored: Mon Oct 24 14:09:43 2016 +0800 Committer: wujinhu Committed: Mon Oct 24 14:09:43 2016 +0800 ---------------------------------------------------------------------- ...agle.alert.app.AlertUnitTopologyAppProvider.xml | 7 +++++++ .../src/main/resources/application.conf | 1 + .../alert/engine/runner/UnitTopologyRunner.java | 17 ++++++++++++----- 3 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml index 6ef96c7..28f7db4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml @@ -51,6 +51,13 @@ false + topology.numOfPublishExecutors + Publisher Executor Number + 1 + Number of publish executors + false + + topology.numOfPublishTasks Publisher Tasks Number 1 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf index 1a25cfa..46f5b08 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf @@ -22,6 +22,7 @@ "numOfSpoutTasks" : 1, "numOfRouterBolts" : 4, "numOfAlertBolts" : 10, + "numOfPublishExecutors" : 1, "numOfPublishTasks" : 1, "messageTimeoutSecs": 3600, "localMode" : "true" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java index 88cfb9b..287d5db 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java @@ -63,6 +63,7 @@ public class UnitTopologyRunner { public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts"; public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts"; public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks"; + public static final String PUBLISH_EXECUTOR_NUM = "topology.numOfPublishExecutors"; public static final String LOCAL_MODE = "topology.localMode"; public static final String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs"; public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600; @@ -88,6 +89,7 @@ public class UnitTopologyRunner { int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, + int numOfPublishExecutors, int numOfPublishTasks, Config config, boolean localMode) { @@ -104,7 +106,7 @@ public class UnitTopologyRunner { } stormConfig.setNumWorkers(numOfTotalWorkers); - StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); if (localMode) { LOG.info("Submitting as local mode"); @@ -126,10 +128,11 @@ public class UnitTopologyRunner { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); boolean localMode = config.getBoolean(LOCAL_MODE); int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM); - run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config, localMode); + run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config, localMode); } public IMetadataChangeNotifyService getMetadataChangeNotifyService() { @@ -144,6 +147,7 @@ public class UnitTopologyRunner { int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, + int numOfPublishExecutors, int numOfPublishTasks, Config config) { StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts]; @@ -199,7 +203,7 @@ public class UnitTopologyRunner { } // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline - BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt).setNumTasks(numOfPublishTasks); + BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks); for (int i = 0; i < numOfAlertBolts; i++) { boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0)); } @@ -211,9 +215,10 @@ public class UnitTopologyRunner { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); } // --------------------------- @@ -224,15 +229,17 @@ public class UnitTopologyRunner { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); + int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config); + return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); } public static Topology buildTopologyMetadata(String topologyId, int numOfSpoutTasks, int numOfRouterBolts, int numOfAlertBolts, + int numOfPublishExecutors, int numOfPublishTasks, Config config) { Topology topology = new Topology();