Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 69396200C38 for ; Wed, 15 Mar 2017 09:34:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 67BE6160B70; Wed, 15 Mar 2017 08:34:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4E2B6160B78 for ; Wed, 15 Mar 2017 09:34:21 +0100 (CET) Received: (qmail 77062 invoked by uid 500); 15 Mar 2017 08:34:20 -0000 Mailing-List: contact commits-help@eagle.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.apache.org Delivered-To: mailing list commits@eagle.apache.org Received: (qmail 77051 invoked by uid 99); 15 Mar 2017 08:34:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2017 08:34:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2FB74DFFD7; Wed, 15 Mar 2017 08:34:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: qingwzhao@apache.org To: commits@eagle.apache.org Date: Wed, 15 Mar 2017 08:34:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] eagle git commit: [EAGLE-946] Refactor MRRunningJobApp & HadoopQueueApp archived-at: Wed, 15 Mar 2017 08:34:23 -0000 [EAGLE-946] Refactor MRRunningJobApp & HadoopQueueApp https://issues.apache.org/jira/browse/EAGLE-946 Since upgrading to HDP 2.7, the api of fetching the running app list in one request become very heavy. To tackle this issue, a new algorithm is introduced: `if requests# <=1, fetch all running job list in one request; otherwise, divide last 6 hours into requests# slots, and fetch running jobs in each slot one by one` Author: Zhao, Qingwen Closes #869 from qingwen220/EAGLE-946. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3fe637eb Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3fe637eb Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3fe637eb Branch: refs/heads/master Commit: 3fe637eb573c0d8aa85420c5d6101efd6aef13e4 Parents: 93f83f4 Author: Zhao, Qingwen Authored: Wed Mar 15 16:34:10 2017 +0800 Committer: Zhao, Qingwen Committed: Wed Mar 15 16:34:10 2017 +0800 ---------------------------------------------------------------------- .../apache/eagle/log/entity/RowkeyBuilder.java | 2 +- .../hadoop/queue/HadoopQueueRunningApp.java | 19 +- .../queue/common/HadoopClusterConstants.java | 1 - .../queue/common/YarnURLSelectorImpl.java | 34 --- .../queue/crawler/RunningAppParseListener.java | 1 + .../queue/crawler/RunningAppsCrawler.java | 1 + .../model/HadoopQueueEntityRepository.java | 3 +- .../hadoop/queue/model/applications/App.java | 1 + .../queue/model/applications/AppStreamInfo.java | 1 + .../hadoop/queue/model/applications/Apps.java | 1 + .../queue/model/applications/AppsWrapper.java | 1 + .../model/applications/YarnAppAPIEntity.java | 6 +- .../storm/HadoopQueueMetricPersistBolt.java | 24 +- .../storm/HadoopQueueRunningExtractor.java | 29 +-- .../queue/storm/HadoopQueueRunningSpout.java | 3 - ...doop.queue.HadoopQueueRunningAppProvider.xml | 59 +---- .../jpm/mr/runningentity/AppStreamInfo.java | 53 +++++ .../mr/runningentity/JPMEntityRepository.java | 1 + .../jpm/mr/runningentity/YarnAppAPIEntity.java | 112 +++++++++ .../jpm/mr/running/MRRunningJobApplication.java | 14 +- .../jpm/mr/running/MRRunningJobConfig.java | 25 ++ .../jpm/mr/running/parser/MRJobParser.java | 4 +- .../running/storm/MRRunningAppMetricBolt.java | 230 +++++++++++++++++++ .../running/storm/MRRunningJobFetchSpout.java | 23 +- .../mr/running/storm/MRRunningJobParseBolt.java | 2 +- ....running.MRRunningJobApplicationProvider.xml | 127 +++++++++- .../mr/running/MRRunningJobApplicationTest.java | 18 +- .../org/apache/eagle/jpm/util/Constants.java | 11 +- .../jpm/util/jobrecover/RunningJobManager.java | 2 +- .../util/resourcefetch/RMResourceFetcher.java | 173 +++++++++----- .../SparkHistoryServerResourceFetcher.java | 1 - .../resourcefetch/ha/AbstractURLSelector.java | 110 --------- .../util/resourcefetch/ha/HAURLSelector.java | 4 +- .../resourcefetch/ha/HAURLSelectorImpl.java | 31 ++- .../jpm/util/resourcefetch/model/AppInfo.java | 25 +- .../url/JobListServiceURLBuilderImpl.java | 14 +- .../url/RmActiveTestURLBuilderImpl.java | 28 +++ .../resourcefetch/url/ServiceURLBuilder.java | 2 +- .../SparkCompleteJobServiceURLBuilderImpl.java | 8 +- .../url/SparkJobServiceURLBuilderImpl.java | 6 +- .../resourcefetch/ha/HAURLSelectorImplTest.java | 18 +- .../url/JobListServiceURLBuilderImplTest.java | 21 +- ....eagle.topology.TopologyCheckAppProvider.xml | 12 +- 43 files changed, 877 insertions(+), 384 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java index 1978d43..5154cc4 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java @@ -108,7 +108,7 @@ public class RowkeyBuilder { * hash code values. */ private static byte[] buildRowkey(int prefixHash, List partitionHashValues, long timestamp, SortedMap tags){ - // alloacate byte array for rowkey + // allocate byte array for rowkey final int len = 4 + 8 + tags.size() * (4 + 4) + (partitionHashValues.size() * 4); final byte[] rowkey = new byte[len]; int offset = 0; http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java index 4708baa..eb8326e 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java @@ -39,18 +39,17 @@ public class HadoopQueueRunningApp extends StormApplication { String persistBoltName = "persistBolt"; IRichSpout spout = new HadoopQueueRunningSpout(appConfig); - Map streamMaps = new HashMap<>(); - String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString(); - String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString(); - streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId); - streamMaps.put(DataSource.SCHEDULER, schedulerStreamId); + //String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString(); + //String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString(); + //streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId); + //streamMaps.put(DataSource.SCHEDULER, schedulerStreamId); int numOfPersistTasks = appConfig.topology.numPersistTasks; int numOfSinkTasks = appConfig.topology.numSinkTasks; int numOfSpoutTasks = 1; - HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig, streamMaps); + HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); @@ -58,11 +57,11 @@ public class HadoopQueueRunningApp extends StormApplication { StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_QUEUE_STREAM", config); builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks) - .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, schedulerStreamId); + .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName); - StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config); - builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks) - .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId); + //StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config); + //builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks) + // .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java index 159da21..b480480 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java @@ -67,7 +67,6 @@ public class HadoopClusterConstants { public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService"; public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService"; - public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService"; // tag constants public static final String TAG_PARENT_QUEUE = "parentQueue"; http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java deleted file mode 100644 index 02f67d4..0000000 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.hadoop.queue.common; - -import org.apache.eagle.jpm.util.Constants; -import org.apache.eagle.jpm.util.resourcefetch.ha.AbstractURLSelector; - -public class YarnURLSelectorImpl extends AbstractURLSelector { - - public YarnURLSelectorImpl(String[] urls, Constants.CompressionType compressionType) { - super(urls, compressionType); - } - - @Override - protected String buildTestURL(String urlToCheck) { - return YarnClusterResourceURLBuilder.buildRunningAppsURL(urlToCheck); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java index ff54ca3..54df5e0 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +@Deprecated public class RunningAppParseListener { private static final Logger logger = LoggerFactory.getLogger(RunningAppParseListener.class); http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java index 39eec80..6629408 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +@Deprecated public class RunningAppsCrawler implements Runnable { private static final Logger logger = LoggerFactory.getLogger(RunningAppsCrawler.class); http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java index 800bd03..531f886 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java @@ -17,7 +17,6 @@ */ package org.apache.eagle.hadoop.queue.model; -import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.entity.repo.EntityRepository; @@ -26,6 +25,6 @@ public class HadoopQueueEntityRepository extends EntityRepository { public HadoopQueueEntityRepository() { this.registerEntity(RunningQueueAPIEntity.class); this.registerEntity(QueueStructureAPIEntity.class); - this.registerEntity(YarnAppAPIEntity.class); + //this.registerEntity(YarnAppAPIEntity.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java index 393ede3..c3d5cf0 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; /** * App model for Yarn Resource http://[rm http address:port]/ws/v1/cluster/apps. */ +@Deprecated @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class App { http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java index 7e72023..09e0ca9 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java @@ -20,6 +20,7 @@ package org.apache.eagle.hadoop.queue.model.applications; import java.util.HashMap; import java.util.Map; +@Deprecated public class AppStreamInfo { public static final String SITE = "site"; public static final String ID = "id"; http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java index e5ea6cd..8709a7a 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.List; +@Deprecated @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class Apps { http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java index 8247383..3cf7975 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java @@ -21,6 +21,7 @@ package org.apache.eagle.hadoop.queue.model.applications; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +@Deprecated @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown = true) public class AppsWrapper { http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java index 7b36523..32449b1 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java @@ -18,15 +18,17 @@ package org.apache.eagle.hadoop.queue.model.applications; import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; +import static org.apache.eagle.jpm.util.Constants.ACCEPTED_APP_SERVICE_NAME; + +@Deprecated @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("yarn_app") @ColumnFamily("f") @Prefix("accepted") -@Service(HadoopClusterConstants.ACCEPTED_APP_SERVICE_NAME) +@Service(ACCEPTED_APP_SERVICE_NAME) @TimeSeries(true) @Partition( {"site"}) @Tags({"site","id","user","queue"}) http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 43a62b7..91bf8e2 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -48,15 +48,12 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class); - private Map streamMap; private HadoopQueueRunningAppConfig config; private IEagleServiceClient client; private OutputCollector collector; - public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config, - Map streamMap) { + public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) { this.config = config; - this.streamMap = streamMap; } @Override @@ -88,31 +85,18 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity; if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { String queueName = queue.getTags().get(HadoopClusterConstants.TAG_QUEUE); - collector.emit(streamMap.get(dataSource), - new Values(queueName, QueueStreamInfo.convertEntityToStream(queue))); + collector.emit(new Values(queueName, QueueStreamInfo.convertEntityToStream(queue))); } - } else if (entity instanceof YarnAppAPIEntity) { - YarnAppAPIEntity appAPIEntity = (YarnAppAPIEntity) entity; - collector.emit(streamMap.get(dataSource), - new Values(appAPIEntity.getAppName(), AppStreamInfo.convertAppToStream(appAPIEntity))); } } - if (!dataSource.equals(DataSource.RUNNING_APPS)) { - writeEntities(entities, dataType, dataSource); - } + writeEntities(entities, dataType, dataSource); } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (streamMap != null) { - for (String stormStreamId : streamMap.values()) { - declarer.declareStream(stormStreamId, new Fields("f1", "message")); - } - } else { - declarer.declare(new Fields("f1", "message")); - } + declarer.declare(new Fields("f1", "message")); } @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java index c5e0654..15e399e 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java @@ -20,13 +20,13 @@ package org.apache.eagle.hadoop.queue.storm; import backtype.storm.spout.SpoutOutputCollector; import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; -import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder; -import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl; import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler; import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler; import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler; import org.apache.eagle.jpm.util.Constants; import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector; +import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelectorImpl; +import org.apache.eagle.jpm.util.resourcefetch.url.RmActiveTestURLBuilderImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,41 +52,34 @@ public class HadoopQueueRunningExtractor { site = eagleConf.eagleProps.site; urlBases = eagleConf.dataSourceConfig.rMEndPoints; if (urlBases == null) { - throw new IllegalArgumentException(site + ".baseurl is null"); + throw new IllegalArgumentException(site + ".baseUrl is null"); } String[] urls = urlBases.split(","); - urlSelector = new YarnURLSelectorImpl(urls, Constants.CompressionType.GZIP); + urlSelector = new HAURLSelectorImpl(urls, new RmActiveTestURLBuilderImpl(), Constants.CompressionType.NONE, null); executorService = Executors.newFixedThreadPool(MAX_NUM_THREADS); this.collector = collector; } - private void checkUrl() throws IOException { - if (!urlSelector.checkUrl(YarnClusterResourceURLBuilder.buildRunningAppsURL(urlSelector.getSelectedUrl()))) { - urlSelector.reSelectUrl(); - } - } - public void crawl() { try { - checkUrl(); + urlSelector.checkUrl(); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("{}", e.getMessage(), e); } String selectedUrl = urlSelector.getSelectedUrl(); LOGGER.info("Current RM base url is " + selectedUrl); List> futures = new ArrayList<>(); futures.add(executorService.submit(new ClusterMetricsCrawler(site, selectedUrl, collector))); - futures.add(executorService.submit(new RunningAppsCrawler(site, selectedUrl, collector))); + // move RunningAppCrawler into MRRunningJobApp + //futures.add(executorService.submit(new RunningAppsCrawler(site, selectedUrl, collector))); futures.add(executorService.submit(new SchedulerInfoCrawler(site, selectedUrl, collector))); futures.forEach(future -> { try { future.get(MAX_WAIT_TIME * 1000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { - LOGGER.info("Caught an overtime exception with message" + e.getMessage()); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); + LOGGER.error("Caught an overtime exception with message" + e.getMessage()); + } catch (InterruptedException | ExecutionException e) { + LOGGER.error("{}", e.getMessage(), e); } }); } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java index 681f25e..495a63a 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java @@ -18,17 +18,14 @@ package org.apache.eagle.hadoop.queue.storm; -import org.apache.eagle.hadoop.queue.HadoopQueueRunningApp; import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; -import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; -import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml index da22836..2d68da8 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -50,7 +50,7 @@ dataSourceConfig.fetchIntervalSec Fetching Metric Interval in Seconds interval seconds of fetching metric from resource manager - 10 + 30 @@ -61,12 +61,6 @@ topic for kafka data sink - dataSinkConfig.ACCEPTED_APP_STREAM.topic - Destination(Kafka Topic) Of App Stream Data - yarn_accepted_app - topic for kafka data sink - - dataSinkConfig.brokerList dataSinkConfig.brokerList localhost:6667 @@ -178,57 +172,6 @@ - - ACCEPTED_APP_STREAM - Accepted App Info Stream - true - - - id - string - - - site - string - - - appName - string - - - queue - string - - - state - string - - - user - string - - - trackingUrl - string - - - elapsedTime - long - - - startedTime - long - - - queueUsagePercentage - double - - - clusterUsagePercentage - double - - - http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java new file mode 100644 index 0000000..8ff1694 --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.runningentity; + +import java.util.HashMap; +import java.util.Map; + +public class AppStreamInfo { + public static final String SITE = "site"; + public static final String ID = "id"; + public static final String USER = "user"; + public static final String QUEUE = "queue"; + private static final String NAME = "appName"; + private static final String STATE = "state"; + private static final String STARTEDTIME = "startTime"; + private static final String ELAPSEDTIME = "elapsedTime"; + private static final String QUEUE_USAGE_PERCENTAGE = "queueUsagePercentage"; + private static final String CLUSTER_USAGE_PERCENTAGE = "clusterUsagePercentage"; + private static final String TRACKING_URL = "trackingUrl"; + + public static Map convertAppToStream(YarnAppAPIEntity appAPIEntity) { + Map queueStreamInfo = new HashMap<>(); + queueStreamInfo.put(SITE, appAPIEntity.getTags().get(SITE)); + queueStreamInfo.put(ID, appAPIEntity.getTags().get(ID)); + queueStreamInfo.put(USER, appAPIEntity.getTags().get(USER)); + queueStreamInfo.put(QUEUE, appAPIEntity.getTags().get(QUEUE)); + queueStreamInfo.put(NAME, appAPIEntity.getAppName()); + queueStreamInfo.put(STATE, appAPIEntity.getState()); + queueStreamInfo.put(ELAPSEDTIME, appAPIEntity.getElapsedTime()); + queueStreamInfo.put(STARTEDTIME, appAPIEntity.getStartedTime()); + queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, appAPIEntity.getQueueUsagePercentage()); + queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, appAPIEntity.getClusterUsagePercentage()); + queueStreamInfo.put(TRACKING_URL, appAPIEntity.getTrackingUrl()); + + return queueStreamInfo; + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java index 8af853a..727c207 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java @@ -27,6 +27,7 @@ public class JPMEntityRepository extends EntityRepository { entitySet.add(JobExecutionAPIEntity.class); entitySet.add(TaskExecutionAPIEntity.class); entitySet.add(TaskAttemptExecutionAPIEntity.class); + entitySet.add(YarnAppAPIEntity.class); serDeserMap.put(JobConfig.class, new JobConfigSerDeser()); serDeserMap.put(JobCounters.class, new JobCountersSerDeser()); } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java new file mode 100644 index 0000000..1e61c2b --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.runningentity; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; + +import static org.apache.eagle.jpm.util.Constants.ACCEPTED_APP_SERVICE_NAME; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@Table("yarn_app") +@ColumnFamily("f") +@Prefix("accepted") +@Service(ACCEPTED_APP_SERVICE_NAME) +@TimeSeries(true) +@Partition( {"site"}) +@Tags({"site","id","user","queue"}) +public class YarnAppAPIEntity extends TaggedLogAPIEntity { + @Column("a") + private String appName; + @Column("b") + private String state; + @Column("c") + private long startedTime; + @Column("d") + private long elapsedTime; + @Column("e") + private String trackingUrl; + @Column("f") + private double queueUsagePercentage; + @Column("g") + private double clusterUsagePercentage; + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + valueChanged("appName"); + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + valueChanged("state"); + } + + public long getStartedTime() { + return startedTime; + } + + public void setStartedTime(long startedTime) { + this.startedTime = startedTime; + valueChanged("startedTime"); + } + + public long getElapsedTime() { + return elapsedTime; + } + + public void setElapsedTime(long elapsedTime) { + this.elapsedTime = elapsedTime; + valueChanged("elapsedTime"); + } + + public String getTrackingUrl() { + return trackingUrl; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + valueChanged("trackingUrl"); + } + + public double getQueueUsagePercentage() { + return queueUsagePercentage; + } + + public void setQueueUsagePercentage(double queueUsagePercentage) { + this.queueUsagePercentage = queueUsagePercentage; + valueChanged("queueUsagePercentage"); + } + + public double getClusterUsagePercentage() { + return clusterUsagePercentage; + } + + public void setClusterUsagePercentage(double clusterUsagePercentage) { + this.clusterUsagePercentage = clusterUsagePercentage; + valueChanged("clusterUsagePercentage"); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java index 7b1e2fb..ed0d103 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java @@ -19,6 +19,7 @@ package org.apache.eagle.jpm.mr.running; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; +import org.apache.eagle.jpm.mr.running.storm.MRRunningAppMetricBolt; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt; import org.apache.eagle.jpm.util.Constants; @@ -26,10 +27,14 @@ import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import com.typesafe.config.Config; +import storm.trident.planner.SpoutNode; import java.util.ArrayList; import java.util.List; +import static org.apache.eagle.jpm.mr.running.MRRunningJobConfig.APP_TO_JOB_STREAM; +import static org.apache.eagle.jpm.mr.running.MRRunningJobConfig.APP_TO_METRIC_STREAM; + public class MRRunningJobApplication extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { @@ -69,7 +74,14 @@ public class MRRunningJobApplication extends StormApplication { mrRunningJobConfig.getZkStateConfig(), confKeyKeys, config), - tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId")); + tasks).setNumTasks(tasks).fieldsGrouping(spoutName, APP_TO_JOB_STREAM, new Fields("appId")); + + // parse running/accepted app metrics + topologyBuilder.setBolt("mrRunningJobMetricBolt", new MRRunningAppMetricBolt(mrRunningJobConfig), 1) + .setNumTasks(1).shuffleGrouping(spoutName, APP_TO_METRIC_STREAM); + topologyBuilder.setBolt("acceptedAppSink", environment.getStreamSink("ACCEPTED_APP_STREAM", config), 1) + .setNumTasks(1).shuffleGrouping("mrRunningJobMetricBolt"); + return topologyBuilder.createTopology(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java index f733b95..ad40f03 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java @@ -34,6 +34,9 @@ public class MRRunningJobConfig implements Serializable { private static final String JOB_SYMBOL = "/jobs"; + public static final String APP_TO_JOB_STREAM = "appStream"; + public static final String APP_TO_METRIC_STREAM = "appMetricStream"; + public ZKStateConfig getZkStateConfig() { return zkStateConfig; } @@ -73,6 +76,9 @@ public class MRRunningJobConfig implements Serializable { public String site; public String[] rmUrls; public int fetchRunningJobInterval; + public int requestsNum; + public String limitPerRequest; + public int timeRangePerRequestInMin; public int parseJobThreadPoolSize; } @@ -127,10 +133,29 @@ public class MRRunningJobConfig implements Serializable { this.endpointConfig.site = config.getString("siteId"); this.endpointConfig.fetchRunningJobInterval = config.getInt("endpointConfig.fetchRunningJobInterval"); this.endpointConfig.parseJobThreadPoolSize = config.getInt("endpointConfig.parseJobThreadPoolSize"); + this.endpointConfig.requestsNum = getConfigValue(config, "endpointConfig.requestsNum", 1); + this.endpointConfig.limitPerRequest = getConfigValue(config, "endpointConfig.limitPerRequest", ""); + this.endpointConfig.timeRangePerRequestInMin = getConfigValue(config, "endpointConfig.timeRangePerRequestInMin", 60); LOG.info("Successfully initialized MRRunningJobConfig"); LOG.info("site: " + this.endpointConfig.site); LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost); LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort); } + + private int getConfigValue(Config config, String key, int defaultValue) { + if (config.hasPath(key)) { + return config.getInt(key); + } else { + return defaultValue; + } + } + + private String getConfigValue(Config config, String key, String defaultValue) { + if (config.hasPath(key)) { + return config.getString(key); + } else { + return defaultValue; + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 0f2ede6..1330e3f 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -182,7 +182,7 @@ public class MRJobParser implements Runnable { LOG.info("fetch mr job from {}", jobURL); mrJobs = OBJ_MAPPER.readValue(is, MRJobsWrapper.class).getJobs().getJob(); } catch (Exception e) { - LOG.warn("fetch mr job from {} failed, {}", jobURL, e); + LOG.warn("fetch mr job from {} failed, {}", jobURL, e.getMessage()); return false; } finally { Utils.closeInputStream(is); @@ -251,7 +251,7 @@ public class MRJobParser implements Runnable { LOG.info("fetch mr job counter from {}", jobCounterURL); jobCounters = OBJ_MAPPER.readValue(is, JobCountersWrapper.class).getJobCounters(); } catch (Exception e) { - LOG.warn("fetch mr job counter from {} failed, {}", jobCounterURL, e); + LOG.warn("fail to fetch mr job counter from {}, {}", jobCounterURL, e.getMessage()); return false; } finally { Utils.closeInputStream(is); http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java new file mode 100644 index 0000000..28a4e7a --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.running.storm; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; +import org.apache.eagle.jpm.mr.runningentity.AppStreamInfo; +import org.apache.eagle.jpm.mr.runningentity.YarnAppAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher; +import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo; +import org.apache.eagle.jpm.util.resourcefetch.url.URLUtil; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.GenericMetricEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.eagle.jpm.mr.runningentity.AppStreamInfo.convertAppToStream; + +public class MRRunningAppMetricBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(MRRunningAppMetricBolt.class); + + private MRRunningJobConfig config; + private IEagleServiceClient client; + private RMResourceFetcher fetcher; + private OutputCollector collector; + private String site; + + private static final long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE; + + private static final String USER_TAG = "user"; + private static final String QUEUE_TAG = "queue"; + private static final String SITE_TAG = "site"; + + @SuppressWarnings("serial") + public static HashMap metrics = new HashMap() { + { + put(Constants.MetricName.HADOOP_APPS_ALLOCATED_MB, "getAllocatedMB"); + put(Constants.MetricName.HADOOP_APPS_ALLOCATED_VCORES, "getAllocatedVCores"); + put(Constants.MetricName.HADOOP_APPS_RUNNING_CONTAINERS, "getRunningContainers"); + } + }; + + public MRRunningAppMetricBolt(MRRunningJobConfig config) { + this.config = config; + this.site = config.getConfig().getString("siteId"); + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + this.client = new EagleServiceClientImpl(config.getConfig()); + this.fetcher = new RMResourceFetcher(config.getEndpointConfig().rmUrls); + } + + @Override + public void execute(Tuple input) { + List runningApps = (List) input.getValue(0); + if (runningApps == null || runningApps.isEmpty()) { + LOG.warn("App list is empty"); + } + try { + Map appMetrics = parseRunningAppMetrics(runningApps); + List acceptedApps = parseAcceptedApp(); + flush(appMetrics, acceptedApps); + } catch (Exception e) { + LOG.error("Fetal error is caught {}", e.getMessage(), e); + } + } + + private void createMetric(Map appMetricEntities, + long timestamp, Map tags, String metricName, int value) { + String key = metricName + tags.toString() + " " + timestamp; + GenericMetricEntity entity = appMetricEntities.get(key); + if (entity == null) { + entity = new GenericMetricEntity(); + entity.setTags(tags); + entity.setTimestamp(timestamp); + entity.setPrefix(metricName); + entity.setValue(new double[] {0.0}); + appMetricEntities.put(key, entity); + } + double lastValue = entity.getValue()[0]; + entity.setValue(new double[] {lastValue + value}); + } + + private Map generateMetricTags(AggLevel level, AppInfo app) { + Map tags = new HashMap<>(); + tags.put(SITE_TAG, site); + switch (level) { + case CLUSTER : break; + case QUEUE : + tags.put(QUEUE_TAG, app.getQueue()); + break; + case USER : + tags.put(USER_TAG, app.getUser()); + break; + default : + LOG.warn("Unsupported Aggregation Level {}", level); + } + return tags; + } + + public Map parseRunningAppMetrics(List runningApps) throws Exception { + long timestamp = System.currentTimeMillis() / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL; + Map appMetricEntities = new HashMap<>(); + for (AppInfo app : runningApps) { + for (AggLevel level : AggLevel.values()) { + Map tags = generateMetricTags(level, app); + for (java.util.Map.Entry entry : metrics.entrySet()) { + Method method = AppInfo.class.getMethod(entry.getValue()); + Integer value = (Integer) method.invoke(app); + String metricName = String.format(entry.getKey(), level.name); + createMetric(appMetricEntities, timestamp, tags, metricName, value); + } + } + } + return appMetricEntities; + } + + public List parseAcceptedApp() { + List acceptedApps = new ArrayList<>(); + try { + List apps = fetcher.getResource(Constants.ResourceType.ACCEPTED_JOB); + for (AppInfo app : apps) { + Map tags = new HashMap<>(); + tags.put(AppStreamInfo.SITE, config.getConfig().getString("siteId")); + tags.put(AppStreamInfo.ID, app.getId()); + tags.put(AppStreamInfo.QUEUE, app.getQueue()); + tags.put(AppStreamInfo.USER, app.getUser()); + + YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity(); + appAPIEntity.setTags(tags); + appAPIEntity.setTrackingUrl(buildAcceptedAppTrackingURL(app.getId())); + appAPIEntity.setAppName(app.getName()); + appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage()); + appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage()); + appAPIEntity.setElapsedTime(app.getElapsedTime()); + appAPIEntity.setStartedTime(app.getStartedTime()); + appAPIEntity.setState(app.getState()); + appAPIEntity.setTimestamp(app.getStartedTime()); + acceptedApps.add(appAPIEntity); + collector.emit(new Values("", convertAppToStream(appAPIEntity))); + } + } catch (Exception e) { + LOG.error("fetch accepted apps failed {}", e.getMessage(), e); + } + return acceptedApps; + } + + private String buildAcceptedAppTrackingURL(String appId) { + String url = URLUtil.removeTrailingSlash(fetcher.getSelector().getSelectedUrl()); + return String.format("%s/%s/%s?%s", url, Constants.V2_APPS_URL, appId, Constants.ANONYMOUS_PARAMETER); + } + + private void flush(Map appMetrics, List acceptedApps) { + List entities = new ArrayList<>(); + if (appMetrics != null && !appMetrics.isEmpty()) { + LOG.info("crawled {} running app metrics", appMetrics.size()); + entities.addAll(appMetrics.values()); + } + if (acceptedApps != null && !acceptedApps.isEmpty()) { + LOG.info("crawled {} accepted apps", acceptedApps.size()); + //entities.addAll(acceptedApps); + } + try { + client.create(entities); + LOG.info("Successfully create {} metrics", entities.size()); + } catch (Exception e) { + LOG.error("Fail to create {} metrics due to {}", entities.size(), e.getMessage(), e); + } + } + + private enum AggLevel { + CLUSTER("cluster"), QUEUE("queue"), USER("user"); + + private String name; + + AggLevel(String name) { + this.name = name; + } + } + + @Override + public void cleanup() { + if (client != null) { + try { + client.close(); + } catch (IOException e) { + LOG.error(e.getMessage()); + } + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1", "message")); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java index cc5df84..c07e971 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java @@ -86,8 +86,14 @@ public class MRRunningJobFetchSpout extends BaseRichSpout { LOG.info("recover {} mr yarn apps from zookeeper", apps.size()); this.init = true; } else { - apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB); - LOG.info("get {} apps from resource manager", apps.size()); + LOG.info("going to fetch all mapReduce running applications"); + apps = resourceFetcher.getResource( + Constants.ResourceType.RUNNING_MR_JOB, + endpointConfig.limitPerRequest, + endpointConfig.requestsNum, + endpointConfig.timeRangePerRequestInMin); + LOG.info("get {} running apps from resource manager", apps.size()); + collector.emit(MRRunningJobConfig.APP_TO_METRIC_STREAM, new Values(apps)); Set runningAppIdsAtThisTime = runningAppIdsAtThisTime(apps); Set runningAppIdsAtPreviousTime = this.runningYarnApps; @@ -117,17 +123,18 @@ public class MRRunningJobFetchSpout extends BaseRichSpout { } for (int i = 0; i < apps.size(); i++) { - LOG.info("emit mr yarn application " + apps.get(i).getId()); + LOG.debug("emit mr yarn application " + apps.get(i).getId()); AppInfo appInfo = apps.get(i); if (mrApps != null && mrApps.containsKey(appInfo.getId())) { //emit (AppInfo, Map) - collector.emit(new Values(appInfo.getId(), appInfo, mrApps.get(appInfo.getId()))); + collector.emit(MRRunningJobConfig.APP_TO_JOB_STREAM, + new Values(appInfo.getId(), appInfo, mrApps.get(appInfo.getId()))); } else { - collector.emit(new Values(appInfo.getId(), appInfo, null)); + collector.emit(MRRunningJobConfig.APP_TO_JOB_STREAM, new Values(appInfo.getId(), appInfo, null)); } } } catch (Exception e) { - e.printStackTrace(); + LOG.error("An fetal exception is caught: {}", e.getMessage(), e); } finally { Utils.sleep(endpointConfig.fetchRunningJobInterval); } @@ -159,7 +166,9 @@ public class MRRunningJobFetchSpout extends BaseRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { - outputFieldsDeclarer.declare(new Fields("appId", "appInfo", "mrJobEntity")); + outputFieldsDeclarer.declareStream(MRRunningJobConfig.APP_TO_JOB_STREAM, + new Fields("appId", "appInfo", "mrJobEntity")); + outputFieldsDeclarer.declareStream(MRRunningJobConfig.APP_TO_METRIC_STREAM, new Fields("apps")); } @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java index a8db603..1cb8cd8 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java @@ -77,7 +77,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt { AppInfo appInfo = (AppInfo) tuple.getValue(1); Map mrJobs = (Map) tuple.getValue(2); - LOG.info("get mr yarn application " + appInfo.getId()); + LOG.debug("get mr yarn application " + appInfo.getId()); MRJobParser applicationParser; if (!runningMRParsers.containsKey(appInfo.getId())) { http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml index 99a6613..fe3ab83 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml @@ -40,10 +40,29 @@ endpointConfig.fetchRunningJobInterval - Interval of Fetch Running Job From Resource Manager + Interval of Fetching Running Jobs in Seconds interval of fetch map reduce running jobs from resource manager + 300 + + + endpointConfig.requestsNum + Request Number of Fetching Running Jobs + number of requests separated by time ranges + 1 + + + endpointConfig.limitPerRequest + Application Number Limit of Each Request + limit of applications in each request + + + + endpointConfig.timeRangePerRequestInMin + StartedTime Range for Each Request in Minutes + time range for each request in minutes if request number > 1 60 + endpointConfig.parseJobThreadPoolSize Parse Job ThreadPool Size in Each Parse Task @@ -80,7 +99,113 @@ User use -Dkey=value to specify name of a job when submit. use this to extract job name from job configuration eagle.job.name + + + + dataSinkConfig.ACCEPTED_APP_STREAM.topic + Destination(Kafka Topic) Of App Stream Data + yarn_accepted_app_{site} + topic for kafka data sink + true + + + dataSinkConfig.brokerList + dataSinkConfig.brokerList + localhost:6667 + kafka broker list + true + true + + + dataSinkConfig.serializerClass + dataSinkConfig.serializerClass + kafka.serializer.StringEncoder + serializer class Kafka message value + + + dataSinkConfig.keySerializerClass + dataSinkConfig.keySerializerClass + kafka.serializer.StringEncoder + serializer class Kafka message key + + + dataSinkConfig.producerType + dataSinkConfig.producerType + async + whether the messages are sent asynchronously in a background thread + + + dataSinkConfig.numBatchMessages + dataSinkConfig.numBatchMessages + 4096 + number of messages to send in one batch when using async mode + + + dataSinkConfig.maxQueueBufferMs + dataSinkConfig.maxQueueBufferMs + 5000 + maximum time to buffer data when using async mode + + + dataSinkConfig.requestRequiredAcks + dataSinkConfig.requestRequiredAcks + 0 + value controls when a produce request is considered completed + + + + ACCEPTED_APP_STREAM + Accepted App Info Stream + true + + + id + string + + + site + string + + + appName + string + + + queue + string + + + state + string + + + user + string + + + trackingUrl + string + + + elapsedTime + long + + + startedTime + long + + + queueUsagePercentage + double + + + clusterUsagePercentage + double + + + + # Step 1: Create source kafka topic named "${site}_example_source_topic" http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index 5ebd9c5..a43c956 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -194,9 +194,9 @@ public class MRRunningJobApplicationTest { init = (boolean) initField.get(mrRunningJobFetchSpout); Assert.assertTrue(init); - Assert.assertEquals(2, tuples.size()); - Assert.assertEquals(TUPLE_1, tuples.get(0).toString()); - Assert.assertEquals(TUPLE_2, tuples.get(1).toString()); + Assert.assertEquals(3, tuples.size()); + Assert.assertEquals(TUPLE_1, tuples.get(1).toString()); + Assert.assertEquals(TUPLE_2, tuples.get(2).toString()); runningYarnApps = (Set) runningYarnAppsField.get(mrRunningJobFetchSpout); Assert.assertEquals(2, runningYarnApps.size()); Assert.assertEquals(RUNNING_YARNAPPS, runningYarnApps.toString()); @@ -208,9 +208,9 @@ public class MRRunningJobApplicationTest { mrRunningJobFetchSpout.nextTuple(); Assert.assertTrue(init); - Assert.assertEquals(2, tuples.size()); - Assert.assertEquals(TUPLE_1, tuples.get(0).toString()); - Assert.assertEquals(TUPLE_2, tuples.get(1).toString()); + Assert.assertEquals(3, tuples.size()); + Assert.assertEquals(TUPLE_1, tuples.get(1).toString()); + Assert.assertEquals(TUPLE_2, tuples.get(2).toString()); runningYarnApps = (Set) runningYarnAppsField.get(mrRunningJobFetchSpout); Assert.assertEquals(2, runningYarnApps.size()); Assert.assertEquals(RUNNING_YARNAPPS, runningYarnApps.toString()); @@ -222,9 +222,9 @@ public class MRRunningJobApplicationTest { mrRunningJobFetchSpout.nextTuple(); Assert.assertTrue(init); - Assert.assertEquals(2, tuples.size()); - Assert.assertEquals(TUPLE_1, tuples.get(0).toString()); - Assert.assertEquals("[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='FINISHED', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, {jobId=prefix:null, timestamp:0, humanReadableDate:1970-01-01 00:00:00,000, tags: , encodedRowkey:null}]", tuples.get(1).toString()); + Assert.assertEquals(3, tuples.size()); + Assert.assertEquals(TUPLE_1, tuples.get(1).toString()); + Assert.assertEquals("[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='FINISHED', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, {jobId=prefix:null, timestamp:0, humanReadableDate:1970-01-01 00:00:00,000, tags: , encodedRowkey:null}]", tuples.get(2).toString()); runningYarnApps = (Set) runningYarnAppsField.get(mrRunningJobFetchSpout); Assert.assertEquals(1, runningYarnApps.size()); http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index b87c41d..d7f9f3f 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -67,6 +67,13 @@ public class Constants { public static final String YARN_API_CLUSTER_INFO = "ws/v1/cluster/info"; + public static class MetricName { + // Metrics from running apps + public static final String HADOOP_APPS_ALLOCATED_MB = "hadoop.%s.allocatedmb"; + public static final String HADOOP_APPS_ALLOCATED_VCORES = "hadoop.%s.allocatedvcores"; + public static final String HADOOP_APPS_RUNNING_CONTAINERS = "hadoop.%s.runningcontainers"; + } + public enum CompressionType { GZIP, NONE } @@ -93,7 +100,7 @@ public class Constants { public enum ResourceType { COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO, JOB_CONFIGURATION, - COMPLETE_MR_JOB + COMPLETE_MR_JOB, ACCEPTED_JOB } public static enum SuggestionType { @@ -119,6 +126,7 @@ public class Constants { public static final String MR_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService"; public static final String MR_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService"; public static final String MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = "JobOptimizerSuggestionService"; + public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService"; public static final String JOB_TASK_TYPE_TAG = "taskType"; @@ -141,6 +149,7 @@ public class Constants { public enum JobType { CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"), + SPARK("SPARK"), MAPREDUCE("MAPREDUCE"), NOTAVALIABLE("N/A") ; private String value; http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java index 95c531c..0f56b17 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java @@ -117,7 +117,7 @@ public class RunningJobManager implements Serializable { appInfo.setElapsedTime(Long.parseLong(appInfoMap.get("elapsedTime"))); appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs") == null ? "" : appInfoMap.get("amContainerLogs")); appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress") == null ? "" : appInfoMap.get("amHostHttpAddress")); - appInfo.setAllocatedMB(Long.parseLong(appInfoMap.get("allocatedMB"))); + appInfo.setAllocatedMB(Integer.parseInt(appInfoMap.get("allocatedMB"))); appInfo.setAllocatedVCores(Integer.parseInt(appInfoMap.get("allocatedVCores"))); appInfo.setRunningContainers(Integer.parseInt(appInfoMap.get("runningContainers")));