eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinh...@apache.org
Subject [2/2] eagle git commit: [EAGLE-935] add jdbc storage support for sla job meta
Date Tue, 07 Mar 2017 02:07:10 GMT
[EAGLE-935] add jdbc storage support for sla job meta

Author: wujinhu <wujinhu920@126.com>

Closes #854 from wujinhu/EAGLE-935.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/d766f681
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/d766f681
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/d766f681

Branch: refs/heads/master
Commit: d766f68152d6cd120d3d0487907f7ee85f7a614a
Parents: c75eadd
Author: wujinhu <wujinhu920@126.com>
Authored: Tue Mar 7 10:07:01 2017 +0800
Committer: wujinhu <wujinhu920@126.com>
Committed: Tue Mar 7 10:07:01 2017 +0800

----------------------------------------------------------------------
 .../app/test/ApplicationSimulatorImpl.java      |  40 ++--
 eagle-jpm/eagle-jpm-analyzer/pom.xml            |   5 +
 .../analyzer/meta/MetaManagementService.java    |  16 +-
 .../impl/MetaManagementServiceJDBCImpl.java     | 180 +++++++++++++--
 .../impl/MetaManagementServiceMemoryImpl.java   |  79 ++++---
 .../meta/impl/orm/JobMetaEntityToRelation.java  |  62 ++++++
 .../meta/impl/orm/RelationToJobMetaEntity.java  |  93 ++++++++
 .../impl/orm/RelationToUserEmailEntity.java     |  37 ++++
 .../impl/orm/UserEmailEntityToRelation.java     |  57 +++++
 .../jpm/analyzer/meta/model/AnalyzerEntity.java |   6 +-
 .../jpm/analyzer/meta/model/JobMetaEntity.java  |  15 +-
 .../analyzer/meta/model/PublisherEntity.java    |  77 -------
 .../analyzer/meta/model/UserEmailEntity.java    |  91 ++++++++
 .../analyzer/mr/MRJobPerformanceAnalyzer.java   |  12 +-
 .../jpm/analyzer/mr/sla/SLAJobEvaluator.java    |  12 +-
 .../UnExpectedLongDurationJobProcessor.java     |  11 +-
 .../mr/suggestion/JobSuggestionEvaluator.java   |  10 +
 .../MapReduceQueueResourceProcessor.java        |   2 -
 .../MapReduceSplitSettingProcessor.java         |   3 -
 .../analyzer/publisher/EagleStorePublisher.java |   6 -
 .../jpm/analyzer/publisher/EmailPublisher.java  |  51 ++++-
 .../eagle/jpm/analyzer/publisher/Result.java    |  31 ++-
 .../dedup/impl/SimpleDeduplicator.java          |  29 +--
 .../jpm/analyzer/resource/AnalyzerResource.java |  87 +++++---
 .../eagle/jpm/analyzer/util/Constants.java      |  20 +-
 .../apache/eagle/jpm/analyzer/util/Utils.java   |  38 +++-
 .../main/resources/AnalyzerReportTemplate.vm    | 219 +++++++++++++++----
 .../src/main/resources/createTable.sql          |  27 +--
 .../MRHistoryJobApplicationProvider.java        |  13 +-
 .../history/crawler/JHFCrawlerDriverImpl.java   |   2 +-
 .../history/parser/JobSuggestionListener.java   |   9 +-
 .../jpm/mr/history/storm/JobHistorySpout.java   |   2 +-
 32 files changed, 1012 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
index a5f5a73..b10205f 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -26,12 +26,18 @@ import org.apache.eagle.metadata.model.SiteEntity;
 import org.apache.eagle.metadata.resource.SiteResource;
 import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class ApplicationSimulatorImpl extends ApplicationSimulator {
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationSimulatorImpl.class);
+
     private final Config config;
     private final SiteResource siteResource;
     private final ApplicationResource applicationResource;
@@ -74,26 +80,28 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator {
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
         statusUpdateService.updateApplicationEntityStatus(applicationEntity);
-        applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
-        int attempt = 0;
-        while (attempt < 10) {
-            attempt++;
-            statusUpdateService.updateApplicationEntityStatus(applicationEntity);
-            if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED
-                    || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) {
-                break;
-            } else {
+        Semaphore semp = new Semaphore(1);
+        Thread stopThread = new Thread(() -> {
+            applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
+            while (applicationEntity.getStatus() != ApplicationEntity.Status.INITIALIZED
+                    && applicationEntity.getStatus() != ApplicationEntity.Status.STOPPED) {
+                statusUpdateService.updateApplicationEntityStatus(applicationEntity);
                 try {
-                    Thread.sleep(500);
-                } catch (InterruptedException e) {
-                    // Ignore
+                    Thread.sleep(1000);
+                } catch (Exception e) {
+                    LOG.warn("{}", e);
                 }
             }
+            semp.release();
+        });
+        stopThread.start();
+        try {
+            stopThread.join(60000L);
+            semp.tryAcquire(60, TimeUnit.SECONDS);
+            applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
+        } catch (Exception e) {
+            throw new IllegalStateException("Application status didn't become STOPPED");
         }
-        if (attempt >= 10 ) {
-            throw new IllegalStateException("Application status didn't become STOPPED in 10 attempts");
-        }
-        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
index 07f5766..a2943df 100644
--- a/eagle-jpm/eagle-jpm-analyzer/pom.xml
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -55,5 +55,10 @@
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-metadata-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
index 0935266..73b7b81 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
@@ -18,22 +18,24 @@
 package org.apache.eagle.jpm.analyzer.meta;
 
 import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
-import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
 
 import java.util.List;
 
 public interface MetaManagementService {
     boolean addJobMeta(JobMetaEntity jobMetaEntity);
 
-    boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity);
+    boolean updateJobMeta(JobMetaEntity jobMetaEntity);
 
-    List<JobMetaEntity> getJobMeta(String jobDefId);
+    List<JobMetaEntity> getJobMeta(String siteId, String jobDefId);
 
-    boolean deleteJobMeta(String jobDefId);
+    boolean deleteJobMeta(String siteId, String jobDefId);
 
-    boolean addPublisherMeta(PublisherEntity publisherEntity);
+    boolean addUserEmailMeta(UserEmailEntity userEmailEntity);
 
-    boolean deletePublisherMeta(String userId);
+    boolean updateUserEmailMeta(UserEmailEntity userEmailEntity);
 
-    List<PublisherEntity> getPublisherMeta(String userId);
+    boolean deleteUserEmailMeta(String siteId, String userId);
+
+    List<UserEmailEntity> getUserEmailMeta(String siteId, String userId);
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
index cfb5029..2048e97 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
@@ -17,61 +17,215 @@
 
 package org.apache.eagle.jpm.analyzer.meta.impl;
 
-import com.google.inject.Inject;
 import com.typesafe.config.Config;
+import org.apache.commons.lang.StringUtils;
 import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.impl.orm.JobMetaEntityToRelation;
+import org.apache.eagle.jpm.analyzer.meta.impl.orm.RelationToJobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.impl.orm.RelationToUserEmailEntity;
+import org.apache.eagle.jpm.analyzer.meta.impl.orm.UserEmailEntityToRelation;
 import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
-import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
+import org.apache.eagle.metadata.store.jdbc.JDBCMetadataQueryService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.inject.Inject;
 import java.io.Serializable;
+import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 
 public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class);
 
+    private static final String addJobMetaSql = "INSERT INTO analysis_jobs(uuid, configuration, evaluators, createdtime, modifiedtime, siteId, jobDefId) VALUES (?, ?, ?, ?, ?, ?, ?)";
+    private static final String addUserEmailSql = "INSERT INTO analysis_email(uuid, mailAddress, createdtime, modifiedtime, siteId, userId) VALUES (?, ?, ?, ?, ?, ?)";
+
+    private static final String getJobMetaSql = "SELECT * FROM analysis_jobs where siteId = ? and jobDefId = ?";
+    private static final String getUserEmailSql = "SELECT * FROM analysis_email where siteId = ? and userId = ?";
+
+    private static final String deleteJobMetaSql = "DELETE FROM analysis_jobs where siteId = ? and jobDefId = ?";
+    private static final String deleteUserEmailSql = "DELETE FROM analysis_email where siteId = ? and userId = ?";
+
     @Inject
     Config config;
 
+    @Inject
+    JDBCMetadataQueryService queryService;
+
     @Override
     public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
-
+        if (getJobMeta(jobMetaEntity.getSiteId(), jobMetaEntity.getJobDefId()) != null) {
+            throw new IllegalArgumentException("Duplicated job meta: " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId());
+        }
+
+        List<JobMetaEntity> entities = new ArrayList<>(1);
+        entities.add(jobMetaEntity);
+        try {
+            queryService.insert(addJobMetaSql, entities, new JobMetaEntityToRelation());
+        } catch (SQLException e) {
+            LOG.error("Error to insert JobMetaEntity: {}", jobMetaEntity, e);
+            return false;
+        }
         return true;
     }
 
     @Override
