Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D4229200BA5 for ; Wed, 19 Oct 2016 09:48:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D2A69160AEA; Wed, 19 Oct 2016 07:48:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2FE65160ADE for ; Wed, 19 Oct 2016 09:48:29 +0200 (CEST) Received: (qmail 60227 invoked by uid 500); 19 Oct 2016 07:48:28 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 60218 invoked by uid 99); 19 Oct 2016 07:48:28 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 07:48:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E28FB180993 for ; Wed, 19 Oct 2016 07:48:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, WEIRD_PORT=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id RgfzmmDTTLSx for ; Wed, 19 Oct 2016 07:48:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 188895FC1C for ; Wed, 19 Oct 2016 07:48:19 +0000 (UTC) Received: (qmail 60153 invoked by uid 99); 19 Oct 2016 07:48:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Oct 2016 07:48:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 077B2DFBA8; Wed, 19 Oct 2016 07:48:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mw@apache.org To: commits@eagle.incubator.apache.org Message-Id: <30776bf5601e43eaab0af3136a1a5f54@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-563] migrate eagle-hadoop-queue to use application framework Date: Wed, 19 Oct 2016 07:48:18 +0000 (UTC) archived-at: Wed, 19 Oct 2016 07:48:31 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master 855b86ef3 -> 453c3a5fa [EAGLE-563] migrate eagle-hadoop-queue to use application framework Migrate eagle-hadoop-queue to use application framework. Author: anyway1021 Closes #528 from anyway1021/EAGLE-563. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/453c3a5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/453c3a5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/453c3a5f Branch: refs/heads/master Commit: 453c3a5fa209c6c6e40dc25be6951adada61f72f Parents: 855b86e Author: anyway1021 Authored: Wed Oct 19 15:48:01 2016 +0800 Committer: anyway1021 Committed: Wed Oct 19 15:48:01 2016 +0800 ---------------------------------------------------------------------- eagle-jpm/eagle-hadoop-queue/pom.xml | 5 + .../hadoop/queue/HadoopQueueRunningApp.java | 47 ++++++++++ .../queue/HadoopQueueRunningAppConfig.java | 98 ++++++++++++++++++++ .../queue/HadoopQueueRunningAppProvider.java | 25 +++++ .../hadoop/queue/HadoopQueueRunningMain.java | 80 ++-------------- .../storm/HadoopQueueMetricPersistBolt.java | 30 +++--- .../storm/HadoopQueueRunningExtractor.java | 13 +-- .../queue/storm/HadoopQueueRunningSpout.java | 10 +- ...doop.queue.HadoopQueueRunningAppProvider.xml | 77 +++++++++++++++ ...org.apache.eagle.app.spi.ApplicationProvider | 16 ++++ .../src/main/resources/application.conf | 11 ++- .../HadoopQueueRunningAppProviderTest.java | 32 +++++++ .../hadoop/queue/HadoopQueueRunningAppTest.java | 27 ++++++ .../src/test/resources/application.conf | 11 ++- eagle-server/pom.xml | 7 ++ eagle-topology-assembly/pom.xml | 5 + ...org.apache.eagle.app.spi.ApplicationProvider | 3 +- 17 files changed, 387 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/pom.xml b/eagle-jpm/eagle-hadoop-queue/pom.xml index 91568ff..95929a9 100644 --- a/eagle-jpm/eagle-hadoop-queue/pom.xml +++ b/eagle-jpm/eagle-hadoop-queue/pom.xml @@ -57,6 +57,11 @@ + + org.apache.eagle + eagle-app-base + ${project.version} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java new file mode 100644 index 0000000..7a853a1 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.queue; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.TopologyBuilder; +import com.typesafe.config.Config; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt; +import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout; + +public class HadoopQueueRunningApp extends StormApplication { + public StormTopology execute(Config config, StormEnvironment environment) { + HadoopQueueRunningAppConfig appConfig = HadoopQueueRunningAppConfig.getInstance(config); + + IRichSpout spout = new HadoopQueueRunningSpout(appConfig); + HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig); + TopologyBuilder builder = new TopologyBuilder(); + + int numOfParserTasks = appConfig.topology.numOfParserTasks; + int numOfSpoutTasks = 1; + + String spoutName = "runningQueueSpout"; + String boltName = "parserBolt"; + + builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); + builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName); + + return builder.createTopology(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java new file mode 100644 index 0000000..5d5d736 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.queue; + +import com.typesafe.config.Config; + +import java.io.Serializable; + +public class HadoopQueueRunningAppConfig implements Serializable { + public static final HadoopQueueRunningAppConfig instance = new HadoopQueueRunningAppConfig(); + + public Topology topology; + public DataSourceConfig dataSourceConfig; + public EagleProps eagleProps; + + private Config config = null; + + private HadoopQueueRunningAppConfig() { + this.topology = new Topology(); + this.dataSourceConfig = new DataSourceConfig(); + this.eagleProps = new EagleProps(); + this.config = null; + } + + public static class Topology implements Serializable { + public boolean localMode; + public int numOfParserTasks; + public String name; + } + + public static class DataSourceConfig implements Serializable { + public String rMEndPoints; + public String fetchIntervalSec; + } + + public static class EagleProps implements Serializable { + public String site; + public EagleService eagleService; + + public EagleProps() { + eagleService = new EagleService(); + } + + public static class EagleService implements Serializable { + public String host; + public int port; + public String username; + public String password; + } + } + + public static HadoopQueueRunningAppConfig getInstance(Config config) { + if (config != null && instance.config == null) { + synchronized (instance) { + if (instance.config == null) { + instance.init(config); + } + return instance; + } + } + return instance; + } + + public Config getConfig() { + return config; + } + + private void init(Config config) { + this.config = config; + + this.topology.localMode = config.getBoolean("topology.localMode"); + this.topology.numOfParserTasks = config.getInt("topology.numOfParserTasks"); + this.topology.name = config.getString("topology.name"); + + this.dataSourceConfig.rMEndPoints = config.getString("dataSourceConfig.rMEndPoints"); + this.dataSourceConfig.fetchIntervalSec = config.getString("dataSourceConfig.fetchIntervalSec"); + + this.eagleProps.site = config.getString("eagleProps.site"); + this.eagleProps.eagleService.host = config.getString("eagleProps.eagleService.host"); + this.eagleProps.eagleService.port = config.getInt("eagleProps.eagleService.port"); + this.eagleProps.eagleService.username = config.getString("eagleProps.eagleService.username"); + this.eagleProps.eagleService.password = config.getString("eagleProps.eagleService.password"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java new file mode 100644 index 0000000..916dd5b --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.queue; + +import org.apache.eagle.app.spi.AbstractApplicationProvider; + +public class HadoopQueueRunningAppProvider extends AbstractApplicationProvider { + public HadoopQueueRunningApp getApplication() { + return new HadoopQueueRunningApp(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java index 37fd17b..d6e90fa 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningMain.java @@ -8,84 +8,18 @@ * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package org.apache.eagle.hadoop.queue; -import org.apache.eagle.common.config.ConfigOptionParser; -import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt; -import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.TopologyBuilder; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class HadoopQueueRunningMain { - public static final String PARSER_TASK_NUM = "topology.numOfParserTasks"; - public static final String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers"; - public static final String TOPOLOGY_NAME = "topology.name"; - public static final String LOCAL_MODE = "topology.localMode"; - - private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningMain.class); - public static void main(String[] args) { - //System.setProperty("config.resource", "/application.conf"); - //Config config = ConfigFactory.load(); - Config config = null; - try { - LOG.info("Loading from configuration file"); - config = new ConfigOptionParser().load(args); - } catch (Exception e) { - LOG.error("failed to load config"); - } - IRichSpout spout = new HadoopQueueRunningSpout(config); - HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(config); - TopologyBuilder builder = new TopologyBuilder(); - - int numOfParserTasks = config.getInt(PARSER_TASK_NUM); - int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM); - int numOfSpoutTasks = 1; - - String spoutName = "runningQueueSpout"; - String boltName = "parserBolt"; - - builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); - builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName); - - StormTopology topology = builder.createTopology(); - - backtype.storm.Config stormConf = new backtype.storm.Config(); - stormConf.setNumWorkers(numOfTotalWorkers); - stormConf.put(stormConf.TOPOLOGY_DEBUG, true); - - String topoName = config.getString(TOPOLOGY_NAME); - Boolean local = config.getBoolean(LOCAL_MODE); - try { - if (!local) { - StormSubmitter.submitTopology(topoName, stormConf, topology); - } else { - //local mode - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topoName, stormConf, topology); - } - } catch (InvalidTopologyException e) { - e.printStackTrace(); - } catch (AlreadyAliveException e) { - e.printStackTrace(); - } - + new HadoopQueueRunningApp().run(args); } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index c6c204a..4edf27d 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -8,16 +8,22 @@ * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ package org.apache.eagle.hadoop.queue.storm; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Tuple; +import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.entity.GenericMetricEntity; @@ -25,15 +31,9 @@ import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.EagleServiceConnector; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; -import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.List; import java.util.Map; @@ -41,17 +41,17 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class); - private Config config; + private HadoopQueueRunningAppConfig config; private IEagleServiceClient client; private OutputCollector collector; - public HadoopQueueMetricPersistBolt(Config config) { + public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) { this.config = config; } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config)); + this.client = new EagleServiceClientImpl(new EagleServiceConnector(this.config.getConfig())); this.collector = collector; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java index ef0c762..c5e0654 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java @@ -18,7 +18,8 @@ package org.apache.eagle.hadoop.queue.storm; -import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils; +import backtype.storm.spout.SpoutOutputCollector; +import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder; import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl; import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler; @@ -26,9 +27,6 @@ import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler; import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector; - -import backtype.storm.spout.SpoutOutputCollector; -import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +40,6 @@ public class HadoopQueueRunningExtractor { private static final Logger LOGGER = LoggerFactory.getLogger(HadoopQueueRunningExtractor.class); private static final int MAX_NUM_THREADS = 10; private static final int MAX_WAIT_TIME = 10; - private static final String DEFAULT_SITE = "sandbox"; private String site; private String urlBases; @@ -51,9 +48,9 @@ public class HadoopQueueRunningExtractor { private ExecutorService executorService; private SpoutOutputCollector collector; - public HadoopQueueRunningExtractor(Config eagleConf, SpoutOutputCollector collector) { - site = HadoopYarnResourceUtils.getConfigValue(eagleConf, "eagleProps.site", DEFAULT_SITE); - urlBases = HadoopYarnResourceUtils.getConfigValue(eagleConf, "dataSourceConfig.RMEndPoints", ""); + public HadoopQueueRunningExtractor(HadoopQueueRunningAppConfig eagleConf, SpoutOutputCollector collector) { + site = eagleConf.eagleProps.site; + urlBases = eagleConf.dataSourceConfig.rMEndPoints; if (urlBases == null) { throw new IllegalArgumentException(site + ".baseurl is null"); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java index 7053a09..530be9a 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java @@ -18,6 +18,8 @@ package org.apache.eagle.hadoop.queue.storm; +import org.apache.eagle.hadoop.queue.HadoopQueueRunningApp; +import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils; @@ -35,18 +37,16 @@ import java.util.Map; public class HadoopQueueRunningSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueRunningSpout.class); - private static final String FETCH_INTERVAL_CONF = "dataSourceConfig.FetchIntervalSec"; - private static final String DEFAULT_FETCH_INTERVAL_SECONDS = "10"; private long fetchIntervalSec; private long lastFetchTime = 0; private HadoopQueueRunningExtractor extractor; - private Config config; + private HadoopQueueRunningAppConfig config; - public HadoopQueueRunningSpout(Config config) { + public HadoopQueueRunningSpout(HadoopQueueRunningAppConfig config) { this.config = config; - fetchIntervalSec = Long.parseLong(HadoopYarnResourceUtils.getConfigValue(config, FETCH_INTERVAL_CONF, DEFAULT_FETCH_INTERVAL_SECONDS)); + fetchIntervalSec = Long.parseLong(config.dataSourceConfig.fetchIntervalSec); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml new file mode 100644 index 0000000..02b60ef --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -0,0 +1,77 @@ + + + + + HADOOP_QUEUE_RUNNING_APP + Hadoop Queue Running Monitoring + 0.5.0-incubating + + + + workers + storm worker number + 4 + + + topology.localMode + true + + + topology.numOfParserTasks + 2 + + + topology.name + sandbox-running-queue-topology + + + dataSourceConfig.rMEndPoints + http://sandbox.hortonworks.com:8088/ + + + dataSourceConfig.fetchIntervalSec + 10 + + + eagleProps.site + sandbox + + + eagleProps.eagleService.host + localhost + + + eagleProps.eagleService.port + 9099 + + + eagleProps.eagleService.username + admin + + + eagleProps.eagleService.password + secret + + + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider new file mode 100644 index 0000000..a35bb7d --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf index 77ae8be..807bd5b 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf @@ -16,13 +16,12 @@ { "topology" : { "localMode" : true, - "numOfTotalWorkers" : 2, "numOfParserTasks" : 2, "name" : "sandbox-running-queue-topology", }, "dataSourceConfig": { - "RMEndPoints" : "http://sandbox.hortonworks.com:8088/", - "FetchIntervalSec": "10" + "rMEndPoints" : "http://sandbox.hortonworks.com:8088/", + "fetchIntervalSec": "10" }, "eagleProps" : { "site": "sandbox", @@ -33,4 +32,8 @@ "password": "secret" } } -} \ No newline at end of file + "appId":"hadoopQueueMonitorJob", + "mode":"LOCAL", + application.storm.nimbusHost=localhost, + "workers":4, +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java new file mode 100644 index 0000000..633e802 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppProviderTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.queue; + +import com.google.inject.Inject; +import org.apache.eagle.app.test.ApplicationSimulator; +import org.apache.eagle.app.test.ApplicationTestBase; +import org.junit.Test; + +public class HadoopQueueRunningAppProviderTest extends ApplicationTestBase { + @Inject + private ApplicationSimulator simulator; + + @Test + public void testRunAsManagedApplicationWithSimulator() { + simulator.start(HadoopQueueRunningAppProvider.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java new file mode 100644 index 0000000..32ed320 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.queue; + +import com.typesafe.config.ConfigFactory; +import org.junit.Test; + +public class HadoopQueueRunningAppTest { + @Test + public void testRun(){ + new HadoopQueueRunningApp().run(ConfigFactory.load()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf index 77ae8be..807bd5b 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf @@ -16,13 +16,12 @@ { "topology" : { "localMode" : true, - "numOfTotalWorkers" : 2, "numOfParserTasks" : 2, "name" : "sandbox-running-queue-topology", }, "dataSourceConfig": { - "RMEndPoints" : "http://sandbox.hortonworks.com:8088/", - "FetchIntervalSec": "10" + "rMEndPoints" : "http://sandbox.hortonworks.com:8088/", + "fetchIntervalSec": "10" }, "eagleProps" : { "site": "sandbox", @@ -33,4 +32,8 @@ "password": "secret" } } -} \ No newline at end of file + "appId":"hadoopQueueMonitorJob", + "mode":"LOCAL", + application.storm.nimbusHost=localhost, + "workers":4, +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-server/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml index 07f02ec..a07cc89 100644 --- a/eagle-server/pom.xml +++ b/eagle-server/pom.xml @@ -309,6 +309,13 @@ eagle-jpm-service ${project.version} + + + + org.apache.eagle + eagle-hadoop-queue + ${project.version} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-topology-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml index 7af6f96..4ea2b0a 100644 --- a/eagle-topology-assembly/pom.xml +++ b/eagle-topology-assembly/pom.xml @@ -67,6 +67,11 @@ eagle-jpm-aggregation ${project.version} + + org.apache.eagle + eagle-hadoop-queue + ${project.version} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/453c3a5f/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider ---------------------------------------------------------------------- diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider index 56292d2..8d35f31 100644 --- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider +++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider @@ -18,4 +18,5 @@ org.apache.eagle.app.example.ExampleApplicationProvider org.apache.eagle.app.jpm.JPMWebApplicationProvider org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider -org.apache.eagle.jpm.aggregation.AggregationApplicationProvider \ No newline at end of file +org.apache.eagle.jpm.aggregation.AggregationApplicationProvider +org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider