eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinh...@apache.org
Subject [2/2] incubator-eagle git commit: [EAGLE-790] Add unit test for eagle-jpm-mr-running's MRRunningJobParseBolt
Date Thu, 24 Nov 2016 07:48:16 GMT
[EAGLE-790] Add unit test for eagle-jpm-mr-running's MRRunningJobParseBolt

 - Add unit test for eagle-jpm-mr-running's MRRunningJobParseBolt

 https://issues.apache.org/jira/browse/EAGLE-790

Author: r7raul1984 <tangjijun@yhd.com>

Closes #676 from r7raul1984/EAGLE-790.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/65de7b0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/65de7b0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/65de7b0a

Branch: refs/heads/master
Commit: 65de7b0acaeff274b07b2e9318fcb6c41d77deed
Parents: 3becca6
Author: r7raul1984 <tangjijun@yhd.com>
Authored: Thu Nov 24 15:48:06 2016 +0800
Committer: wujinhu <wujinhu920@126.com>
Committed: Thu Nov 24 15:48:06 2016 +0800

----------------------------------------------------------------------
 .../jpm/mr/running/parser/MRJobParser.java      |  49 +-
 .../mr/running/MRRunningJobApplicationTest.java |  98 +++-
 .../parser/MRJobEntityCreationHandlerTest.java  | 235 +++++++++
 .../jpm/mr/running/parser/MRJobParserTest.java  | 490 +++++++++++++++++++
 .../src/test/resources/jobcounts_30784.json     | 390 +++++++++++++++
 .../src/test/resources/mrconf_30784.xml         |   1 +
 .../src/test/resources/mrjob_30784.json         |  37 ++
 .../src/test/resources/mrtasks_30784.json       |  61 +++
 8 files changed, 1332 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/65de7b0a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index edf4bbb..d866c1c 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -43,6 +43,7 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
+
 import java.io.InputStream;
 import java.net.URLConnection;
 import java.util.*;