-    public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
-
+    public boolean updateJobMeta(JobMetaEntity entity) {
+        String updateSql = "update analysis_jobs set ";
+        if (entity.getUuid() != null && !entity.getUuid().isEmpty()) {
+            updateSql += "uuid = ?, ";
+        }
+        if (entity.getConfiguration() != null) {
+            updateSql += "configuration = ?, ";
+        }
+        if (entity.getEvaluators() != null) {
+            updateSql += "evaluators = ?, ";
+        }
+        if (entity.getCreatedTime() > 0) {
+            updateSql += "createdtime = ?, ";
+        }
+        if (entity.getModifiedTime() > 0) {
+            updateSql += "modifiedtime = ?, ";
+        }
+        updateSql = updateSql.substring(0, updateSql.length() - 2);
+        if (StringUtils.isNotBlank(entity.getSiteId())) {
+            updateSql += " where siteId = ?";
+        }
+        if (StringUtils.isNotBlank(entity.getJobDefId())) {
+            updateSql += " and jobDefId = ?";
+        }
+
+        try {
+            if (queryService.update(updateSql, entity, new JobMetaEntityToRelation()) == 0) {
+                LOG.warn("failed to execute {}", updateSql);
+            }
+        } catch (SQLException e) {
+            LOG.warn("failed to execute {}, {}", updateSql, e);
+            return false;
+        }
         return true;
     }
 
     @Override
