eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [2/2] eagle git commit: [EAGLE-946] Refactor MRRunningJobApp & HadoopQueueApp
Date Wed, 15 Mar 2017 08:34:20 GMT
[EAGLE-946] Refactor MRRunningJobApp & HadoopQueueApp

https://issues.apache.org/jira/browse/EAGLE-946

Since upgrading to HDP 2.7, the api of fetching the running app list in one request become very heavy. To tackle this issue, a new algorithm is introduced:

`if requests# <=1, fetch all running job list in one request; otherwise, divide last 6 hours into requests# slots, and fetch running jobs in each slot one by one`

Author: Zhao, Qingwen <qingwzhao@apache.org>

Closes #869 from qingwen220/EAGLE-946.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3fe637eb
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3fe637eb
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3fe637eb

Branch: refs/heads/master
Commit: 3fe637eb573c0d8aa85420c5d6101efd6aef13e4
Parents: 93f83f4
Author: Zhao, Qingwen <qingwzhao@apache.org>
Authored: Wed Mar 15 16:34:10 2017 +0800
Committer: Zhao, Qingwen <qingwzhao@apache.org>
Committed: Wed Mar 15 16:34:10 2017 +0800

----------------------------------------------------------------------
 .../apache/eagle/log/entity/RowkeyBuilder.java  |   2 +-
 .../hadoop/queue/HadoopQueueRunningApp.java     |  19 +-
 .../queue/common/HadoopClusterConstants.java    |   1 -
 .../queue/common/YarnURLSelectorImpl.java       |  34 ---
 .../queue/crawler/RunningAppParseListener.java  |   1 +
 .../queue/crawler/RunningAppsCrawler.java       |   1 +
 .../model/HadoopQueueEntityRepository.java      |   3 +-
 .../hadoop/queue/model/applications/App.java    |   1 +
 .../queue/model/applications/AppStreamInfo.java |   1 +
 .../hadoop/queue/model/applications/Apps.java   |   1 +
 .../queue/model/applications/AppsWrapper.java   |   1 +
 .../model/applications/YarnAppAPIEntity.java    |   6 +-
 .../storm/HadoopQueueMetricPersistBolt.java     |  24 +-
 .../storm/HadoopQueueRunningExtractor.java      |  29 +--
 .../queue/storm/HadoopQueueRunningSpout.java    |   3 -
 ...doop.queue.HadoopQueueRunningAppProvider.xml |  59 +----
 .../jpm/mr/runningentity/AppStreamInfo.java     |  53 +++++
 .../mr/runningentity/JPMEntityRepository.java   |   1 +
 .../jpm/mr/runningentity/YarnAppAPIEntity.java  | 112 +++++++++
 .../jpm/mr/running/MRRunningJobApplication.java |  14 +-
 .../jpm/mr/running/MRRunningJobConfig.java      |  25 ++
 .../jpm/mr/running/parser/MRJobParser.java      |   4 +-
 .../running/storm/MRRunningAppMetricBolt.java   | 230 +++++++++++++++++++
 .../running/storm/MRRunningJobFetchSpout.java   |  23 +-
 .../mr/running/storm/MRRunningJobParseBolt.java |   2 +-
 ....running.MRRunningJobApplicationProvider.xml | 127 +++++++++-
 .../mr/running/MRRunningJobApplicationTest.java |  18 +-
 .../org/apache/eagle/jpm/util/Constants.java    |  11 +-
 .../jpm/util/jobrecover/RunningJobManager.java  |   2 +-
 .../util/resourcefetch/RMResourceFetcher.java   | 173 +++++++++-----
 .../SparkHistoryServerResourceFetcher.java      |   1 -
 .../resourcefetch/ha/AbstractURLSelector.java   | 110 ---------
 .../util/resourcefetch/ha/HAURLSelector.java    |   4 +-
 .../resourcefetch/ha/HAURLSelectorImpl.java     |  31 ++-
 .../jpm/util/resourcefetch/model/AppInfo.java   |  25 +-
 .../url/JobListServiceURLBuilderImpl.java       |  14 +-
 .../url/RmActiveTestURLBuilderImpl.java         |  28 +++
 .../resourcefetch/url/ServiceURLBuilder.java    |   2 +-
 .../SparkCompleteJobServiceURLBuilderImpl.java  |   8 +-
 .../url/SparkJobServiceURLBuilderImpl.java      |   6 +-
 .../resourcefetch/ha/HAURLSelectorImplTest.java |  18 +-
 .../url/JobListServiceURLBuilderImplTest.java   |  21 +-
 ....eagle.topology.TopologyCheckAppProvider.xml |  12 +-
 43 files changed, 877 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
index 1978d43..5154cc4 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/RowkeyBuilder.java
@@ -108,7 +108,7 @@ public class RowkeyBuilder {
 	 * hash code values. 
 	 */
 	private static byte[] buildRowkey(int prefixHash, List<Integer> partitionHashValues, long timestamp, SortedMap<Integer, Integer> tags){
-		// alloacate byte array for rowkey
+		// allocate byte array for rowkey
 		final int len = 4 + 8 + tags.size() * (4 + 4) + (partitionHashValues.size() * 4);
 		final byte[] rowkey = new byte[len];
 		int offset = 0;

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
index 4708baa..eb8326e 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java
@@ -39,18 +39,17 @@ public class HadoopQueueRunningApp extends StormApplication {
         String persistBoltName = "persistBolt";
 
         IRichSpout spout = new HadoopQueueRunningSpout(appConfig);
-        Map<HadoopClusterConstants.DataSource, String> streamMaps = new HashMap<>();
 
-        String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString();
-        String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString();
-        streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId);
-        streamMaps.put(DataSource.SCHEDULER, schedulerStreamId);
+        //String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString();
+        //String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString();
+        //streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId);
+        //streamMaps.put(DataSource.SCHEDULER, schedulerStreamId);
 
         int numOfPersistTasks = appConfig.topology.numPersistTasks;
         int numOfSinkTasks = appConfig.topology.numSinkTasks;
         int numOfSpoutTasks = 1;
 
-        HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig, streamMaps);
+        HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig);
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
@@ -58,11 +57,11 @@ public class HadoopQueueRunningApp extends StormApplication {
 
         StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_QUEUE_STREAM", config);
         builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks)
