eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [2/2] eagle git commit: [EAGLE-859] MapReduce job performance suggestion
Date Fri, 20 Jan 2017 03:16:58 GMT
[EAGLE-859] MapReduce job performance suggestion

https://issues.apache.org/jira/browse/EAGLE-859

Author: Zhao, Qingwen <qingwzhao@apache.org>
Author: Qingwen Zhao <qingwen220@gmail.com>

Closes #784 from qingwen220/EAGLE-859.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/eae6e8f1
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/eae6e8f1
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/eae6e8f1

Branch: refs/heads/master
Commit: eae6e8f11367c397c3a9ce39f2c59cf341f33847
Parents: 015d577
Author: Zhao, Qingwen <qingwzhao@apache.org>
Authored: Fri Jan 20 11:14:56 2017 +0800
Committer: Zhao, Qingwen <qingwzhao@apache.org>
Committed: Fri Jan 20 11:14:56 2017 +0800

----------------------------------------------------------------------
 eagle-jpm/eagle-jpm-analyzer/pom.xml            |   5 +
 .../eagle/jpm/analyzer/AnalyzerEntity.java      | 130 ------------
 .../apache/eagle/jpm/analyzer/Evaluator.java    |   5 +-
 .../apache/eagle/jpm/analyzer/JobAnalyzer.java  |  10 +-
 .../apache/eagle/jpm/analyzer/Processor.java    |   5 +-
 .../jpm/analyzer/meta/model/AnalyzerEntity.java | 130 ++++++++++++
 .../meta/model/MapReduceAnalyzerEntity.java     | 174 +++++++++++++++
 .../analyzer/mr/MRJobPerformanceAnalyzer.java   |  10 +-
 .../jpm/analyzer/mr/sla/SLAJobEvaluator.java    |   7 +-
 .../sla/processors/LongStuckJobProcessor.java   |   4 +-
 .../UnExpectedLongDurationJobProcessor.java     |   8 +-
 .../mr/suggestion/JobSuggestionEvaluator.java   |  74 ++++++-
 .../MapReduceCompressionSettingProcessor.java   |  82 ++++++++
 .../suggestion/MapReduceDataSkewProcessor.java  |  63 ++++++
 .../mr/suggestion/MapReduceGCTimeProcessor.java |  74 +++++++
 .../MapReduceJobSuggestionContext.java          | 209 +++++++++++++++++++
 .../MapReduceQueueResourceProcessor.java        |  85 ++++++++
 .../mr/suggestion/MapReduceSpillProcessor.java  | 125 +++++++++++
 .../MapReduceSplitSettingProcessor.java         |  47 +++++
 .../suggestion/MapReduceTaskNumProcessor.java   | 194 +++++++++++++++++
 .../analyzer/publisher/EagleStorePublisher.java |  40 +++-
 .../jpm/analyzer/publisher/EmailPublisher.java  |  21 +-
 .../eagle/jpm/analyzer/publisher/Publisher.java |   2 +-
 .../eagle/jpm/analyzer/publisher/Result.java    |  90 +++++++-
 .../publisher/dedup/AlertDeduplicator.java      |   2 +-
 .../dedup/impl/SimpleDeduplicator.java          |   2 +-
 .../eagle/jpm/analyzer/util/Constants.java      |   1 +
 .../main/resources/AnalyzerReportTemplate.vm    |  12 +-
 .../mr/historyentity/JPAEntityRepository.java   |   1 +
 .../historyentity/JobSuggestionAPIEntity.java   |  63 ++++++
 .../TaskAttemptExecutionAPIEntity.java          |  46 +++-
 eagle-jpm/eagle-jpm-mr-history/pom.xml          |   5 +
 .../crawler/DefaultJHFInputStreamCallback.java  |   3 +-
 .../mr/history/parser/JHFEventReaderBase.java   |  28 ++-
 .../eagle/jpm/mr/history/parser/JHFFormat.java  |   1 +
 .../mr/history/parser/JHFMRVer2EventReader.java |  11 +-
 .../jpm/mr/history/parser/JHFParserFactory.java |   2 +
 .../parser/JobEntityCreationPublisher.java      |   2 +-
 .../history/parser/JobSuggestionListener.java   |  94 +++++++++
 .../parser/TaskAttemptCounterListener.java      |   1 -
 .../src/main/resources/application.conf         |   2 +-
 .../jpm/mr/running/parser/MRJobParser.java      |   4 +-
 .../org/apache/eagle/jpm/util/Constants.java    |   1 +
 .../eagle/jpm/util/jobcounter/JobCounters.java  |   6 +-
 44 files changed, 1674 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