@@ -93,10 +94,10 @@ public class MRJobParser implements Runnable {
                        List<String> configKeys,
                        Config config) {
         this.app = app;
-        this.mrJobEntityMap = new HashMap<>();
-        this.mrJobEntityMap = mrJobMap;
-        if (this.mrJobEntityMap == null) {
+        if (mrJobMap == null) {
             this.mrJobEntityMap = new HashMap<>();
+        } else {
+            this.mrJobEntityMap = mrJobMap;
         }
         this.mrJobConfigs = new HashMap<>();
 
@@ -106,7 +107,7 @@ public class MRJobParser implements Runnable {
         this.commonTags.put(MRJobTagName.USER.toString(), app.getUser());
         this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue());
         this.runningJobManager = runningJobManager;
-        this.parserStatus  = ParserStatus.FINISHED;
+        this.parserStatus = ParserStatus.FINISHED;
         this.rmResourceFetcher = rmResourceFetcher;
         this.finishedTaskIds = new HashSet<>();
         this.configKeys = configKeys;
@@ -152,7 +153,7 @@ public class MRJobParser implements Runnable {
         List<Function<String, Boolean>> functions = new ArrayList<>();
         functions.add(fetchJobConfig);
         functions.add(fetchJobCounters);
-        if ((int)(Math.random() * 10) % FLUSH_TASKS_EVERY_TIME == 0) {
+        if ((int) (Math.random() * 10) % FLUSH_TASKS_EVERY_TIME == 0) {
             functions.add(fetchTasks);
         }
 
@@ -281,11 +282,11 @@ public class MRJobParser implements Runnable {
                 counterValues.put(key, item.getTotalCounterValue());
                 if (counterGroupName.equals(Constants.JOB_COUNTER)) {
                     if (key.equals(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
-                        jobExecutionAPIEntity.setDataLocalMaps((int)item.getTotalCounterValue());
+                        jobExecutionAPIEntity.setDataLocalMaps((int) item.getTotalCounterValue());
                     } else if (key.equals(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
-                        jobExecutionAPIEntity.setRackLocalMaps((int)item.getTotalCounterValue());
+                        jobExecutionAPIEntity.setRackLocalMaps((int) item.getTotalCounterValue());
                     } else if (key.equals(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
-                        jobExecutionAPIEntity.setTotalLaunchedMaps((int)item.getTotalCounterValue());
+                        jobExecutionAPIEntity.setTotalLaunchedMaps((int) item.getTotalCounterValue());
                     }
                 }
             }
@@ -305,10 +306,10 @@ public class MRJobParser implements Runnable {
         String jobId = jobAndTaskId.getLeft();
         String taskId = jobAndTaskId.getRight();
         String taskCounterURL = app.getTrackingUrl()
-            + Constants.MR_JOBS_URL + "/"
-            + jobId + "/" + Constants.MR_TASKS_URL + "/"
-            + taskId + "/" + Constants.MR_JOB_COUNTERS_URL
-            + "?" + Constants.ANONYMOUS_PARAMETER;
+                + Constants.MR_JOBS_URL + "/"
+                + jobId + "/" + Constants.MR_TASKS_URL + "/"
+                + taskId + "/" + Constants.MR_JOB_COUNTERS_URL
+                + "?" + Constants.ANONYMOUS_PARAMETER;
         InputStream is = null;
         TaskCounters taskCounters = null;
         try {
@@ -351,9 +352,9 @@ public class MRJobParser implements Runnable {
         String jobId = jobAndTaskId.getLeft();
         String taskId = jobAndTaskId.getRight();
         String taskAttemptURL = app.getTrackingUrl()
-            + Constants.MR_JOBS_URL + "/"
-            + jobId + "/" + Constants.MR_TASKS_URL + "/"
-            + taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+                + Constants.MR_JOBS_URL + "/"
+                + jobId + "/" + Constants.MR_TASKS_URL + "/"
+                + taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
         InputStream is = null;
         List<MRTaskAttempt> taskAttempts = null;
         try {
@@ -413,20 +414,20 @@ public class MRJobParser implements Runnable {
         Comparator<MRTask> byElapsedTimeDecrease = (e1, e2) -> -1 * Long.compare(e1.getElapsedTime(), e2.getElapsedTime());
         //2, get finished bottom n
         Iterator<MRTask> taskIteratorIncrease = tasks.stream()
-            .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
-            .sorted(byElapsedTimeIncrease).iterator();
+                .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+                .sorted(byElapsedTimeIncrease).iterator();
         needFetchAttemptTasks(taskIteratorIncrease, needFetchAttemptTasks);
 
         //3, fetch finished top n
         Iterator<MRTask> taskIteratorDecrease = tasks.stream()
-            .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
-            .sorted(byElapsedTimeDecrease).iterator();
+                .filter(task -> task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
+                .sorted(byElapsedTimeDecrease).iterator();
         needFetchAttemptTasks(taskIteratorDecrease, needFetchAttemptTasks);
 
         //4, fetch running top n
         taskIteratorDecrease = tasks.stream()
-            .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
-            .sorted(byElapsedTimeDecrease).iterator();
+                .filter(task -> task.getState().equals(Constants.TaskState.RUNNING.toString()))
+                .sorted(byElapsedTimeDecrease).iterator();
         needFetchAttemptTasks(taskIteratorDecrease, needFetchAttemptTasks);
 
         return needFetchAttemptTasks;
@@ -492,9 +493,9 @@ public class MRJobParser implements Runnable {
             mrJobEntityCreationHandler.add(taskExecutionAPIEntity);
 
             if (task.getState().equals(Constants.TaskState.SUCCEEDED.toString())
-                || task.getState().equals(Constants.TaskState.FAILED.toString())
-                || task.getState().equals(Constants.TaskState.KILLED.toString())
-                || task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
+                    || task.getState().equals(Constants.TaskState.FAILED.toString())
+                    || task.getState().equals(Constants.TaskState.KILLED.toString())
+                    || task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
                 //LOG.info("mr job {} task {} has finished", jobId, task.getId());
                 this.finishedTaskIds.add(task.getId());
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/65de7b0a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index 8707182..5d78a50 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -16,11 +16,17 @@
  */
 package org.apache.eagle.jpm.mr.running;
 
+import backtype.storm.Testing;
 import backtype.storm.spout.ISpoutOutputCollector;
 import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
@@ -29,6 +35,7 @@ import org.apache.eagle.jpm.util.resourcefetch.model.AppsWrapper;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
@@ -39,13 +46,15 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({InputStreamUtils.class, MRRunningJobFetchSpout.class})
+@PrepareForTest({InputStreamUtils.class, MRRunningJobFetchSpout.class, Executors.class, MRRunningJobParseBolt.class})
 @PowerMockIgnore({"javax.*"})
 public class MRRunningJobApplicationTest {
 
@@ -53,13 +62,90 @@ public class MRRunningJobApplicationTest {
     public static final String RUNNING_YARNAPPS = "[application_1479206441898_35341, application_1479206441898_30784]";
     public static final String TUPLE_1 = "[application_1479206441898_30784, AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}, null]";
     public static final String TUPLE_2 = "[application_1479206441898_35341, AppInfo{id='application_1479206441898_35341', user='yyy', name='insert overwrite table inter...a.xxx(Stage-3)', queue='yyy', state='RUNNING', finalStatus='UNDEFINED', progress=59.545456, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_35341/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479341511477, finishedTime=0, elapsedTime=77619, amContainerLogs='http://host.domain.com:8042/node/containerlogs/container_e11_1479206441898_35341_01_000005/yyy', amHostHttpAddress='host.domain.com:8042', allocatedMB=27648, allocatedVCores=6, runningContainers=6}, null]";
-
     private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
 
-    static {
+    @BeforeClass
+    public static void setupMapper() throws Exception {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
     }
 
+
+    @Test
+    public void testMRRunningJobParseBolt() throws Exception {
+        mockStatic(Executors.class);
+        ExecutorService executorService = mock(ExecutorService.class);
+        when(Executors.newFixedThreadPool(anyInt())).thenReturn(executorService);
+
+
+        MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
+        PowerMockito.whenNew(MRRunningJobManager.class).withArguments(any()).thenReturn(mrRunningJobManager);
+        Config config = ConfigFactory.load();
+        MRRunningJobConfig mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
+        List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
+        MRRunningJobParseBolt mrRunningJobParseBolt = new MRRunningJobParseBolt(
+                mrRunningJobConfig.getEagleServiceConfig(),
+                mrRunningJobConfig.getEndpointConfig(),
+                mrRunningJobConfig.getZkStateConfig(),
+                confKeyKeys,
+                config);
+        mrRunningJobParseBolt.prepare(null, null, null);
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app1 = appInfos.get(0);
+        Tuple tuple = Testing.testTuple(new Values(app1.getId(), app1, null));
+        mrRunningJobParseBolt.execute(tuple);
+
+        Field runningMRParsers = MRRunningJobParseBolt.class.getDeclaredField("runningMRParsers");
+        runningMRParsers.setAccessible(true);
+        Map<String, MRJobParser> appIdToMRJobParser = (Map<String, MRJobParser>) runningMRParsers.get(mrRunningJobParseBolt);
+        Assert.assertEquals(1, appIdToMRJobParser.size());
+        Assert.assertTrue(appIdToMRJobParser.get("application_1479206441898_30784") != null);
+        Assert.assertTrue(appIdToMRJobParser.get("application_1479206441898_30784").status().equals(MRJobParser.ParserStatus.RUNNING));
+        verify(executorService, times(1)).execute(appIdToMRJobParser.get("application_1479206441898_30784"));
+        verify(executorService, times(1)).execute(any(MRJobParser.class));
+
+        MRJobParser mrJobParser = appIdToMRJobParser.get("application_1479206441898_30784");
+        mrJobParser.setStatus(MRJobParser.ParserStatus.APP_FINISHED);
+        AppInfo app2 = appInfos.get(1);
+        tuple = Testing.testTuple(new Values(app2.getId(), app2, null));
+        mrRunningJobParseBolt.execute(tuple);
+
+        Map<String, MRJobParser> appIdToMRJobParser1 = (Map<String, MRJobParser>) runningMRParsers.get(mrRunningJobParseBolt);
+        Assert.assertEquals(1, appIdToMRJobParser1.size());
+        Assert.assertTrue(appIdToMRJobParser1.get("application_1479206441898_30784") == null);
+        Assert.assertTrue(appIdToMRJobParser1.get("application_1479206441898_35341") != null);
+        Assert.assertTrue(appIdToMRJobParser1.get("application_1479206441898_35341").status().equals(MRJobParser.ParserStatus.RUNNING));
+        verify(executorService, times(1)).execute(appIdToMRJobParser.get("application_1479206441898_35341"));
+        verify(executorService, times(2)).execute(any(MRJobParser.class));
+
+        app2 = appInfos.get(1);
+        tuple = Testing.testTuple(new Values(app2.getId(), app2, null));
+        mrRunningJobParseBolt.execute(tuple);
+
+        Map<String, MRJobParser> appIdToMRJobParser2 = (Map<String, MRJobParser>) runningMRParsers.get(mrRunningJobParseBolt);
+        Assert.assertEquals(1, appIdToMRJobParser2.size());
+        Assert.assertTrue(appIdToMRJobParser2.get("application_1479206441898_30784") == null);
+        Assert.assertTrue(appIdToMRJobParser2.get("application_1479206441898_35341") != null);
+        Assert.assertTrue(appIdToMRJobParser2.get("application_1479206441898_35341").status().equals(MRJobParser.ParserStatus.RUNNING));
+        verify(executorService, times(2)).execute(any(MRJobParser.class));
+
+    }
+
+    private List<String> makeConfKeyKeys(MRRunningJobConfig mrRunningJobConfig) {
+        String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(",");
+        List<String> confKeyKeys = new ArrayList<>(confKeyPatternsSplit.length);
+        for (String confKeyPattern : confKeyPatternsSplit) {
+            confKeyKeys.add(confKeyPattern.trim());
+        }
+        confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
+        confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey"));
+        return confKeyKeys;
+    }
+
     @Test
     public void testMRRunningJobFetchSpout() throws Exception {
 
@@ -162,9 +248,11 @@ public class MRRunningJobApplicationTest {
 
         return new MRRunningJobFetchSpout(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getZkStateConfig());
     }
+
     private void mockInputSteam(String mockDataFilePath) throws Exception {
         InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
         mockStatic(InputStreamUtils.class);
         when(InputStreamUtils.getInputStream(RM_URL, null, Constants.CompressionType.GZIP)).thenReturn(jsonstream);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/65de7b0a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
new file mode 100644
index 0000000..e840355
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandlerTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running.parser;
+
+import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
+import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
+import org.apache.eagle.jpm.util.resourcefetch.model.*;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({System.class, JobExecutionMetricsCreationListener.class, MRJobEntityCreationHandler.class})
+public class MRJobEntityCreationHandlerTest {
+
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    @BeforeClass
+    public static void startZookeeper() throws Exception {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    @Test
+    public void testMRJobEntityCreationHandlerAdd() throws IOException, NoSuchFieldException, IllegalAccessException {
+        mockStatic(System.class);
+        when(System.currentTimeMillis()).thenReturn(1479863033310l);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = new MRJobEntityCreationHandler(null);
+        mrJobEntityCreationHandler.add(makeJobExecutionAPIEntity());
+
+        Field entities = MRJobEntityCreationHandler.class.getDeclaredField("entities");
+        entities.setAccessible(true);
+        List<TaggedLogAPIEntity> entityList = (ArrayList<TaggedLogAPIEntity>) entities.get(mrJobEntityCreationHandler);
+        Assert.assertEquals(4, entityList.size());
+        Assert.assertEquals("[prefix:null, timestamp:1479328221694, humanReadableDate:2016-11-16 20:30:21,694, tags: , encodedRowkey:null, prefix:hadoop.job.allocatedmb, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.allocatedvcores, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.runningcontainers, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null]", entityList.toString());
+    }
+
+    @Test
+    public void testMRJobEntityCreationHandlerWithCounts() throws IOException, NoSuchFieldException, IllegalAccessException {
+        mockStatic(System.class);
+        when(System.currentTimeMillis()).thenReturn(1479863033310l);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = new MRJobEntityCreationHandler(null);
+        mrJobEntityCreationHandler.add(makeJobExecutionAPIEntityWithCounts());
+
+        Field entities = MRJobEntityCreationHandler.class.getDeclaredField("entities");
+        entities.setAccessible(true);
+        List<TaggedLogAPIEntity> entityList = (ArrayList<TaggedLogAPIEntity>) entities.get(mrJobEntityCreationHandler);
+        Assert.assertEquals(62, entityList.size());
+        Assert.assertEquals("[prefix:null, timestamp:1479328221694, humanReadableDate:2016-11-16 20:30:21,694, tags: , encodedRowkey:null, prefix:hadoop.job.allocatedmb, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.allocatedvcores, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.runningcontainers, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.viewfs_large_read_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.file_bytes_written, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.file_large_read_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.file_write_ops, timestamp:1479863033310, hu
 manReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.viewfs_bytes_read, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.viewfs_read_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.hdfs_read_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.viewfs_write_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.hdfs_bytes_read, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.hdfs_large_read_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.file_read_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:h
 adoop.job.file_bytes_read, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.hdfs_write_ops, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.viewfs_bytes_written, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.hdfs_bytes_written, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.deserialize_errors, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.records_out_intermediate, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.records_in, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.bytes_written, timestamp:1479863033310, humanReadableDat
 e:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.bytes_read, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.total_launched_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.vcores_millis_reduces, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.mb_millis_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.total_launched_reduces, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.slots_millis_reduces, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.vcores_millis_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix
 :hadoop.job.mb_millis_reduces, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.slots_millis_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.rack_local_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.millis_reduces, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.other_local_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.millis_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.map_output_materialized_bytes, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.reduce_input_records, timestamp:1479863033310, humanRe
 adableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.spilled_records, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.merged_map_outputs, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.virtual_memory_bytes, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.map_input_records, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.split_raw_bytes, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.failed_shuffle, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.map_output_bytes, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix
 :hadoop.job.reduce_shuffle_bytes, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.physical_memory_bytes, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.gc_time_millis, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.reduce_input_groups, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.combine_output_records, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.shuffled_maps, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.reduce_output_records, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.map_output_records, timestamp:1479863033310
 , humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.combine_input_records, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.cpu_milliseconds, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.committed_heap_bytes, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.connection, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.wrong_length, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.bad_id, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.wrong_map, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.
 wrong_reduce, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null, prefix:hadoop.job.io_error, timestamp:1479863033310, humanReadableDate:2016-11-23 01:03:53,310, tags: , encodedRowkey:null]", entityList.toString());
+    }
+
+    private JobExecutionAPIEntity makeJobExecutionAPIEntity() throws IOException {
+        InputStream jsonstream = this.getClass().getResourceAsStream("/mrjob_30784.json");
+        List<MRJob> mrJobs = OBJ_MAPPER.readValue(jsonstream, MRJobsWrapper.class).getJobs().getJob();
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app = appInfos.get(0);
+        JobExecutionAPIEntity jobExecutionAPIEntity = new JobExecutionAPIEntity();
+        MRJob mrJob = mrJobs.get(0);
+        jobExecutionAPIEntity.setTimestamp(app.getStartedTime());
+        jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime());
+        jobExecutionAPIEntity.setTrackingUrl(app.getTrackingUrl());
+        jobExecutionAPIEntity.setStartTime(mrJob.getStartTime());
+        jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime());
+        jobExecutionAPIEntity.setCurrentState(mrJob.getState());
+        jobExecutionAPIEntity.setInternalState(mrJob.getState());
+        jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal());
+        jobExecutionAPIEntity.setNumFinishedMaps(mrJob.getMapsCompleted());
+        jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal());
+        jobExecutionAPIEntity.setNumFinishedReduces(mrJob.getReducesCompleted());
+        jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress());
+        jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress());
+        jobExecutionAPIEntity.setMapsPending(mrJob.getMapsPending());
+        jobExecutionAPIEntity.setMapsRunning(mrJob.getMapsRunning());
+        jobExecutionAPIEntity.setReducesPending(mrJob.getReducesPending());
+        jobExecutionAPIEntity.setReducesRunning(mrJob.getReducesRunning());
+        jobExecutionAPIEntity.setNewReduceAttempts(mrJob.getNewReduceAttempts());
+        jobExecutionAPIEntity.setRunningReduceAttempts(mrJob.getRunningReduceAttempts());
+        jobExecutionAPIEntity.setFailedReduceAttempts(mrJob.getFailedReduceAttempts());
+        jobExecutionAPIEntity.setKilledReduceAttempts(mrJob.getKilledReduceAttempts());
+        jobExecutionAPIEntity.setSuccessfulReduceAttempts(mrJob.getSuccessfulReduceAttempts());
+        jobExecutionAPIEntity.setNewMapAttempts(mrJob.getNewMapAttempts());
+        jobExecutionAPIEntity.setRunningMapAttempts(mrJob.getRunningMapAttempts());
+        jobExecutionAPIEntity.setFailedMapAttempts(mrJob.getFailedMapAttempts());
+        jobExecutionAPIEntity.setKilledMapAttempts(mrJob.getKilledMapAttempts());
+        jobExecutionAPIEntity.setSuccessfulMapAttempts(mrJob.getSuccessfulMapAttempts());
+        jobExecutionAPIEntity.setAppInfo(app);
+        jobExecutionAPIEntity.setAllocatedMB(app.getAllocatedMB());
+        jobExecutionAPIEntity.setAllocatedVCores(app.getAllocatedVCores());
+        jobExecutionAPIEntity.setRunningContainers(app.getRunningContainers());
+        return jobExecutionAPIEntity;
+    }
+
+    private JobCounters getJobCounters() {
+        InputStream countstream = null;
+        JobCounters jobCounters = null;
+        try {
+            countstream = this.getClass().getResourceAsStream("/jobcounts_30784.json");
+            jobCounters = OBJ_MAPPER.readValue(countstream, JobCountersWrapper.class).getJobCounters();
+        } catch (Exception e) {
+            return null;
+        } finally {
+            if (countstream != null) {
+                Utils.closeInputStream(countstream);
+            }
+        }
+        return jobCounters;
+    }
+
+    private JobExecutionAPIEntity makeJobExecutionAPIEntityWithCounts() throws IOException {
+
+
+        InputStream jsonstream = this.getClass().getResourceAsStream("/mrjob_30784.json");
+        List<MRJob> mrJobs = OBJ_MAPPER.readValue(jsonstream, MRJobsWrapper.class).getJobs().getJob();
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app = appInfos.get(0);
+        JobExecutionAPIEntity jobExecutionAPIEntity = new JobExecutionAPIEntity();
+        MRJob mrJob = mrJobs.get(0);
+        jobExecutionAPIEntity.setTimestamp(app.getStartedTime());
+        jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime());
+        jobExecutionAPIEntity.setTrackingUrl(app.getTrackingUrl());
+        jobExecutionAPIEntity.setStartTime(mrJob.getStartTime());
+        jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime());
+        jobExecutionAPIEntity.setCurrentState(mrJob.getState());
+        jobExecutionAPIEntity.setInternalState(mrJob.getState());
+        jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal());
+        jobExecutionAPIEntity.setNumFinishedMaps(mrJob.getMapsCompleted());
+        jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal());
+        jobExecutionAPIEntity.setNumFinishedReduces(mrJob.getReducesCompleted());
+        jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress());
+        jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress());
+        jobExecutionAPIEntity.setMapsPending(mrJob.getMapsPending());
+        jobExecutionAPIEntity.setMapsRunning(mrJob.getMapsRunning());
+        jobExecutionAPIEntity.setReducesPending(mrJob.getReducesPending());
+        jobExecutionAPIEntity.setReducesRunning(mrJob.getReducesRunning());
+        jobExecutionAPIEntity.setNewReduceAttempts(mrJob.getNewReduceAttempts());
+        jobExecutionAPIEntity.setRunningReduceAttempts(mrJob.getRunningReduceAttempts());
+        jobExecutionAPIEntity.setFailedReduceAttempts(mrJob.getFailedReduceAttempts());
+        jobExecutionAPIEntity.setKilledReduceAttempts(mrJob.getKilledReduceAttempts());
+        jobExecutionAPIEntity.setSuccessfulReduceAttempts(mrJob.getSuccessfulReduceAttempts());
+        jobExecutionAPIEntity.setNewMapAttempts(mrJob.getNewMapAttempts());
+        jobExecutionAPIEntity.setRunningMapAttempts(mrJob.getRunningMapAttempts());
+        jobExecutionAPIEntity.setFailedMapAttempts(mrJob.getFailedMapAttempts());
+        jobExecutionAPIEntity.setKilledMapAttempts(mrJob.getKilledMapAttempts());
+        jobExecutionAPIEntity.setSuccessfulMapAttempts(mrJob.getSuccessfulMapAttempts());
+        jobExecutionAPIEntity.setAppInfo(app);
+        jobExecutionAPIEntity.setAllocatedMB(app.getAllocatedMB());
+        jobExecutionAPIEntity.setAllocatedVCores(app.getAllocatedVCores());
+        jobExecutionAPIEntity.setRunningContainers(app.getRunningContainers());
+
+        applyJobCounts(jobExecutionAPIEntity);
+
+        return jobExecutionAPIEntity;
+    }
+
+    private void applyJobCounts(JobExecutionAPIEntity jobExecutionAPIEntity) {
+
+        JobCounters jobCounters = getJobCounters();
+        org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
+        Map<String, Map<String, Long>> groups = new HashMap<>();
+
+        for (JobCounterGroup jobCounterGroup : jobCounters.getCounterGroup()) {
+            String counterGroupName = jobCounterGroup.getCounterGroupName();
+            if (!groups.containsKey(counterGroupName)) {
+                groups.put(counterGroupName, new HashMap<>());
+            }
+
+            Map<String, Long> counterValues = groups.get(counterGroupName);
+            List<JobCounterItem> items = jobCounterGroup.getCounter();
+            if (items == null) {
+                continue;
+            }
+            for (JobCounterItem item : items) {
+                String key = item.getName();
+                counterValues.put(key, item.getTotalCounterValue());
+                if (counterGroupName.equals(Constants.JOB_COUNTER)) {
+                    if (key.equals(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
+                        jobExecutionAPIEntity.setDataLocalMaps((int) item.getTotalCounterValue());
+                    } else if (key.equals(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
+                        jobExecutionAPIEntity.setRackLocalMaps((int) item.getTotalCounterValue());
+                    } else if (key.equals(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
+                        jobExecutionAPIEntity.setTotalLaunchedMaps((int) item.getTotalCounterValue());
+                    }
+                }
+            }
+        }
+
+        jobCounter.setCounters(groups);
+        jobExecutionAPIEntity.setJobCounters(jobCounter);
+        if (jobExecutionAPIEntity.getTotalLaunchedMaps() > 0) {
+            jobExecutionAPIEntity.setDataLocalMapsPercentage(jobExecutionAPIEntity.getDataLocalMaps() * 1.0 / jobExecutionAPIEntity.getTotalLaunchedMaps());
+            jobExecutionAPIEntity.setRackLocalMapsPercentage(jobExecutionAPIEntity.getRackLocalMaps() * 1.0 / jobExecutionAPIEntity.getTotalLaunchedMaps());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/65de7b0a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
new file mode 100644
index 0000000..4b00bb2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.running.parser;
+
+import com.sun.jersey.api.client.Client;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
+import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
+import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
+import org.apache.eagle.jpm.mr.runningentity.JobConfig;
+import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppsWrapper;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({InputStreamUtils.class, MRJobParser.class, URLConnectionUtils.class, Math.class, MRJobEntityCreationHandler.class})
+@PowerMockIgnore({"javax.*", "org.w3c.*", "com.sun.org.apache.xerces.*"})
+public class MRJobParserTest {
+    private static final String ZK_JOB_PATH = "/apps/mr/running/sandbox/application_1479206441898_30784/job_1479206441898_30784";
+    private static final String ZK_APP_PATH = "/apps/mr/running/sandbox/application_1479206441898_30784";
+    private static final String JOB_CONF_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/conf?anonymous=true";
+    private static final String JOB_COUNT_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/counters?anonymous=true";
+    private static final String JOB_ID = "job_1479206441898_30784";
+    private static final String JOB_URL = "http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs?anonymous=true";
+    private static final String DATA_FROM_ZK = "{\"entityTags\":\"{\\\"jobName\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"jobId\\\":\\\"job_1479206441898_30784\\\",\\\"site\\\":\\\"sandbox\\\",\\\"jobDefId\\\":\\\"eagletest\\\",\\\"jobType\\\":\\\"HIVE\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\"}\",\"appInfo\":\"{\\\"applicationType\\\":\\\"MAPREDUCE\\\",\\\"startedTime\\\":\\\"1479328221694\\\",\\\"finalStatus\\\":\\\"UNDEFINED\\\",\\\"trackingUrl\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/proxy\\\\\\/application_1479206441898_30784\\\\\\/\\\",\\\"runningContainers\\\":\\\"2\\\",\\\"trackingUI\\\":\\\"ApplicationMaster\\\",\\\"clusterId\\\":\\\"1479206441898\\\",\\\"amContainerLogs\\\":\\\"http:\\\\\\/\\\\\\/host.domain.com:8088\\\\\\/node\\\\\\/containerlogs\\\\\\/container_e11_1479206441898_30784_01_000001\\\\\\/xxx\\\",\\\"allocatedVCores\\\":\\\"2\\\",\\\"diagnostics\\\":\\\"\\\",\\\
 "name\\\":\\\"oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W\\\",\\\"progress\\\":\\\"95.0\\\",\\\"finishedTime\\\":\\\"0\\\",\\\"allocatedMB\\\":\\\"3072\\\",\\\"id\\\":\\\"application_1479206441898_30784\\\",\\\"state\\\":\\\"RUNNING\\\",\\\"amHostHttpAddress\\\":\\\"host.domain.com:8088\\\",\\\"user\\\":\\\"xxx\\\",\\\"queue\\\":\\\"xxx\\\",\\\"elapsedTime\\\":\\\"13367402\\\"}\"}";
+    private static TestingServer zk;
+    private static String ZKROOT;
+    private static MRRunningJobConfig mrRunningJobConfig;
+    private static Config config = ConfigFactory.load();
+    private static CuratorFramework curator;
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+    private EagleServiceClientImpl client;
+
+    @BeforeClass
+    public static void startZookeeper() throws Exception {
+        zk = new TestingServer();
+        curator = CuratorFrameworkFactory.newClient(zk.getConnectString(), new RetryOneTime(1));
+        mrRunningJobConfig = MRRunningJobConfig.newInstance(config);
+        mrRunningJobConfig.getZkStateConfig().zkQuorum = zk.getConnectString();
+        ZKROOT = mrRunningJobConfig.getZkStateConfig().zkRoot;
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+        curator.start();
+    }
+
+    @AfterClass
+    public static void teardownZookeeper() throws IOException {
+        curator.close();
+        zk.stop();
+    }
+
+    @Before
+    public void cleanZkPath() throws Exception {
+        if (curator.checkExists().forPath(ZK_JOB_PATH) != null) {
+            curator.delete().deletingChildrenIfNeeded().forPath(ZK_JOB_PATH);
+        }
+        if (curator.checkExists().forPath(ZK_APP_PATH) != null) {
+            curator.delete().deletingChildrenIfNeeded().forPath(ZK_APP_PATH);
+        }
+        if (curator.checkExists().forPath(ZKROOT) != null) {
+            curator.delete().deletingChildrenIfNeeded().forPath(ZKROOT);
+        }
+    }
+
+    @Test
+    public void testMRJobParser() throws Exception {
+        //TODO fetch task attempt when(Math.random()).thenReturn(0.0); http://host.domain.com:8088/proxy/application_1479206441898_30784/ws/v1/mapreduce/jobs/job_1479206441898_30784/tasks?anonymous=true
+        setupMock();
+
+        mockInputJobSteam("/mrjob_30784.json", JOB_URL);
+        mockInputJobSteam("/jobcounts_30784.json", JOB_COUNT_URL);
+        mockGetConnection("/mrconf_30784.xml");
+
+
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app1 = appInfos.get(0);
+        Map<String, JobExecutionAPIEntity> mrJobs = null;
+
+        MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
+        RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
+        MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+
+
+        Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
+        Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
+        List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(entities.isEmpty());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+
+        mrJobParser.run();
+
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
+
+        JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
+        Assert.assertEquals("AppInfo{id='application_1479206441898_30784', user='xxx', name='oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W', queue='xxx', state='RUNNING', finalStatus='UNDEFINED', progress=95.0, trackingUI='ApplicationMaster', trackingUrl='http://host.domain.com:8088/proxy/application_1479206441898_30784/', diagnostics='', clusterId='1479206441898', applicationType='MAPREDUCE', startedTime=1479328221694, finishedTime=0, elapsedTime=13367402, amContainerLogs='http://host.domain.com:8088/node/containerlogs/container_e11_1479206441898_30784_01_000001/xxx', amHostHttpAddress='host.domain.com:8088', allocatedMB=3072, allocatedVCores=2, runningContainers=2}", jobExecutionAPIEntity.getAppInfo().toString());
+        Assert.assertEquals("RUNNING", jobExecutionAPIEntity.getCurrentState());
+        Assert.assertEquals("RUNNING", jobExecutionAPIEntity.getInternalState());
+        Assert.assertEquals("prefix:null, timestamp:1479328221694, humanReadableDate:2016-11-16 20:30:21,694, tags: jobName=oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W,jobId=job_1479206441898_30784,site=sandbox,jobDefId=eagletest,jobType=HIVE,user=xxx,queue=xxx,, encodedRowkey:null", jobExecutionAPIEntity.toString());
+        //Assert.assertEquals("prefix:null, timestamp:1479328221694, humanReadableDate:2016-11-16 20:30:21,694, tags: jobName=oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W,jobId=job_1479206441898_30784,site=sandbox,jobDefId=oozie:launcher-shell-wf_co_xxx_xxx_v3-extract_org_data~,jobType=HIVE,user=xxx,queue=xxx,, encodedRowkey:null", jobExecutionAPIEntity.toString());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+        Assert.assertEquals("{eagle.job.name=eagletest, hive.optimize.skewjoin.compiletime=false, hive.query.string=insert overwrite table xxxx}", jobExecutionAPIEntity.getJobConfig().toString());
+        Assert.assertTrue(jobIdToJobConfig.size() == 1);
+        Assert.assertEquals("{eagle.job.name=eagletest, hive.optimize.skewjoin.compiletime=false, hive.query.string=insert overwrite table xxxx}", jobIdToJobConfig.get(JOB_ID).toString());
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) != null);
+        Assert.assertEquals(DATA_FROM_ZK, new String(curator.getData().forPath(ZK_JOB_PATH), "UTF-8"));
+        Assert.assertTrue(entities.isEmpty());
+        verify(client, times(1)).create(any());
+
+    }
+
+    @Test
+    public void testMRJobParserFetchMrJobFail() throws Exception {
+        setupMock();
+        mockInputJobSteamWithException(JOB_URL);
+        mockGetConnection("/mrconf_30784.xml");
+
+
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app1 = appInfos.get(0);
+        Map<String, JobExecutionAPIEntity> mrJobs = null;
+
+        MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
+        RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
+        MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+
+
+        Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
+        Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
+        List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(entities.isEmpty());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+
+        mrJobParser.run();
+
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 0);
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
+        Assert.assertTrue(entities.isEmpty());
+        verify(client, never()).create(any());
+    }
+
+    @Test
+    public void testMRJobParserFetchJobConfFail() throws Exception {
+        setupMock();
+        mockInputJobSteam("/mrjob_30784.json", JOB_URL);
+        mockGetConnectionWithException("/mrconf_30784.xml");
+
+
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app1 = appInfos.get(0);
+        Map<String, JobExecutionAPIEntity> mrJobs = null;
+
+        MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
+        RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
+        MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+
+
+        Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
+        Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
+        List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(entities.isEmpty());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+
+        mrJobParser.run();
+
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
+        Assert.assertTrue(entities.isEmpty());
+        verify(client, times(1)).create(any());
+    }
+
+
+    @Test
+    public void testMRJobParserFetchJobCountFail() throws Exception {
+        setupMock();
+        mockInputJobSteam("/mrjob_30784.json", JOB_URL);
+        mockGetConnection("/mrconf_30784.xml");
+        mockInputJobSteamWithException(JOB_COUNT_URL);
+
+
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app1 = appInfos.get(0);
+        Map<String, JobExecutionAPIEntity> mrJobs = null;
+
+        MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
+        RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
+        MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+
+
+        Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
+        Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
+        List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(entities.isEmpty());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+
+        mrJobParser.run();
+
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) != null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) != null);
+        Assert.assertEquals(DATA_FROM_ZK, new String(curator.getData().forPath(ZK_JOB_PATH), "UTF-8"));
+        Assert.assertTrue(entities.isEmpty());
+        verify(client, times(1)).create(any());
+    }
+
+    @Test
+    public void testMRJobParserFetchJobConfFailButRMalive() throws Exception {
+        setupMock();
+        mockInputJobSteam("/mrjob_30784.json", JOB_URL);
+        mockGetConnectionWithException("/mrconf_30784.xml");
+        mockInputJobSteamWithException(JOB_COUNT_URL);
+
+
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app1 = appInfos.get(0);
+        Map<String, JobExecutionAPIEntity> mrJobs = null;
+
+        MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
+        RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
+        when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
+        MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+
+
+        Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
+        Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
+        List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(entities.isEmpty());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+
+        mrJobParser.run();
+
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
+        JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
+        Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState());
+        Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
+        Assert.assertTrue(entities.isEmpty());
+        verify(client, times(1)).create(any());
+    }
+
+
+    @Test
+    public void testMRJobParserFetchJobCountFailButRMalive() throws Exception {
+        setupMock();
+        mockInputJobSteam("/mrjob_30784.json", JOB_URL);
+        mockInputJobSteamWithException(JOB_COUNT_URL);
+        mockGetConnection("/mrconf_30784.xml");
+
+
+        Assert.assertTrue(curator.checkExists().forPath(ZKROOT) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        List<String> confKeyKeys = makeConfKeyKeys(mrRunningJobConfig);
+
+        InputStream previousmrrunningapp = this.getClass().getResourceAsStream("/previousmrrunningapp.json");
+        AppsWrapper appsWrapper = OBJ_MAPPER.readValue(previousmrrunningapp, AppsWrapper.class);
+        List<AppInfo> appInfos = appsWrapper.getApps().getApp();
+        AppInfo app1 = appInfos.get(0);
+        Map<String, JobExecutionAPIEntity> mrJobs = null;
+
+        MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
+        RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
+        when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
+        MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+
+
+        Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
+        Map<String, JobConfig> jobIdToJobConfig = getMrJobConfigs(mrJobParser);
+        MRJobEntityCreationHandler mrJobEntityCreationHandler = getMrJobEntityCreationHandler(mrJobParser);
+        List<TaggedLogAPIEntity> entities = getMrJobEntityCreationHandlerEntities(mrJobEntityCreationHandler);
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.isEmpty());
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(entities.isEmpty());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.FINISHED);
+
+        mrJobParser.run();
+
+        Assert.assertTrue(jobIdToJobConfig.isEmpty());
+        Assert.assertTrue(jobIdToJobExecutionAPIEntity.size() == 1);
+        JobExecutionAPIEntity jobExecutionAPIEntity = jobIdToJobExecutionAPIEntity.get(JOB_ID);
+        Assert.assertEquals(Constants.AppState.FINISHED.toString(), jobExecutionAPIEntity.getInternalState());
+        Assert.assertEquals(Constants.AppState.RUNNING.toString(), jobExecutionAPIEntity.getCurrentState());
+        Assert.assertTrue(mrJobParser.status() == MRJobParser.ParserStatus.APP_FINISHED);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_JOB_PATH) == null);
+        Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
+        Assert.assertTrue(entities.isEmpty());
+        verify(client, times(1)).create(any());
+
+    }
+
+    private void setupMock() throws Exception {
+        mockStatic(Math.class);
+        when(Math.random()).thenReturn(0.3689680489913364d);
+        mockStatic(InputStreamUtils.class);
+        client = mock(EagleServiceClientImpl.class);
+        MRRunningJobConfig.EagleServiceConfig eagleServiceConfig = mrRunningJobConfig.getEagleServiceConfig();
+        PowerMockito.whenNew(EagleServiceClientImpl.class).withArguments(
+                eagleServiceConfig.eagleServiceHost,
+                eagleServiceConfig.eagleServicePort,
+                eagleServiceConfig.username,
+                eagleServiceConfig.password).thenReturn(client);
+        when(client.create(any())).thenReturn(null);
+        when(client.getJerseyClient()).thenReturn(new Client());
+
+    }
+
+    private Map<String, JobConfig> getMrJobConfigs(MRJobParser mrJobParser) throws NoSuchFieldException, IllegalAccessException {
+        Field mrJobConfigs = MRJobParser.class.getDeclaredField("mrJobConfigs");
+        mrJobConfigs.setAccessible(true);
+        return (Map<String, JobConfig>) mrJobConfigs.get(mrJobParser);
+    }
+
+    private Map<String, JobExecutionAPIEntity> getMrJobs(MRJobParser mrJobParser) throws NoSuchFieldException, IllegalAccessException {
+        Field mrJobEntityMap = MRJobParser.class.getDeclaredField("mrJobEntityMap");
+        mrJobEntityMap.setAccessible(true);
+        return (Map<String, JobExecutionAPIEntity>) mrJobEntityMap.get(mrJobParser);
+    }
+
+    private MRJobEntityCreationHandler getMrJobEntityCreationHandler(MRJobParser mrJobParser) throws NoSuchFieldException, IllegalAccessException {
+        Field mrJobEntityCreationHandler = MRJobParser.class.getDeclaredField("mrJobEntityCreationHandler");
+        mrJobEntityCreationHandler.setAccessible(true);
+        return (MRJobEntityCreationHandler) mrJobEntityCreationHandler.get(mrJobParser);
+    }
+
+    private List<TaggedLogAPIEntity> getMrJobEntityCreationHandlerEntities(MRJobEntityCreationHandler mrJobEntityCreationHandler) throws NoSuchFieldException, IllegalAccessException {
+        Field entities = MRJobEntityCreationHandler.class.getDeclaredField("entities");
+        entities.setAccessible(true);
+        return (ArrayList<TaggedLogAPIEntity>) entities.get(mrJobEntityCreationHandler);
+    }
+
+    private void initMrJobEntityCreationHandlerEntities(MRJobEntityCreationHandler mrJobEntityCreationHandler) throws NoSuchFieldException, IllegalAccessException {
+        Field entities = MRJobEntityCreationHandler.class.getDeclaredField("entities");
+        entities.setAccessible(true);
+        entities.set(mrJobEntityCreationHandler, new ArrayList<>());
+    }
+
+    private void initMetricsCreationListener(MRJobEntityCreationHandler mrJobEntityCreationHandler) throws IllegalAccessException, NoSuchFieldException {
+
+        Field jobMetricsListener = MRJobEntityCreationHandler.class.getDeclaredField("jobMetricsListener");
+        jobMetricsListener.setAccessible(true);
+        jobMetricsListener.set(mrJobEntityCreationHandler, new JobExecutionMetricsCreationListener());
+    }
+
+
+    private List<String> makeConfKeyKeys(MRRunningJobConfig mrRunningJobConfig) {
+        String[] confKeyPatternsSplit = mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobConfigKey").split(",");
+        List<String> confKeyKeys = new ArrayList<>(confKeyPatternsSplit.length);
+        for (String confKeyPattern : confKeyPatternsSplit) {
+            confKeyKeys.add(confKeyPattern.trim());
+        }
+        confKeyKeys.add(Constants.JobConfiguration.CASCADING_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.HIVE_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.PIG_JOB);
+        confKeyKeys.add(Constants.JobConfiguration.SCOOBI_JOB);
+        confKeyKeys.add("hive.optimize.skewjoin.compiletime");
+        confKeyKeys.add(0, mrRunningJobConfig.getConfig().getString("MRConfigureKeys.jobNameKey"));
+        return confKeyKeys;
+    }
+
+    private void mockInputJobSteam(String mockDataFilePath, String url) throws Exception {
+        InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
+        when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenReturn(jsonstream);
+    }
+
+    private void mockInputJobSteamWithException(String url) throws Exception {
+        when(InputStreamUtils.getInputStream(eq(url), anyObject(), anyObject())).thenThrow(new Exception());
+    }
+
+    private void mockGetConnectionWithException(String mockDataFilePath) throws Exception {
+        InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
+        mockStatic(URLConnectionUtils.class);
+        URLConnection connection = mock(URLConnection.class);
+        when(connection.getInputStream()).thenReturn(jsonstream);
+        when(URLConnectionUtils.getConnection(JOB_CONF_URL)).thenThrow(new Exception());
+    }
+
+    private void mockGetConnection(String mockDataFilePath) throws Exception {
+        InputStream jsonstream = this.getClass().getResourceAsStream(mockDataFilePath);
+        mockStatic(URLConnectionUtils.class);
+        URLConnection connection = mock(URLConnection.class);
+        when(connection.getInputStream()).thenReturn(jsonstream);
+        when(URLConnectionUtils.getConnection(JOB_CONF_URL)).thenReturn(connection);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/65de7b0a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobcounts_30784.json
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobcounts_30784.json b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobcounts_30784.json
new file mode 100644
index 0000000..7165a9c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/jobcounts_30784.json
@@ -0,0 +1,390 @@
+{
+  "jobCounters": {
+    "id": "job_1479206441898_30784",
+    "counterGroup": [
+      {
+        "counterGroupName": "org.apache.hadoop.mapreduce.FileSystemCounter",
+        "counter": [
+          {
+            "name": "FILE_BYTES_READ",
+            "totalCounterValue": 207411516,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 207411516
+          },
+          {
+            "name": "FILE_BYTES_WRITTEN",
+            "totalCounterValue": 478282694,
+            "mapCounterValue": 57031387,
+            "reduceCounterValue": 421251307
+          },
+          {
+            "name": "FILE_READ_OPS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "FILE_LARGE_READ_OPS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "FILE_WRITE_OPS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "HDFS_BYTES_READ",
+            "totalCounterValue": 234808990,
+            "mapCounterValue": 234784786,
+            "reduceCounterValue": 24204
+          },
+          {
+            "name": "HDFS_BYTES_WRITTEN",
+            "totalCounterValue": 324,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 324
+          },
+          {
+            "name": "HDFS_READ_OPS",
+            "totalCounterValue": 22,
+            "mapCounterValue": 9,
+            "reduceCounterValue": 13
+          },
+          {
+            "name": "HDFS_LARGE_READ_OPS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "HDFS_WRITE_OPS",
+            "totalCounterValue": 9,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 9
+          },
+          {
+            "name": "VIEWFS_BYTES_READ",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "VIEWFS_BYTES_WRITTEN",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "VIEWFS_READ_OPS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "VIEWFS_LARGE_READ_OPS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "VIEWFS_WRITE_OPS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          }
+        ]
+      },
+      {
+        "counterGroupName": "org.apache.hadoop.mapreduce.JobCounter",
+        "counter": [
+          {
+            "name": "TOTAL_LAUNCHED_MAPS",
+            "totalCounterValue": 3,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "TOTAL_LAUNCHED_REDUCES",
+            "totalCounterValue": 4,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "OTHER_LOCAL_MAPS",
+            "totalCounterValue": 2,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "RACK_LOCAL_MAPS",
+            "totalCounterValue": 1,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "SLOTS_MILLIS_MAPS",
+            "totalCounterValue": 85670,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "SLOTS_MILLIS_REDUCES",
+            "totalCounterValue": 60964,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MILLIS_MAPS",
+            "totalCounterValue": 42835,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MILLIS_REDUCES",
+            "totalCounterValue": 15241,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "VCORES_MILLIS_MAPS",
+            "totalCounterValue": 42835,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "VCORES_MILLIS_REDUCES",
+            "totalCounterValue": 15241,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MB_MILLIS_MAPS",
+            "totalCounterValue": 87726080,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MB_MILLIS_REDUCES",
+            "totalCounterValue": 62427136,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          }
+        ]
+      },
+      {
+        "counterGroupName": "org.apache.hadoop.mapreduce.TaskCounter",
+        "counter": [
+          {
+            "name": "MAP_INPUT_RECORDS",
+            "totalCounterValue": 6371118,
+            "mapCounterValue": 6371118,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MAP_OUTPUT_RECORDS",
+            "totalCounterValue": 6371118,
+            "mapCounterValue": 6371118,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MAP_OUTPUT_BYTES",
+            "totalCounterValue": 335501834,
+            "mapCounterValue": 335501834,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MAP_OUTPUT_MATERIALIZED_BYTES",
+            "totalCounterValue": 56297000,
+            "mapCounterValue": 56297000,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "SPLIT_RAW_BYTES",
+            "totalCounterValue": 942,
+            "mapCounterValue": 942,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "COMBINE_INPUT_RECORDS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "COMBINE_OUTPUT_RECORDS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "REDUCE_INPUT_GROUPS",
+            "totalCounterValue": 102587,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 102587
+          },
+          {
+            "name": "REDUCE_SHUFFLE_BYTES",
+            "totalCounterValue": 56297000,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 56297000
+          },
+          {
+            "name": "REDUCE_INPUT_RECORDS",
+            "totalCounterValue": 6371118,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 6371118
+          },
+          {
+            "name": "REDUCE_OUTPUT_RECORDS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "SPILLED_RECORDS",
+            "totalCounterValue": 12742236,
+            "mapCounterValue": 6371118,
+            "reduceCounterValue": 6371118
+          },
+          {
+            "name": "SHUFFLED_MAPS",
+            "totalCounterValue": 12,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 12
+          },
+          {
+            "name": "FAILED_SHUFFLE",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "MERGED_MAP_OUTPUTS",
+            "totalCounterValue": 12,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 12
+          },
+          {
+            "name": "GC_TIME_MILLIS",
+            "totalCounterValue": 490,
+            "mapCounterValue": 253,
+            "reduceCounterValue": 237
+          },
+          {
+            "name": "CPU_MILLISECONDS",
+            "totalCounterValue": 75570,
+            "mapCounterValue": 38110,
+            "reduceCounterValue": 37460
+          },
+          {
+            "name": "PHYSICAL_MEMORY_BYTES",
+            "totalCounterValue": 4851335168,
+            "mapCounterValue": 2238287872,
+            "reduceCounterValue": 2613047296
+          },
+          {
+            "name": "VIRTUAL_MEMORY_BYTES",
+            "totalCounterValue": 23634513920,
+            "mapCounterValue": 7247814656,
+            "reduceCounterValue": 16386699264
+          },
+          {
+            "name": "COMMITTED_HEAP_BYTES",
+            "totalCounterValue": 9992404992,
+            "mapCounterValue": 3588227072,
+            "reduceCounterValue": 6404177920
+          }
+        ]
+      },
+      {
+        "counterGroupName": "HIVE",
+        "counter": [
+          {
+            "name": "DESERIALIZE_ERRORS",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "RECORDS_IN",
+            "totalCounterValue": 6371118,
+            "mapCounterValue": 6371118,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "RECORDS_OUT_INTERMEDIATE",
+            "totalCounterValue": 6371118,
+            "mapCounterValue": 6371118,
+            "reduceCounterValue": 0
+          }
+        ]
+      },
+      {
+        "counterGroupName": "Shuffle Errors",
+        "counter": [
+          {
+            "name": "BAD_ID",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "CONNECTION",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "IO_ERROR",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "WRONG_LENGTH",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "WRONG_MAP",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          },
+          {
+            "name": "WRONG_REDUCE",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          }
+        ]
+      },
+      {
+        "counterGroupName": "org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter",
+        "counter": [
+          {
+            "name": "BYTES_READ",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          }
+        ]
+      },
+      {
+        "counterGroupName": "org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter",
+        "counter": [
+          {
+            "name": "BYTES_WRITTEN",
+            "totalCounterValue": 0,
+            "mapCounterValue": 0,
+            "reduceCounterValue": 0
+          }
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/65de7b0a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
new file mode 100644
index 0000000..78d61b5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrconf_30784.xml
@@ -0,0 +1 @@
+<conf><path>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</path><property><name>eagle.job.name</name><value>eagletest</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><property><name>hive.query.string</name><value>insert overwrite table xxxx</value><source>programatically</source><source>viewfs://xxx/user/xxx/.staging/job_1479206441898_124837/job.xml</source></property><property><name>hive.optimize.skewjoin.compiletime</name><value>false</value><source>programatically</source><source>org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@70a6620d</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property><!--<property><name>hadoop.security.group.mapping.ldap.search.filter.user</name><value>(&(objectClass=user)(sAMAccountName={0}))</value><source>core-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.
 xml</source></property>--><property><name>dfs.datanode.data.dir</name><value>file://${hadoop.tmp.dir}/dfs/data</value><source>hdfs-default.xml</source><source>viewfs://xxxxx/user/xxx/.staging/job_1479206441898_30784/job.xml</source></property></conf>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/65de7b0a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrjob_30784.json
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrjob_30784.json b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrjob_30784.json
new file mode 100644
index 0000000..3d5e182
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/resources/mrjob_30784.json
@@ -0,0 +1,37 @@
+{
+  "jobs": {
+    "job": [
+      {
+        "startTime": 1479716129967,
+        "finishTime": 0,
+        "elapsedTime": 27750,
+        "id": "job_1479206441898_30784",
+        "name": "oozie:launcher:T=shell:W=wf_co_xxx_xxx_v3:A=extract_org_data:ID=0002383-161115184801730-oozie-oozi-W",
+        "user": "xxx",
+        "state": "RUNNING",
+        "mapsTotal": 161,
+        "mapsCompleted": 57,
+        "reducesTotal": 72,
+        "reducesCompleted": 0,
+        "mapProgress": 38.38946,
+        "reduceProgress": 0.0,
+        "mapsPending": 0,
+        "mapsRunning": 104,
+        "reducesPending": 72,
+        "reducesRunning": 0,
+        "uberized": false,
+        "diagnostics": "",
+        "newReduceAttempts": 72,
+        "runningReduceAttempts": 0,
+        "failedReduceAttempts": 0,
+        "killedReduceAttempts": 0,
+        "successfulReduceAttempts": 0,
+        "newMapAttempts": 0,
+        "runningMapAttempts": 104,
+        "failedMapAttempts": 0,
+        "killedMapAttempts": 0,
+        "successfulMapAttempts": 57
+      }
+    ]
+  }
+}
\ No newline at end of file



Mime
View raw message