-    public List<JobMetaEntity> getJobMeta(String jobDefId) {
-
-        return null;
+    public List<JobMetaEntity> getJobMeta(String siteId, String jobDefId) {
+        JobMetaEntity jobMetaEntity = new JobMetaEntity();
+        jobMetaEntity.setSiteId(siteId);
+        jobMetaEntity.setJobDefId(jobDefId);
+
+        List<JobMetaEntity> results;
+        try {
+            results = queryService.queryWithCond(getJobMetaSql, jobMetaEntity, new JobMetaEntityToRelation(), new RelationToJobMetaEntity());
+        } catch (SQLException e) {
+            LOG.error("Error to getJobMeta : {}", e);
+            return null;
+        }
+        if (results.isEmpty()) {
+            return null;
+        }
+
+        return results;
     }
 
     @Override
-    public boolean deleteJobMeta(String jobDefId) {
+    public boolean deleteJobMeta(String siteId, String jobDefId) {
+        JobMetaEntity entity = new JobMetaEntity();
+        entity.setSiteId(siteId);
+        entity.setJobDefId(jobDefId);
+        try {
+            queryService.update(deleteJobMetaSql, entity, new JobMetaEntityToRelation());
+        } catch (SQLException e) {
+            LOG.error("Error to delete JobMetaEntity: {}", entity, e);
+            return false;
+        }
 
         return true;
     }
 
     @Override
-    public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+    public boolean addUserEmailMeta(UserEmailEntity userEmailEntity) {
+        if (getUserEmailMeta(userEmailEntity.getSiteId(), userEmailEntity.getUserId()) != null) {
+            throw new IllegalArgumentException("Duplicated user meta: " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId());
+        }
+
+        List<UserEmailEntity> entities = new ArrayList<>(1);
+        entities.add(userEmailEntity);
+        try {
+            queryService.insert(addUserEmailSql, entities, new UserEmailEntityToRelation());
+        } catch (SQLException e) {
+            LOG.error("Error to insert UserEmailEntity: {}", userEmailEntity, e);
+            return false;
+        }
+        return true;
+    }
 
+    @Override
+    public  boolean updateUserEmailMeta(UserEmailEntity entity) {
+        String updateSql = "update analysis_email set ";
+        if (entity.getUuid() != null && !entity.getUuid().isEmpty()) {
+            updateSql += "uuid = ?, ";
+        }
+        if (entity.getMailAddress() != null && !entity.getMailAddress().isEmpty()) {
+            updateSql += "mailAddress = ?, ";
+        }
+        if (entity.getCreatedTime() > 0) {
+            updateSql += "createdtime = ?, ";
+        }
+        if (entity.getModifiedTime() > 0) {
+            updateSql += "modifiedtime = ?, ";
+        }
+        updateSql = updateSql.substring(0, updateSql.length() - 2);
+        if (StringUtils.isNotBlank(entity.getSiteId())) {
+            updateSql += " where siteId = ?";
+        }
+        if (StringUtils.isNotBlank(entity.getUserId())) {
+            updateSql += " and userId = ?";
+        }
+
+        try {
+            if (queryService.update(updateSql, entity, new UserEmailEntityToRelation()) == 0) {
+                LOG.warn("failed to execute {}", updateSql);
+            }
+        } catch (SQLException e) {
+            LOG.warn("failed to execute {}, {}", updateSql, e);
+            return false;
+        }
         return true;
     }
 
     @Override
-    public boolean deletePublisherMeta(String userId) {
+    public boolean deleteUserEmailMeta(String siteId, String userId) {
+        UserEmailEntity entity = new UserEmailEntity();
+        entity.setSiteId(siteId);
+        entity.setUserId(userId);
+        try {
+            queryService.update(deleteUserEmailSql, entity, new UserEmailEntityToRelation());
+        } catch (SQLException e) {
+            LOG.error("Error to delete UserEmailEntity: {}", entity, e);
+            return false;
+        }
 
         return true;
     }
 
     @Override
-    public List<PublisherEntity> getPublisherMeta(String userId) {
-        return null;
+    public List<UserEmailEntity> getUserEmailMeta(String siteId, String userId) {
+        UserEmailEntity userEmailEntity = new UserEmailEntity();
+        userEmailEntity.setSiteId(siteId);
+        userEmailEntity.setUserId(userId);
+
+        List<UserEmailEntity> results;
+        try {
+            results = queryService.queryWithCond(getUserEmailSql, userEmailEntity, new UserEmailEntityToRelation(), new RelationToUserEmailEntity());
+        } catch (SQLException e) {
+            LOG.error("Error to getJobMeta : {}", e);
+            return null;
+        }
+        if (results.isEmpty()) {
+            return null;
+        }
+
+        return results;
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
index 85e8358..b7582c1 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
@@ -17,94 +17,105 @@
 
 package org.apache.eagle.jpm.analyzer.meta.impl;
 
-import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
 import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
-import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.inject.Inject;
 import java.io.Serializable;
 import java.util.*;
 
 public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class);
 
-    private final Map<String, JobMetaEntity> jobMetaEntities = new HashMap<>();
-    private final Map<String, List<PublisherEntity>> publisherEntities = new HashMap<>();
+    private final Map<String, Map<String, JobMetaEntity>> jobMetaEntities = new HashMap<>();
+    private final Map<String, Map<String, UserEmailEntity>> publisherEntities = new HashMap<>();
 
     @Inject
     Config config;
 
     @Override
     public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
-        if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
-            LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId());
-            return false;
+        if (!jobMetaEntities.containsKey(jobMetaEntity.getSiteId())) {
+            jobMetaEntities.put(jobMetaEntity.getSiteId(), new HashMap<>());
         }
 
-        jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity);
+        jobMetaEntities.get(jobMetaEntity.getSiteId()).put(jobMetaEntity.getJobDefId(), jobMetaEntity);
         LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId());
         return true;
     }
 
     @Override
-    public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
-        if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
-            LOG.warn("does not contain job {}, update job meta failed", jobDefId);
+    public boolean updateJobMeta(JobMetaEntity jobMetaEntity) {
+        if (!jobMetaEntities.containsKey(jobMetaEntity.getSiteId())) {
+            LOG.warn("does not contain siteId {}, update job meta failed", jobMetaEntity.getSiteId());
             return false;
         }
 
-        jobMetaEntities.put(jobDefId, jobMetaEntity);
-        LOG.info("Successfully update job {} meta", jobDefId);
+        jobMetaEntities.get(jobMetaEntity.getSiteId()).put(jobMetaEntity.getJobDefId(), jobMetaEntity);
+        LOG.info("Successfully update job {} meta", jobMetaEntity.getJobDefId());
         return true;
     }
 
     @Override
