eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [1/2] eagle git commit: [EAGLE-859] MapReduce job performance suggestion
Date Fri, 20 Jan 2017 03:16:57 GMT
Repository: eagle
Updated Branches:
  refs/heads/master 015d57788 -> eae6e8f11


http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
index a12f589..a9f5132 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher;
 
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
 import java.util.ArrayList;
@@ -27,14 +27,17 @@ import java.util.Map;
 
 public class Result {
     //for EagleStorePublisher
-    private TaggedLogAPIEntity alertEntity = null;//TODO
+    private Map<String, List<TaggedLogAPIEntity>> alertEntities = new HashMap<>();
     //for EmailPublisher
-    private Map<String, List<Pair<ResultLevel, String>>> alertMessages
= new HashMap<>();
+    private Map<String, List<ProcessorResult>> alertMessages = new HashMap<>();
 
     public void addEvaluatorResult(Class<?> type, EvaluatorResult result) {
         Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults();
+        Map<Class<?>, TaggedLogAPIEntity> processorEntities = result.getProcessorEntities();
+
         for (Class<?> processorType : processorResults.keySet()) {
             ProcessorResult processorResult = processorResults.get(processorType);
+
             if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
                 continue;
             }
@@ -42,17 +45,27 @@ public class Result {
             String typeName = type.getName();
             if (!alertMessages.containsKey(typeName)) {
                 alertMessages.put(typeName, new ArrayList<>());
+                alertEntities.put(typeName, new ArrayList<>());
             }
-            alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage()));
+            normalizeResult(processorResult);
+            alertMessages.get(typeName).add(processorResult);
+            alertEntities.get(typeName).add(processorEntities.get(processorType));
+
         }
     }
 