-                .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, schedulerStreamId);
+                .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName);
 
-        StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config);
-        builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks)
-                .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId);
+        //StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config);
+        //builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks)
+        //        .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId);
 
         return builder.createTopology();
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
index 159da21..b480480 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java
@@ -67,7 +67,6 @@ public class HadoopClusterConstants {
 
     public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService";
     public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService";
-    public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService";
 
     // tag constants
     public static final String TAG_PARENT_QUEUE = "parentQueue";

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
deleted file mode 100644
index 02f67d4..0000000
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnURLSelectorImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.hadoop.queue.common;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourcefetch.ha.AbstractURLSelector;
-
-public class YarnURLSelectorImpl extends AbstractURLSelector {
-
-    public YarnURLSelectorImpl(String[] urls, Constants.CompressionType compressionType) {
-        super(urls, compressionType);
-    }
-
-    @Override
-    protected String buildTestURL(String urlToCheck) {
-        return YarnClusterResourceURLBuilder.buildRunningAppsURL(urlToCheck);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
index ff54ca3..54df5e0 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java
@@ -44,6 +44,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@Deprecated
 public class RunningAppParseListener {
 
     private static final Logger logger = LoggerFactory.getLogger(RunningAppParseListener.class);

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
index 39eec80..6629408 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
+@Deprecated
 public class RunningAppsCrawler implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(RunningAppsCrawler.class);

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
index 800bd03..531f886 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java
@@ -17,7 +17,6 @@
  */
 package org.apache.eagle.hadoop.queue.model;
 
-import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity;
 import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity;
 import org.apache.eagle.log.entity.repo.EntityRepository;
@@ -26,6 +25,6 @@ public class HadoopQueueEntityRepository extends EntityRepository {
     public HadoopQueueEntityRepository() {
         this.registerEntity(RunningQueueAPIEntity.class);
         this.registerEntity(QueueStructureAPIEntity.class);
-        this.registerEntity(YarnAppAPIEntity.class);
+        //this.registerEntity(YarnAppAPIEntity.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
index 393ede3..c3d5cf0 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 /**
  * App model for Yarn Resource http://[rm http address:port]/ws/v1/cluster/apps.
  */
+@Deprecated
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class App {

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
index 7e72023..09e0ca9 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java
@@ -20,6 +20,7 @@ package org.apache.eagle.hadoop.queue.model.applications;
 import java.util.HashMap;
 import java.util.Map;
 
+@Deprecated
 public class AppStreamInfo {
     public static final String SITE = "site";
     public static final String ID = "id";

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
index e5ea6cd..8709a7a 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/Apps.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.List;
 
+@Deprecated
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Apps {

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
index 8247383..3cf7975 100755
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppsWrapper.java
@@ -21,6 +21,7 @@ package org.apache.eagle.hadoop.queue.model.applications;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+@Deprecated
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class AppsWrapper {

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
index 7b36523..32449b1 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java
@@ -18,15 +18,17 @@
 package org.apache.eagle.hadoop.queue.model.applications;
 
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.*;
 
+import static org.apache.eagle.jpm.util.Constants.ACCEPTED_APP_SERVICE_NAME;
+
+@Deprecated
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 @Table("yarn_app")
 @ColumnFamily("f")
 @Prefix("accepted")
-@Service(HadoopClusterConstants.ACCEPTED_APP_SERVICE_NAME)
+@Service(ACCEPTED_APP_SERVICE_NAME)
 @TimeSeries(true)
 @Partition( {"site"})
 @Tags({"site","id","user","queue"})

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
index 43a62b7..91bf8e2 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java
@@ -48,15 +48,12 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
 
     private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class);
 
-    private Map<HadoopClusterConstants.DataSource, String> streamMap;
     private HadoopQueueRunningAppConfig config;
     private IEagleServiceClient client;
     private OutputCollector collector;
 
-    public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config,
-                                        Map<HadoopClusterConstants.DataSource, String> streamMap) {
+    public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) {
         this.config = config;
-        this.streamMap = streamMap;
     }
 
     @Override
@@ -88,31 +85,18 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt {
                     RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity;
                     if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) {
                         String queueName = queue.getTags().get(HadoopClusterConstants.TAG_QUEUE);
-                        collector.emit(streamMap.get(dataSource),
-                                new Values(queueName, QueueStreamInfo.convertEntityToStream(queue)));
+                        collector.emit(new Values(queueName, QueueStreamInfo.convertEntityToStream(queue)));
                     }
-                } else if (entity instanceof YarnAppAPIEntity) {
-                    YarnAppAPIEntity appAPIEntity = (YarnAppAPIEntity) entity;
-                    collector.emit(streamMap.get(dataSource),
-                            new Values(appAPIEntity.getAppName(), AppStreamInfo.convertAppToStream(appAPIEntity)));
                 }
             }
-            if (!dataSource.equals(DataSource.RUNNING_APPS)) {
-                writeEntities(entities, dataType, dataSource);
-            }
+            writeEntities(entities, dataType, dataSource);
         }
         this.collector.ack(input);
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        if (streamMap != null) {
-            for (String stormStreamId : streamMap.values()) {
-                declarer.declareStream(stormStreamId, new Fields("f1", "message"));
-            }
-        } else {
-            declarer.declare(new Fields("f1", "message"));
-        }
+        declarer.declare(new Fields("f1", "message"));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
index c5e0654..15e399e 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningExtractor.java
@@ -20,13 +20,13 @@ package org.apache.eagle.hadoop.queue.storm;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
-import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder;
-import org.apache.eagle.hadoop.queue.common.YarnURLSelectorImpl;
 import org.apache.eagle.hadoop.queue.crawler.ClusterMetricsCrawler;
 import org.apache.eagle.hadoop.queue.crawler.RunningAppsCrawler;
 import org.apache.eagle.hadoop.queue.crawler.SchedulerInfoCrawler;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelector;
+import org.apache.eagle.jpm.util.resourcefetch.ha.HAURLSelectorImpl;
+import org.apache.eagle.jpm.util.resourcefetch.url.RmActiveTestURLBuilderImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,41 +52,34 @@ public class HadoopQueueRunningExtractor {
         site = eagleConf.eagleProps.site;
         urlBases = eagleConf.dataSourceConfig.rMEndPoints;
         if (urlBases == null) {
-            throw new IllegalArgumentException(site + ".baseurl is null");
+            throw new IllegalArgumentException(site + ".baseUrl is null");
         }
         String[] urls = urlBases.split(",");
-        urlSelector = new YarnURLSelectorImpl(urls, Constants.CompressionType.GZIP);
+        urlSelector = new HAURLSelectorImpl(urls, new RmActiveTestURLBuilderImpl(), Constants.CompressionType.NONE, null);
         executorService = Executors.newFixedThreadPool(MAX_NUM_THREADS);
         this.collector = collector;
     }
 
-    private void checkUrl() throws IOException {
-        if (!urlSelector.checkUrl(YarnClusterResourceURLBuilder.buildRunningAppsURL(urlSelector.getSelectedUrl()))) {
-            urlSelector.reSelectUrl();
-        }
-    }
-
     public void crawl() {
         try {
-            checkUrl();
+            urlSelector.checkUrl();
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("{}", e.getMessage(), e);
         }
         String selectedUrl = urlSelector.getSelectedUrl();
         LOGGER.info("Current RM base url is " + selectedUrl);
         List<Future<?>> futures = new ArrayList<>();
         futures.add(executorService.submit(new ClusterMetricsCrawler(site, selectedUrl, collector)));
-        futures.add(executorService.submit(new RunningAppsCrawler(site, selectedUrl, collector)));
+        // move RunningAppCrawler into MRRunningJobApp
+        //futures.add(executorService.submit(new RunningAppsCrawler(site, selectedUrl, collector)));
         futures.add(executorService.submit(new SchedulerInfoCrawler(site, selectedUrl, collector)));
         futures.forEach(future -> {
             try {
                 future.get(MAX_WAIT_TIME * 1000, TimeUnit.MILLISECONDS);
             } catch (TimeoutException e) {
-                LOGGER.info("Caught an overtime exception with message" + e.getMessage());
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            } catch (ExecutionException e) {
-                e.printStackTrace();
+                LOGGER.error("Caught an overtime exception with message" + e.getMessage());
+            } catch (InterruptedException | ExecutionException e) {
+                LOGGER.error("{}", e.getMessage(), e);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
index 681f25e..495a63a 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java
@@ -18,17 +18,14 @@
 
 package org.apache.eagle.hadoop.queue.storm;
 
-import org.apache.eagle.hadoop.queue.HadoopQueueRunningApp;
 import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig;
 import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants;
-import org.apache.eagle.hadoop.queue.common.HadoopYarnResourceUtils;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
index da22836..2d68da8 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml
@@ -50,7 +50,7 @@
             <name>dataSourceConfig.fetchIntervalSec</name>
             <displayName>Fetching Metric Interval in Seconds</displayName>
             <description>interval seconds of fetching metric from resource manager</description>
-            <value>10</value>
+            <value>30</value>
         </property>
 
         <!-- sink to kafka -->
@@ -61,12 +61,6 @@
             <description>topic for kafka data sink</description>
         </property>
         <property>
-            <name>dataSinkConfig.ACCEPTED_APP_STREAM.topic</name>
-            <displayName>Destination(Kafka Topic) Of App Stream Data</displayName>
-            <value>yarn_accepted_app</value>
-            <description>topic for kafka data sink</description>
-        </property>
-        <property>
             <name>dataSinkConfig.brokerList</name>
             <displayName>dataSinkConfig.brokerList</displayName>
             <value>localhost:6667</value>
@@ -178,57 +172,6 @@
                 </column>
             </columns>
         </stream>
-        <stream>
-            <streamId>ACCEPTED_APP_STREAM</streamId>
-            <description>Accepted App Info Stream</description>
-            <validate>true</validate>
-            <columns>
-                <column>
-                    <name>id</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>site</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>appName</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>queue</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>state</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>user</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>trackingUrl</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>elapsedTime</name>
-                    <type>long</type>
-                </column>
-                <column>
-                    <name>startedTime</name>
-                    <type>long</type>
-                </column>
-                <column>
-                    <name>queueUsagePercentage</name>
-                    <type>double</type>
-                </column>
-                <column>
-                    <name>clusterUsagePercentage</name>
-                    <type>double</type>
-                </column>
-            </columns>
-        </stream>
     </streams>
   <docs>
     <install>

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java
new file mode 100644
index 0000000..8ff1694
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/AppStreamInfo.java
@@ -0,0 +1,53 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AppStreamInfo {
+    public static final String SITE = "site";
+    public static final String ID = "id";
+    public static final String USER = "user";
+    public static final String QUEUE = "queue";
+    private static final String NAME = "appName";
+    private static final String STATE = "state";
+    private static final String STARTEDTIME = "startTime";
+    private static final String ELAPSEDTIME = "elapsedTime";
+    private static final String QUEUE_USAGE_PERCENTAGE = "queueUsagePercentage";
+    private static final String CLUSTER_USAGE_PERCENTAGE = "clusterUsagePercentage";
+    private static final String TRACKING_URL = "trackingUrl";
+
+    public static Map<String, Object> convertAppToStream(YarnAppAPIEntity appAPIEntity) {
+        Map<String, Object> queueStreamInfo = new HashMap<>();
+        queueStreamInfo.put(SITE, appAPIEntity.getTags().get(SITE));
+        queueStreamInfo.put(ID, appAPIEntity.getTags().get(ID));
+        queueStreamInfo.put(USER, appAPIEntity.getTags().get(USER));
+        queueStreamInfo.put(QUEUE, appAPIEntity.getTags().get(QUEUE));
+        queueStreamInfo.put(NAME, appAPIEntity.getAppName());
+        queueStreamInfo.put(STATE, appAPIEntity.getState());
+        queueStreamInfo.put(ELAPSEDTIME, appAPIEntity.getElapsedTime());
+        queueStreamInfo.put(STARTEDTIME, appAPIEntity.getStartedTime());
+        queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, appAPIEntity.getQueueUsagePercentage());
+        queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, appAPIEntity.getClusterUsagePercentage());
+        queueStreamInfo.put(TRACKING_URL, appAPIEntity.getTrackingUrl());
+
+        return queueStreamInfo;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java
index 8af853a..727c207 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JPMEntityRepository.java
@@ -27,6 +27,7 @@ public class JPMEntityRepository extends EntityRepository {
         entitySet.add(JobExecutionAPIEntity.class);
         entitySet.add(TaskExecutionAPIEntity.class);
         entitySet.add(TaskAttemptExecutionAPIEntity.class);
+        entitySet.add(YarnAppAPIEntity.class);
         serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
         serDeserMap.put(JobCounters.class, new JobCountersSerDeser());
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java
new file mode 100644
index 0000000..1e61c2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/YarnAppAPIEntity.java
@@ -0,0 +1,112 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.runningentity;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import static org.apache.eagle.jpm.util.Constants.ACCEPTED_APP_SERVICE_NAME;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("yarn_app")
+@ColumnFamily("f")
+@Prefix("accepted")
+@Service(ACCEPTED_APP_SERVICE_NAME)
+@TimeSeries(true)
+@Partition( {"site"})
+@Tags({"site","id","user","queue"})
+public class YarnAppAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String appName;
+    @Column("b")
+    private String state;
+    @Column("c")
+    private long startedTime;
+    @Column("d")
+    private long elapsedTime;
+    @Column("e")
+    private String trackingUrl;
+    @Column("f")
+    private double queueUsagePercentage;
+    @Column("g")
+    private double clusterUsagePercentage;
+
+    public String getAppName() {
+        return appName;
+    }
+
+    public void setAppName(String appName) {
+        this.appName = appName;
+        valueChanged("appName");
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+        valueChanged("state");
+    }
+
+    public long getStartedTime() {
+        return startedTime;
+    }
+
+    public void setStartedTime(long startedTime) {
+        this.startedTime = startedTime;
+        valueChanged("startedTime");
+    }
+
+    public long getElapsedTime() {
+        return elapsedTime;
+    }
+
+    public void setElapsedTime(long elapsedTime) {
+        this.elapsedTime = elapsedTime;
+        valueChanged("elapsedTime");
+    }
+
+    public String getTrackingUrl() {
+        return trackingUrl;
+    }
+
+    public void setTrackingUrl(String trackingUrl) {
+        this.trackingUrl = trackingUrl;
+        valueChanged("trackingUrl");
+    }
+
+    public double getQueueUsagePercentage() {
+        return queueUsagePercentage;
+    }
+
+    public void setQueueUsagePercentage(double queueUsagePercentage) {
+        this.queueUsagePercentage = queueUsagePercentage;
+        valueChanged("queueUsagePercentage");
+    }
+
+    public double getClusterUsagePercentage() {
+        return clusterUsagePercentage;
+    }
+
+    public void setClusterUsagePercentage(double clusterUsagePercentage) {
+        this.clusterUsagePercentage = clusterUsagePercentage;
+        valueChanged("clusterUsagePercentage");
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index 7b1e2fb..ed0d103 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -19,6 +19,7 @@ package org.apache.eagle.jpm.mr.running;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningAppMetricBolt;
 import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
 import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
 import org.apache.eagle.jpm.util.Constants;
@@ -26,10 +27,14 @@ import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
+import storm.trident.planner.SpoutNode;
 
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.eagle.jpm.mr.running.MRRunningJobConfig.APP_TO_JOB_STREAM;
+import static org.apache.eagle.jpm.mr.running.MRRunningJobConfig.APP_TO_METRIC_STREAM;
+
 public class MRRunningJobApplication extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
@@ -69,7 +74,14 @@ public class MRRunningJobApplication extends StormApplication {
                 mrRunningJobConfig.getZkStateConfig(),
                 confKeyKeys,
                 config),
-                tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
+                tasks).setNumTasks(tasks).fieldsGrouping(spoutName, APP_TO_JOB_STREAM, new Fields("appId"));
+
+        // parse running/accepted app metrics
+        topologyBuilder.setBolt("mrRunningJobMetricBolt", new MRRunningAppMetricBolt(mrRunningJobConfig), 1)
+                .setNumTasks(1).shuffleGrouping(spoutName, APP_TO_METRIC_STREAM);
+        topologyBuilder.setBolt("acceptedAppSink", environment.getStreamSink("ACCEPTED_APP_STREAM", config), 1)
+                .setNumTasks(1).shuffleGrouping("mrRunningJobMetricBolt");
+
         return topologyBuilder.createTopology();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index f733b95..ad40f03 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -34,6 +34,9 @@ public class MRRunningJobConfig implements Serializable {
 
     private static final String JOB_SYMBOL = "/jobs";
 
+    public static final String APP_TO_JOB_STREAM = "appStream";
+    public static final String APP_TO_METRIC_STREAM = "appMetricStream";
+
     public ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -73,6 +76,9 @@ public class MRRunningJobConfig implements Serializable {
         public String site;
         public String[] rmUrls;
         public int fetchRunningJobInterval;
+        public int requestsNum;
+        public String limitPerRequest;
+        public int timeRangePerRequestInMin;
         public int parseJobThreadPoolSize;
     }
 
@@ -127,10 +133,29 @@ public class MRRunningJobConfig implements Serializable {
         this.endpointConfig.site = config.getString("siteId");
         this.endpointConfig.fetchRunningJobInterval = config.getInt("endpointConfig.fetchRunningJobInterval");
         this.endpointConfig.parseJobThreadPoolSize = config.getInt("endpointConfig.parseJobThreadPoolSize");
+        this.endpointConfig.requestsNum = getConfigValue(config, "endpointConfig.requestsNum", 1);
+        this.endpointConfig.limitPerRequest = getConfigValue(config, "endpointConfig.limitPerRequest", "");
+        this.endpointConfig.timeRangePerRequestInMin = getConfigValue(config, "endpointConfig.timeRangePerRequestInMin", 60);
 
         LOG.info("Successfully initialized MRRunningJobConfig");
         LOG.info("site: " + this.endpointConfig.site);
         LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
         LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
     }
+
+    private int getConfigValue(Config config, String key, int defaultValue) {
+        if (config.hasPath(key)) {
+            return config.getInt(key);
+        } else {
+            return defaultValue;
+        }
+    }
+
+    private String getConfigValue(Config config, String key, String defaultValue) {
+        if (config.hasPath(key)) {
+            return config.getString(key);
+        } else {
+            return defaultValue;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 0f2ede6..1330e3f 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -182,7 +182,7 @@ public class MRJobParser implements Runnable {
             LOG.info("fetch mr job from {}", jobURL);
             mrJobs = OBJ_MAPPER.readValue(is, MRJobsWrapper.class).getJobs().getJob();
         } catch (Exception e) {
-            LOG.warn("fetch mr job from {} failed, {}", jobURL, e);
+            LOG.warn("fetch mr job from {} failed, {}", jobURL, e.getMessage());
             return false;
         } finally {
             Utils.closeInputStream(is);
@@ -251,7 +251,7 @@ public class MRJobParser implements Runnable {
             LOG.info("fetch mr job counter from {}", jobCounterURL);
             jobCounters = OBJ_MAPPER.readValue(is, JobCountersWrapper.class).getJobCounters();
         } catch (Exception e) {
-            LOG.warn("fetch mr job counter from {} failed, {}", jobCounterURL, e);
+            LOG.warn("fail to fetch mr job counter from {}, {}", jobCounterURL, e.getMessage());
             return false;
         } finally {
             Utils.closeInputStream(is);

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
new file mode 100644
index 0000000..28a4e7a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
@@ -0,0 +1,230 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
+import org.apache.eagle.jpm.mr.runningentity.AppStreamInfo;
+import org.apache.eagle.jpm.mr.runningentity.YarnAppAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.url.URLUtil;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.eagle.jpm.mr.runningentity.AppStreamInfo.convertAppToStream;
+
+public class MRRunningAppMetricBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(MRRunningAppMetricBolt.class);
+
+    private MRRunningJobConfig config;
+    private IEagleServiceClient client;
+    private RMResourceFetcher fetcher;
+    private OutputCollector collector;
+    private String site;
+
+    private static final long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE;
+
+    private static final String USER_TAG = "user";
+    private static final String QUEUE_TAG = "queue";
+    private static final String SITE_TAG = "site";
+
+    @SuppressWarnings("serial")
+    public static HashMap<String, String> metrics = new HashMap<String, String>() {
+        {
+            put(Constants.MetricName.HADOOP_APPS_ALLOCATED_MB, "getAllocatedMB");
+            put(Constants.MetricName.HADOOP_APPS_ALLOCATED_VCORES, "getAllocatedVCores");
+            put(Constants.MetricName.HADOOP_APPS_RUNNING_CONTAINERS, "getRunningContainers");
+        }
+    };
+
+    public MRRunningAppMetricBolt(MRRunningJobConfig config) {
+        this.config = config;
+        this.site = config.getConfig().getString("siteId");
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.client = new EagleServiceClientImpl(config.getConfig());
+        this.fetcher = new RMResourceFetcher(config.getEndpointConfig().rmUrls);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        List<AppInfo> runningApps = (List<AppInfo>) input.getValue(0);
+        if (runningApps == null || runningApps.isEmpty()) {
+            LOG.warn("App list is empty");
+        }
+        try {
+            Map<String, GenericMetricEntity> appMetrics = parseRunningAppMetrics(runningApps);
+            List<YarnAppAPIEntity> acceptedApps = parseAcceptedApp();
+            flush(appMetrics, acceptedApps);
+        } catch (Exception e) {
+            LOG.error("Fetal error is caught {}", e.getMessage(), e);
+        }
+    }
+
+    private void createMetric(Map<String, GenericMetricEntity> appMetricEntities,
+                              long timestamp, Map<String, String> tags, String metricName, int value) {
+        String key = metricName + tags.toString() + " " + timestamp;
+        GenericMetricEntity entity = appMetricEntities.get(key);
+        if (entity == null) {
+            entity = new GenericMetricEntity();
+            entity.setTags(tags);
+            entity.setTimestamp(timestamp);
+            entity.setPrefix(metricName);
+            entity.setValue(new double[] {0.0});
+            appMetricEntities.put(key, entity);
+        }
+        double lastValue = entity.getValue()[0];
+        entity.setValue(new double[] {lastValue + value});
+    }
+
+    private Map<String, String> generateMetricTags(AggLevel level, AppInfo app) {
+        Map<String, String> tags = new HashMap<>();
+        tags.put(SITE_TAG, site);
+        switch (level) {
+            case CLUSTER : break;
+            case QUEUE :
+                tags.put(QUEUE_TAG, app.getQueue());
+                break;
+            case USER :
+                tags.put(USER_TAG, app.getUser());
+                break;
+            default :
+                LOG.warn("Unsupported Aggregation Level {}", level);
+        }
+        return tags;
+    }
+
+    public Map<String, GenericMetricEntity> parseRunningAppMetrics(List<AppInfo> runningApps) throws Exception {
+        long timestamp = System.currentTimeMillis() / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL;
+        Map<String, GenericMetricEntity> appMetricEntities = new HashMap<>();
+        for (AppInfo app : runningApps) {
+            for (AggLevel level : AggLevel.values()) {
+                Map<String, String> tags = generateMetricTags(level, app);
+                for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) {
+                    Method method = AppInfo.class.getMethod(entry.getValue());
+                    Integer value = (Integer) method.invoke(app);
+                    String metricName = String.format(entry.getKey(), level.name);
+                    createMetric(appMetricEntities, timestamp, tags, metricName, value);
+                }
+            }
+        }
+        return appMetricEntities;
+    }
+
+    public List<YarnAppAPIEntity> parseAcceptedApp() {
+        List<YarnAppAPIEntity> acceptedApps = new ArrayList<>();
+        try {
+            List<AppInfo> apps = fetcher.getResource(Constants.ResourceType.ACCEPTED_JOB);
+            for (AppInfo app : apps) {
+                Map<String, String> tags = new HashMap<>();
+                tags.put(AppStreamInfo.SITE, config.getConfig().getString("siteId"));
+                tags.put(AppStreamInfo.ID, app.getId());
+                tags.put(AppStreamInfo.QUEUE, app.getQueue());
+                tags.put(AppStreamInfo.USER, app.getUser());
+
+                YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity();
+                appAPIEntity.setTags(tags);
+                appAPIEntity.setTrackingUrl(buildAcceptedAppTrackingURL(app.getId()));
+                appAPIEntity.setAppName(app.getName());
+                appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage());
+                appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage());
+                appAPIEntity.setElapsedTime(app.getElapsedTime());
+                appAPIEntity.setStartedTime(app.getStartedTime());
+                appAPIEntity.setState(app.getState());
+                appAPIEntity.setTimestamp(app.getStartedTime());
+                acceptedApps.add(appAPIEntity);
+                collector.emit(new Values("", convertAppToStream(appAPIEntity)));
+            }
+        } catch (Exception e) {
+            LOG.error("fetch accepted apps failed {}", e.getMessage(), e);
+        }
+        return acceptedApps;
+    }
+
+    private String buildAcceptedAppTrackingURL(String appId) {
+        String url = URLUtil.removeTrailingSlash(fetcher.getSelector().getSelectedUrl());
+        return String.format("%s/%s/%s?%s", url, Constants.V2_APPS_URL, appId, Constants.ANONYMOUS_PARAMETER);
+    }
+
+    private void flush(Map<String, GenericMetricEntity> appMetrics, List<YarnAppAPIEntity> acceptedApps) {
+        List<TaggedLogAPIEntity> entities = new ArrayList<>();
+        if (appMetrics != null && !appMetrics.isEmpty()) {
+            LOG.info("crawled {} running app metrics", appMetrics.size());
+            entities.addAll(appMetrics.values());
+        }
+        if (acceptedApps != null && !acceptedApps.isEmpty()) {
+            LOG.info("crawled {} accepted apps", acceptedApps.size());
+            //entities.addAll(acceptedApps);
+        }
+        try {
+            client.create(entities);
+            LOG.info("Successfully create {} metrics", entities.size());
+        } catch (Exception e) {
+            LOG.error("Fail to create {} metrics due to {}", entities.size(), e.getMessage(), e);
+        }
+    }
+
+    private enum AggLevel {
+        CLUSTER("cluster"), QUEUE("queue"), USER("user");
+
+        private String name;
+
+        AggLevel(String name) {
+            this.name = name;
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        if (client != null) {
+            try {
+                client.close();
+            } catch (IOException e) {
+                LOG.error(e.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1", "message"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index cc5df84..c07e971 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -86,8 +86,14 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
                 LOG.info("recover {} mr yarn apps from zookeeper", apps.size());
                 this.init = true;
             } else {
-                apps = resourceFetcher.getResource(Constants.ResourceType.RUNNING_MR_JOB);
-                LOG.info("get {} apps from resource manager", apps.size());
+                LOG.info("going to fetch all mapReduce running applications");
+                apps = resourceFetcher.getResource(
+                        Constants.ResourceType.RUNNING_MR_JOB,
+                        endpointConfig.limitPerRequest,
+                        endpointConfig.requestsNum,
+                        endpointConfig.timeRangePerRequestInMin);
+                LOG.info("get {} running apps from resource manager", apps.size());
+                collector.emit(MRRunningJobConfig.APP_TO_METRIC_STREAM, new Values(apps));
 
                 Set<String> runningAppIdsAtThisTime = runningAppIdsAtThisTime(apps);
                 Set<String> runningAppIdsAtPreviousTime = this.runningYarnApps;
@@ -117,17 +123,18 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
             }
 
             for (int i = 0; i < apps.size(); i++) {
-                LOG.info("emit mr yarn application " + apps.get(i).getId());
+                LOG.debug("emit mr yarn application " + apps.get(i).getId());
                 AppInfo appInfo = apps.get(i);
                 if (mrApps != null && mrApps.containsKey(appInfo.getId())) {
                     //emit (AppInfo, Map<String, JobExecutionAPIEntity>)
-                    collector.emit(new Values(appInfo.getId(), appInfo, mrApps.get(appInfo.getId())));
+                    collector.emit(MRRunningJobConfig.APP_TO_JOB_STREAM,
+                            new Values(appInfo.getId(), appInfo, mrApps.get(appInfo.getId())));
                 } else {
-                    collector.emit(new Values(appInfo.getId(), appInfo, null));
+                    collector.emit(MRRunningJobConfig.APP_TO_JOB_STREAM, new Values(appInfo.getId(), appInfo, null));
                 }
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            LOG.error("An fetal exception is caught: {}", e.getMessage(), e);
         } finally {
             Utils.sleep(endpointConfig.fetchRunningJobInterval);
         }
@@ -159,7 +166,9 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-        outputFieldsDeclarer.declare(new Fields("appId", "appInfo", "mrJobEntity"));
+        outputFieldsDeclarer.declareStream(MRRunningJobConfig.APP_TO_JOB_STREAM,
+                new Fields("appId", "appInfo", "mrJobEntity"));
+        outputFieldsDeclarer.declareStream(MRRunningJobConfig.APP_TO_METRIC_STREAM, new Fields("apps"));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index a8db603..1cb8cd8 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -77,7 +77,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
         AppInfo appInfo = (AppInfo) tuple.getValue(1);
         Map<String, JobExecutionAPIEntity> mrJobs = (Map<String, JobExecutionAPIEntity>) tuple.getValue(2);
 
-        LOG.info("get mr yarn application " + appInfo.getId());
+        LOG.debug("get mr yarn application " + appInfo.getId());
 
         MRJobParser applicationParser;
         if (!runningMRParsers.containsKey(appInfo.getId())) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
index 99a6613..fe3ab83 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
@@ -40,10 +40,29 @@
         </property>
         <property>
             <name>endpointConfig.fetchRunningJobInterval</name>
-            <displayName>Interval of Fetch Running Job From Resource Manager</displayName>
+            <displayName>Interval of Fetching Running Jobs in Seconds</displayName>
             <description>interval of fetch map reduce running jobs from resource manager</description>
+            <value>300</value>
+        </property>
+        <property>
+            <name>endpointConfig.requestsNum</name>
+            <displayName>Request Number of Fetching Running Jobs</displayName>
+            <description>number of requests separated by time ranges</description>
+            <value>1</value>
+        </property>
+        <property>
+            <name>endpointConfig.limitPerRequest</name>
+            <displayName>Application Number Limit of Each Request</displayName>
+            <description>limit of applications in each request</description>
+            <value></value>
+        </property>
+        <property>
+            <name>endpointConfig.timeRangePerRequestInMin</name>
+            <displayName>StartedTime Range for Each Request in Minutes</displayName>
+            <description>time range for each request in minutes if request number > 1</description>
             <value>60</value>
         </property>
+
         <property>
             <name>endpointConfig.parseJobThreadPoolSize</name>
             <displayName>Parse Job ThreadPool Size in Each Parse Task</displayName>
@@ -80,7 +99,113 @@
             <description>User use -Dkey=value to specify name of a job when submit. use this to extract job name from job configuration</description>
             <value>eagle.job.name</value>
         </property>
+
+        <!-- Accepted App Stream -->
+        <property>
+            <name>dataSinkConfig.ACCEPTED_APP_STREAM.topic</name>
+            <displayName>Destination(Kafka Topic) Of App Stream Data</displayName>
+            <value>yarn_accepted_app_{site}</value>
+            <description>topic for kafka data sink</description>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>dataSinkConfig.brokerList</displayName>
+            <value>localhost:6667</value>
+            <description>kafka broker list</description>
+            <required>true</required>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>dataSinkConfig.serializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message value</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message key</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.producerType</name>
+            <displayName>dataSinkConfig.producerType</displayName>
+            <value>async</value>
+            <description>whether the messages are sent asynchronously in a background thread</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.numBatchMessages</name>
+            <displayName>dataSinkConfig.numBatchMessages</displayName>
+            <value>4096</value>
+            <description>number of messages to send in one batch when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.maxQueueBufferMs</name>
+            <displayName>dataSinkConfig.maxQueueBufferMs</displayName>
+            <value>5000</value>
+            <description>maximum time to buffer data when using async mode</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.requestRequiredAcks</name>
+            <displayName>dataSinkConfig.requestRequiredAcks</displayName>
+            <value>0</value>
+            <description>value controls when a produce request is considered completed</description>
+        </property>
     </configuration>
+    <streams>
+        <stream>
+            <streamId>ACCEPTED_APP_STREAM</streamId>
+            <description>Accepted App Info Stream</description>
+            <validate>true</validate>
+            <columns>
+                <column>
+                    <name>id</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>appName</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>queue</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>state</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>user</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>trackingUrl</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>elapsedTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>startedTime</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>queueUsagePercentage</name>
+                    <type>double</type>
+                </column>
+                <column>
+                    <name>clusterUsagePercentage</name>
+                    <type>double</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
     <docs>
         <install>
             # Step 1: Create source kafka topic named "${site}_example_source_topic"

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index 5ebd9c5..a43c956 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -194,9 +194,9 @@ public class MRRunningJobApplicationTest {
 
         init = (boolean) initField.get(mrRunningJobFetchSpout);
         Assert.assertTrue(init);
-        Assert.assertEquals(2, tuples.size());
-        Assert.assertEquals(TUPLE_1, tuples.get(0).toString());
-        Assert.assertEquals(TUPLE_2, tuples.get(1).toString());
+        Assert.assertEquals(3, tuples.size());
+        Assert.assertEquals(TUPLE_1, tuples.get(1).toString());
+        Assert.assertEquals(TUPLE_2, tuples.get(2).toString());
         runningYarnApps = (Set<String>) runningYarnAppsField.get(mrRunningJobFetchSpout);
         Assert.assertEquals(2, runningYarnApps.size());
         Assert.assertEquals(RUNNING_YARNAPPS, runningYarnApps.toString());
@@ -208,9 +208,9 @@ public class MRRunningJobApplicationTest {
         mrRunningJobFetchSpout.nextTuple();
 
         Assert.assertTrue(init);
-        Assert.assertEquals(2, tuples.size());
-        Assert.assertEquals(TUPLE_1, tuples.get(0).toString());
-        Assert.assertEquals(TUPLE_2, tuples.get(1).toString());
+        Assert.assertEquals(3, tuples.size());
+        Assert.assertEquals(TUPLE_1, tuples.get(1).toString());
+        Assert.assertEquals(TUPLE_2, tuples.get(2).toString());
         runningYarnApps = (Set<String>) runningYarnAppsField.get(mrRunningJobFetchSpout);
         Assert.assertEquals(2, runningYarnApps.size());
         Assert.assertEquals(RUNNING_YARNAPPS, runningYarnApps.toString());
@@ -222,9 +222,9 @@ public class MRRunningJobApplicationTest {
         mrRunningJobFetchSpout.nextTuple();
 
         Assert.assertTrue(init);
-        Assert.assertEquals(2, tuples.size());
-        Assert.assertEquals(TUPLE_1, tuples.get(0).toString());
-        Assert.assertEquals("[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='FINISHED', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, {jobId=prefix:null, timestamp:0, humanReadableDate:1970-01-01 00:00:00,000, tags: , encodedRowkey:null}]", tuples.get(1).toString());
+        Assert.assertEquals(3, tuples.size());
+        Assert.assertEquals(TUPLE_1, tuples.get(1).toString());
+        Assert.assertEquals("[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='FINISHED', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, {jobId=prefix:null, timestamp:0, humanReadableDate:1970-01-01 00:00:00,000, tags: , encodedRowkey:null}]", tuples.get(2).toString());
 
         runningYarnApps = (Set<String>) runningYarnAppsField.get(mrRunningJobFetchSpout);
         Assert.assertEquals(1, runningYarnApps.size());

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index b87c41d..d7f9f3f 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -67,6 +67,13 @@ public class Constants {
 
     public static final String YARN_API_CLUSTER_INFO = "ws/v1/cluster/info";
 
+    public static class MetricName {
+        // Metrics from running apps
+        public static final String HADOOP_APPS_ALLOCATED_MB = "hadoop.%s.allocatedmb";
+        public static final String HADOOP_APPS_ALLOCATED_VCORES = "hadoop.%s.allocatedvcores";
+        public static final String HADOOP_APPS_RUNNING_CONTAINERS = "hadoop.%s.runningcontainers";
+    }
+
     public enum CompressionType {
         GZIP, NONE
     }
@@ -93,7 +100,7 @@ public class Constants {
 
     public enum ResourceType {
         COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO, JOB_CONFIGURATION,
-        COMPLETE_MR_JOB
+        COMPLETE_MR_JOB, ACCEPTED_JOB
     }
 
     public static enum SuggestionType {
@@ -119,6 +126,7 @@ public class Constants {
     public static final String MR_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService";
     public static final String MR_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService";
     public static final String MR_JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = "JobOptimizerSuggestionService";
+    public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService";
 
     public static final String JOB_TASK_TYPE_TAG = "taskType";
 
@@ -141,6 +149,7 @@ public class Constants {
 
     public enum JobType {
         CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"),
+        SPARK("SPARK"), MAPREDUCE("MAPREDUCE"),
         NOTAVALIABLE("N/A")
         ;
         private String value;

http://git-wip-us.apache.org/repos/asf/eagle/blob/3fe637eb/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
index 95c531c..0f56b17 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobrecover/RunningJobManager.java
@@ -117,7 +117,7 @@ public class RunningJobManager implements Serializable {
                 appInfo.setElapsedTime(Long.parseLong(appInfoMap.get("elapsedTime")));
                 appInfo.setAmContainerLogs(appInfoMap.get("amContainerLogs") == null ? "" : appInfoMap.get("amContainerLogs"));
                 appInfo.setAmHostHttpAddress(appInfoMap.get("amHostHttpAddress") == null ? "" : appInfoMap.get("amHostHttpAddress"));
-                appInfo.setAllocatedMB(Long.parseLong(appInfoMap.get("allocatedMB")));
+                appInfo.setAllocatedMB(Integer.parseInt(appInfoMap.get("allocatedMB")));
                 appInfo.setAllocatedVCores(Integer.parseInt(appInfoMap.get("allocatedVCores")));
                 appInfo.setRunningContainers(Integer.parseInt(appInfoMap.get("runningContainers")));
 


Mime
View raw message