-    public List<JobMetaEntity> getJobMeta(String jobDefId) {
-        if (!jobMetaEntities.containsKey(jobDefId)) {
-            LOG.warn("does not contain job {}, get job meta failed", jobDefId);
+    public List<JobMetaEntity> getJobMeta(String siteId, String jobDefId) {
+        if (!jobMetaEntities.containsKey(siteId)) {
+            LOG.warn("does not contain site {}, get job meta failed", siteId);
             return new ArrayList<>();
         }
 
-        return Arrays.asList(jobMetaEntities.get(jobDefId));
+        return Arrays.asList(jobMetaEntities.get(siteId).get(jobDefId));
     }
 
     @Override
-    public boolean deleteJobMeta(String jobDefId) {
-        if (!jobMetaEntities.containsKey(jobDefId)) {
-            LOG.warn("does not contain job {}, delete job meta failed", jobDefId);
+    public boolean deleteJobMeta(String siteId, String jobDefId) {
+        if (!jobMetaEntities.containsKey(siteId)) {
+            LOG.warn("does not contain siteId {}, delete job meta failed", siteId);
             return false;
         }
 
-        jobMetaEntities.remove(jobDefId);
+        jobMetaEntities.get(siteId).remove(jobDefId);
         LOG.info("Successfully delete job {} meta", jobDefId);
         return true;
     }
 
     @Override
-    public boolean addPublisherMeta(PublisherEntity publisherEntity) {
-        if (publisherEntities.containsKey(publisherEntity.getUserId())) {
-            for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) {
-                if (entity.equals(publisherEntity)) {
+    public boolean addUserEmailMeta(UserEmailEntity userEmailEntity) {
+        if (publisherEntities.containsKey(userEmailEntity.getSiteId())) {
+            for (UserEmailEntity entity : publisherEntities.get(userEmailEntity.getSiteId()).values()) {
+                if (entity.equals(userEmailEntity)) {
                     LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress());
                     return false;
                 }
             }
         }
 
-        if (!publisherEntities.containsKey(publisherEntity.getUserId())) {
-            publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>());
+        if (!publisherEntities.containsKey(userEmailEntity.getSiteId())) {
+            publisherEntities.put(userEmailEntity.getSiteId(), new HashMap<>());
         }
 
-        publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity);
-        LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress());
+        publisherEntities.get(userEmailEntity.getSiteId()).put(userEmailEntity.getUserId(), userEmailEntity);
+        LOG.info("Successfully add publisher user {}, mailAddress {}", userEmailEntity.getUserId(), userEmailEntity.getMailAddress());
         return true;
     }
 
     @Override
-    public boolean deletePublisherMeta(String userId) {
+    public boolean updateUserEmailMeta(UserEmailEntity userEmailEntity) {
+        if (!publisherEntities.containsKey(userEmailEntity.getSiteId())) {
+            LOG.warn("does not contain siteId {}, update user email meta failed", userEmailEntity.getSiteId());
+            return false;
+        }
+
+        publisherEntities.get(userEmailEntity.getSiteId()).put(userEmailEntity.getUserId(), userEmailEntity);
+        LOG.info("Successfully update user {} meta", userEmailEntity.getUserId());
+        return true;
+    }
+
+    @Override
+    public boolean deleteUserEmailMeta(String siteId, String userId) {
         if (!publisherEntities.containsKey(userId)) {
             LOG.warn("does not contain user {}, failed to delete publisher", userId);
             return false;
@@ -116,12 +127,12 @@ public class MetaManagementServiceMemoryImpl implements MetaManagementService, S
     }
 
     @Override
-    public List<PublisherEntity> getPublisherMeta(String userId) {
-        if (!publisherEntities.containsKey(userId)) {
-            LOG.warn("does not contain user {}, failed to get publisher", userId);
+    public List<UserEmailEntity> getUserEmailMeta(String siteId, String userId) {
+        if (!publisherEntities.containsKey(siteId)) {
+            LOG.warn("does not contain siteId {}, failed to get publisher", siteId);
             return new ArrayList<>();
         }
 
-        return publisherEntities.get(userId);
+        return Arrays.asList(publisherEntities.get(siteId).get(userId));
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java
new file mode 100644
index 0000000..5053b50
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl.orm;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.eagle.common.function.ThrowableConsumer2;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class JobMetaEntityToRelation implements ThrowableConsumer2<PreparedStatement, JobMetaEntity, SQLException> {
+    @Override
+    public void accept(PreparedStatement statement, JobMetaEntity entity) throws SQLException {
+        int parameterIndex = 1;
+        if (entity.getUuid() != null && StringUtils.isNotBlank(entity.getUuid())) {
+            statement.setString(parameterIndex, entity.getUuid());
+            parameterIndex++;
+        }
+        if (entity.getConfiguration() != null) {
+            statement.setString(parameterIndex, JSONObject.toJSONString(entity.getConfiguration()));
+            parameterIndex++;
+        }
+        if (entity.getEvaluators() != null) {
+            statement.setString(parameterIndex, JSONArray.toJSONString(entity.getEvaluators()));
+            parameterIndex++;
+        }
+        if (entity.getCreatedTime() > 0) {
+            statement.setLong(parameterIndex, entity.getCreatedTime());
+            parameterIndex++;
+        }
+        if (entity.getModifiedTime() > 0) {
+            statement.setLong(parameterIndex, entity.getModifiedTime());
+            parameterIndex++;
+        }
+        if (StringUtils.isNotBlank(entity.getSiteId())) {
+            statement.setString(parameterIndex, entity.getSiteId());
+            parameterIndex++;
+        }
+        if (StringUtils.isNotBlank(entity.getJobDefId())) {
+            statement.setString(parameterIndex, entity.getJobDefId());
+            parameterIndex++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java
new file mode 100644
index 0000000..180eb8d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl.orm;
+
+import org.apache.eagle.common.function.ThrowableFunction;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+
+public class RelationToJobMetaEntity implements ThrowableFunction<ResultSet, JobMetaEntity, SQLException> {
+    private static final Logger LOG = LoggerFactory.getLogger(RelationToJobMetaEntity.class);
+
+    @Override
+    public JobMetaEntity apply(ResultSet resultSet) throws SQLException {
+        JobMetaEntity jobMetaEntity = new JobMetaEntity();
+        jobMetaEntity.setUuid(resultSet.getString(1));
+        jobMetaEntity.setJobDefId(resultSet.getString(2));
+        jobMetaEntity.setSiteId(resultSet.getString(3));
+        jobMetaEntity.setConfiguration(parse(resultSet.getString(4)));
+        jobMetaEntity.setEvaluators(new ArrayList<>());
+        try {
+            JSONArray jsonArray = new JSONArray(resultSet.getString(5));
+            for (int i = 0; i < jsonArray.length(); ++i) {
+                jobMetaEntity.getEvaluators().add(jsonArray.getString(i));
+            }
+        } catch (Exception e) {
+            LOG.warn("{}", e);
+        }
+        jobMetaEntity.setCreatedTime(resultSet.getLong(6));
+        jobMetaEntity.setModifiedTime(resultSet.getLong(7));
+
+        return jobMetaEntity;
+    }
+
+    private Map<String, Object> parse(String field) {
+        Map<String, Object> items = new java.util.HashMap<>();
+        try {
+            JSONObject jsonObject = new JSONObject(field);
+
+            Iterator<String> keyItemItr = jsonObject.keys();
+            while (keyItemItr.hasNext()) {
+                String itemKey = keyItemItr.next();
+                if (canParseToMap(jsonObject.getString(itemKey))) {
+                    items.put(itemKey, parse(jsonObject.getString(itemKey)));
+                } else {
+                    items.put(itemKey, jsonObject.get(itemKey));
+                }
+            }
+
+        } catch (Exception e) {
+            LOG.warn("{}", e);
+        }
+
+        return items;
+    }
+
+    private boolean canParseToMap(String field) {
+        try {
+            JSONObject jsonObject = new JSONObject(field);
+            Iterator<String> keyItemItr = jsonObject.keys();
+            while (keyItemItr.hasNext()) {
+                keyItemItr.next();
+            }
+            return true;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java
new file mode 100644
index 0000000..ec86506
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.analyzer.meta.impl.orm;
+
+import org.apache.eagle.common.function.ThrowableFunction;
+import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class RelationToUserEmailEntity implements ThrowableFunction<ResultSet, UserEmailEntity, SQLException> {
+    @Override
+    public UserEmailEntity apply(ResultSet resultSet) throws SQLException {
+        UserEmailEntity userEmailEntity = new UserEmailEntity();
+        userEmailEntity.setUuid(resultSet.getString(1));
+        userEmailEntity.setUserId(resultSet.getString(2));
+        userEmailEntity.setSiteId(resultSet.getString(3));
+        userEmailEntity.setMailAddress(resultSet.getString(4));
+        userEmailEntity.setCreatedTime(resultSet.getLong(5));
+        userEmailEntity.setModifiedTime(resultSet.getLong(6));
+        return userEmailEntity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java
new file mode 100644
index 0000000..29958b0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl.orm;
+
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.eagle.common.function.ThrowableConsumer2;
+import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+public class UserEmailEntityToRelation implements ThrowableConsumer2<PreparedStatement, UserEmailEntity, SQLException> {
+    @Override
+    public void accept(PreparedStatement statement, UserEmailEntity entity) throws SQLException {
+        int parameterIndex = 1;
+        if (StringUtils.isNotBlank(entity.getUuid())) {
+            statement.setString(parameterIndex, entity.getUuid());
+            parameterIndex++;
+        }
+        if (StringUtils.isNotBlank(entity.getMailAddress())) {
+            statement.setString(parameterIndex, entity.getMailAddress());
+            parameterIndex++;
+        }
+        if (entity.getCreatedTime() > 0) {
+            statement.setLong(parameterIndex, entity.getCreatedTime());
+            parameterIndex++;
+        }
+        if (entity.getModifiedTime() > 0) {
+            statement.setLong(parameterIndex, entity.getModifiedTime());
+            parameterIndex++;
+        }
+        if (StringUtils.isNotBlank(entity.getSiteId())) {
+            statement.setString(parameterIndex, entity.getSiteId());
+            parameterIndex++;
+        }
+        if (StringUtils.isNotBlank(entity.getUserId())) {
+            statement.setString(parameterIndex, entity.getUserId());
+            parameterIndex++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
index 189d85d..9497140 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
@@ -38,7 +38,7 @@ public class AnalyzerEntity {
 
     private Map<String, Object> jobConfig = new HashMap<>();
 
-    private Map<String, Object> jobMeta = new HashMap<>();
+    private JobMetaEntity jobMeta;
 
     public String getJobDefId() {
         return jobDefId;
@@ -112,11 +112,11 @@ public class AnalyzerEntity {
         this.jobConfig = jobConfig;
     }
 
-    public Map<String, Object> getJobMeta() {
+    public JobMetaEntity getJobMeta() {
         return jobMeta;
     }
 
-    public void setJobMeta(Map<String, Object> jobMeta) {
+    public void setJobMeta(JobMetaEntity jobMeta) {
         this.jobMeta = jobMeta;
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
index 2e15c17..8d4af8e 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
@@ -20,17 +20,14 @@ package org.apache.eagle.jpm.analyzer.meta.model;
 import org.apache.eagle.metadata.persistence.PersistenceEntity;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class JobMetaEntity extends PersistenceEntity {
     private String jobDefId;
     private String siteId;
-    private Map<String, Object> configuration = new HashMap<>();
-    private Set<String> evaluators = new HashSet<>();
+    private Map<String, Object> configuration;
+    private List<String> evaluators;
 
     public JobMetaEntity() {
 
@@ -39,7 +36,7 @@ public class JobMetaEntity extends PersistenceEntity {
     public JobMetaEntity(String jobDefId,
                          String siteId,
                          Map<String, Object> configuration,
-                         Set<String> evaluators) {
+                         List<String> evaluators) {
         this.jobDefId = jobDefId;
         this.siteId = siteId;
         this.configuration = configuration;
@@ -75,11 +72,11 @@ public class JobMetaEntity extends PersistenceEntity {
         this.configuration = configuration;
     }
 
-    public Set<String> getEvaluators() {
+    public List<String> getEvaluators() {
         return evaluators;
     }
 
-    public void setEvaluators(Set<String> evaluators) {
+    public void setEvaluators(List<String> evaluators) {
         this.evaluators = evaluators;
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
deleted file mode 100644
index bca7ab1..0000000
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.analyzer.meta.model;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.metadata.persistence.PersistenceEntity;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class PublisherEntity extends PersistenceEntity {
-    private String userId;
-    private String mailAddress;
-
-    public PublisherEntity(String userId, String mailAddress) {
-        this.userId = userId;
-        this.mailAddress = mailAddress;
-    }
-
-    @Override
-    public String toString() {
-        return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress);
-    }
-
-    public String getUserId() {
-        return userId;
-    }
-
-    public void setUserId(String userId) {
-        this.userId = userId;
-    }
-
-    public String getMailAddress() {
-        return mailAddress;
-    }
-
-    public void setMailAddress(String mailAddress) {
-        this.mailAddress = mailAddress;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(userId)
-                .append(mailAddress)
-                .build();
-    }
-
-    @Override
-    public boolean equals(Object that) {
-        if (that == this) {
-            return true;
-        }
-
-        if (!(that instanceof PublisherEntity)) {
-            return false;
-        }
-
-        PublisherEntity another = (PublisherEntity)that;
-
-        return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress);
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java
new file mode 100644
index 0000000..cbac4d0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserEmailEntity extends PersistenceEntity {
+    private String userId;
+    private String siteId;
+    private String mailAddress;
+
+    public UserEmailEntity() {
+    }
+
+    public UserEmailEntity(String userId, String siteId, String mailAddress) {
+        this.userId = userId;
+        this.siteId = siteId;
+        this.mailAddress = mailAddress;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("UserEmailEntity[userId=%s, siteId=%s, mailAddress=%s]", userId, siteId, mailAddress);
+    }
+
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setUserId(String userId) {
+        this.userId = userId;
+    }
+
+    public String getMailAddress() {
+        return mailAddress;
+    }
+
+    public void setMailAddress(String mailAddress) {
+        this.mailAddress = mailAddress;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(userId)
+                .append(siteId)
+                .append(mailAddress)
+                .build();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (that == this) {
+            return true;
+        }
+
+        if (!(that instanceof UserEmailEntity)) {
+            return false;
+        }
+
+        UserEmailEntity another = (UserEmailEntity)that;
+
+        return another.userId.equals(this.userId) && another.siteId.equals(this.siteId) && another.mailAddress.equals(this.mailAddress);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
index 57e1765..34365dc 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -27,6 +27,8 @@ import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
 import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher;
 import org.apache.eagle.jpm.analyzer.publisher.Publisher;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,7 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn
     private List<Publisher> publishers = new ArrayList<>();
 
     private Config config;
+    private AlertDeduplicator alertDeduplicator;
 
     public MRJobPerformanceAnalyzer(Config config) {
         this.config = config;
@@ -48,7 +51,9 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn
         evaluators.add(new JobSuggestionEvaluator(config));
 
         publishers.add(new EagleStorePublisher(config));
-        //publishers.add(new EmailPublisher(config));
+        publishers.add(new EmailPublisher(config));
+
+        this.alertDeduplicator = new SimpleDeduplicator();
     }
 
     @Override
@@ -62,6 +67,11 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn
             }
         }
 
+        if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+            LOG.info("skip publish job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+            return;
+        }
+
         for (Publisher publisher : publishers) {
             publisher.publish(analyzerJobEntity, result);
         }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
index a77e55d..ec7a641 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -48,20 +48,16 @@ public class SLAJobEvaluator implements Evaluator, Serializable {
 
     @Override
     public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
-        if (!analyzerJobEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) {
-            return null;
-        }
-
         Result.EvaluatorResult result = new Result.EvaluatorResult();
 
-        List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());
-        if (jobMetaEntities.size() == 0
-                || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) {
+        List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getJobDefId());
+        if (jobMetaEntities == null || jobMetaEntities.size() == 0
+                || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getSimpleName())) {
             LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId());
             return result;
         }
 
-        analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration());
+        analyzerJobEntity.setJobMeta(jobMetaEntities.get(0));
 
         for (Processor processor : processors) {
             result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity));

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
index f7748f8..88e799d 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -47,22 +47,27 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab
     public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
         LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId());
 
-        Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
+        Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta().getConfiguration();
         long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
+
         if (avgDurationTime == 0L) {
             return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE);
         }
 
         Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
         if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) {
-            alertThreshold = (Map<Result.ResultLevel, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY);
+            Map<String, Double> alertThresholds = (Map<String, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY);
+            for (String level : alertThresholds.keySet()) {
+                alertThreshold.put(Result.ResultLevel.fromString(level), alertThresholds.get(level));
+            }
         }
         List<Map.Entry<Result.ResultLevel, Double>> sorted = Utils.sortByValue(alertThreshold);
 
         double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
         for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
             if (expirePercent >= entry.getValue()) {
-                return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+                return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(),
+                        String.format("Job duration exceeds average duration(calculated by historical executions of this job) by %d%%, average duration is %ds",
                         (int)(expirePercent * 100), avgDurationTime / 1000));
             }
         }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
index ea60ff9..e1a357a 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -63,6 +63,16 @@ public class JobSuggestionEvaluator implements Evaluator<MapReduceAnalyzerEntity
             return null;
         }
 
+
+        if (analyzerEntity.getTotalCounters() == null) {
+            LOG.warn("Total counters of Job {} is null", analyzerEntity.getJobId());
+            return null;
+        }
+        if (analyzerEntity.getMapCounters() == null && analyzerEntity.getReduceCounters() == null) {
+            LOG.warn("Map/Reduce task counters of Job {} are null", analyzerEntity.getJobId());
+            return null;
+        }
+
         MapReduceJobSuggestionContext jobContext = new MapReduceJobSuggestionContext(analyzerEntity);
         if (jobContext.getNumMaps() == 0) {
             return null;

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
index a1b57bf..a86eb72 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
@@ -26,8 +26,6 @@ import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-
 /*
  * Criterion: (TimeElapsed / (numTasks / 500 * avgTaskTime)) > 20
  */

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
index 8eba468..28e1129 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
@@ -22,9 +22,6 @@ import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class MapReduceSplitSettingProcessor implements Processor<MapReduceAnalyzerEntity> {
 
     private MapReduceJobSuggestionContext context;

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
index 0d7d2d7..1c5a033 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -37,11 +37,9 @@ public class EagleStorePublisher implements Publisher, Serializable {
 
     private Config config;
     private IEagleServiceClient client;
-    private AlertDeduplicator alertDeduplicator;
 
     public EagleStorePublisher(Config config) {
         this.config = config;
-        this.alertDeduplicator = new SimpleDeduplicator();
     }
 
     @Override
@@ -51,10 +49,6 @@ public class EagleStorePublisher implements Publisher, Serializable {
         }
 
         LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId());
-        if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
-            LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
-            return;
-        }
 
         try {
             this.client = new EagleServiceClientImpl(config);

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
index 842e0ac..471dbf8 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -18,13 +18,16 @@
 package org.apache.eagle.jpm.analyzer.publisher;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.app.service.ApplicationEmailService;
 import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.mail.AlertEmailConstants;
 import org.apache.eagle.common.mail.AlertEmailContext;
 import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
-import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
-import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
 import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.apache.eagle.jpm.analyzer.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,25 +40,25 @@ public class EmailPublisher implements Publisher, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class);
 
     private Config config;
-    private AlertDeduplicator alertDeduplicator;
 
     public EmailPublisher(Config config) {
         this.config = config;
-        this.alertDeduplicator = new SimpleDeduplicator();
     }
 
     @Override
+    //will refactor, just work now
     public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
-        if (result.getAlertMessages().size() == 0) {
+        if (!config.hasPath(Constants.ANALYZER_REPORT_CONFIG_PATH)) {
+            LOG.warn("no email configuration, skip send email");
             return;
         }
 
-        LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId());
-        if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
-            LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+        if (result.getAlertMessages().size() == 0) {
             return;
         }
 
+        LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId());
+
         Map<String, String> basic = new HashMap<>();
         basic.put("site", analyzerJobEntity.getSiteId());
         basic.put("name", analyzerJobEntity.getJobDefId());
@@ -71,20 +74,29 @@ public class EmailPublisher implements Publisher, Serializable {
         basic.put("detail", getJobLink(analyzerJobEntity));
 
         Map<String, List<Result.ProcessorResult>> extend = result.getAlertMessages();
+        Map<String, Object> alertData = new HashMap<>();
         for (String evaluator : extend.keySet()) {
             for (Result.ProcessorResult message : extend.get(evaluator)) {
+                setAlertLevel(alertData, message.getResultLevel());
                 LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
                         analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator);
             }
         }
 
-        Map<String, Object> alertData = new HashMap<>();
         alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic);
         alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend);
-
-        //TODO, override email config in job meta data
-        ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH);
+        Config cloneConfig = ConfigFactory.empty().withFallback(config);
+        if (analyzerJobEntity.getUserId() != null) {
+            List<UserEmailEntity> users = Utils.getUserMail(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getUserId());
+            if (users != null && users.size() > 0) {
+                Map<String, String> additionalConfig = new HashMap<>();
+                additionalConfig.put(Constants.ANALYZER_REPORT_CONFIG_PATH + "." + AlertEmailConstants.RECIPIENTS, users.get(0).getMailAddress());
+                cloneConfig = ConfigFactory.parseMap(additionalConfig).withFallback(cloneConfig);
+            }
+        }
+        ApplicationEmailService emailService = new ApplicationEmailService(cloneConfig, Constants.ANALYZER_REPORT_CONFIG_PATH);
         String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId());
+        alertData.put(PublishConstants.ALERT_EMAIL_SUBJECT, subject);
         AlertEmailContext alertContext = emailService.buildEmailContext(subject);
         emailService.onAlert(alertContext, alertData);
     }
@@ -99,4 +111,19 @@ public class EmailPublisher implements Publisher, Serializable {
                 + "/jpm/detail/"
                 + analyzerJobEntity.getJobId();
     }
+
+    private void setAlertLevel(Map<String, Object> alertData, Result.ResultLevel level) {
+        if (!alertData.containsKey(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY)) {
+            alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, Result.ResultLevel.INFO.toString());
+        }
+
+        if (level.equals(Result.ResultLevel.CRITICAL)) {
+            alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString());
+        }
+
+        if (level.equals(Result.ResultLevel.WARNING)
+                && !alertData.get(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY).equals(Result.ResultLevel.CRITICAL.toString())) {
+            alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
index 7d7442b..748a5d5 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -17,7 +17,6 @@
 
 package org.apache.eagle.jpm.analyzer.publisher;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
 import java.util.ArrayList;
@@ -47,7 +46,7 @@ public class Result {
                 alertMessages.put(typeName, new ArrayList<>());
                 alertEntities.put(typeName, new ArrayList<>());
             }
-            normalizeResult(processorResult);
+            //normalizeResult(processorResult);
             alertMessages.get(typeName).add(processorResult);
             alertEntities.get(typeName).add(processorEntities.get(processorType));
 
@@ -63,11 +62,11 @@ public class Result {
     }
 
     private void normalizeResult(ProcessorResult processorResult) {
-        String settingList = "";
+        /*String settingList = "";
         if (processorResult.getSettings() != null && !processorResult.getSettings().isEmpty()) {
             settingList = StringUtils.join(processorResult.getSettings(), "\n");
         }
-        processorResult.setSettingList(settingList);
+        processorResult.setSettingList(settingList);*/
     }
 
     /**
@@ -77,9 +76,19 @@ public class Result {
     public enum ResultLevel {
         NONE,
         INFO,
-        NOTICE,
         WARNING,
-        CRITICAL
+        CRITICAL;
+
+        private static final Map<String, ResultLevel> stringToLevels = new HashMap<>();
+        static {
+            for (ResultLevel level : values()) {
+                stringToLevels.put(level.toString(), level);
+            }
+        }
+
+        public static ResultLevel fromString(String levelString) {
+            return stringToLevels.get(levelString);
+        }
     }
 
     public enum RuleType {
@@ -100,20 +109,20 @@ public class Result {
         private ResultLevel resultLevel;
         private String message;
         private List<String> settings;
-        private String settingList;
+        //private String settingList;
 
         public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message, List<String> settings) {
             this.ruleType = ruleType;
             this.resultLevel = resultLevel;
             this.message = message;
-            this.settings = settings;
+            //this.settings = settings;
         }
 
         public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message) {
             this.ruleType = ruleType;
             this.resultLevel = resultLevel;
             this.message = message;
-            this.settings = new ArrayList<>();
+            //this.settings = new ArrayList<>();
         }
 
         public RuleType getRuleType() {
@@ -148,13 +157,13 @@ public class Result {
             this.settings = settings;
         }
 
-        public String getSettingList() {
+        /*public String getSettingList() {
             return settingList;
         }
 
         public void setSettingList(String settingList) {
             this.settingList = settingList;
-        }
+        }*/
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
index b139b3c..f8155f1 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -34,26 +34,29 @@ import java.util.Map;
 public class SimpleDeduplicator implements AlertDeduplicator, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class);
 
-    private Map<String, Long> lastUpdateTime = new HashMap<>();
+    private static Map<String, Long> lastUpdateTime = new HashMap<>();
 
     @Override
     public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) {
-        long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL;
-        if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) {
-            dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY);
-        }
+        synchronized (lastUpdateTime) {
+            long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL;
+            if (analyzerJobEntity.getJobMeta().getConfiguration().containsKey(Constants.DEDUP_INTERVAL_KEY)) {
+                dedupInterval = (Integer)analyzerJobEntity.getJobMeta().getConfiguration().get(Constants.DEDUP_INTERVAL_KEY);
+            }
 
-        dedupInterval = dedupInterval * 1000;
-        long currentTimeStamp = System.currentTimeMillis();
-        if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) {
-            if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) {
-                return true;
+            dedupInterval = dedupInterval * 1000;
+            long currentTimeStamp = System.currentTimeMillis();
+            if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) {
+                if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) {
+                    return true;
+                } else {
+                    lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp);
+                    return false;
+                }
             } else {
+                lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp);
                 return false;
             }
-        } else {
-            lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp);
-            return false;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
index 80d9fb7..a9c3171 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
@@ -20,8 +20,9 @@ package org.apache.eagle.jpm.analyzer.resource;
 import com.google.inject.Inject;
 import org.apache.eagle.common.rest.RESTResponse;
 import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
 import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
-import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.apache.eagle.jpm.analyzer.util.Constants;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -38,16 +39,16 @@ public class AnalyzerResource {
     }
 
     @POST
-    @Path(META_PATH)
+    @Path(JOB_META_ROOT_PATH)
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
     public RESTResponse<Void> addJobMeta(JobMetaEntity jobMetaEntity) {
         return RESTResponse.<Void>async((response) -> {
             jobMetaEntity.ensureDefault();
             boolean ret = metaManagementService.addJobMeta(jobMetaEntity);
-            String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId();
+            String message = "Successfully add job meta for " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId();
             if (!ret) {
-                message = "Failed to add job meta for " + jobMetaEntity.getJobDefId();
+                message = "Failed to add job meta for " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId();
             }
             response.success(ret).message(message);
         }).get();
@@ -56,13 +57,17 @@ public class AnalyzerResource {
     @POST
     @Path(JOB_META_PATH)
     @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Void> updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) {
+    public RESTResponse<Void> updateJobMeta(@PathParam(Constants.SITE_ID) String siteId,
+                                            @PathParam(Constants.JOB_DEF_ID) String jobDefId,
+                                            JobMetaEntity jobMetaEntity) {
         return RESTResponse.<Void>async((response) -> {
-            jobMetaEntity.ensureDefault();
-            boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity);
-            String message = "Successfully update job meta for " + jobDefId;
+            jobMetaEntity.setModifiedTime(System.currentTimeMillis());
+            jobMetaEntity.setSiteId(siteId);
+            jobMetaEntity.setJobDefId(jobDefId);
+            boolean ret = metaManagementService.updateJobMeta(jobMetaEntity);
+            String message = "Successfully update job meta for " + siteId + ":" + jobDefId;
             if (!ret) {
-                message = "Failed to update job meta for " + jobDefId;
+                message = "Failed to update job meta for " + siteId + ":" + jobDefId;
             }
             response.success(ret).message(message);
         }).get();
@@ -71,20 +76,22 @@ public class AnalyzerResource {
     @GET
     @Path(JOB_META_PATH)
     @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
-        return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get();
+    public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(Constants.SITE_ID) String siteId,
+                                                        @PathParam(Constants.JOB_DEF_ID) String jobDefId) {
+        return RESTResponse.async(() -> metaManagementService.getJobMeta(siteId, jobDefId)).get();
     }
 
     @DELETE
     @Path(JOB_META_PATH)
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Void> deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+    public RESTResponse<Void> deleteJobMeta(@PathParam(Constants.SITE_ID) String siteId,
+                                            @PathParam(Constants.JOB_DEF_ID) String jobDefId) {
         return RESTResponse.<Void>async((response) -> {
-            boolean ret = metaManagementService.deleteJobMeta(jobDefId);
-            String message = "Successfully delete job meta for " + jobDefId;
+            boolean ret = metaManagementService.deleteJobMeta(siteId, jobDefId);
+            String message = "Successfully delete job meta for " + siteId + ": " + jobDefId;
             if (!ret) {
-                message = "Failed to delete job meta for " + jobDefId;
+                message = "Failed to delete job meta for " + siteId + ": " + jobDefId;
             }
 
             response.success(ret).message(message);
@@ -92,40 +99,62 @@ public class AnalyzerResource {
     }
 
     @POST
-    @Path(PUBLISHER_PATH)
+    @Path(USER_META_ROOT_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> addEmailPublisherMeta(UserEmailEntity userEmailEntity) {
+        return RESTResponse.<Void>async((response) -> {
+            userEmailEntity.ensureDefault();
+            boolean ret = metaManagementService.addUserEmailMeta(userEmailEntity);
+            String message = "Successfully add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId();
+            if (!ret) {
+                message = "Failed to add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId();
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @POST
+    @Path(USER_META_PATH)
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Void> addPublisherMeta(PublisherEntity publisherEntity) {
+    public RESTResponse<Void> updateEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId,
+                                                       @PathParam(Constants.USER_ID) String userId,
+                                                       UserEmailEntity userEmailEntity) {
         return RESTResponse.<Void>async((response) -> {
-            publisherEntity.ensureDefault();
-            boolean ret = metaManagementService.addPublisherMeta(publisherEntity);
-            String message = "Successfully add publisher meta for " + publisherEntity.getUserId();
+            userEmailEntity.setSiteId(siteId);
+            userEmailEntity.setUserId(userId);
+            userEmailEntity.setModifiedTime(System.currentTimeMillis());
+            boolean ret = metaManagementService.updateUserEmailMeta(userEmailEntity);
+            String message = "Successfully update user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId();
             if (!ret) {
-                message = "Failed to add publisher meta for " + publisherEntity.getUserId();
+                message = "Failed to update user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId();
             }
             response.success(ret).message(message);
         }).get();
     }
 
     @DELETE
-    @Path(PUBLISHER_META_PATH)
+    @Path(USER_META_PATH)
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<Void> deletePublisherMeta(@PathParam(USER_PATH) String userId) {
+    public RESTResponse<Void> deleteEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId,
+                                                       @PathParam(Constants.USER_ID) String userId) {
         return RESTResponse.<Void>async((response) -> {
-            boolean ret = metaManagementService.deletePublisherMeta(userId);
-            String message = "Successfully delete publisher meta for " + userId;
+            boolean ret = metaManagementService.deleteUserEmailMeta(siteId, userId);
+            String message = "Successfully delete user meta for " + siteId + ":" + userId;
             if (!ret) {
-                message = "Failed to delete publisher meta for " + userId;
+                message = "Failed to delete user meta for " + siteId + ":" + userId;
             }
             response.success(ret).message(message);
         }).get();
     }
 
     @GET
-    @Path(PUBLISHER_META_PATH)
+    @Path(USER_META_PATH)
     @Produces(MediaType.APPLICATION_JSON)
-    public RESTResponse<List<PublisherEntity>> getPublisherMeta(@PathParam(USER_PATH) String userId) {
-        return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get();
+    public RESTResponse<List<UserEmailEntity>> getEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId,
+                                                                     @PathParam(Constants.USER_ID) String userId) {
+        return RESTResponse.async(() -> metaManagementService.getUserEmailMeta(siteId, userId)).get();
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
index 4c6661a..4ddc27e 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -31,14 +31,16 @@ public class Constants {
     public static final String CONTEXT_PATH = "service.context";
     public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds";
 
-    public static final String META_PATH = "/metadata";
-    public static final String ANALYZER_PATH = "/job/analyzer";
-    public static final String JOB_DEF_PATH = "jobDefId";
-    public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}";
+    public static final String ANALYZER_PATH = "/analyzer";
 
-    public static final String PUBLISHER_PATH = "/publisher";
-    public static final String USER_PATH = "userId";
-    public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}";
+    public static final String SITE_ID = "siteId";
+    public static final String JOB_META_ROOT_PATH = "/jobmeta";
+    public static final String JOB_DEF_ID = "jobDefId";
+    public static final String JOB_META_PATH = JOB_META_ROOT_PATH + "/{" + SITE_ID + "}/" + "{" + JOB_DEF_ID + "}";
+
+    public static final String USER_META_ROOT_PATH = "/usermeta";
+    public static final String USER_ID = "userId";
+    public static final String USER_META_PATH = USER_META_ROOT_PATH + "/{" + SITE_ID + "}/" + "{" + USER_ID + "}";
 
     public static final String PROCESS_NONE = "PROCESS_NONE";
 
@@ -48,7 +50,7 @@ public class Constants {
     public static final String ALERT_THRESHOLD_KEY = "alert.threshold";
     public static final Map<Result.ResultLevel, Double> DEFAULT_ALERT_THRESHOLD = new HashMap<Result.ResultLevel, Double>() {
         {
-            put(Result.ResultLevel.NOTICE, 0.1);
+            put(Result.ResultLevel.INFO, 0.1);
             put(Result.ResultLevel.WARNING, 0.3);
             put(Result.ResultLevel.CRITICAL, 0.5);
         }
@@ -58,7 +60,7 @@ public class Constants {
     public static final int DEFAULT_DEDUP_INTERVAL = 300;
 
     public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport";
-    public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s";
+    public static final String ANALYZER_REPORT_SUBJECT = "Performance Insights For %s";
 
     public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
     public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";


Mime
View raw message