-    public TaggedLogAPIEntity getAlertEntity() {
-        return alertEntity;
+    public Map<String, List<ProcessorResult>> getAlertMessages() {
+        return alertMessages;
+    }
+
+    public Map<String, List<TaggedLogAPIEntity>> getAlertEntities() {
+        return alertEntities;
     }
 
-    public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages()
{
-        return alertMessages;
+    private void normalizeResult(ProcessorResult processorResult) {
+        if (processorResult.getSettings() != null && !processorResult.getSettings().isEmpty())
{
+            processorResult.setSettingList(StringUtils.join(processorResult.getSettings(),
"\n"));
+        }
     }
 
     /**
@@ -61,18 +74,52 @@ public class Result {
 
     public enum ResultLevel {
         NONE,
+        INFO,
         NOTICE,
         WARNING,
         CRITICAL
     }
 
+    public enum RuleType {
+        COMPRESS,
+        SPLIT,
+        SPILL,
+        TASK_NUMBER,
+        GC_TIME,
+        RESOURCE_CONTENTION,
+        DATA_SKEW,
+
+        LONG_STUCK_JOB,
+        LONG_DURATION_JOB
+    }
+
     public static class ProcessorResult {
+        private RuleType ruleType;
         private ResultLevel resultLevel;
         private String message;
+        private List<String> settings;
+        private String settingList;
 
-        public ProcessorResult(ResultLevel resultLevel, String message) {
+        public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message,
List<String> settings) {
+            this.ruleType = ruleType;
             this.resultLevel = resultLevel;
             this.message = message;
+            this.settings = settings;
+        }
+
+        public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message)
{
+            this.ruleType = ruleType;
+            this.resultLevel = resultLevel;
+            this.message = message;
+            this.settings = new ArrayList<>();
+        }
+
+        public RuleType getRuleType() {
+            return ruleType;
+        }
+
+        public void setRuleType(RuleType ruleType) {
+            this.ruleType = ruleType;
         }
 
         public ResultLevel getResultLevel() {
@@ -90,6 +137,22 @@ public class Result {
         public void setMessage(String message) {
             this.message = message;
         }
+
+        public List<String> getSettings() {
+            return settings;
+        }
+
+        public void setSettings(List<String> settings) {
+            this.settings = settings;
+        }
+
+        public String getSettingList() {
+            return settingList;
+        }
+
+        public void setSettingList(String settingList) {
+            this.settingList = settingList;
+        }
     }
 
     /**
@@ -97,13 +160,22 @@ public class Result {
      */
     public static class EvaluatorResult {
         private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>();
+        private Map<Class<?>, TaggedLogAPIEntity> processorEntities = new HashMap<>();
 
         public void addProcessorResult(Class<?> type, ProcessorResult result) {
             this.processorResults.put(type, result);
         }
 
+        public void addProcessorEntity(Class<?> type, TaggedLogAPIEntity entity) {
+            this.processorEntities.put(type, entity);
+        }
+
         public Map<Class<?>, ProcessorResult> getProcessorResults() {
             return this.processorResults;
         }
+
+        public Map<Class<?>, TaggedLogAPIEntity> getProcessorEntities() {
+            return processorEntities;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
index 4b18f7c..6a51a76 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher.dedup;
 
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 
 public interface AlertDeduplicator {

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
index 09f1af6..b139b3c 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher.dedup.impl;
 
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
 import org.apache.eagle.jpm.analyzer.util.Constants;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
index 774e6d2..4c6661a 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -62,4 +62,5 @@ public class Constants {
 
     public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
     public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
index 39cec68..996adba 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
@@ -115,13 +115,17 @@
 <table class="body-wrap" style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif;
box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;"
bgcolor="#f6f6f6" border="1">
     <caption><b>Analysis By $evaluator</b></caption>
     <tr>
-        <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing:
border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></td>
+        <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing:
border-box; font-size: 14px; margin: 0;" width="100"><b>type</b></th>
         <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing:
border-box; font-size: 14px; margin: 0;" width="250"><b>message</b></th>
+		<th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box;
font-size: 14px; margin: 0;" width="250"><b>optimizer setting</b></th>
+		<th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box;
font-size: 14px; margin: 0;" width="100"><b>level</b></th>
     </tr>
-    #foreach($message in ${elem["extend"].get($evaluator).keySet()})
+    #foreach($result in ${elem["extend"].get($evaluator)})
         <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing:
border-box; font-size: 14px; margin: 0;">
-            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing:
border-box; font-size: 14px; margin: 0;"><b>${elem["extend"].get($evaluator).get($message)}</b></td>
-            <th style="...">$message</th>
+			<td style="...">${result.ruleType}</td>
+            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing:
border-box; font-size: 14px; margin: 0;">${result.message}</td>
+			<td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box;
font-size: 14px; margin: 0;">${result.settingList}</td>
+			<td style="...">${result.resultLevel}</td>
         </tr>
     #end
 </table>

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
index cbbdad3..8c65adf 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
@@ -38,5 +38,6 @@ public class JPAEntityRepository extends EntityRepository {
         entitySet.add(JobProcessTimeStampEntity.class);
         entitySet.add(JobCountEntity.class);
         entitySet.add(TaskAttemptErrorCategoryEntity.class);
+        entitySet.add(JobSuggestionAPIEntity.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
new file mode 100644
index 0000000..3863a5d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.historyentity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import java.util.List;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import static org.apache.eagle.jpm.util.Constants.JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa")
+@ColumnFamily("f")
+@Prefix("jsuggestion")
+@Service(JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true),
+        @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+        })
+public class JobSuggestionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String optimizerSuggestion;
+    @Column("b")
+    private List<String> optimizerSettings;
+
+    public String getOptimizerSuggestion() {
+        return optimizerSuggestion;
+    }
+
+    public void setOptimizerSuggestion(String optimizerSuggestion) {
+        this.optimizerSuggestion = optimizerSuggestion;
+        valueChanged("optimizerSuggestion");
+    }
+
+    public List<String> getOptimizerSettings() {
+        return optimizerSettings;
+    }
+
+    public void setOptimizerSettings(List<String> optimizerSettings) {
+        this.optimizerSettings = optimizerSettings;
+        valueChanged("optimizerSettings");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
index 8db7f5c..46fcf5e 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
@@ -46,6 +46,40 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
     private String error;
     @Column("f")
     private JobCounters jobCounters;
+    // new added
+    @Column("g")
+    private long shuffleFinishTime;
+    @Column("h")
+    private long sortFinishTime;
+    @Column("i")
+    private long mapFinishTime;
+
+    public long getShuffleFinishTime() {
+        return shuffleFinishTime;
+    }
+
+    public void setShuffleFinishTime(long shuffleFinishTime) {
+        this.shuffleFinishTime = shuffleFinishTime;
+        valueChanged("shuffleFinishTime");
+    }
+
+    public long getSortFinishTime() {
+        return sortFinishTime;
+    }
+
+    public void setSortFinishTime(long sortFinishTime) {
+        this.sortFinishTime = sortFinishTime;
+        valueChanged("sortFinishTime");
+    }
+
+    public long getMapFinishTime() {
+        return mapFinishTime;
+    }
+
+    public void setMapFinishTime(long mapFinishTime) {
+        this.mapFinishTime = mapFinishTime;
+        valueChanged("mapFinishTime");
+    }
 
     public String getTaskStatus() {
         return taskStatus;
@@ -53,7 +87,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setTaskStatus(String taskStatus) {
         this.taskStatus = taskStatus;
-        pcs.firePropertyChange("taskStatus", null, null);
+        valueChanged("taskStatus");
     }
 
     public long getStartTime() {
@@ -62,7 +96,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setStartTime(long startTime) {
         this.startTime = startTime;
-        pcs.firePropertyChange("startTime", null, null);
+        valueChanged("startTime");
     }
 
     public long getEndTime() {
@@ -71,7 +105,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setEndTime(long endTime) {
         this.endTime = endTime;
-        pcs.firePropertyChange("endTime", null, null);
+        valueChanged("endTime");
     }
 
     public long getDuration() {
@@ -80,7 +114,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setDuration(long duration) {
         this.duration = duration;
-        pcs.firePropertyChange("duration", null, null);
+        valueChanged("duration");
     }
 
     public String getError() {
@@ -89,7 +123,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setError(String error) {
         this.error = error;
-        pcs.firePropertyChange("error", null, null);
+        valueChanged("error");
     }
 
     public JobCounters getJobCounters() {
@@ -98,6 +132,6 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setJobCounters(JobCounters jobCounters) {
         this.jobCounters = jobCounters;
-        pcs.firePropertyChange("jobCounters", null, null);
+        valueChanged("jobCounters");
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 349c489..2e3a1a8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -89,6 +89,11 @@
             <version>1.6</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-analyzer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 262054e..28ebf4e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.history.crawler;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserBase;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback
{
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", appConfig.getJobHistoryEndpointConfig().site);
+                put(MRJobTagName.SITE.toString(), appConfig.getJobHistoryEndpointConfig().site);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index d89937e..d58eadc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -81,6 +81,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
     private long sumReduceTaskDuration;
 
     private JobCounterMetricsGenerator jobCounterMetricsGenerator;
+    private JobSuggestionListener jobSuggestionListener;
 
     private MRHistoryJobConfig appConfig;
 
@@ -127,6 +128,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
 
         this.appConfig = appConfig;
         this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator(appConfig.getEagleServiceConfig());
+        this.jobSuggestionListener = new JobSuggestionListener(appConfig.getConfig());
+        this.addListener(jobSuggestionListener);
     }
 
     public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
@@ -179,7 +182,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
         this.jobType = jobType;
     }
 
-    protected void handleJob(EventType eventType, Map<Keys, String> values, Object
totalCounters) throws Exception {
+    protected void handleJob(EventType eventType, Map<Keys, String> values, Object
totalCounters, Object mapCounters, Object reduceCounters) throws Exception {
         String id = values.get(Keys.JOBID);
 
         if (jobId == null) {
@@ -300,8 +303,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
             this.jobCounterMetricsGenerator.setBaseTags(jobExecutionEntity.getTags());
 
             formatDiagnostics(values.get(Keys.DIAGNOSTICS));
-
             entityCreated(jobExecutionEntity);
+
+            if (configuration != null && totalCounters != null) {
+                JobCounters parsedTotalCounters = parseCounters(totalCounters);
+                JobCounters parsedMapCounters = parseCounters(mapCounters);
+                JobCounters parsedReduceCounters = parseCounters(reduceCounters);
+                jobSuggestionListener.jobCountersCreated(parsedTotalCounters, parsedMapCounters,
parsedReduceCounters);
+                jobSuggestionListener.jobConfigCreated(configuration);
+            }
         }
     }
 
@@ -332,7 +342,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
             }
         }
 
-        super.notifiyListeners(entity);
+        super.notifyListeners(entity);
     }
 
     protected abstract JobCounters parseCounters(Object value) throws IOException;
@@ -432,7 +442,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
             // it is very likely that an attempt ID could be both succeeded and failed due
to M/R system
             // in this case, we should ignore this attempt?
             if (taskAttemptStartTime.get(taskAttemptID) == null) {
-                LOG.warn("task attemp has consistency issue " + taskAttemptID);
+                LOG.warn("task attempt has consistency issue " + taskAttemptID);
                 return;
             }
             entity.setStartTime(taskAttemptStartTime.get(taskAttemptID));
@@ -441,6 +451,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
             entity.setDuration(entity.getEndTime() - entity.getStartTime());
             entity.setTaskStatus(values.get(Keys.TASK_STATUS));
             entity.setError(values.get(Keys.ERROR));
+            if (values.containsKey(Keys.SHUFFLE_FINISHED)) {
+                entity.setShuffleFinishTime(Long.valueOf(values.get(Keys.SHUFFLE_FINISHED)));
+            }
+            if (values.containsKey(Keys.SORT_FINISHED)) {
+                entity.setSortFinishTime(Long.valueOf(values.get(Keys.SORT_FINISHED)));
+            }
+            if (values.containsKey(Keys.MAP_FINISH_TIME)) {
+                entity.setMapFinishTime(Long.valueOf(values.get(Keys.MAP_FINISH_TIME)));
+            }
             if (values.get(Keys.COUNTERS) != null || counters != null) {  // when task is
killed, COUNTERS does not exist
                 //entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS)));
                 entity.setJobCounters(parseCounters(counters));
@@ -473,7 +492,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher
impl
                 taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp());
                 entityCreated(taskAttemptErrorCategoryEntity);
             }
-
             taskAttemptStartTime.remove(taskAttemptID);
         } else {
             // silently ignore

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
index adeb41e..615e9ad 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+@Deprecated
 public enum JHFFormat {
     MRVer1,
     MRVer2

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 8184f90..1903fc9 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -185,7 +185,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
         if (js.getJobQueueName() != null) {
             values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
         }
-        handleJob(wrapper.getType(), values, null);
+        handleJob(wrapper.getType(), values, null, null, null);
     }
 
     private void handleJobInited(Event wrapper) throws Exception {
@@ -209,7 +209,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
         if (js.getUberized() != null) {
             values.put(Keys.UBERISED, js.getUberized().toString());
         }
-        handleJob(wrapper.getType(), values, null);
+        handleJob(wrapper.getType(), values, null, null, null);
     }
 
     private void handleJobFinished(Event wrapper) throws Exception {
@@ -234,7 +234,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
             values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
         }
         values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCEEDED.name());
-        handleJob(wrapper.getType(), values, js.getTotalCounters());
+        handleJob(wrapper.getType(), values, js.getTotalCounters(), js.getMapCounters(),
js.getReduceCounters());
     }
 
     private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception {
@@ -258,7 +258,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
         if (js.getDiagnostics() != null) {
             values.put(Keys.DIAGNOSTICS, js.getDiagnostics().toString());
         }
-        handleJob(wrapper.getType(), values, null);
+        handleJob(wrapper.getType(), values, null, null, null);
     }
 
     private void handleTaskStarted(Event wrapper) throws Exception {
@@ -539,6 +539,9 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     }
 
     protected JobCounters parseCounters(Object value) throws IOException {
+        if (value == null) {
+            return null;
+        }
         JobCounters jc = new JobCounters();
         Map<String, Map<String, Long>> groups = new HashMap<>();
         JhCounters counters = (JhCounters) value;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index ca49d9c..1d17640 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -38,11 +38,13 @@ public class JHFParserFactory {
         MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig();
 
         JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration,
filter, appConfig);
+        // add HistoryJobEntityCreationListener
         reader2.addListener(new JobEntityCreationEagleServiceListener(appConfig));
         reader2.addListener(new TaskFailureListener(eagleServiceConfig));
         reader2.addListener(new TaskAttemptCounterListener(eagleServiceConfig));
         reader2.addListener(new JobConfigurationCreationServiceListener(eagleServiceConfig));
 
+        // add HistoryJobEntityLifecycleListener
         reader2.register(new JobEntityLifecycleAggregator());
         JHFParserBase parser = new JHFMRVer2Parser(reader2);
         return parser;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index a80462d..f2730fd 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -29,7 +29,7 @@ public class JobEntityCreationPublisher {
         listeners.add(l);
     }
 
-    public void notifiyListeners(JobBaseAPIEntity entity) throws Exception {
+    public void notifyListeners(JobBaseAPIEntity entity) throws Exception {
         for (HistoryJobEntityCreationListener l : listeners) {
             l.jobEntityCreated(entity);
         }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
new file mode 100644
index 0000000..e5b0d2e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
+import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.mr.history.parser.JHFEventReaderBase.Keys;
+import org.apache.hadoop.conf.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ATTEMPT_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ID;
+
+/*
+ * JobEventCounterListener provides an interface to add job/task counter analyzers
+ */
+public class JobSuggestionListener implements HistoryJobEntityCreationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionListener.class);
+
+    private MapReduceAnalyzerEntity info;
+    private MRJobPerformanceAnalyzer<MapReduceAnalyzerEntity> analyzer;
+
+    public JobSuggestionListener(Config config) {
+        this.info = new MapReduceAnalyzerEntity();
+        this.analyzer = new MRJobPerformanceAnalyzer<>(config);
+    }
+
+    @Override
+    public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+        if (entity instanceof TaskExecutionAPIEntity) {
+            info.getTasksMap().put(entity.getTags().get(TASK_ID.toString()), (TaskExecutionAPIEntity)
entity);
+        } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
+            info.getCompletedTaskAttemptsMap().put(entity.getTags().get(TASK_ATTEMPT_ID.toString()),
(TaskAttemptExecutionAPIEntity) entity);
+        } else if (entity instanceof JobExecutionAPIEntity) {
+            JobExecutionAPIEntity jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
+            info.setCurrentState(jobExecutionAPIEntity.getCurrentState());
+            info.setStartTime(jobExecutionAPIEntity.getStartTime());
+            info.setEndTime(jobExecutionAPIEntity.getEndTime());
+            info.setDurationTime(jobExecutionAPIEntity.getDurationTime());
+            info.setUserId(jobExecutionAPIEntity.getTags().get(MRJobTagName.USER.toString()));
+            info.setJobId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_ID.toString()));
+            info.setJobDefId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+            info.setSiteId(jobExecutionAPIEntity.getTags().get(MRJobTagName.SITE.toString()));
+            info.setJobName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_NAME.toString()))
;
+            info.setJobQueueName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_QUEUE.toString()));
+            info.setJobType(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+            info.setFinishedMaps(jobExecutionAPIEntity.getNumFinishedMaps());
+            info.setFinishedReduces(jobExecutionAPIEntity.getNumFinishedReduces());
+            info.setFailedReduces(jobExecutionAPIEntity.getNumFailedReduces());
+            info.setFailedMaps(jobExecutionAPIEntity.getNumFailedMaps());
+            info.setTotalMaps(jobExecutionAPIEntity.getNumTotalMaps());
+            info.setTotalReduces(jobExecutionAPIEntity.getNumTotalReduces());
+        }
+    }
+
+    public void jobConfigCreated(Configuration configuration) {
+        info.setJobConf(configuration);
+    }
+
+    public void jobCountersCreated(JobCounters totalCounters, JobCounters mapCounters, JobCounters
reduceCounters) {
+        info.setTotalCounters(totalCounters);
+        info.setReduceCounters(reduceCounters);
+        info.setMapCounters(mapCounters);
+    }
+
+    @Override
+    public void flush() throws Exception {
+        analyzer.analyze(info);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index 856f051..24c734d 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptCounterAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index e8d5311..3836e3a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -49,7 +49,7 @@
 
   "service": {
     "host": "sandbox.hortonworks.com",
-    "port": 9099,
+    "port": 9090,
     "username": "admin",
     "password": "secret",
     "readTimeOutSeconds" : 10,

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 120303e..6b33d31 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -19,7 +19,7 @@
 package org.apache.eagle.jpm.mr.running.parser;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
 import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
@@ -173,7 +173,7 @@ public class MRJobParser implements Runnable {
                     break;
                 }
             }
-            mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
+            mrJobPerformanceAnalyzer.analyze(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 0ba6521..4ee58a1 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -117,6 +117,7 @@ public class Constants {
     public static final String JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME = "RunningTaskExecutionService";
     public static final String JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService";
     public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService";
+    public static final String JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = "JobOptimizerSuggestionService";
 
     public static final String JOB_TASK_TYPE_TAG = "taskType";
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
index 8def44f..bbb80cd 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
@@ -48,7 +48,11 @@ public final class JobCounters implements Serializable {
     }
 
     public Long getCounterValue(CounterName counterName) {
-        return counters.get(counterName.group.name).get(counterName.name);
+        if (counters.get(counterName.group.name).containsKey(counterName.name)) {
+            return counters.get(counterName.group.name).get(counterName.name);
+        } else {
+            return 0L;
+        }
     }
 
     public static enum GroupName {


Mime
View raw message