index 50fb803..07f5766 100644
--- a/eagle-jpm/eagle-jpm-analyzer/pom.xml
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -47,6 +47,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-entity</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
deleted file mode 100644
index f9b7af0..0000000
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.analyzer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * will refactor later if other types of job needs this.
- * AnalyzerEntity for each job needed to be analysised
- */
-public class AnalyzerEntity {
-    private String jobDefId;
-    private String jobId;
-    private String siteId;
-    private String userId;
-
-    private long startTime;
-    private long endTime;
-    private long durationTime;
-    private String currentState;
-    private double progress;
-
-    private Map<String, Object> jobConfig = new HashMap<>();
-
-    private Map<String, Object> jobMeta = new HashMap<>();
-
-    public String getJobDefId() {
-        return jobDefId;
-    }
-
-    public void setJobDefId(String jobDefId) {
-        this.jobDefId = jobDefId;
-    }
-
-    public String getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(String jobId) {
-        this.jobId = jobId;
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-
-    public String getUserId() {
-        return userId;
-    }
-
-    public void setUserId(String userId) {
-        this.userId = userId;
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-    }
-
-    public long getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(long endTime) {
-        this.endTime = endTime;
-    }
-
-    public long getDurationTime() {
-        return durationTime;
-    }
-
-    public void setDurationTime(long durationTime) {
-        this.durationTime = durationTime;
-    }
-
-    public String getCurrentState() {
-        return currentState;
-    }
-
-    public void setCurrentState(String currentState) {
-        this.currentState = currentState;
-    }
-
-    public Map<String, Object> getJobConfig() {
-        return jobConfig;
-    }
-
-    public void setJobConfig(Map<String, Object> jobConfig) {
-        this.jobConfig = jobConfig;
-    }
-
-    public Map<String, Object> getJobMeta() {
-        return jobMeta;
-    }
-
-    public void setJobMeta(Map<String, Object> jobMeta) {
-        this.jobMeta = jobMeta;
-    }
-
-    public double getProgress() {
-        return progress;
-    }
-
-    public void setProgress(double progress) {
-        this.progress = progress;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
index 6617916..60ee8d6 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
@@ -17,8 +17,9 @@
 
 package org.apache.eagle.jpm.analyzer;
 
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 
-public interface Evaluator {
-    Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity);
+public interface Evaluator<T extends AnalyzerEntity> {
+    Result.EvaluatorResult evaluate(T analyzerEntity);
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
index 6cda1cd..1e9c00e 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -17,12 +17,14 @@
 
 package org.apache.eagle.jpm.analyzer;
 
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+
 /**
- * Each JobAnalyzer contains one or more Evaluators to analysis each job.
+ * Each JobAnalyzer contains one or more Evaluators to analyze each job.
  * Each Evaluator is a group of Processors
- * Each Processor implements an algorithm or a model to analysis one dimension of a job
+ * Each Processor implements an algorithm or a model to analyze one dimension of a job
  *
  */
-public interface JobAnalyzer {
-    void analysis(AnalyzerEntity analyzerEntity) throws Exception;
+public interface JobAnalyzer<T extends AnalyzerEntity> {
+    void analyze(T analyzerEntity) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
index d5a8a74..419e402 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
@@ -17,8 +17,9 @@
 
 package org.apache.eagle.jpm.analyzer;
 
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 
-public interface Processor {
-    Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity);
+public interface Processor<T extends AnalyzerEntity> {
+    Result.ProcessorResult process(T jobAnalysisEntity);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
new file mode 100644
index 0000000..189d85d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
@@ -0,0 +1,130 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * will refactor later if other types of job needs this.
+ * AnalyzerEntity for each job needed to be analyzed
+ */
+public class AnalyzerEntity {
+    private String jobDefId;
+    private String jobId;
+    private String siteId;
+    private String userId;
+
+    private long startTime;
+    private long endTime;
+    private long durationTime;
+    private String currentState;
+    private double progress;
+
+    private Map<String, Object> jobConfig = new HashMap<>();
+
+    private Map<String, Object> jobMeta = new HashMap<>();
+
+    public String getJobDefId() {
+        return jobDefId;
+    }
+
+    public void setJobDefId(String jobDefId) {
+        this.jobDefId = jobDefId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setUserId(String userId) {
+        this.userId = userId;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
+    public long getDurationTime() {
+        return durationTime;
+    }
+
+    public void setDurationTime(long durationTime) {
+        this.durationTime = durationTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public Map<String, Object> getJobConfig() {
+        return jobConfig;
+    }
+
+    public void setJobConfig(Map<String, Object> jobConfig) {
+        this.jobConfig = jobConfig;
+    }
+
+    public Map<String, Object> getJobMeta() {
+        return jobMeta;
+    }
+
+    public void setJobMeta(Map<String, Object> jobMeta) {
+        this.jobMeta = jobMeta;
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
new file mode 100644
index 0000000..cd6249d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MapReduceAnalyzerEntity extends AnalyzerEntity {
+    private String jobName;
+    private String jobQueueName;
+    private String jobType;
+    private int totalMaps;
+    private int totalReduces;
+    private int failedMaps;
+    private int failedReduces;
+    private int finishedMaps;
+    private int finishedReduces;
+    private JobCounters totalCounters;
+    private JobCounters mapCounters;
+    private JobCounters reduceCounters;
+    private Map<String, TaskExecutionAPIEntity> tasksMap;
+    private Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap;
+    private Configuration jobConf;
+
+    public MapReduceAnalyzerEntity() {
+        this.setEndTime(-1);
+        this.setStartTime(-1);
+        finishedMaps = finishedReduces = 0;
+        jobName = jobQueueName = "";
+        tasksMap = new HashMap<>();
+        completedTaskAttemptsMap = new HashMap<>();
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public String getJobQueueName() {
+        return jobQueueName;
+    }
+
+    public String getJobType() {
+        return jobType;
+    }
+
+    public int getTotalMaps() {
+        return totalMaps;
+    }
+
+    public int getTotalReduces() {
+        return totalReduces;
+    }
+
+    public int getFailedMaps() {
+        return failedMaps;
+    }
+
+    public int getFailedReduces() {
+        return failedReduces;
+    }
+
+    public int getFinishedMaps() {
+        return finishedMaps;
+    }
+
+    public int getFinishedReduces() {
+        return finishedReduces;
+    }
+
+    public JobCounters getTotalCounters() {
+        return totalCounters;
+    }
+
+    public JobCounters getMapCounters() {
+        return mapCounters;
+    }
+
+    public JobCounters getReduceCounters() {
+        return reduceCounters;
+    }
+
+    public Map<String, TaskExecutionAPIEntity> getTasksMap() {
+        return tasksMap;
+    }
+
+    public Map<String, TaskAttemptExecutionAPIEntity> getCompletedTaskAttemptsMap() {
+        return completedTaskAttemptsMap;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public void setJobQueueName(String jobQueueName) {
+        this.jobQueueName = jobQueueName;
+    }
+
+    public void setJobType(String jobType) {
+        this.jobType = jobType;
+    }
+
+    public void setTotalMaps(int totalMaps) {
+        this.totalMaps = totalMaps;
+    }
+
+    public void setTotalReduces(int totalReduces) {
+        this.totalReduces = totalReduces;
+    }
+
+    public void setFailedMaps(int failedMaps) {
+        this.failedMaps = failedMaps;
+    }
+
+    public void setFailedReduces(int failedReduces) {
+        this.failedReduces = failedReduces;
+    }
+
+    public void setFinishedMaps(int finishedMaps) {
+        this.finishedMaps = finishedMaps;
+    }
+
+    public void setFinishedReduces(int finishedReduces) {
+        this.finishedReduces = finishedReduces;
+    }
+
+    public void setTotalCounters(JobCounters totalCounters) {
+        this.totalCounters = totalCounters;
+    }
+
+    public void setMapCounters(JobCounters mapCounters) {
+        this.mapCounters = mapCounters;
+    }
+
+    public void setReduceCounters(JobCounters reduceCounters) {
+        this.reduceCounters = reduceCounters;
+    }
+
+    public void setTasksMap(Map<String, TaskExecutionAPIEntity> tasksMap) {
+        this.tasksMap = tasksMap;
+    }
+
+    public void setCompletedTaskAttemptsMap(Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap) {
+        this.completedTaskAttemptsMap = completedTaskAttemptsMap;
+    }
+
+    public Configuration getJobConf() {
+        return jobConf;
+    }
+
+    public void setJobConf(Configuration jobConf) {
+        this.jobConf = jobConf;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
index e0e579a..e32a37c 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.eagle.jpm.analyzer.mr;
 import com.typesafe.config.Config;
 import org.apache.eagle.jpm.analyzer.*;
 import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
 import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
 import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
@@ -33,7 +34,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
+public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAnalyzer<T>, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class);
 
     private List<Evaluator> evaluators = new ArrayList<>();
@@ -51,11 +52,14 @@ public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
     }
 
     @Override
-    public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception {
+    public void analyze(T analyzerJobEntity) throws Exception {
         Result result = new Result();
 
         for (Evaluator evaluator : evaluators) {
-            result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity));
+            Result.EvaluatorResult evaluatorResult = evaluator.evaluate(analyzerJobEntity);
+            if (evaluatorResult != null) {
+                result.addEvaluatorResult(evaluator.getClass(), evaluatorResult);
+            }
         }
 
         for (Publisher publisher : publishers) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
index f10b68d..a77e55d 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -18,7 +18,7 @@
 package org.apache.eagle.jpm.analyzer.mr.sla;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.Evaluator;
 import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
 import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor;
@@ -26,6 +26,7 @@ import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJob
 import org.apache.eagle.jpm.analyzer.Processor;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.apache.eagle.jpm.util.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,10 @@ public class SLAJobEvaluator implements Evaluator, Serializable {
 
     @Override
     public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
+        if (!analyzerJobEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) {
+            return null;
+        }
+
         Result.EvaluatorResult result = new Result.EvaluatorResult();
 
         List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
index 35f3b27..b3322ed 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
@@ -18,7 +18,7 @@
 package org.apache.eagle.jpm.analyzer.mr.sla.processors;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.Processor;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.slf4j.Logger;
@@ -38,6 +38,6 @@ public class LongStuckJobProcessor implements Processor, Serializable {
     @Override
     public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
         LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId());
-        return new Result.ProcessorResult(Result.ResultLevel.NONE, "");
+        return new Result.ProcessorResult(Result.RuleType.LONG_STUCK_JOB, Result.ResultLevel.NONE, "");
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
index 9d4ce2b..8f655ba 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -18,7 +18,7 @@
 package org.apache.eagle.jpm.analyzer.mr.sla.processors;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.apache.eagle.jpm.analyzer.Processor;
 import org.apache.eagle.jpm.analyzer.util.Constants;
@@ -50,7 +50,7 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab
         Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
         long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
         if (avgDurationTime == 0L) {
-            return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+            return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE);
         }
 
         Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
@@ -62,12 +62,12 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab
         double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
         for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
             if (expirePercent >= entry.getValue()) {
-                return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+                return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
                         (int)(expirePercent * 100), avgDurationTime / 1000));
             }
         }
 
-        return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+        return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE);
     }
 
     private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
index 79f5318..ea60ff9 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -18,15 +18,24 @@
 package org.apache.eagle.jpm.analyzer.mr.suggestion;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.mr.historyentity.JobSuggestionAPIEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-public class JobSuggestionEvaluator implements Evaluator, Serializable {
+import static org.apache.eagle.jpm.util.MRJobTagName.*;
+
+public class JobSuggestionEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class);
 
     private Config config;
@@ -35,10 +44,63 @@ public class JobSuggestionEvaluator implements Evaluator, Serializable {
         this.config = config;
     }
 
+    private List<Processor> loadProcessors(MapReduceJobSuggestionContext context) {
+        List<Processor> processors = new ArrayList<>();
+        processors.add(new MapReduceCompressionSettingProcessor(context));
+        processors.add(new MapReduceSplitSettingProcessor(context));
+        processors.add(new MapReduceDataSkewProcessor(context));
+        processors.add(new MapReduceGCTimeProcessor(context));
+        processors.add(new MapReduceSpillProcessor(context));
+        processors.add(new MapReduceTaskNumProcessor(context));
+        //processors.add(new MapReduceQueueResourceProcessor(context));
+
+        return processors;
+    }
+
     @Override
-    public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) {
-        Result.EvaluatorResult result = new Result.EvaluatorResult();
-        //TODO
-        return result;
+    public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity analyzerEntity) {
+        if (analyzerEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) {
+            return null;
+        }
+
+        MapReduceJobSuggestionContext jobContext = new MapReduceJobSuggestionContext(analyzerEntity);
+        if (jobContext.getNumMaps() == 0) {
+            return null;
+        }
+
+        try {
+            Result.EvaluatorResult result = new Result.EvaluatorResult();
+            for (Processor processor : loadProcessors(jobContext)) {
+                Result.ProcessorResult processorResult = processor.process(analyzerEntity);
+                if (processorResult != null) {
+                    result.addProcessorResult(processor.getClass(), processorResult);
+                    result.addProcessorEntity(processor.getClass(), createJobSuggestionEntity(processorResult, analyzerEntity));
+                }
+            }
+            return result;
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            return null;
+        }
+
     }
+
+    private static JobSuggestionAPIEntity createJobSuggestionEntity(Result.ProcessorResult processorResult, MapReduceAnalyzerEntity entity) {
+        Map<String, String> tags = new HashMap<>();
+        tags.put(JOB_ID.toString(), entity.getJobId());
+        tags.put(JOD_DEF_ID.toString(), entity.getJobDefId());
+        tags.put(SITE.toString(), entity.getSiteId());
+        tags.put(USER.toString(), entity.getUserId());
+        tags.put(RULE_TYPE.toString(), processorResult.getRuleType().toString());
+        tags.put(JOB_QUEUE.toString(), entity.getJobQueueName());
+        tags.put(JOB_TYPE.toString(), entity.getJobType());
+        JobSuggestionAPIEntity jobSuggestionAPIEntity = new JobSuggestionAPIEntity();
+        jobSuggestionAPIEntity.setTags(tags);
+        jobSuggestionAPIEntity.setTimestamp(entity.getStartTime());  // startTime as the job timestamp
+        jobSuggestionAPIEntity.setOptimizerSuggestion(processorResult.getMessage());
+        jobSuggestionAPIEntity.setOptimizerSettings(processorResult.getSettings());
+
+        return jobSuggestionAPIEntity;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
new file mode 100644
index 0000000..62c5c2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+import static org.apache.hadoop.mapreduce.MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR;
+
+public class MapReduceCompressionSettingProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceCompressionSettingProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+
+        JobConf jobconf = new JobConf(context.getJobconf());
+        if (jobconf.getLong(NUM_REDUCES, 0) > 0) {
+            if (!jobconf.getCompressMapOutput()) {
+                optSettings.add(String.format("%s=true", MAP_OUTPUT_COMPRESS));
+                sb.append("Please set " + MAP_OUTPUT_COMPRESS + " to true to reduce network IO.\n");
+            } else {
+                String codecClassName = jobconf.get(MAP_OUTPUT_COMPRESS_CODEC);
+                if (!(codecClassName.endsWith("LzoCodec") || codecClassName.endsWith("SnappyCodec"))) {
+                    optSettings.add(String.format("%s=LzoCodec or SnappyCodec", MAP_OUTPUT_COMPRESS_CODEC));
+                    sb.append("Best practice: use LzoCodec or SnappyCodec for " + MAP_OUTPUT_COMPRESS_CODEC).append("\n");
+                }
+            }
+        }
+
+        if (!jobconf.getBoolean(FileOutputFormat.COMPRESS, false)) {
+            optSettings.add(String.format("%s=true", FileOutputFormat.COMPRESS));
+            sb.append("Please set " + FileOutputFormat.COMPRESS + " to true to reduce disk usage and network IO.\n");
+        } else {
+            String codecName = jobconf.get(FileOutputFormat.COMPRESS_CODEC, "");
+            String outputFileFormat = jobconf.get(OUTPUT_FORMAT_CLASS_ATTR, "");
+
+            if ((codecName.endsWith("GzipCodec") || codecName.endsWith("SnappyCodec") || codecName.endsWith("DefaultCodec"))
+                && outputFileFormat.endsWith("TextOutputFormat")) {
+                sb.append("Best practice: don't use Gzip/Snappy/DefaultCodec with TextOutputFormat");
+                sb.append(" as this will cause the output files to be unsplittable. ");
+                sb.append("Please use LZO instead or ");
+                sb.append("use a container file format such as SequenceFileOutputFormat.\n");
+            }
+        }
+
+        if (sb.length() > 0) {
+            return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, sb.toString(), optSettings);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
new file mode 100644
index 0000000..b21a927
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+public class MapReduceDataSkewProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceDataSkewProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+    
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        TaskAttemptExecutionAPIEntity worstReduce = context.getWorstReduce();
+        if (context.getNumReduces() == 0 || worstReduce == null) {
+            return null;
+        }
+        StringBuilder sb = new StringBuilder();
+        try {
+            long worstTimeInSec = (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) / DateTimeUtil.ONESECOND;
+            if (worstTimeInSec - context.getAvgReduceTimeInSec() > 30 * 60 ) {
+                long avgInputs = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS)
+                    / context.getNumReduces();
+                long worstInputs = worstReduce.getJobCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS);
+
+                if (worstInputs > avgInputs * 5) {
+                    sb.append("Data skew detected in reducers. The average reduce time is ").append(context.getAvgReduceTimeInSec());
+                    sb.append(" seconds, the worst reduce time is ").append(worstTimeInSec);
+                    sb.append(" seconds. Please investigate this problem to improve your job performance.\n");
+                }
+            }
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.INFO, sb.toString());
+            }
+        } catch (NullPointerException e) {
+            // When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
new file mode 100644
index 0000000..103de7a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_JAVA_OPTS;
+
+public class MapReduceGCTimeProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceGCTimeProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+        String setting;
+
+        try {
+            long mapGCTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS);
+            long mapCPUTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS);
+
+            if (mapGCTime > mapCPUTime * 0.1) {
+                setting = String.format("-D%s", MAP_JAVA_OPTS);
+                optSettings.add(setting);
+                sb.append("Map GC_TIME_MILLIS took too long. Please increase mapper memory via ").append(setting);
+                sb.append(", or optimize your mapper class.\n");
+            }
+
+            if (context.getNumReduces() > 0) {
+                long reduceGCTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS);
+                long reduceCPUTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS);
+                if (reduceGCTime > reduceCPUTime * 0.1) {
+                    setting = String.format("-D%s", REDUCE_JAVA_OPTS);
+                    optSettings.add(setting);
+                    sb.append("Reduce GC_TIME_MILLIS took too long. Please increase memory for reduce via ").append(setting);
+                    sb.append(", or optimize your reducer class.\n");
+                }
+            }
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.INFO, sb.toString(), optSettings);
+            }
+        } catch (NullPointerException e) {
+            // When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
new file mode 100644
index 0000000..1f4e548
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import java.util.regex.Pattern;
+
+import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_BYTES;
+import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_MAPS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+
+public class MapReduceJobSuggestionContext {
+
+    private JobConf jobconf;
+    private MapReduceAnalyzerEntity job;
+
+    private long numMaps;
+    private long numReduces;
+
+    private long avgMapTimeInSec;
+    private long avgReduceTimeInSec;
+    private long avgShuffleTimeInSec;
+    private TaskAttemptExecutionAPIEntity worstMap;
+    private TaskAttemptExecutionAPIEntity worstReduce;
+    private TaskAttemptExecutionAPIEntity worstShuffle;
+    private TaskAttemptExecutionAPIEntity lastMap;
+    private TaskAttemptExecutionAPIEntity lastReduce;
+    private TaskAttemptExecutionAPIEntity lastShuffle;
+    private TaskAttemptExecutionAPIEntity firstMap;
+    private TaskAttemptExecutionAPIEntity firstReduce;
+    private TaskAttemptExecutionAPIEntity firstShuffle;
+
+    private long minMapSpillMemBytes;
+
+    public static final Pattern MAX_HEAP_PATTERN = Pattern.compile("-Xmx([0-9]+)([kKmMgG]?)");
+
+    public MapReduceJobSuggestionContext(MapReduceAnalyzerEntity job) {
+        this.job = job;
+        this.jobconf = new JobConf(job.getJobConf());
+        buildContext();
+    }
+
+    private MapReduceJobSuggestionContext buildContext() {
+        avgMapTimeInSec = avgReduceTimeInSec = avgShuffleTimeInSec = 0;
+        numMaps = jobconf.getLong(NUM_MAPS, 0);
+        numReduces = jobconf.getLong(NUM_REDUCES, 0);
+
+        for (TaskAttemptExecutionAPIEntity attempt : job.getCompletedTaskAttemptsMap().values()) {
+            String taskType = getTaskType(attempt);
+            if (Constants.TaskType.MAP.toString().equalsIgnoreCase(taskType)) {
+                long mapTime = attempt.getEndTime() - attempt.getStartTime();
+                avgMapTimeInSec += mapTime;
+                if (firstMap == null || firstMap.getStartTime() > attempt.getStartTime()) {
+                    firstMap = attempt;
+                }
+                if (lastMap == null || lastMap.getEndTime() < attempt.getEndTime()) {
+                    lastMap = attempt;
+                }
+                if (worstMap == null || (worstMap.getEndTime() - worstMap.getStartTime()) < mapTime) {
+                    worstMap = attempt;
+                }
+                long tmpMem = getMinimumIOSortMemory(attempt);
+                if (tmpMem > minMapSpillMemBytes) {
+                    minMapSpillMemBytes = tmpMem;
+                }
+            } else if (TaskType.REDUCE.toString().equalsIgnoreCase(taskType)) {
+                long shuffleTime = attempt.getShuffleFinishTime() - attempt.getStartTime();
+                avgShuffleTimeInSec += shuffleTime;
+                if (firstShuffle == null || firstShuffle.getStartTime() > attempt.getStartTime()) {
+                    firstShuffle = attempt;
+                }
+                if (lastShuffle == null || lastShuffle.getShuffleFinishTime() < attempt.getShuffleFinishTime()) {
+                    lastShuffle = attempt;
+                }
+                if (worstShuffle == null || (worstShuffle.getShuffleFinishTime() - worstShuffle.getStartTime()) < shuffleTime) {
+                    worstShuffle = attempt;
+                }
+
+                long reduceTime = attempt.getEndTime() - attempt.getShuffleFinishTime();
+                avgReduceTimeInSec += reduceTime;
+                if (firstReduce == null || firstReduce.getStartTime() > attempt.getStartTime()) {
+                    firstReduce = attempt;
+                }
+                if (lastReduce == null || lastReduce.getEndTime() < attempt.getEndTime()) {
+                    lastReduce = attempt;
+                }
+                if (worstReduce == null || (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) < reduceTime) {
+                    worstReduce = attempt;
+                }
+            }
+        }
+        if (numMaps > 0) {
+            avgMapTimeInSec = avgMapTimeInSec / numMaps / DateTimeUtil.ONESECOND;
+        }
+        if (numReduces > 0) {
+            avgReduceTimeInSec = avgReduceTimeInSec / numReduces / DateTimeUtil.ONESECOND;
+            avgShuffleTimeInSec = avgShuffleTimeInSec / numReduces / DateTimeUtil.ONESECOND;
+        }
+        return this;
+    }
+
+    private String getTaskType(TaskAttemptExecutionAPIEntity taskAttemptInfo) {
+        return taskAttemptInfo.getTags().get(MRJobTagName.TASK_TYPE.toString());
+    }
+
+    /**
+     * The default index size is 16.
+     *
+     * @param attempt
+     * @return minimal sort memory
+     */
+    private long getMinimumIOSortMemory(TaskAttemptExecutionAPIEntity attempt) {
+        long records = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_RECORDS);
+        long outputBytes = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_BYTES);
+        return outputBytes + records * 16;
+    }
+
+    public JobConf getJobconf() {
+        return jobconf;
+    }
+
+    public MapReduceAnalyzerEntity getJob() {
+        return job;
+    }
+
+    public long getNumMaps() {
+        return numMaps;
+    }
+
+    public long getNumReduces() {
+        return numReduces;
+    }
+
+    public long getAvgMapTimeInSec() {
+        return avgMapTimeInSec;
+    }
+
+    public long getAvgReduceTimeInSec() {
+        return avgReduceTimeInSec;
+    }
+
+    public long getAvgShuffleTimeInSec() {
+        return avgShuffleTimeInSec;
+    }
+
+    public TaskAttemptExecutionAPIEntity getWorstMap() {
+        return worstMap;
+    }
+
+    public TaskAttemptExecutionAPIEntity getWorstReduce() {
+        return worstReduce;
+    }
+
+    public TaskAttemptExecutionAPIEntity getWorstShuffle() {
+        return worstShuffle;
+    }
+
+    public TaskAttemptExecutionAPIEntity getLastMap() {
+        return lastMap;
+    }
+
+    public TaskAttemptExecutionAPIEntity getLastReduce() {
+        return lastReduce;
+    }
+
+    public TaskAttemptExecutionAPIEntity getLastShuffle() {
+        return lastShuffle;
+    }
+
+    public TaskAttemptExecutionAPIEntity getFirstMap() {
+        return firstMap;
+    }
+
+    public TaskAttemptExecutionAPIEntity getFirstReduce() {
+        return firstReduce;
+    }
+
+    public TaskAttemptExecutionAPIEntity getFirstShuffle() {
+        return firstShuffle;
+    }
+
+    public long getMinMapSpillMemBytes() {
+        return minMapSpillMemBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
new file mode 100644
index 0000000..a1b57bf
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+/*
+ * Criterion: (TimeElapsed / (numTasks / 500 * avgTaskTime)) > 20
+ */
+public class MapReduceQueueResourceProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private static final Logger LOG = LoggerFactory.getLogger(MapReduceQueueResourceProcessor.class);
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceQueueResourceProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        try {
+            String userName = context.getJob().getUserId();
+            TaskAttemptExecutionAPIEntity lastMap = context.getLastMap();
+            TaskAttemptExecutionAPIEntity firstMap = context.getFirstMap();
+            TaskAttemptExecutionAPIEntity lastReduce = context.getLastReduce();
+            TaskAttemptExecutionAPIEntity firstShuffle = context.getFirstShuffle();
+
+            if (checkBatchUser(userName) && lastMap != null && firstMap != null) {
+                StringBuilder sb = new StringBuilder();
+
+                long tasksPerTime = 500; // better get it from RM
+                long mapPhaseTimeInSec = (lastMap.getEndTime() - firstMap.getStartTime()) / DateTimeUtil.ONESECOND;
+                if (mapPhaseTimeInSec > context.getAvgMapTimeInSec()
+                    * ((context.getNumMaps() + tasksPerTime - 1) / tasksPerTime) * 20) {
+                    sb.append("There appears to have been resource contention during the map phase of your job. Please ask for more resources if your job is SLA-bound,");
+                    sb.append(" or submit your job when the cluster is less busy.\n");
+                }
+
+                if (context.getNumReduces() > 0 && lastReduce != null && firstShuffle != null) {
+                    long reducePhaseTimeInSec = (lastReduce.getEndTime() - firstShuffle.getStartTime()) / DateTimeUtil.ONESECOND;
+                    if (reducePhaseTimeInSec > context.getAvgReduceTimeInSec()
+                        * ((context.getNumReduces() + tasksPerTime - 1) / tasksPerTime) * 20) {
+                        sb.append("Seems there was resource contention when your job in reduce phase, please ask for more resource if your job is SLA enabled,");
+                        sb.append(" or submit your job when the cluster is less busy.\n");
+                    }
+                }
+
+                if (sb.length() > 0) {
+                    return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.INFO, sb.toString());
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn(e.getMessage(), e);
+        }
+        return null;
+    }
+
+    protected boolean checkBatchUser(String userName) {
+        return userName.startsWith("b_");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
new file mode 100644
index 0000000..835b382
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import static org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceJobSuggestionContext.MAX_HEAP_PATTERN;
+import static org.apache.hadoop.mapreduce.MRJobConfig.IO_SORT_MB;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_SORT_SPILL_PERCENT;
+
+/**
+ * Check whether spilled more than once, if true, find out the minimum value of the memory to hold all the data,
+ * based on that value, find out how much memory need for heap size.
+ */
+public class MapReduceSpillProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceSpillProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+        String setting;
+
+        long outputRecords = 0L; // Map output records
+        long spillRecords = 0L; //  Spilled Records
+        try {
+            outputRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.MAP_OUTPUT_RECORDS);
+            spillRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS);
+
+            if (outputRecords < spillRecords) {
+                sb.append("Total map output records: ").append(outputRecords);
+                sb.append(" Total map spilled records: ").append(spillRecords).append(". Please set");
+
+                long minMapSpillMemBytes = context.getMinMapSpillMemBytes();
+                double spillPercent = context.getJobconf().getDouble(MAP_SORT_SPILL_PERCENT, 0.8);
+                if (minMapSpillMemBytes > 512 * FileUtils.ONE_MB * spillPercent) {
+                    if (Math.abs(1.0 - spillPercent) > 0.001) {
+                        setting = String.format("-D%s=1", MAP_SORT_SPILL_PERCENT);
+                        sb.append(" ").append(setting);
+                        optSettings.add(setting);
+                    }
+                } else {
+                    minMapSpillMemBytes /= spillPercent;
+                }
+
+                long minMapSpillMemMB = (minMapSpillMemBytes / FileUtils.ONE_MB + 10) / 10 * 10;
+                if (minMapSpillMemMB >= 2047 ) {
+                    sb.append("\nPlease reduce the block size of the input files and make sure they are splittable.");
+                } else {
+                    setting = String.format("-D%s=%s", IO_SORT_MB, minMapSpillMemMB);
+                    sb.append(" ").append(setting);
+                    optSettings.add(setting);
+                    long heapSize = getMaxHeapSize(context.getJobconf().get(MAP_JAVA_OPTS));
+                    if (heapSize < 3 * minMapSpillMemMB) {
+                        long expectedHeapSizeMB = (minMapSpillMemMB * 3 + 1024) / 1024 * 1024;
+                        setting = String.format(" -D%s=-Xmx%sM", MAP_JAVA_OPTS, expectedHeapSizeMB);
+                        sb.append(" ").append(setting);
+                        optSettings.add(setting);
+                    }
+                }
+                sb.append(" to avoid spilled records.\n");
+            }
+
+
+            long reduceInputRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS);
+            spillRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS);
+            if (reduceInputRecords < spillRecords) {
+                sb.append("Please add more memory (mapreduce.reduce.java.opts) to avoid spilled records.");
+                sb.append(" Total Reduce input records: ").append(reduceInputRecords);
+                sb.append(" Total Spilled Records: ").append(spillRecords);
+                sb.append("\n");
+            }
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.INFO, sb.toString(), optSettings);
+            }
+        } catch (NullPointerException e) {
+            //When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+
+    private static long getMaxHeapSize(String s) {
+        Matcher m = MAX_HEAP_PATTERN.matcher(s);
+        long val = 0;
+        if (m.find()) {
+            val = Long.parseLong(m.group(1));
+            if ("k".equalsIgnoreCase(m.group(2))) {
+                val /= 1024;
+            } else if ("g".equalsIgnoreCase(m.group(2))) {
+                val *= 1024;
+            }
+        }
+        return val;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
new file mode 100644
index 0000000..8eba468
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MapReduceSplitSettingProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceSplitSettingProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+
+        if (context.getJobconf().getLong(FileInputFormat.SPLIT_MINSIZE, 0) > 1) {
+            sb.append("Best practice: don't set " + FileInputFormat.SPLIT_MINSIZE);
+            sb.append(", because it may lower data locality, hence maps will run slower.\n");
+            return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.INFO, sb.toString());
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
new file mode 100644
index 0000000..00d5cc9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+
+public class MapReduceTaskNumProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private static final String[] SIZE_UNITS = {"B", "K", "M", "G", "T", "P"};
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceTaskNumProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+        try {
+            sb.append(analyzeReduceTaskNum(optSettings));
+            sb.append(analyzeMapTaskNum(optSettings));
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.INFO, sb.toString(), optSettings);
+            }
+        } catch (NullPointerException e) {
+            // When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+
+
+    private String analyzeReduceTaskNum(List<String> optSettings) {
+        StringBuilder sb = new StringBuilder();
+
+        long numReduces = context.getNumReduces();
+        if (numReduces > 0) {
+            long avgReduceTime = context.getAvgReduceTimeInSec();
+            long avgShuffleTime = context.getAvgShuffleTimeInSec();
+            long avgShuffleBytes = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_SHUFFLE_BYTES)
+                    / numReduces;
+            long avgReduceOutput = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_WRITTEN)
+                    / numReduces;
+            long avgReduceTotalTime = avgShuffleTime + avgReduceTime;
+
+            long suggestReduces = 0;
+            StringBuilder tmpsb = new StringBuilder();
+
+            String avgShuffleDisplaySize = bytesToHumanReadable(avgShuffleBytes);
+            if (avgShuffleBytes < 256 * FileUtils.ONE_MB && avgReduceTotalTime < 300
+                    && avgReduceOutput < 256 * FileUtils.ONE_MB && numReduces > 1) {
+                tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", ");
+                suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+            } else if (avgShuffleBytes > 10 * FileUtils.ONE_GB && avgReduceTotalTime > 1800) {
+                tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", ");
+                suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+            }
+
+            if (avgReduceTotalTime < 60 && numReduces > 1) {
+                tmpsb.append("average reduce time is only ").append(avgReduceTotalTime).append(" seconds, ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            } else if (avgReduceTotalTime > 3600 && avgReduceTime > 1800) {
+                tmpsb.append("average reduce time is ").append(avgReduceTotalTime).append(" seconds, ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            }
+
+            String avgReduceOutputDisplaySize = bytesToHumanReadable(avgReduceOutput);
+            if (avgReduceOutput < 10 * FileUtils.ONE_MB && avgReduceTime < 300
+                    && avgShuffleBytes < 2 * FileUtils.ONE_GB && numReduces > 1) {
+                tmpsb.append(" average reduce output is only ").append(avgReduceOutputDisplaySize).append(", ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            } else if (avgReduceOutput > 10 * FileUtils.ONE_GB && avgReduceTime > 1800) {
+                tmpsb.append(" average reduce output is ").append(avgReduceOutputDisplaySize).append(", ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            }
+
+            if (suggestReduces > 0) {
+                sb.append("Best practice: ").append(tmpsb.toString()).append("please consider ");
+                if (suggestReduces > numReduces) {
+                    sb.append("increasing the ");
+                } else {
+                    sb.append("decreasing the ");
+                }
+                String setting = String.format("-D%s=%s", NUM_REDUCES, suggestReduces);
+                sb.append("reducer number. You could try ").append(setting).append("\n");
+                optSettings.add(setting);
+            }
+        }
+        return sb.toString();
+    }
+
+    private String analyzeMapTaskNum(List<String> optSettings) {
+        StringBuilder sb = new StringBuilder();
+
+        long numMaps = context.getNumMaps();
+        long avgMapTime = context.getAvgMapTimeInSec();
+        long avgMapInput = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_READ)
+                / numMaps;
+        String avgMapInputDisplaySize = bytesToHumanReadable(avgMapInput);
+
+        if (avgMapInput < 5 * FileUtils.ONE_MB && avgMapTime < 30 && numMaps > 1) {
+            sb.append("Best practice: average map input bytes only have ").append(avgMapInputDisplaySize);
+            sb.append(". Please reduce the number of mappers by merging input files.\n");
+        } else if (avgMapInput > FileUtils.ONE_GB) {
+            sb.append("Best practice: average map input bytes have ").append(avgMapInputDisplaySize);
+            sb.append(". Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n");
+        }
+
+        if (avgMapTime < 10 && numMaps > 1) {
+            sb.append("Best practice: average map time only have ").append(avgMapTime);
+            sb.append(" seconds. Please reduce the number of mappers by merging input files or by using a larger block size.\n");
+        } else if (avgMapTime > 600 && avgMapInput < FileUtils.ONE_GB) {
+            sb.append("Best practice: average map time is ").append(avgMapInput);
+            sb.append(" seconds. Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n");
+        }
+
+        return sb.toString();
+    }
+
+    private long getReduceNum(long avgInputBytes, long avgOutputBytes, long avgTime) {
+        long newReduceNum = 1;
+        long tmpReduceNum;
+
+        long numReduces = context.getNumReduces();
+        tmpReduceNum = avgInputBytes * numReduces / (3 * FileUtils.ONE_GB);
+        if (tmpReduceNum > newReduceNum) {
+            newReduceNum = tmpReduceNum;
+        }
+
+        tmpReduceNum = avgOutputBytes * numReduces / (2 * FileUtils.ONE_GB);
+        if (tmpReduceNum > newReduceNum) {
+            newReduceNum = tmpReduceNum;
+        }
+
+        tmpReduceNum = avgTime * numReduces / (10 * 60);
+        if (tmpReduceNum > newReduceNum) {
+            newReduceNum = tmpReduceNum;
+        }
+
+        return newReduceNum;
+    }
+
+
+    private static String bytesToHumanReadable(long bytes) {
+        double val = bytes;
+        int idx = 0;
+        while (val >= 1024) {
+            val /= 1024.0;
+            idx += 1;
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append((int)Math.floor(val));
+        sb.append(SIZE_UNITS[idx]);
+        int tmp = (int)(1000 * val) % 1000;
+        if (idx >= 1 && tmp > 0) {
+            sb.append(tmp);
+            sb.append(SIZE_UNITS[idx - 1]);
+        }
+        return sb.toString();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
index 6109704..0d7d2d7 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -18,23 +18,61 @@
 package org.apache.eagle.jpm.analyzer.publisher;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
 
 public class EagleStorePublisher implements Publisher, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class);
 
     private Config config;
+    private IEagleServiceClient client;
+    private AlertDeduplicator alertDeduplicator;
 
     public EagleStorePublisher(Config config) {
         this.config = config;
+        this.alertDeduplicator = new SimpleDeduplicator();
     }
 
     @Override
     public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+        if (result.getAlertMessages().size() == 0) {
+            return;
+        }
+
+        LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId());
+        if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+            LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+            return;
+        }
+
+        try {
+            this.client = new EagleServiceClientImpl(config);
+            for (Map.Entry<String, List<TaggedLogAPIEntity>> entry : result.getAlertEntities().entrySet()) {
+                client.create(entry.getValue());
+                LOG.info("successfully persist {} entities for evaluator {}", entry.getValue().size(), entry.getKey());
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        } finally {
+            if (client != null) {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    LOG.error(e.getMessage(), e);
+                }
+            }
+        }
 
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
index 4e49094..842e0ac 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -18,12 +18,10 @@
 package org.apache.eagle.jpm.analyzer.publisher;
 
 import com.typesafe.config.Config;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.app.service.ApplicationEmailService;
 import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.mail.AlertEmailConstants;
 import org.apache.eagle.common.mail.AlertEmailContext;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
 import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
 import org.apache.eagle.jpm.analyzer.util.Constants;
@@ -31,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.net.URLEncoder;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,19 +66,15 @@ public class EmailPublisher implements Publisher, Serializable {
         basic.put("end", analyzerJobEntity.getEndTime() == 0
                 ? "0"
                 : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime()));
-        basic.put("progress", analyzerJobEntity.getProgress() + "%");
+        double progress = analyzerJobEntity.getCurrentState().equalsIgnoreCase(org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString()) ? analyzerJobEntity.getProgress() : 100;
+        basic.put("progress", progress + "%");
         basic.put("detail", getJobLink(analyzerJobEntity));
 
-
-        Map<String, Map<String, String>> extend = new HashMap<>();
-        Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages();
-        for (String evaluator : alertMessages.keySet()) {
-            List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator);
-            extend.put(evaluator, new HashMap<>());
-            for (Pair<Result.ResultLevel, String> message : messages) {
+        Map<String, List<Result.ProcessorResult>> extend = result.getAlertMessages();
+        for (String evaluator : extend.keySet()) {
+            for (Result.ProcessorResult message : extend.get(evaluator)) {
                 LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
-                        analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator);
-                extend.get(evaluator).put(message.getRight(), message.getLeft().toString());
+                        analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
index 2f42bf9..1f42ef9 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher;
 
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 
 public interface Publisher {
     void publish(AnalyzerEntity analyzerJobEntity, Result result);


Mime
View raw message