eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject [2/2] eagle git commit: [EAGLE-797] add job performance analysis
Date Thu, 12 Jan 2017 02:23:30 GMT
[EAGLE-797] add job performance analysis

for continuous development

Author: wujinhu <wujinhu920@126.com>

Closes #762 from wujinhu/EAGLE-797.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/b4695801
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/b4695801
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/b4695801

Branch: refs/heads/master
Commit: b4695801f6c93ca0991fd9758c470b784aa8c781
Parents: 2adbbf5
Author: wujinhu <wujinhu920@126.com>
Authored: Thu Jan 12 10:23:02 2017 +0800
Committer: Zhao, Qingwen <qingwzhao@apache.org>
Committed: Thu Jan 12 10:23:02 2017 +0800

----------------------------------------------------------------------
 .../app/spi/AbstractApplicationProvider.java    |   3 +-
 ...adoopQueueRunningApplicationHealthCheck.java |   3 +-
 .../jpm/aggregation/mr/MRMetricAggregator.java  |   3 +
 .../mr/MRMetricsAggregateContainer.java         |   1 +
 eagle-jpm/eagle-jpm-analyzer/pom.xml            |  38 ++++++
 .../eagle/jpm/analyzer/AnalyzerEntity.java      | 130 ++++++++++++++++++
 .../apache/eagle/jpm/analyzer/Evaluator.java    |  24 ++++
 .../apache/eagle/jpm/analyzer/JobAnalyzer.java  |  28 ++++
 .../apache/eagle/jpm/analyzer/Processor.java    |  24 ++++
 .../analyzer/meta/MetaManagementService.java    |  39 ++++++
 .../impl/MetaManagementServiceJDBCImpl.java     |  77 +++++++++++
 .../impl/MetaManagementServiceMemoryImpl.java   | 127 ++++++++++++++++++
 .../jpm/analyzer/meta/model/JobMetaEntity.java  |  85 ++++++++++++
 .../analyzer/meta/model/PublisherEntity.java    |  77 +++++++++++
 .../analyzer/mr/MRJobPerformanceAnalyzer.java   |  65 +++++++++
 .../jpm/analyzer/mr/sla/SLAJobEvaluator.java    |  67 ++++++++++
 .../sla/processors/LongStuckJobProcessor.java   |  43 ++++++
 .../UnExpectedLongDurationJobProcessor.java     | 120 +++++++++++++++++
 .../mr/suggestion/JobSuggestionEvaluator.java   |  44 +++++++
 .../analyzer/publisher/EagleStorePublisher.java |  40 ++++++
 .../jpm/analyzer/publisher/EmailPublisher.java  | 109 +++++++++++++++
 .../eagle/jpm/analyzer/publisher/Publisher.java |  24 ++++
 .../eagle/jpm/analyzer/publisher/Result.java    | 109 +++++++++++++++
 .../publisher/dedup/AlertDeduplicator.java      |  25 ++++
 .../dedup/impl/SimpleDeduplicator.java          |  59 +++++++++
 .../jpm/analyzer/resource/AnalyzerResource.java | 131 +++++++++++++++++++
 .../eagle/jpm/analyzer/util/Constants.java      |  65 +++++++++
 .../apache/eagle/jpm/analyzer/util/Utils.java   |  74 +++++++++++
 .../main/resources/AnalyzerReportTemplate.vm    | 131 +++++++++++++++++++
 .../src/main/resources/createTable.sql          |  23 ++++
 .../MRHistoryJobApplicationHealthCheck.java     |   1 -
 ...JobConfigurationCreationServiceListener.java |   1 -
 .../JobEntityCreationEagleServiceListener.java  |   3 +-
 .../parser/TaskAttemptCounterListener.java      |   3 +-
 .../mr/history/parser/TaskFailureListener.java  |   1 -
 eagle-jpm/eagle-jpm-mr-running/pom.xml          |  10 ++
 .../jpm/mr/running/MRRunningJobApplication.java |   4 +-
 .../MRRunningJobApplicationProvider.java        |  11 ++
 .../parser/MRJobEntityCreationHandler.java      |   1 -
 .../jpm/mr/running/parser/MRJobParser.java      |  25 +++-
 .../mr/running/storm/MRRunningJobParseBolt.java |   9 +-
 .../mr/running/MRRunningJobApplicationTest.java |   4 +-
 .../jpm/mr/running/parser/MRJobParserTest.java  |  17 +--
 .../SparkHistoryJobApplicationHealthCheck.java  |   3 +-
 .../history/crawl/JHFSparkEventReader.java      |   2 +-
 .../parser/SparkAppEntityCreationHandler.java   |   2 +-
 .../java/org/apache/eagle/jpm/util/Utils.java   |   6 +-
 eagle-jpm/pom.xml                               |   1 +
 eagle-server-assembly/src/main/conf/eagle.conf  |   6 +
 .../TopologyCheckApplicationHealthCheck.java    |   3 +-
 50 files changed, 1868 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 2a8d7c0..e537643 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -19,6 +19,7 @@ package org.apache.eagle.app.spi;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.service.ApplicationListener;
