eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/2] incubator-eagle git commit: [EAGLE-762] Add AlertEmailPublisherTest and improve alert template
Date Fri, 11 Nov 2016 03:40:01 GMT
[EAGLE-762] Add AlertEmailPublisherTest and improve alert template

* Add AlertEmailPublisherTest
* Improve alert template
* Generate `alertId` in `AlertStreamEvent` instead of `AlertPublishEvent` to make sure alerts track-able among different publish plugin types.

https://issues.apache.org/jira/browse/EAGLE-762

Author: Hao Chen <hao@apache.org>

Closes #645 from haoch/EAGLE-762.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/38ec7fc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/38ec7fc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/38ec7fc2

Branch: refs/heads/master
Commit: 38ec7fc23d06c77b6e083ad2dfb9c61e729901e0
Parents: 75ab772
Author: Hao Chen <hao@apache.org>
Authored: Fri Nov 11 11:39:53 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Fri Nov 11 11:39:53 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/model/AlertPublishEvent.java   |  16 +-
 .../alert/engine/model/AlertStreamEvent.java    |  24 +-
 .../service/MetadataServiceClientImpl.java      | 596 +++++++++----------
 .../engine/model/AlertPublishEventTest.java     |   1 +
 .../eagle-alert/alert-engine/pom.xml            |  16 +-
 .../engine/publisher/PublishConstants.java      |  12 +-
 .../publisher/email/AlertEmailGenerator.java    |  67 ++-
 .../email/AlertEmailGeneratorBuilder.java       |  10 +
 .../publisher/impl/AlertEmailPublisher.java     |  14 +-
 .../publisher/impl/AlertPublisherImpl.java      |   2 +-
 .../src/main/resources/ALERT_DEFAULT.vm         |  94 +--
 .../engine/metric/MemoryUsageGaugeSetTest.java  |   2 +-
 .../publisher/AlertEmailPublisherTest.java      |  76 +++
 .../publisher/AlertKafkaPublisherTest.java      | 123 +---
 .../publisher/AlertPublisherTestHelper.java     | 106 ++++
 .../dedup/DefaultDedupWithoutStateTest.java     | 240 +++-----
 .../dedup/DefaultDeduplicatorTest.java          |  71 +--
 pom.xml                                         |   4 +-
 18 files changed, 780 insertions(+), 694 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
index 3453e55..6230731 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
@@ -18,9 +18,10 @@
 
 package org.apache.eagle.alert.engine.model;
 