@@ -130,7 +131,7 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
         currentRegistry.register(scope, new AbstractModule() {
             @Override
             protected void configure() {
-                bind(type).to(impl);
+                bind(type).to(impl).in(Singleton.class);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
index 6471dfc..5a5d0ee 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -52,7 +52,7 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
                 eagleServiceConfig.eagleService.username,
                 eagleServiceConfig.eagleService.password);
 
-        client.getJerseyClient().setReadTimeout(60000);
+        client.setReadTimeout(60000);
 
         String message = "";
         try {
@@ -91,7 +91,6 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
         } catch (Exception e) {
             return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
         } finally {
-            client.getJerseyClient().destroy();
             try {
                 client.close();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
index f8840b2..54bd29b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
@@ -74,6 +74,8 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
                 .endTime(endTime)
                 .pageSize(Integer.MAX_VALUE)
                 .send();
+
+            client.close();
         } catch (Exception e) {
             LOG.warn("{}", e);
             return false;
@@ -151,6 +153,7 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
             client.create(entities);
             LOG.info("finish flushing entities of total number " + entities.size());
             entities.clear();
+            client.close();
         } catch (Exception e) {
             LOG.warn("{}", e);
             return false;

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
index 45bbcef..dd1980b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
@@ -74,6 +74,7 @@ public class MRMetricsAggregateContainer implements MetricsAggregateContainer, S
 
             List<Map<List<String>, List<Double>>> results = response.getObj();
             long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+            client.close();
             return currentProcessTimeStamp;
         } catch (Exception e) {
             LOG.warn("{}", e);

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
new file mode 100644
index 0000000..d6383b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-jpm-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-jpm-analyzer</artifactId>
+    <name>Eagle::App::JPM::Analyzer</name>
+    <url>http://maven.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-metadata-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
new file mode 100644
index 0000000..f9b7af0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * will refactor later if other types of job needs this.
+ * AnalyzerEntity for each job needed to be analysised
+ */
+public class AnalyzerEntity {
+    private String jobDefId;
+    private String jobId;
+    private String siteId;
+    private String userId;
+
+    private long startTime;
+    private long endTime;
+    private long durationTime;
+    private String currentState;
+    private double progress;
+
+    private Map<String, Object> jobConfig = new HashMap<>();
+
+    private Map<String, Object> jobMeta = new HashMap<>();
+
+    public String getJobDefId() {
+        return jobDefId;
+    }
+
+    public void setJobDefId(String jobDefId) {
+        this.jobDefId = jobDefId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setUserId(String userId) {
+        this.userId = userId;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
+    public long getDurationTime() {
+        return durationTime;
+    }
+
+    public void setDurationTime(long durationTime) {
+        this.durationTime = durationTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public Map<String, Object> getJobConfig() {
+        return jobConfig;
+    }
+
+    public void setJobConfig(Map<String, Object> jobConfig) {
+        this.jobConfig = jobConfig;
+    }
+
+    public Map<String, Object> getJobMeta() {
+        return jobMeta;
+    }
+
+    public void setJobMeta(Map<String, Object> jobMeta) {
+        this.jobMeta = jobMeta;
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
new file mode 100644
index 0000000..6617916
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Evaluator {
+    Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
new file mode 100644
index 0000000..6cda1cd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+/**
+ * Each JobAnalyzer contains one or more Evaluators to analysis each job.
+ * Each Evaluator is a group of Processors
+ * Each Processor implements an algorithm or a model to analysis one dimension of a job
+ *
+ */
+public interface JobAnalyzer {
+    void analysis(AnalyzerEntity analyzerEntity) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
new file mode 100644
index 0000000..d5a8a74
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Processor {
+    Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
new file mode 100644
index 0000000..0935266
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta;
+
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+
+import java.util.List;
+
+public interface MetaManagementService {
+    boolean addJobMeta(JobMetaEntity jobMetaEntity);
+
+    boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity);
+
+    List<JobMetaEntity> getJobMeta(String jobDefId);
+
+    boolean deleteJobMeta(String jobDefId);
+
+    boolean addPublisherMeta(PublisherEntity publisherEntity);
+
+    boolean deletePublisherMeta(String userId);
+
+    List<PublisherEntity> getPublisherMeta(String userId);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
new file mode 100644
index 0000000..cfb5029
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class);
+
+    @Inject
+    Config config;
+
+    @Override
+    public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+
+        return true;
+    }
+
+    @Override
+    public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+
+        return true;
+    }
+
+    @Override
+    public List<JobMetaEntity> getJobMeta(String jobDefId) {
+
+        return null;
+    }
+
+    @Override
+    public boolean deleteJobMeta(String jobDefId) {
+
+        return true;
+    }
+
+    @Override
+    public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+
+        return true;
+    }
+
+    @Override
+    public boolean deletePublisherMeta(String userId) {
+
+        return true;
+    }
+
+    @Override
+    public List<PublisherEntity> getPublisherMeta(String userId) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
new file mode 100644
index 0000000..85e8358
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class);
+
+    private final Map<String, JobMetaEntity> jobMetaEntities = new HashMap<>();
+    private final Map<String, List<PublisherEntity>> publisherEntities = new HashMap<>();
+
+    @Inject
+    Config config;
+
+    @Override
+    public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+        if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+            LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId());
+            return false;
+        }
+
+        jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity);
+        LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId());
+        return true;
+    }
+
+    @Override
+    public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+        if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+            LOG.warn("does not contain job {}, update job meta failed", jobDefId);
+            return false;
+        }
+
+        jobMetaEntities.put(jobDefId, jobMetaEntity);
+        LOG.info("Successfully update job {} meta", jobDefId);
+        return true;
+    }
+
+    @Override
+    public List<JobMetaEntity> getJobMeta(String jobDefId) {
+        if (!jobMetaEntities.containsKey(jobDefId)) {
+            LOG.warn("does not contain job {}, get job meta failed", jobDefId);
+            return new ArrayList<>();
+        }
+
+        return Arrays.asList(jobMetaEntities.get(jobDefId));
+    }
+
+    @Override
+    public boolean deleteJobMeta(String jobDefId) {
+        if (!jobMetaEntities.containsKey(jobDefId)) {
+            LOG.warn("does not contain job {}, delete job meta failed", jobDefId);
+            return false;
+        }
+
+        jobMetaEntities.remove(jobDefId);
+        LOG.info("Successfully delete job {} meta", jobDefId);
+        return true;
+    }
+
+    @Override
+    public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+        if (publisherEntities.containsKey(publisherEntity.getUserId())) {
+            for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) {
+                if (entity.equals(publisherEntity)) {
+                    LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress());
+                    return false;
+                }
+            }
+        }
+
+        if (!publisherEntities.containsKey(publisherEntity.getUserId())) {
+            publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>());
+        }
+
+        publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity);
+        LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress());
+        return true;
+    }
+
+    @Override
+    public boolean deletePublisherMeta(String userId) {
+        if (!publisherEntities.containsKey(userId)) {
+            LOG.warn("does not contain user {}, failed to delete publisher", userId);
+            return false;
+        }
+
+        publisherEntities.remove(userId);
+        LOG.info("Successfully delete publisher user " + userId);
+        return true;
+    }
+
+    @Override
+    public List<PublisherEntity> getPublisherMeta(String userId) {
+        if (!publisherEntities.containsKey(userId)) {
+            LOG.warn("does not contain user {}, failed to get publisher", userId);
+            return new ArrayList<>();
+        }
+
+        return publisherEntities.get(userId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
new file mode 100644
index 0000000..2e15c17
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobMetaEntity extends PersistenceEntity {
+    private String jobDefId;
+    private String siteId;
+    private Map<String, Object> configuration = new HashMap<>();
+    private Set<String> evaluators = new HashSet<>();
+
+    public JobMetaEntity() {
+
+    }
+
+    public JobMetaEntity(String jobDefId,
+                         String siteId,
+                         Map<String, Object> configuration,
+                         Set<String> evaluators) {
+        this.jobDefId = jobDefId;
+        this.siteId = siteId;
+        this.configuration = configuration;
+        this.evaluators = evaluators;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("JobMetaEntity[jobDefId=%s, siteId=%s]", jobDefId, siteId);
+    }
+
+    public String getJobDefId() {
+        return jobDefId;
+    }
+
+    public void setJobDefId(String jobDefId) {
+        this.jobDefId = jobDefId;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public Map<String, Object> getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(Map<String, Object> configuration) {
+        this.configuration = configuration;
+    }
+
+    public Set<String> getEvaluators() {
+        return evaluators;
+    }
+
+    public void setEvaluators(Set<String> evaluators) {
+        this.evaluators = evaluators;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
new file mode 100644
index 0000000..bca7ab1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PublisherEntity extends PersistenceEntity {
+    private String userId;
+    private String mailAddress;
+
+    public PublisherEntity(String userId, String mailAddress) {
+        this.userId = userId;
+        this.mailAddress = mailAddress;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress);
+    }
+
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setUserId(String userId) {
+        this.userId = userId;
+    }
+
+    public String getMailAddress() {
+        return mailAddress;
+    }
+
+    public void setMailAddress(String mailAddress) {
+        this.mailAddress = mailAddress;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(userId)
+                .append(mailAddress)
+                .build();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (that == this) {
+            return true;
+        }
+
+        if (!(that instanceof PublisherEntity)) {
+            return false;
+        }
+
+        PublisherEntity another = (PublisherEntity)that;
+
+        return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
new file mode 100644
index 0000000..e0e579a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.*;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
+import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
+import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
+import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher;
+import org.apache.eagle.jpm.analyzer.publisher.Publisher;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class);
+
+    private List<Evaluator> evaluators = new ArrayList<>();
+    private List<Publisher> publishers = new ArrayList<>();
+
+    private Config config;
+
+    public MRJobPerformanceAnalyzer(Config config) {
+        this.config = config;
+        evaluators.add(new SLAJobEvaluator(config));
+        evaluators.add(new JobSuggestionEvaluator(config));
+
+        publishers.add(new EagleStorePublisher(config));
+        publishers.add(new EmailPublisher(config));
+    }
+
+    @Override
+    public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception {
+        Result result = new Result();
+
+        for (Evaluator evaluator : evaluators) {
+            result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity));
+        }
+
+        for (Publisher publisher : publishers) {
+            publisher.publish(analyzerJobEntity, result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
new file mode 100644
index 0000000..f10b68d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJobProcessor;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SLAJobEvaluator implements Evaluator, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(SLAJobEvaluator.class);
+
+    private List<Processor> processors = new ArrayList<>();
+    private Config config;
+
+    public SLAJobEvaluator(Config config) {
+        this.config = config;
+        processors.add(new UnExpectedLongDurationJobProcessor(config));
+        processors.add(new LongStuckJobProcessor(config));
+    }
+
+    @Override
+    public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
+        Result.EvaluatorResult result = new Result.EvaluatorResult();
+
+        List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());
+        if (jobMetaEntities.size() == 0
+                || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) {
+            LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId());
+            return result;
+        }
+
+        analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration());
+
+        for (Processor processor : processors) {
+            result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity));
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
new file mode 100644
index 0000000..35f3b27
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class LongStuckJobProcessor implements Processor, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(LongStuckJobProcessor.class);
+
+    private Config config;
+
+    public LongStuckJobProcessor(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+        LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId());
+        return new Result.ProcessorResult(Result.ResultLevel.NONE, "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
new file mode 100644
index 0000000..9d4ce2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.List;
+import java.util.Map;
+
+public class UnExpectedLongDurationJobProcessor implements Processor, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(UnExpectedLongDurationJobProcessor.class);
+
+    private Config config;
+
+    public UnExpectedLongDurationJobProcessor(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+        LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId());
+
+        Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
+        long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
+        if (avgDurationTime == 0L) {
+            return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+        }
+
+        Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
+        if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) {
+            alertThreshold = (Map<Result.ResultLevel, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY);
+        }
+        List<Map.Entry<Result.ResultLevel, Double>> sorted = Utils.sortByValue(alertThreshold);
+
+        double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
+        for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
+            if (expirePercent >= entry.getValue()) {
+                return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+                        (int)(expirePercent * 100), avgDurationTime / 1000));
+            }
+        }
+
+        return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+    }
+
+    private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) {
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                config.getString(Constants.HOST_PATH),
+                config.getInt(Constants.PORT_PATH),
+                config.getString(Constants.USERNAME_PATH),
+                config.getString(Constants.PASSWORD_PATH));
+
+        client.setReadTimeout(config.getInt(Constants.READ_TIMEOUT_PATH) * 1000);
+
+        try {
+            int timeLength = Constants.DEFAULT_EVALUATOR_TIME_LENGTH;
+            try {
+                if (jobMetaData.containsKey(Constants.EVALUATOR_TIME_LENGTH_KEY)) {
+                    timeLength = Integer.parseInt(jobMetaData.get(Constants.EVALUATOR_TIME_LENGTH_KEY).toString());
+                }
+            } catch (Exception e) {
+                LOG.warn("exception found when parse timeLength {}, use default", e);
+            }
+
+            String query = String.format("%s[@site=\"%s\" and @jobDefId=\"%s\"]<@site>{avg(durationTime)}",
+                    org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME,
+                    mrJobAnalysisEntity.getSiteId(),
+                    URLEncoder.encode(mrJobAnalysisEntity.getJobDefId()));
+
+            GenericServiceAPIResponseEntity response = client
+                    .search(query)
+                    .startTime(System.currentTimeMillis() - (timeLength + 1) * 24 * 60 * 60000L)
+                    .endTime(System.currentTimeMillis() - 24 * 60 * 60000L)
+                    .pageSize(10)
+                    .send();
+
+            List<Map<List<String>, List<Double>>> results = response.getObj();
+            if (results.size() == 0) {
+                return 0L;
+            }
+            return results.get(0).get("value").get(0).longValue();
+        } catch (Exception e) {
+            LOG.warn("{}", e);
+            return 0L;
+        } finally {
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
new file mode 100644
index 0000000..79f5318
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class JobSuggestionEvaluator implements Evaluator, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class);
+
+    private Config config;
+
+    public JobSuggestionEvaluator(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) {
+        Result.EvaluatorResult result = new Result.EvaluatorResult();
+        //TODO
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
new file mode 100644
index 0000000..6109704
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class EagleStorePublisher implements Publisher, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class);
+
+    private Config config;
+
+    public EagleStorePublisher(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
new file mode 100644
index 0000000..4e49094
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.app.service.ApplicationEmailService;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.mail.AlertEmailConstants;
+import org.apache.eagle.common.mail.AlertEmailContext;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EmailPublisher implements Publisher, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class);
+
+    private Config config;
+    private AlertDeduplicator alertDeduplicator;
+
+    public EmailPublisher(Config config) {
+        this.config = config;
+        this.alertDeduplicator = new SimpleDeduplicator();
+    }
+
+    @Override
+    public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+        if (result.getAlertMessages().size() == 0) {
+            return;
+        }
+
+        LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId());
+        if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+            LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+            return;
+        }
+
+        Map<String, String> basic = new HashMap<>();
+        basic.put("site", analyzerJobEntity.getSiteId());
+        basic.put("name", analyzerJobEntity.getJobDefId());
+        basic.put("user", analyzerJobEntity.getUserId());
+        basic.put("status", analyzerJobEntity.getCurrentState());
+        basic.put("duration", analyzerJobEntity.getDurationTime() * 1.0 / 1000 + "s");
+        basic.put("start", DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getStartTime()));
+        basic.put("end", analyzerJobEntity.getEndTime() == 0
+                ? "0"
+                : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime()));
+        basic.put("progress", analyzerJobEntity.getProgress() + "%");
+        basic.put("detail", getJobLink(analyzerJobEntity));
+
+
+        Map<String, Map<String, String>> extend = new HashMap<>();
+        Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages();
+        for (String evaluator : alertMessages.keySet()) {
+            List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator);
+            extend.put(evaluator, new HashMap<>());
+            for (Pair<Result.ResultLevel, String> message : messages) {
+                LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
+                        analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator);
+                extend.get(evaluator).put(message.getRight(), message.getLeft().toString());
+            }
+        }
+
+        Map<String, Object> alertData = new HashMap<>();
+        alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic);
+        alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend);
+
+        //TODO, override email config in job meta data
+        ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH);
+        String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId());
+        AlertEmailContext alertContext = emailService.buildEmailContext(subject);
+        emailService.onAlert(alertContext, alertData);
+    }
+
+    private String getJobLink(AnalyzerEntity analyzerJobEntity) {
+        return "http://"
+                + config.getString(Constants.HOST_PATH)
+                + ":"
+                + config.getInt(Constants.PORT_PATH)
+                + "/#/site/"
+                + analyzerJobEntity.getSiteId()
+                + "/jpm/detail/"
+                + analyzerJobEntity.getJobId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
new file mode 100644
index 0000000..2f42bf9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+
+public interface Publisher {
+    void publish(AnalyzerEntity analyzerJobEntity, Result result);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
new file mode 100644
index 0000000..a12f589
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Result {
+    //for EagleStorePublisher
+    private TaggedLogAPIEntity alertEntity = null;//TODO
+    //for EmailPublisher
+    private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>();
+
+    public void addEvaluatorResult(Class<?> type, EvaluatorResult result) {
+        Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults();
+        for (Class<?> processorType : processorResults.keySet()) {
+            ProcessorResult processorResult = processorResults.get(processorType);
+            if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
+                continue;
+            }
+
+            String typeName = type.getName();
+            if (!alertMessages.containsKey(typeName)) {
+                alertMessages.put(typeName, new ArrayList<>());
+            }
+            alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage()));
+        }
+    }
+
+    public TaggedLogAPIEntity getAlertEntity() {
+        return alertEntity;
+    }
+
+    public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() {
+        return alertMessages;
+    }
+
+    /**
+     * Processor result.
+     */
+
+    public enum ResultLevel {
+        NONE,
+        NOTICE,
+        WARNING,
+        CRITICAL
+    }
+
+    public static class ProcessorResult {
+        private ResultLevel resultLevel;
+        private String message;
+
+        public ProcessorResult(ResultLevel resultLevel, String message) {
+            this.resultLevel = resultLevel;
+            this.message = message;
+        }
+
+        public ResultLevel getResultLevel() {
+            return resultLevel;
+        }
+
+        public void setResultLevel(ResultLevel resultLevel) {
+            this.resultLevel = resultLevel;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public void setMessage(String message) {
+            this.message = message;
+        }
+    }
+
+    /**
+     * Evaluator result.
+     */
+    public static class EvaluatorResult {
+        private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>();
+
+        public void addProcessorResult(Class<?> type, ProcessorResult result) {
+            this.processorResults.put(type, result);
+        }
+
+        public Map<Class<?>, ProcessorResult> getProcessorResults() {
+            return this.processorResults;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
new file mode 100644
index 0000000..4b18f7c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface AlertDeduplicator {
+    boolean dedup(AnalyzerEntity analyzerJobEntity, Result result);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
new file mode 100644
index 0000000..09f1af6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup.impl;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * dedup by jobDefId.
+ */
+public class SimpleDeduplicator implements AlertDeduplicator, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class);
+
+    private Map<String, Long> lastUpdateTime = new HashMap<>();
+
+    @Override
+    public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) {
+        long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL;
+        if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) {
+            dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY);
+        }
+
+        dedupInterval = dedupInterval * 1000;
+        long currentTimeStamp = System.currentTimeMillis();
+        if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) {
+            if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) {
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp);
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
new file mode 100644
index 0000000..dc09202
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.resource;
+
+import com.google.inject.Inject;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.apache.eagle.metadata.resource.RESTResponse;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.List;
+
+import static org.apache.eagle.jpm.analyzer.util.Constants.*;
+
+@Path(ANALYZER_PATH)
+public class AnalyzerResource {
+    @Inject
+    MetaManagementService metaManagementService;
+
+    public AnalyzerResource() {
+    }
+
+    @POST
+    @Path(META_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> addJobMeta(JobMetaEntity jobMetaEntity) {
+        return RESTResponse.<Void>async((response) -> {
+            jobMetaEntity.ensureDefault();
+            boolean ret = metaManagementService.addJobMeta(jobMetaEntity);
+            String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId();
+            if (!ret) {
+                message = "Failed to add job meta for " + jobMetaEntity.getJobDefId();
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @POST
+    @Path(JOB_META_PATH)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) {
+        return RESTResponse.<Void>async((response) -> {
+            jobMetaEntity.ensureDefault();
+            boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity);
+            String message = "Successfully update job meta for " + jobDefId;
+            if (!ret) {
+                message = "Failed to update job meta for " + jobDefId;
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @GET
+    @Path(JOB_META_PATH)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+        return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get();
+    }
+
+    @DELETE
+    @Path(JOB_META_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+        return RESTResponse.<Void>async((response) -> {
+            boolean ret = metaManagementService.deleteJobMeta(jobDefId);
+            String message = "Successfully delete job meta for " + jobDefId;
+            if (!ret) {
+                message = "Failed to delete job meta for " + jobDefId;
+            }
+
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @POST
+    @Path(PUBLISHER_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> addPublisherMeta(PublisherEntity publisherEntity) {
+        return RESTResponse.<Void>async((response) -> {
+            publisherEntity.ensureDefault();
+            boolean ret = metaManagementService.addPublisherMeta(publisherEntity);
+            String message = "Successfully add publisher meta for " + publisherEntity.getUserId();
+            if (!ret) {
+                message = "Failed to add publisher meta for " + publisherEntity.getUserId();
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @DELETE
+    @Path(PUBLISHER_META_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> deletePublisherMeta(@PathParam(USER_PATH) String userId) {
+        return RESTResponse.<Void>async((response) -> {
+            boolean ret = metaManagementService.deletePublisherMeta(userId);
+            String message = "Successfully delete publisher meta for " + userId;
+            if (!ret) {
+                message = "Failed to delete publisher meta for " + userId;
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @GET
+    @Path(PUBLISHER_META_PATH)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<List<PublisherEntity>> getPublisherMeta(@PathParam(USER_PATH) String userId) {
+        return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
new file mode 100644
index 0000000..774e6d2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Constants {
+    public static final String HOST_PATH = "service.host";
+    public static final String PORT_PATH = "service.port";
+    public static final String USERNAME_PATH = "service.username";
+    public static final String PASSWORD_PATH = "service.password";
+    public static final String CONTEXT_PATH = "service.context";
+    public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds";
+
+    public static final String META_PATH = "/metadata";
+    public static final String ANALYZER_PATH = "/job/analyzer";
+    public static final String JOB_DEF_PATH = "jobDefId";
+    public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}";
+
+    public static final String PUBLISHER_PATH = "/publisher";
+    public static final String USER_PATH = "userId";
+    public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}";
+
+    public static final String PROCESS_NONE = "PROCESS_NONE";
+
+    public static final String EVALUATOR_TIME_LENGTH_KEY = "evaluator.timeLength";
+    public static final int DEFAULT_EVALUATOR_TIME_LENGTH = 7;//7 days
+
+    public static final String ALERT_THRESHOLD_KEY = "alert.threshold";
+    public static final Map<Result.ResultLevel, Double> DEFAULT_ALERT_THRESHOLD = new HashMap<Result.ResultLevel, Double>() {
+        {
+            put(Result.ResultLevel.NOTICE, 0.1);
+            put(Result.ResultLevel.WARNING, 0.3);
+            put(Result.ResultLevel.CRITICAL, 0.5);
+        }
+    };
+
+    public static final String DEDUP_INTERVAL_KEY = "alert.dedupInterval"; //seconds
+    public static final int DEFAULT_DEDUP_INTERVAL = 300;
+
+    public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport";
+    public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s";
+
+    public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
+    public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
new file mode 100644
index 0000000..66f7622
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.metadata.resource.RESTResponse;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URLEncoder;
+import java.util.*;
+
+public class Utils {
+    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public static List<JobMetaEntity> getJobMeta(Config config, String jobDefId) {
+        List<JobMetaEntity> result = new ArrayList<>();
+        String url = "http://"
+                + config.getString(Constants.HOST_PATH)
+                + ":"
+                + config.getInt(Constants.PORT_PATH)
+                + config.getString(Constants.CONTEXT_PATH)
+                + Constants.ANALYZER_PATH
+                + Constants.META_PATH
+                + "/"
+                + URLEncoder.encode(jobDefId);
+
+        InputStream is = null;
+        try {
+            is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE);
+            LOG.info("get job meta from {}", url);
+            result = (List<JobMetaEntity>)OBJ_MAPPER.readValue(is, RESTResponse.class).getData();
+        } catch (Exception e) {
+            LOG.warn("failed to get job meta from {}", url, e);
+        } finally {
+            org.apache.eagle.jpm.util.Utils.closeInputStream(is);
+            return result;
+        }
+    }
+
+    public static <K, V extends Comparable<? super V>> List<Map.Entry<K, V>> sortByValue(Map<K, V> map) {
+        List<Map.Entry<K, V>> list = new LinkedList<>(map.entrySet());
+        Collections.sort(list, (e1, e2) -> e1.getValue().compareTo(e2.getValue()));
+        Collections.reverse(list);
+        return list;
+    }
+}


Mime
View raw message