+import com.google.common.base.Preconditions;
+
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 public class AlertPublishEvent {
     private String alertId;
@@ -29,7 +30,7 @@ public class AlertPublishEvent {
     private String policyId;
     private String policyValue;
     private long alertTimestamp;
-    private Map<String, String> alertData;
+    private Map<String, Object> alertData;
 
     public static final String SITE_ID_KEY = "siteId";
     public static final String APP_IDS_KEY = "appIds";
@@ -84,17 +85,18 @@ public class AlertPublishEvent {
         this.policyId = policyId;
     }
 
-    public Map<String, String> getAlertData() {
+    public Map<String, Object> getAlertData() {
         return alertData;
     }
 
-    public void setAlertData(Map<String, String> alertData) {
+    public void setAlertData(Map<String, Object> alertData) {
         this.alertData = alertData;
     }
 
     public static AlertPublishEvent createAlertPublishEvent(AlertStreamEvent event) {
+        Preconditions.checkNotNull(event.getAlertId(), "alertId is not initialized before being published: " + event.toString());
         AlertPublishEvent alertEvent = new AlertPublishEvent();
-        alertEvent.setAlertId(UUID.randomUUID().toString());
+        alertEvent.setAlertId(event.getAlertId());
         alertEvent.setPolicyId(event.getPolicyId());
         alertEvent.setAlertTimestamp(event.getCreatedTime());
         if (event.getExtraData() != null && !event.getExtraData().isEmpty()) {
@@ -105,6 +107,4 @@ public class AlertPublishEvent {
         alertEvent.setAlertData(event.getDataMap());
         return alertEvent;
     }
-
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index e76c195..c0a709d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -22,10 +22,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
 import org.apache.commons.lang3.StringUtils;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * streamId stands for alert type instead of source event streamId.
@@ -33,6 +30,7 @@ import java.util.Map;
 public class AlertStreamEvent extends StreamEvent {
     private static final long serialVersionUID = 2392131134670106397L;
 
+    private String alertId;
     private String policyId;
     private StreamDefinition schema;
     private String createdBy;
@@ -44,6 +42,7 @@ public class AlertStreamEvent extends StreamEvent {
     }
 
     public AlertStreamEvent(AlertStreamEvent event) {
+        this.alertId = event.getAlertId();
         this.policyId = event.policyId;
         this.schema = event.schema;
         this.createdBy = event.createdBy;
@@ -102,15 +101,15 @@ public class AlertStreamEvent extends StreamEvent {
         this.createdTime = createdTime;
     }
 
-    public Map<String, String> getDataMap() {
-        Map<String, String> event = new HashMap<>();
+    public Map<String, Object> getDataMap() {
+        Map<String, Object> event = new HashMap<>();
         for (StreamColumn column : schema.getColumns()) {
             Object obj = this.getData()[schema.getColumnIndex(column.getName())];
             if (obj == null) {
                 event.put(column.getName(), null);
                 continue;
             }
-            event.put(column.getName(), obj.toString());
+            event.put(column.getName(), obj);
         }
         return event;
     }
@@ -123,4 +122,13 @@ public class AlertStreamEvent extends StreamEvent {
         this.extraData = extraData;
     }
 
-}
+    public String getAlertId() {
+        return alertId;
+    }
+
+    public void ensureAlertId() {
+        if (this.alertId == null) {
+            this.alertId = UUID.randomUUID().toString();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
index 099767e..209a3a6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/service/MetadataServiceClientImpl.java
@@ -1,298 +1,298 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.service;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.GenericType;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import javax.ws.rs.core.MediaType;
-
-public class MetadataServiceClientImpl implements IMetadataServiceClient {
-    private static final long serialVersionUID = 3003976065082684128L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceClientImpl.class);
-
-    private static final String METADATA_SCHEDULESTATES_PATH = "/metadata/schedulestates";
-    private static final String METADATA_PUBLISHMENTS_PATH = "/metadata/publishments";
-    private static final String METADATA_DATASOURCES_PATH = "/metadata/datasources";
-    private static final String METADATA_STREAMS_PATH = "/metadata/streams";
-    private static final String METADATA_POLICIES_PATH = "/metadata/policies";
-    private static final String METADATA_CLUSTERS_PATH = "/metadata/clusters";
-    private static final String METADATA_TOPOLOGY_PATH = "/metadata/topologies";
-    private static final String METADATA_ALERTS_PATH = "/metadata/alerts";
-
-    private static final String METADATA_PUBLISHMENTS_BATCH_PATH = "/metadata/publishments/batch";
-    private static final String METADATA_DATASOURCES_BATCH_PATH = "/metadata/datasources/batch";
-    private static final String METADATA_STREAMS_BATCH_PATH = "/metadata/streams/batch";
-    private static final String METADATA_POLICIES_BATCH_PATH = "/metadata/policies/batch";
-    private static final String METADATA_CLUSTERS_BATCH_PATH = "/metadata/clusters/batch";
-    private static final String METADATA_TOPOLOGY_BATCH_PATH = "/metadata/topologies/batch";
-    private static final String METADATA_ALERTS_BATCH_PATH = "/metadata/alerts/batch";
-
-    private static final String METADATA_CLEAR_PATH = "/metadata/clear";
-
-    private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
-    private static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
-    private static final String EAGLE_CORRELATION_SERVICE_HOST = "metadataService.host";
-
-    protected static final String CONTENT_TYPE = "Content-Type";
-
-    private String host;
-    private int port;
-    private String context;
-    private transient Client client;
-    private String basePath;
-
-    public MetadataServiceClientImpl(Config config) {
-        this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
-            .getString(EAGLE_CORRELATION_CONTEXT));
-        basePath = buildBasePath();
-    }
-
-    public MetadataServiceClientImpl(String host, int port, String context) {
-        this.host = host;
-        this.port = port;
-        this.context = context;
-        this.basePath = buildBasePath();
-        ClientConfig cc = new DefaultClientConfig();
-        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
-        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
-        cc.getClasses().add(JacksonJsonProvider.class);
-        cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
-        this.client = Client.create(cc);
-        client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
-    }
-
-    private String buildBasePath() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("http://");
-        sb.append(host);
-        sb.append(":");
-        sb.append(port);
-        sb.append(context);
-        return sb.toString();
-    }
-
-    @Override
-    public void close() throws IOException {
-        client.destroy();
-    }
-
-    @Override
-    public List<SpoutSpec> listSpoutMetadata() {
-        ScheduleState state = getVersionedSpec();
-        return new ArrayList<>(state.getSpoutSpecs().values());
-    }
-
-    @Override
-    public List<StreamingCluster> listClusters() {
-        return list(METADATA_CLUSTERS_PATH, new GenericType<List<StreamingCluster>>() {
-        });
-    }
-
-    @Override
-    public List<PolicyDefinition> listPolicies() {
-        return list(METADATA_POLICIES_PATH, new GenericType<List<PolicyDefinition>>() {
-        });
-    }
-
-    @Override
-    public List<StreamDefinition> listStreams() {
-        return list(METADATA_STREAMS_PATH, new GenericType<List<StreamDefinition>>() {
-        });
-    }
-
-    @Override
-    public List<Kafka2TupleMetadata> listDataSources() {
-        return list(METADATA_DATASOURCES_PATH, new GenericType<List<Kafka2TupleMetadata>>() {
-        });
-    }
-
-    private <T> List<T> list(String path, GenericType<List<T>> type) {
-        WebResource r = client.resource(basePath + path);
-        LOG.info("query URL {}", basePath + path);
-        List<T> ret = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).get(type);
-        return ret;
-    }
-
-    @Override
-    public List<Publishment> listPublishment() {
-        return list(METADATA_PUBLISHMENTS_PATH, new GenericType<List<Publishment>>() {
-        });
-    }
-
-    @Override
-    public ScheduleState getVersionedSpec(String version) {
-        return listOne(METADATA_SCHEDULESTATES_PATH + "/" + version, ScheduleState.class);
-    }
-
-    @Override
-    public ScheduleState getVersionedSpec() {
-        return listOne(METADATA_SCHEDULESTATES_PATH, ScheduleState.class);
-    }
-
-    private <T> T listOne(String path, Class<T> tClz) {
-        LOG.info("query URL {}", basePath + path);
-        WebResource r = client.resource(basePath + path);
-
-        ClientResponse resp = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON)
-            .get(ClientResponse.class);
-        if (resp.getStatus() < 300) {
-            try {
-                return resp.getEntity(tClz);
-            } catch (Exception e) {
-                LOG.warn(" list one entity failed, ignored and continute, path {}, message {}!", path, e.getMessage());
-            }
-        } else {
-            LOG.warn("fail querying metadata service {} with http status {}", basePath + path, resp.getStatus());
-        }
-        return null;
-    }
-
-    @Override
-    public void addScheduleState(ScheduleState state) {
-        WebResource r = client.resource(basePath + METADATA_SCHEDULESTATES_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(state);
-    }
-
-    @Override
-    public List<Topology> listTopologies() {
-        return list(METADATA_TOPOLOGY_PATH, new GenericType<List<Topology>>() {
-        });
-    }
-
-    @Override
-    public void addStreamingCluster(StreamingCluster cluster) {
-        WebResource r = client.resource(basePath + METADATA_CLUSTERS_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(cluster);
-    }
-
-    @Override
-    public void addStreamingClusters(List<StreamingCluster> clusters) {
-        WebResource r = client.resource(basePath + METADATA_CLUSTERS_BATCH_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(clusters);
-    }
-
-    @Override
-    public void addTopology(Topology t) {
-        WebResource r = client.resource(basePath + METADATA_TOPOLOGY_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(t);
-    }
-
-    @Override
-    public void addTopologies(List<Topology> topologies) {
-        WebResource r = client.resource(basePath + METADATA_TOPOLOGY_BATCH_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(topologies);
-    }
-
-    @Override
-    public void addPolicy(PolicyDefinition policy) {
-        WebResource r = client.resource(basePath + METADATA_POLICIES_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(policy);
-    }
-
-    @Override
-    public void addPolicies(List<PolicyDefinition> policies) {
-        WebResource r = client.resource(basePath + METADATA_POLICIES_BATCH_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(policies);
-    }
-
-    @Override
-    public void addStreamDefinition(StreamDefinition streamDef) {
-        WebResource r = client.resource(basePath + METADATA_STREAMS_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(streamDef);
-    }
-
-    @Override
-    public void addStreamDefinitions(List<StreamDefinition> streamDefs) {
-        WebResource r = client.resource(basePath + METADATA_STREAMS_BATCH_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(streamDefs);
-    }
-
-    @Override
-    public void addDataSource(Kafka2TupleMetadata k2t) {
-        WebResource r = client.resource(basePath + METADATA_DATASOURCES_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(k2t);
-    }
-
-    @Override
-    public void addDataSources(List<Kafka2TupleMetadata> k2ts) {
-        WebResource r = client.resource(basePath + METADATA_DATASOURCES_BATCH_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(k2ts);
-    }
-
-    @Override
-    public void addPublishment(Publishment pub) {
-        WebResource r = client.resource(basePath + METADATA_PUBLISHMENTS_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(pub);
-    }
-
-    @Override
-    public void addPublishments(List<Publishment> pubs) {
-        WebResource r = client.resource(basePath + METADATA_PUBLISHMENTS_BATCH_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(pubs);
-    }
-
-    @Override
-    public void clear() {
-        WebResource r = client.resource(basePath + METADATA_CLEAR_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post();
-    }
-
-    @Override
-    public List<AlertPublishEvent> listAlertPublishEvent() {
-        return list(METADATA_ALERTS_PATH, new GenericType<List<AlertPublishEvent>>(){});
-    }
-
-    @Override
-    public void addAlertPublishEvent(AlertPublishEvent event) {
-        WebResource r = client.resource(basePath + METADATA_ALERTS_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(event);
-    }
-
-    @Override
-    public void addAlertPublishEvents(List<AlertPublishEvent> events) {
-        WebResource r = client.resource(basePath + METADATA_ALERTS_BATCH_PATH);
-        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(events);
-    }
-
-}
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.service;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
+
+public class MetadataServiceClientImpl implements IMetadataServiceClient {
+    private static final long serialVersionUID = 3003976065082684128L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceClientImpl.class);
+
+    private static final String METADATA_SCHEDULESTATES_PATH = "/metadata/schedulestates";
+    private static final String METADATA_PUBLISHMENTS_PATH = "/metadata/publishments";
+    private static final String METADATA_DATASOURCES_PATH = "/metadata/datasources";
+    private static final String METADATA_STREAMS_PATH = "/metadata/streams";
+    private static final String METADATA_POLICIES_PATH = "/metadata/policies";
+    private static final String METADATA_CLUSTERS_PATH = "/metadata/clusters";
+    private static final String METADATA_TOPOLOGY_PATH = "/metadata/topologies";
+    private static final String METADATA_ALERTS_PATH = "/metadata/alerts";
+
+    private static final String METADATA_PUBLISHMENTS_BATCH_PATH = "/metadata/publishments/batch";
+    private static final String METADATA_DATASOURCES_BATCH_PATH = "/metadata/datasources/batch";
+    private static final String METADATA_STREAMS_BATCH_PATH = "/metadata/streams/batch";
+    private static final String METADATA_POLICIES_BATCH_PATH = "/metadata/policies/batch";
+    private static final String METADATA_CLUSTERS_BATCH_PATH = "/metadata/clusters/batch";
+    private static final String METADATA_TOPOLOGY_BATCH_PATH = "/metadata/topologies/batch";
+    private static final String METADATA_ALERTS_BATCH_PATH = "/metadata/alerts/batch";
+
+    private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+
+    private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
+    public static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
+    public static final String EAGLE_CORRELATION_SERVICE_HOST = "metadataService.host";
+
+    protected static final String CONTENT_TYPE = "Content-Type";
+
+    private String host;
+    private int port;
+    private String context;
+    private transient Client client;
+    private String basePath;
+
+    public MetadataServiceClientImpl(Config config) {
+        this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
+            .getString(EAGLE_CORRELATION_CONTEXT));
+        basePath = buildBasePath();
+    }
+
+    public MetadataServiceClientImpl(String host, int port, String context) {
+        this.host = host;
+        this.port = port;
+        this.context = context;
+        this.basePath = buildBasePath();
+        ClientConfig cc = new DefaultClientConfig();
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+        cc.getClasses().add(JacksonJsonProvider.class);
+        cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+        this.client = Client.create(cc);
+        client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+    }
+
+    private String buildBasePath() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("http://");
+        sb.append(host);
+        sb.append(":");
+        sb.append(port);
+        sb.append(context);
+        return sb.toString();
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.destroy();
+    }
+
+    @Override
+    public List<SpoutSpec> listSpoutMetadata() {
+        ScheduleState state = getVersionedSpec();
+        return new ArrayList<>(state.getSpoutSpecs().values());
+    }
+
+    @Override
+    public List<StreamingCluster> listClusters() {
+        return list(METADATA_CLUSTERS_PATH, new GenericType<List<StreamingCluster>>() {
+        });
+    }
+
+    @Override
+    public List<PolicyDefinition> listPolicies() {
+        return list(METADATA_POLICIES_PATH, new GenericType<List<PolicyDefinition>>() {
+        });
+    }
+
+    @Override
+    public List<StreamDefinition> listStreams() {
+        return list(METADATA_STREAMS_PATH, new GenericType<List<StreamDefinition>>() {
+        });
+    }
+
+    @Override
+    public List<Kafka2TupleMetadata> listDataSources() {
+        return list(METADATA_DATASOURCES_PATH, new GenericType<List<Kafka2TupleMetadata>>() {
+        });
+    }
+
+    private <T> List<T> list(String path, GenericType<List<T>> type) {
+        WebResource r = client.resource(basePath + path);
+        LOG.info("query URL {}", basePath + path);
+        List<T> ret = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).get(type);
+        return ret;
+    }
+
+    @Override
+    public List<Publishment> listPublishment() {
+        return list(METADATA_PUBLISHMENTS_PATH, new GenericType<List<Publishment>>() {
+        });
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec(String version) {
+        return listOne(METADATA_SCHEDULESTATES_PATH + "/" + version, ScheduleState.class);
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec() {
+        return listOne(METADATA_SCHEDULESTATES_PATH, ScheduleState.class);
+    }
+
+    private <T> T listOne(String path, Class<T> tClz) {
+        LOG.info("query URL {}", basePath + path);
+        WebResource r = client.resource(basePath + path);
+
+        ClientResponse resp = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON)
+            .get(ClientResponse.class);
+        if (resp.getStatus() < 300) {
+            try {
+                return resp.getEntity(tClz);
+            } catch (Exception e) {
+                LOG.warn(" list one entity failed, ignored and continute, path {}, message {}!", path, e.getMessage());
+            }
+        } else {
+            LOG.warn("fail querying metadata service {} with http status {}", basePath + path, resp.getStatus());
+        }
+        return null;
+    }
+
+    @Override
+    public void addScheduleState(ScheduleState state) {
+        WebResource r = client.resource(basePath + METADATA_SCHEDULESTATES_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(state);
+    }
+
+    @Override
+    public List<Topology> listTopologies() {
+        return list(METADATA_TOPOLOGY_PATH, new GenericType<List<Topology>>() {
+        });
+    }
+
+    @Override
+    public void addStreamingCluster(StreamingCluster cluster) {
+        WebResource r = client.resource(basePath + METADATA_CLUSTERS_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(cluster);
+    }
+
+    @Override
+    public void addStreamingClusters(List<StreamingCluster> clusters) {
+        WebResource r = client.resource(basePath + METADATA_CLUSTERS_BATCH_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(clusters);
+    }
+
+    @Override
+    public void addTopology(Topology t) {
+        WebResource r = client.resource(basePath + METADATA_TOPOLOGY_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(t);
+    }
+
+    @Override
+    public void addTopologies(List<Topology> topologies) {
+        WebResource r = client.resource(basePath + METADATA_TOPOLOGY_BATCH_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(topologies);
+    }
+
+    @Override
+    public void addPolicy(PolicyDefinition policy) {
+        WebResource r = client.resource(basePath + METADATA_POLICIES_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(policy);
+    }
+
+    @Override
+    public void addPolicies(List<PolicyDefinition> policies) {
+        WebResource r = client.resource(basePath + METADATA_POLICIES_BATCH_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(policies);
+    }
+
+    @Override
+    public void addStreamDefinition(StreamDefinition streamDef) {
+        WebResource r = client.resource(basePath + METADATA_STREAMS_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(streamDef);
+    }
+
+    @Override
+    public void addStreamDefinitions(List<StreamDefinition> streamDefs) {
+        WebResource r = client.resource(basePath + METADATA_STREAMS_BATCH_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(streamDefs);
+    }
+
+    @Override
+    public void addDataSource(Kafka2TupleMetadata k2t) {
+        WebResource r = client.resource(basePath + METADATA_DATASOURCES_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(k2t);
+    }
+
+    @Override
+    public void addDataSources(List<Kafka2TupleMetadata> k2ts) {
+        WebResource r = client.resource(basePath + METADATA_DATASOURCES_BATCH_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(k2ts);
+    }
+
+    @Override
+    public void addPublishment(Publishment pub) {
+        WebResource r = client.resource(basePath + METADATA_PUBLISHMENTS_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(pub);
+    }
+
+    @Override
+    public void addPublishments(List<Publishment> pubs) {
+        WebResource r = client.resource(basePath + METADATA_PUBLISHMENTS_BATCH_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(pubs);
+    }
+
+    @Override
+    public void clear() {
+        WebResource r = client.resource(basePath + METADATA_CLEAR_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post();
+    }
+
+    @Override
+    public List<AlertPublishEvent> listAlertPublishEvent() {
+        return list(METADATA_ALERTS_PATH, new GenericType<List<AlertPublishEvent>>(){});
+    }
+
+    @Override
+    public void addAlertPublishEvent(AlertPublishEvent event) {
+        WebResource r = client.resource(basePath + METADATA_ALERTS_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(event);
+    }
+
+    @Override
+    public void addAlertPublishEvents(List<AlertPublishEvent> events) {
+        WebResource r = client.resource(basePath + METADATA_ALERTS_BATCH_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(events);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
index d534e3b..01eaa3c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
@@ -76,6 +76,7 @@ public class AlertPublishEventTest {
         alertStreamEvent.setSchema(streamDefinition);
         alertStreamEvent.setPolicyId("setPolicyId");
         alertStreamEvent.setCreatedTime(1234);
+        alertStreamEvent.ensureAlertId();
         AlertPublishEvent alertPublishEvent = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
         Assert.assertEquals(null, alertPublishEvent.getSiteId());
         Assert.assertTrue(alertPublishEvent.getAlertId() != null);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
index 649125c..84aee88 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -130,15 +130,21 @@
             <groupId>org.mongodb</groupId>
             <artifactId>mongo-java-driver</artifactId>
         </dependency>
-		<dependency>
- 			<groupId>de.flapdoodle.embed</groupId>
- 			<artifactId>de.flapdoodle.embed.mongo</artifactId>
- 			<scope>test</scope>
- 		</dependency>
         <dependency>
             <groupId>com.ullink.slack</groupId>
             <artifactId>simpleslackapi</artifactId>
         </dependency>
+        <dependency>
+            <groupId>de.flapdoodle.embed</groupId>
+            <artifactId>de.flapdoodle.embed.mongo</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>dumbster</groupId>
+            <artifactId>dumbster</artifactId>
+            <version>1.6</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
index b509092..f2168bc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -47,9 +47,15 @@ public class PublishConstants {
     public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
 
     public static final String ALERT_EMAIL_MESSAGE = "alertMessage";
-    public static final String ALERT_EMAIL_STREAM = "streamId";
+    public static final String ALERT_EMAIL_STREAM_ID = "streamId";
     public static final String ALERT_EMAIL_TIMESTAMP = "alertTime";
-    public static final String ALERT_EMAIL_POLICY = "policyId";
+    public static final String ALERT_EMAIL_POLICY_ID = "policyId";
+    public static final String ALERT_EMAIL_ALERT_ID = "alertId";
+    public static final String ALERT_EMAIL_ALERT_DATA = "alertData";
+    public static final String ALERT_EMAIL_ALERT_DATA_DESC = "alertDataDesc";
     public static final String ALERT_EMAIL_CREATOR = "creator";
-
+    public static final String ALERT_EMAIL_ALERT_DETAIL_URL = "alertDetailUrl";
+    public static final String ALERT_EMAIL_POLICY_DETAIL_URL = "policyDetailUrl";
+    public static final String ALERT_EMAIL_HOME_URL = "homeUrl";
+    public static final String ALERT_EMAIL_VERSION = "version";
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 6b5295c..8a69c37 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -20,10 +20,13 @@
  */
 package org.apache.eagle.alert.engine.publisher.email;
 
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 
+import org.apache.eagle.common.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +39,10 @@ public class AlertEmailGenerator {
     private String sender;
     private String recipients;
     private String subject;
+
+    private String serverHost = "localhost";
+    private int serverPort = 80;
+
     private Map<String, Object> properties;
 
     private ThreadPoolExecutor executorPool;
@@ -86,13 +93,51 @@ public class AlertEmailGenerator {
         return status;
     }
 
+    /**
+     * TODO Support template-based alert message.
+     */
+    private String renderAlertMessage(AlertStreamEvent event) {
+        return String.format("Alert policy \"%s\" was triggered: %s",event.getPolicyId(), generateAlertDataDesc(event));
+    }
+
+    private String generateAlertDataDesc(AlertStreamEvent event) {
+        if (event.getDataMap() == null) {
+            return "N/A";
+        }
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<String,Object> entry : event.getDataMap().entrySet()) {
+            sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
+        }
+        return sb.toString();
+    }
+
     private Map<String, String> buildAlertContext(AlertStreamEvent event) {
         Map<String, String> alertContext = new HashMap<>();
-        alertContext.put(PublishConstants.ALERT_EMAIL_MESSAGE, event.toString());
-        alertContext.put(PublishConstants.ALERT_EMAIL_POLICY, event.getPolicyId());
+        alertContext.put(PublishConstants.ALERT_EMAIL_MESSAGE, renderAlertMessage(event));
+        alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_ID, event.getPolicyId());
+        alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_ID, event.getAlertId());
+        alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA, event.getDataMap().toString());
+        alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, generateAlertDataDesc(event));
         alertContext.put(PublishConstants.ALERT_EMAIL_TIMESTAMP, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
-        alertContext.put(PublishConstants.ALERT_EMAIL_STREAM, event.getStreamId());
+        alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, event.getStreamId());
         alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy());
+        alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, Version.version);
+
+        String rootUrl = this.getServerPort() == 80 ? String.format("http://%s",this.getServerHost())
+            : String.format("http://%s:%s",this.getServerHost(), this.getServerPort());
+        try {
+            alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DETAIL_URL,
+                String.format("%s/#/alert/detail/%s", rootUrl, URIUtil.encodeQuery(event.getAlertId(),"UTF-8")));
+            alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_DETAIL_URL,
+                String.format("%s/#/policy/detail/%s",rootUrl, URIUtil.encodeQuery(event.getPolicyId(),"UTF-8")));
+        } catch (URIException e) {
+            LOG.warn(e.getMessage(),e);
+            alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DETAIL_URL,
+                String.format("%s/#/alert/detail/%s", rootUrl, event.getAlertId()));
+            alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_DETAIL_URL,
+                String.format("%s/#/policy/detail/%s",rootUrl, event.getPolicyId()));
+        }
+        alertContext.put(PublishConstants.ALERT_EMAIL_HOME_URL,rootUrl);
         return alertContext;
     }
 
@@ -139,4 +184,20 @@ public class AlertEmailGenerator {
     public void setExecutorPool(ThreadPoolExecutor executorPool) {
         this.executorPool = executorPool;
     }
+
+    public String getServerHost() {
+        return serverHost;
+    }
+
+    public void setServerHost(String serverHost) {
+        this.serverHost = serverHost;
+    }
+
+    public int getServerPort() {
+        return serverPort;
+    }
+
+    public void setServerPort(int serverPort) {
+        this.serverPort = serverPort;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
index f817774..b018d5c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
@@ -61,6 +61,16 @@ public class AlertEmailGeneratorBuilder {
         return this;
     }
 
+    public AlertEmailGeneratorBuilder withServerHost(String serverHost) {
+        generator.setServerHost(serverHost);
+        return this;
+    }
+
+    public AlertEmailGeneratorBuilder withServerPort(int serverPort) {
+        generator.setServerPort(serverPort);
+        return this;
+    }
+
     public AlertEmailGenerator build() {
         return this.generator;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 70c8cf2..40237ee 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -25,6 +25,7 @@ import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
 import com.typesafe.config.Config;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,17 +47,25 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
     private Map<String, Object> emailConfig;
 
     private transient ThreadPoolExecutor executorPool;
+    private String serverHost;
+    private int serverPort;
 
     @Override
     @SuppressWarnings("rawtypes")
     public void init(Config config, Publishment publishment, Map conf) throws Exception {
         super.init(config, publishment, conf);
+        this.serverHost = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST)
+            ? config.getString(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_HOST) : "localhost";
+        this.serverPort = config.hasPath(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT)
+            ? config.getInt(MetadataServiceClientImpl.EAGLE_CORRELATION_SERVICE_PORT) : 80;
+
         executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
         LOG.info(" Creating Email Generator... ");
         if (publishment.getProperties() != null) {
             emailConfig = new HashMap<>(publishment.getProperties());
             emailGenerator = createEmailGenerator(emailConfig);
         }
+
     }
 
     @Override
@@ -123,7 +132,10 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
             .withSender(sender)
             .withRecipients(recipients)
             .withTplFile(tplFileName)
-            .withExecutorPool(this.executorPool).build();
+            .withExecutorPool(this.executorPool)
+            .withServerHost(this.serverHost)
+            .withServerPort(this.serverPort)
+            .build();
         return gen;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index 7cb4a73..4709180 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -91,7 +91,7 @@ public class AlertPublisherImpl implements AlertPublisher {
             LOG.warn("Policy {} Stream {} does *NOT* subscribe any publishment!", policyId, event.getStreamId());
             return;
         }
-
+        event.ensureAlertId();
         for (String pubId : pubIds) {
             @SuppressWarnings("resource")
             AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) : null;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
index 965480e..fad3aa9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
@@ -123,8 +123,6 @@
 </head>
 <body>
     #set ( $elem = $alertList[0] )
-    #set ( $alertUrl = $elem["alertDetailUrl"] )
-    #set ( $policyUrl = $elem["policyDetailUrl"] )
 <table class="body">
     <tr>
         <td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
@@ -132,7 +130,7 @@
             <table width="580">
                 <tr>
                     <td style="padding: 0 0 0 0;" align="left">
-                        <p style="color:#FFFFFF;font-weight: bold; font-size: 22px">UMP Alerts</p>
+                        <p style="color:#FFFFFF;font-weight: bold; font-size: 22px">Eagle Alert Notification</p>
                     </td>
                 </tr>
             </table>
@@ -146,7 +144,7 @@
                 <tr>
                     <!-- Title -->
                     <td align="center">
-                        <h1>$elem["streamId"] Alert Detected</h1>
+                        <h2>[Alert] $elem["policyId"]</h2>
                     </td>
                 </tr>
                 <tr>
@@ -175,29 +173,24 @@
                         </table>
                     </td>
                 </tr>
+
                 <tr>
-                    <!-- Description -->
-                    <td valign="top"
-                        style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
-                        <p>$elem["alertMessage"]</p>
+                    <!-- Basic Information -->
+                    <td style="padding: 20px 0 10px 0;">
+                        <p><b>Alert Message </b></p>
                     </td>
                 </tr>
                 <tr>
-                    <!-- View Detail -->
-                    <td align="center" style="padding: 10px 0 0 0;">
-                        <table width="580">
-                            <tr>
-                                <td class="btn">
-                                    <a href="$alertUrl">View Alert Details on Eagle Web</a>
-                                </td>
-                            </tr>
-                        </table>
+                    <!-- Description -->
+                    <td valign="top"
+                        style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 20px;">
+                        <p>$elem["alertMessage"]</p>
                     </td>
                 </tr>
                 <tr>
                     <!-- Basic Information -->
-                    <td style="padding: 20px 0 0 0;">
-                        <p><b>Basic Information:</b></p>
+                    <td style="padding: 20px 0 10px 0;">
+                        <p><b>Alert Detail</b></p>
                     </td>
                 </tr>
                 <tr>
@@ -208,45 +201,65 @@
                                 <th>
                                     <p>Policy Name</p>
                                 </th>
-                                <th>
-                                    <p>Data Source</p>
-                                </th>
+                                <td>
+                                    <p><a href="$elem["policyDetailUrl"]">$elem["policyId"]</a></p>
+                                </td>
                             </tr>
                             <tr>
+                                <th>
+                                    <p>Severity Level</p>
+                                </th>
                                 <td>
-                                    <p>$elem["policyId"]</p>
+                                    <p>$elem["severity"]</p>
                                 </td>
+                            </tr>
+                            <tr>
+                                <th>
+                                    <p>Alert Stream</p>
+                                </th>
                                 <td>
                                     <p>$elem["streamId"]</p>
                                 </td>
                             </tr>
                             <tr>
-
                                 <th>
-                                    <p>Creator</p>
-                                </th>
-                                <th>
-                                    <p>Severity</p>
+                                    <p>Created Time</p>
                                 </th>
+                                <td>
+                                    <p>$elem["alertTime"]</p>
+                                </td>
                             </tr>
                             <tr>
+                                <th>
+                                    <p>Created By</p>
+                                </th>
                                 <td>
                                     <p>$elem["creator"]</p>
                                 </td>
-                                <td>
-                                    <p>$elem["severity"]</p>
-                                </td>
                             </tr>
                         </table>
                     </td>
                 </tr>
+##                <tr>
+##                    <!-- View Detail -->
+##                    <td align="center" style="padding: 10px 0 0 0;">
+##                        <table width="580">
+##                            <tr>
+##                                <td class="btn">
+##                                    <a href="$elem["policyDetailUrl"]">View Policy Details</a>
+##                                </td>
+##                            </tr>
+##                        </table>
+##                    </td>
+##                </tr>
+
                 <tr>
                     <!-- View Detail -->
                     <td align="center" style="padding: 10px 0 0 0;">
                         <table width="580">
                             <tr>
                                 <td class="btn">
-                                    <a href="$policyUrl">View Policy Details on Eagle Web</a>
+                                    <a href="$elem["alertDetailUrl"]">View Alert on Eagle</a>
                                 </td>
                             </tr>
                         </table>
@@ -254,21 +267,30 @@
                 </tr>
                 <tr>
                     <!-- Actions Required -->
-                    <td style="padding: 20px 0 0 0;">
-                        <p><b>Actions Required:</b></p>
+                    <td style="padding: 20px 0 10px 0;">
+                        <p><b>Actions Required</b></p>
                     </td>
                 </tr>
                 <tr>
                     <!-- Possible Root Causes Content -->
                     <td class="panel" valign="top"
                         style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
-                        <p> $elem["streamId"] alert found, please check.</p>
+                        <p>
+                            The alert notification was automatically detected and sent by Eagle according to policy: $elem["policyId"].
+                            To follow-up on this, please verify the alert and diagnose the root cause with Eagle:
+                        </p>
+                        <p></p>
+                        <ul>
+                            <li><p><a href="$elem["alertDetailUrl"]">View alert detail</a></p></li>
+                            <li><p><a href="$elem["policyDetailUrl"]">View policy detail</a></p></li>
+                            <li><p><a href="$elem["homeUrl"]">View eagle home</a></p></li>
+                        </ul>
                     </td>
                 </tr>
                 <tr>
                     <!-- Copyright -->
                     <td align="center">
-                        <p><a href="<Eagle-Host>/alerts/alertlist.html">UMP Alert Engine</a></p>
+                        <p><i>Powered by <a href="http://eagle.incubator.apache.org">Apache Eagle</a> (version: $elem["version"])</i></p>
                     </td>
                 </tr>
             </table>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
index a1a9cdd..2305a0f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
@@ -41,6 +41,6 @@ public class MemoryUsageGaugeSetTest {
         metrics.registerAll(new MemoryUsageGaugeSet());
         metrics.register("sample", (Gauge<Double>) () -> 0.1234);
         reporter.start(1, TimeUnit.SECONDS);
-        Thread.sleep(5000);
+        reporter.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
new file mode 100644
index 0000000..3f3141a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import com.dumbster.smtp.SimpleSmtpServer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants;
+import org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AlertEmailPublisherTest {
+    private static final String EMAIL_PUBLISHER_TEST_POLICY = "Test Policy Alert";
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisherTest.class);
+    private static final int SMTP_PORT = 5025;
+    private Config config;
+    private SimpleSmtpServer server;
+
+    @Before
+    public void setUp(){
+        config = ConfigFactory.load();
+        server = SimpleSmtpServer.start(SMTP_PORT);
+    }
+
+    @After
+    public void clear(){
+        if(server!=null) {
+            server.stop();
+        }
+    }
+
+    @Test
+    public void testAlertEmailPublisher() throws Exception {
+        AlertEmailPublisher publisher = new AlertEmailPublisher();
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(PublishConstants.SUBJECT,EMAIL_PUBLISHER_TEST_POLICY);
+        properties.put(PublishConstants.SENDER,"eagle@localhost");
+        properties.put(PublishConstants.RECIPIENTS,"somebody@localhost");
+        properties.put(AlertEmailConstants.CONF_MAIL_HOST,"localhost");
+        properties.put(AlertEmailConstants.CONF_MAIL_PORT,String.valueOf(SMTP_PORT));
+        Publishment publishment = new Publishment();
+        publishment.setName("testEmailPublishment");
+        publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
+        publishment.setPolicyIds(Collections.singletonList(EMAIL_PUBLISHER_TEST_POLICY));
+        publishment.setDedupIntervalMin("PT0M");
+        publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName());
+        publishment.setProperties(properties);
+        Map<String, String> conf = new HashMap<>();
+        publisher.init(config, publishment,conf);
+        publisher.onAlert(AlertPublisherTestHelper.mockEvent(EMAIL_PUBLISHER_TEST_POLICY));
+        Assert.assertEquals(1,server.getReceivedEmailSize());
+        Assert.assertTrue(server.getReceivedEmail().hasNext());
+        LOG.info("EMAIL:\n {}", server.getReceivedEmail().next());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
index 61de2e4..aaa1e80 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java
@@ -16,12 +16,7 @@
  */
 package org.apache.eagle.alert.engine.publisher;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
@@ -31,6 +26,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
 import org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher;
+import org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer;
 import org.apache.eagle.alert.utils.KafkaEmbedded;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -50,7 +46,9 @@ import kafka.message.MessageAndMetadata;
 public class AlertKafkaPublisherTest {
 
     private static final String TEST_TOPIC_NAME = "test";
-
+    private static final String TEST_POLICY_ID = "testPolicy";
+    private static final int TEST_KAFKA_BROKER_PORT = 59092;
+    private static final int TEST_KAFKA_ZOOKEEPER_PORT = 52181;
     private static KafkaEmbedded kafka;
     private static Config config;
 
@@ -58,11 +56,9 @@ public class AlertKafkaPublisherTest {
 
     @BeforeClass
     public static void setup() {
-        kafka = new KafkaEmbedded(9092, 2181);
-
+        kafka = new KafkaEmbedded(TEST_KAFKA_BROKER_PORT, TEST_KAFKA_ZOOKEEPER_PORT);
         System.setProperty("config.resource", "/simple/application-integration.conf");
         config = ConfigFactory.load();
-
         consumeWithOutput(outputMessages);
     }
 
@@ -75,12 +71,9 @@ public class AlertKafkaPublisherTest {
 
     @Test
     public void testAsync() throws Exception {
-        StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
-
         AlertKafkaPublisher publisher = new AlertKafkaPublisher();
         Map<String, Object> properties = new HashMap<>();
-        properties.put(PublishConstants.BROKER_LIST, "localhost:9092");
+        properties.put(PublishConstants.BROKER_LIST, "localhost:" + TEST_KAFKA_BROKER_PORT);
         properties.put(PublishConstants.TOPIC, TEST_TOPIC_NAME);
         
         List<Map<String, Object>> kafkaClientConfig = new ArrayList<Map<String, Object>>();
@@ -92,62 +85,48 @@ public class AlertKafkaPublisherTest {
 
         Publishment publishment = new Publishment();
         publishment.setName("testAsyncPublishment");
-        publishment.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
-        publishment.setPolicyIds(Arrays.asList(policy.getName()));
+        publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
+        publishment.setPolicyIds(Arrays.asList(TEST_POLICY_ID));
         publishment.setDedupIntervalMin("PT0M");
-        publishment.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+        publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName());
         publishment.setProperties(properties);
         
         Map<String, String> conf = new HashMap<String, String>();
         publisher.init(config, publishment, conf);
 
-        AlertStreamEvent event = createEvent(stream, policy,
-            new Object[] {System.currentTimeMillis(), "host1", "testPolicy-host1-01", "open", 0, 0});
+        AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID);
 
         outputMessages.clear();
 
         publisher.onAlert(event);
         Thread.sleep(3000);
-
         Assert.assertEquals(1, outputMessages.size());
-
         publisher.close();
     }
 
     @Test
     public void testSync() throws Exception {
-        StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
-
         AlertKafkaPublisher publisher = new AlertKafkaPublisher();
         Map<String, Object> properties = new HashMap<>();
-        properties.put(PublishConstants.BROKER_LIST, "localhost:9092");
+        properties.put(PublishConstants.BROKER_LIST, "localhost:" + TEST_KAFKA_BROKER_PORT);
         properties.put(PublishConstants.TOPIC, TEST_TOPIC_NAME);
-
         List<Map<String, Object>> kafkaClientConfig = new ArrayList<Map<String, Object>>();
         kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync"));
         properties.put("kafka_client_config", kafkaClientConfig);
-        
         Publishment publishment = new Publishment();
         publishment.setName("testAsyncPublishment");
-        publishment.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
-        publishment.setPolicyIds(Arrays.asList(policy.getName()));
+        publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName());
+        publishment.setPolicyIds(Collections.singletonList(TEST_POLICY_ID));
         publishment.setDedupIntervalMin("PT0M");
-        publishment.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+        publishment.setSerializer(JsonEventSerializer.class.getName());
         publishment.setProperties(properties);
-        Map<String, String> conf = new HashMap<String, String>();
+        Map<String, String> conf = new HashMap<>();
         publisher.init(config, publishment, conf);
-
-        AlertStreamEvent event = createEvent(stream, policy,
-            new Object[] {System.currentTimeMillis(), "host1", "testPolicy-host1-01", "open", 0, 0});
-
+        AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID);
         outputMessages.clear();
-
         publisher.onAlert(event);
         Thread.sleep(3000);
-
         Assert.assertEquals(1, outputMessages.size());
-
         publisher.close();
     }
 
@@ -157,7 +136,7 @@ public class AlertKafkaPublisherTest {
             public void run() {
                 Properties props = new Properties();
                 props.put("group.id", "B");
-                props.put("zookeeper.connect", "127.0.0.1:2181");
+                props.put("zookeeper.connect", "127.0.0.1:" + + TEST_KAFKA_ZOOKEEPER_PORT);
                 props.put("zookeeper.session.timeout.ms", "4000");
                 props.put("zookeeper.sync.time.ms", "2000");
                 props.put("auto.commit.interval.ms", "1000");
@@ -190,71 +169,5 @@ public class AlertKafkaPublisherTest {
         t.start();
     }
 
-    private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
-        AlertStreamEvent event = new AlertStreamEvent();
-        event.setPolicyId(policy.getName());
-        event.setSchema(stream);
-        event.setStreamId(stream.getStreamId());
-        event.setTimestamp(System.currentTimeMillis());
-        event.setCreatedTime(System.currentTimeMillis());
-        event.setData(data);
-        return event;
-    }
-
-    private StreamDefinition createStream() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("host");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn alertKeyColumn = new StreamColumn();
-        alertKeyColumn.setName("alertKey");
-        alertKeyColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn stateColumn = new StreamColumn();
-        stateColumn.setName("state");
-        stateColumn.setType(StreamColumn.Type.STRING);
-
-        // dedupCount, dedupFirstOccurrence
-
-        StreamColumn dedupCountColumn = new StreamColumn();
-        dedupCountColumn.setName("dedupCount");
-        dedupCountColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
-        dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
-        dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn,
-            dedupFirstOccurrenceColumn));
-        sd.setDataSource("testDatasource");
-        sd.setStreamId("testStream");
-        sd.setDescription("test stream");
-        return sd;
-    }
-
-    private PolicyDefinition createPolicy(String streamName, String policyName) {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        // expression, something like "PT5S,dynamic,1,host"
-        def.setValue("test");
-        def.setType("siddhi");
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList("inputStream"));
-        pd.setOutputStreams(Arrays.asList("outputStream"));
-        pd.setName(policyName);
-        pd.setDescription(String.format("Test policy for stream %s", streamName));
-
-        StreamPartition sp = new StreamPartition();
-        sp.setStreamId(streamName);
-        sp.setColumns(Arrays.asList("host"));
-        sp.setType(StreamPartition.Type.GROUPBY);
-        pd.addPartition(sp);
-        return pd;
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/38ec7fc2/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
new file mode 100644
index 0000000..ee865b3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
+import org.junit.Assert;
+
+import java.util.Arrays;
+
+public class AlertPublisherTestHelper {
+
+    public static AlertStreamEvent mockEvent(String policyId){
+        StreamDefinition stream = createStream();
+        PolicyDefinition policy = createPolicy(stream.getStreamId(), policyId);
+        return  createEvent(stream, policy,
+            new Object[] {System.currentTimeMillis(), "host1", "testPolicy-host1-01", "open", 0, 0});
+    }
+
+    public static AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) {
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setPolicyId(policy.getName());
+        event.setSchema(stream);
+        event.setStreamId(stream.getStreamId());
+        event.setTimestamp(System.currentTimeMillis());
+        event.setCreatedTime(System.currentTimeMillis());
+        event.setData(data);
+        event.ensureAlertId();
+        Assert.assertNotNull(event.getAlertId());
+        return event;
+    }
+
+    public static StreamDefinition createStream() {
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn tsColumn = new StreamColumn();
+        tsColumn.setName("timestamp");
+        tsColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("host");
+        hostColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn alertKeyColumn = new StreamColumn();
+        alertKeyColumn.setName("alertKey");
+        alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn stateColumn = new StreamColumn();
+        stateColumn.setName("state");
+        stateColumn.setType(StreamColumn.Type.STRING);
+
+        // dedupCount, dedupFirstOccurrence
+
+        StreamColumn dedupCountColumn = new StreamColumn();
+        dedupCountColumn.setName("dedupCount");
+        dedupCountColumn.setType(StreamColumn.Type.LONG);
+
+        StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+        dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE);
+        dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+
+        sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn,
+            dedupFirstOccurrenceColumn));
+        sd.setDataSource("testDatasource");
+        sd.setStreamId("testStream");
+        sd.setDescription("test stream");
+        return sd;
+    }
+
+    public static  PolicyDefinition createPolicy(String streamName, String policyName) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        // expression, something like "PT5S,dynamic,1,host"
+        def.setValue("test");
+        def.setType("siddhi");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList("inputStream"));
+        pd.setOutputStreams(Arrays.asList("outputStream"));
+        pd.setName(policyName);
+        pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamName);
+        sp.setColumns(Arrays.asList("host"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        return pd;
+    }
+}


Mime
View raw message