eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [44/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:23 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java
new file mode 100644
index 0000000..2749648
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.notification.dao.AlertNotificationDAO;
+import org.apache.eagle.notification.dao.AlertNotificationDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestGetAllNotifications {
+    @Ignore
+    @Test
+    public void  getAllNotification() throws Exception {
+        Config config = EagleConfigFactory.load().getConfig();
+        AlertNotificationDAO dao = new AlertNotificationDAOImpl( new EagleServiceConnector(config));
+        List<AlertNotificationEntity> list = dao.findAlertNotificationTypes();
+        System.out.println(" Fetch all Notifications : "+list);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java
new file mode 100644
index 0000000..624343b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.plugin.NotificationPluginLoader;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created on 2/10/16.
+ */
+public class TestNotificationPluginLoader {
+    @Ignore //only work when connected to eagle service
+    @Test
+    public void testLoader(){
+        Config config = ConfigFactory.load();
+        NotificationPluginLoader loader = NotificationPluginLoader.getInstance();
+        loader.init(config);
+        Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.EAGLE_STORE));
+        Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.KAFKA_STORE));
+        Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.EMAIL_NOTIFICATION));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java
new file mode 100644
index 0000000..2b6f25d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.notifications.testcases;
+
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.NotificationPluginManager;
+import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl;
+import org.apache.eagle.policy.common.Constants;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class TestNotificationPluginManager {
+    @Ignore
+    @Test
+    public void testUpdateNotificationPlugins() {
+        boolean isDelete = false;
+        AlertDefinitionAPIEntity alertDef = new AlertDefinitionAPIEntity();
+        alertDef.setTags(new HashMap<String, String>());
+        alertDef.getTags().put(Constants.POLICY_ID, "testPlugin");
+        alertDef.setNotificationDef("[]");
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.updateNotificationPlugins(alertDef, isDelete);
+        Assert.assertTrue(true);
+    }
+    @Ignore
+    @Test
+    public void testUpdateNotificationPlugins2() {
+        boolean isDelete = false;
+        AlertDefinitionAPIEntity alertDef = new AlertDefinitionAPIEntity();
+        alertDef.setTags(new HashMap<String, String>());
+        alertDef.getTags().put(Constants.POLICY_ID, "testEmptyPlugins");
+        alertDef.setNotificationDef("[{\"notificationType\":\"eagleStore\"},{\"notificationType\":\"kafka\",\"kafka_broker\":\"sandbox.hortonworks.com:6667\",\"topic\":\"testTopic\"}]");
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.updateNotificationPlugins(alertDef, isDelete);
+        Assert.assertTrue(true);
+    }
+
+    @Ignore
+    @Test
+    public void testUpdateNotificationPluginsWithDelete() {
+        boolean isDelete = true;
+        AlertDefinitionAPIEntity alertDef = new AlertDefinitionAPIEntity();
+        alertDef.setTags(new HashMap<String, String>());
+        alertDef.getTags().put(Constants.POLICY_ID, "testEmptyPlugins");
+        alertDef.setNotificationDef("[]");
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.updateNotificationPlugins(alertDef, isDelete);
+        Assert.assertTrue(true);
+    }
+
+    @Ignore
+    @Test
+    public void testMultipleNotificationInstance() {
+        AlertAPIEntity alert = new AlertAPIEntity();
+        alert.setTags(new HashMap<String, String>());
+        alert.getTags().put(Constants.POLICY_ID, "testPlugin");
+        alert.setDescription("");
+        alert.setAlertContext(new AlertContext().toJsonString());
+
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.notifyAlert(alert);
+        Assert.assertTrue(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java
new file mode 100644
index 0000000..022526c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.notifications.testcases;
+
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+
+public class TestNotificationPluginUtils {
+    @Ignore
+    @Test
+    public void testDeserializeNotificationConfig() throws Exception {
+        String notificationDef = "[]";
+        List<Map<String,String>> list = NotificationPluginUtils.deserializeNotificationConfig(notificationDef);
+        Assert.assertTrue(list.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/application.conf
new file mode 100644
index 0000000..2c5e770
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/application.conf
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "cluster",
+    "topologyName" : "sandbox-hdfsAuditLog-topology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 1,
+      "hdfsAuditLogAlertExecutor*" : 1
+    }
+  },
+  "dataSourceConfig": {
+    "topic" : "sandbox_hdfs_audit_log",
+    "zkConnection" : "127.0.0.1:2181",
+    "brokerZkPath" : "/brokers",
+    "zkConnectionTimeoutMS" : 15000,
+    "fetchSize" : 1048586,
+    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+    "transactionZKServers" : "127.0.0.1",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/consumers",
+    "consumerGroupId" : "eagle.hdfsaudit.consumer",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+     "hdfsAuditLogAlertExecutor" : {
+       "parallelism" : 1,
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner",
+       "needValidation" : "true"
+     }
+  },
+  "eagleProps" : {
+    "site" : "sandbox",
+    "application": "hdfsAuditLog",
+  	"dataJoinPollIntervalSec" : 30,
+    "mailHost" : "mailhost.com",
+    "mailSmtpPort":"25",
+    "mailDebug" : "true",
+    "eagleService": {
+      "host": "localhost",
+      "port": 9099,
+      "username": "admin",
+      "password": "secret"
+    }
+  },
+  "dynamicConfigSource" : {
+  	"enabled" : true,
+  	"initDelayMillis" : 0,
+  	"delayMillis" : 30000
+  },
+  "eagleNotificationProps" : {
+    "kafka_broker":"192.168.56.101:6667"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3499c46
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+ eagle.log.dir=./logs
+ eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert-process/pom.xml
new file mode 100644
index 0000000..cdf321a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+	xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+	<modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.eagle</groupId>
+        <artifactId>eagle-alert-parent</artifactId>
+        <version>0.4.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+	<packaging>jar</packaging>
+	<artifactId>eagle-alert-process</artifactId>
+    <name>eagle-alert-process</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+    		<artifactId>eagle-embed-server</artifactId>
+    		<version>${project.version}</version>
+            <scope>test</scope>
+    	</dependency>
+		<dependency>
+        	<groupId>org.apache.eagle</groupId>
+    		<artifactId>eagle-embed-server</artifactId>
+    		<version>${project.version}</version>
+    	    <classifier>tests</classifier>
+    	    <scope>test</scope>
+    	</dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-embed-hbase</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-embed-hbase</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>eagle-alert-base</artifactId>
+			<version>${project.version}</version>
+        </dependency>
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>eagle-metric</artifactId>
+            <version>${project.version}</version>
+		</dependency>
+	  	<dependency>
+  			<groupId>org.wso2.siddhi</groupId>
+  			<artifactId>siddhi-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-simple</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.log4j.wso2</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+            </exclusions>
+  		</dependency>
+	  	<dependency>
+  			<groupId>org.wso2.siddhi</groupId>
+  			<artifactId>siddhi-extension-string</artifactId>
+  		</dependency>
+  		<dependency>
+          	<groupId>log4j</groupId>
+  			<artifactId>log4j</artifactId>
+        </dependency>
+      	<dependency>
+       		<groupId>org.slf4j</groupId>
+			<artifactId>log4j-over-slf4j</artifactId>
+      	</dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-process-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+	<dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-alert-notification-plugin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+	</dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>test-jar</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java
new file mode 100644
index 0000000..1e65408
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/DeduplicatorConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class DeduplicatorConfig implements Serializable{
+	private static final long serialVersionUID = 1L;
+
+	private int alertDedupIntervalMin;
+	private List<String> fields;
+
+	public int getAlertDedupIntervalMin() {
+		return alertDedupIntervalMin;
+	}
+
+	public void setAlertDedupIntervalMin(int alertDedupIntervalMin) {
+		this.alertDedupIntervalMin = alertDedupIntervalMin;
+	}
+	public List<String> getFields() {
+		return fields;
+	}
+
+	public void setFields(List<String> fields) {
+		this.fields = fields;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
new file mode 100644
index 0000000..f543d63
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+@Deprecated
+public class EmailNotificationConfig extends NotificationConfig{
+	private static final long serialVersionUID = 1L;
+	private String sender;
+	private String recipients;
+	private String tplFileName;
+	private String subject;
+	public String getSubject() {
+		return subject;
+	}
+	public void setSubject(String subject) {
+		this.subject = subject;
+	}
+	public String getRecipients() {
+		return recipients;
+	}
+	public void setRecipients(String recipients) {
+		this.recipients = recipients;
+	}
+	public String getSender() {
+		return sender;
+	}
+	public void setSender(String sender) {
+		this.sender = sender;
+	}
+	public String getTplFileName() {
+		return tplFileName;
+	}
+	public void setTplFileName(String tplFileName) {
+		this.tplFileName = tplFileName;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
new file mode 100644
index 0000000..60c87dc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+@Deprecated
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "flavor", visible=true)
+public class NotificationConfig implements Serializable{
+	private static final long serialVersionUID = 1L;
+	private String id;
+	private String flavor;
+
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+
+	public String getFlavor() {
+		return flavor;
+	}
+
+	public void setFlavor(String flavor) {
+		this.flavor = flavor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java
new file mode 100644
index 0000000..ec52508
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/Remediation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import java.io.Serializable;
+
+public class Remediation implements Serializable{
+	private static final long serialVersionUID = 1L;
+	private String id;
+
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
new file mode 100644
index 0000000..25e5cff
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.config.DeduplicatorConfig;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.policy.DynamicPolicyLoader;
+import org.apache.eagle.policy.PolicyLifecycleMethods;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import scala.Tuple2;
+
+public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
+	private static final long serialVersionUID = 1L;
+	private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
+	protected Config config;
+	protected DEDUP_TYPE dedupType;
+
+	private List<String> alertExecutorIdList;
+	private volatile CopyOnWriteHashMap<String, DefaultDeduplicator> alertDedups;
+	private PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao;
+
+	public enum DEDUP_TYPE {
+		ENTITY,
+		EMAIL
+	}
+
+	public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
+		this.alertExecutorIdList = alertExecutorIdList;
+		this.dedupType = dedupType;
+		this.dao = dao;
+	}
+	
+	@Override
+	public void prepareConfig(Config config) {
+		this.config = config;
+	}
+	
+	public DefaultDeduplicator createAlertDedup(AlertDefinitionAPIEntity alertDef) {
+		DeduplicatorConfig dedupConfig = null;
+		try {
+			dedupConfig = JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), DeduplicatorConfig.class);
+		}
+		catch (Exception ex) {
+			LOG.warn("Initial dedup Config error, " + ex.getMessage());
+		}
+
+        if (dedupConfig != null) {
+			return new DefaultDeduplicator(dedupConfig.getAlertDedupIntervalMin(), dedupConfig.getFields());
+		}
+
+		return null;
+	}
+	
+	@Override
+	public void init() {		
+        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
+        String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION);
+	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;	    	    
+	    try {
+	 		initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
+	    }
+	    catch (Exception ex) {
+ 			LOG.error("fail to initialize initialAlertDefs: ", ex);
+	        throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
+        }
+	    Map<String, DefaultDeduplicator> tmpDeduplicators = new HashMap<>();
+        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+            LOG.warn("No alert definitions was found for site: "+site+", dataSource: "+dataSource);
+        } else {
+		    for (String alertExecutorId: alertExecutorIdList) {
+			    if(initialAlertDefs.containsKey(alertExecutorId)){
+                    for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){
+                       try {
+                          DefaultDeduplicator deduplicator = createAlertDedup(alertDef);
+                          if (deduplicator != null)
+                              tmpDeduplicators.put(alertDef.getTags().get(Constants.POLICY_ID), deduplicator);
+                          else LOG.warn("The dedup interval is not set, alertDef: " + alertDef);
+                        }
+                        catch (Throwable t) {
+                            LOG.error("Got an exception when initial dedup config, probably dedup config is not set: " + t.getMessage() + "," + alertDef);
+                        }
+                    }
+                } else {
+                    LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
+                }
+		    }
+        }
+
+		alertDedups = new CopyOnWriteHashMap<>();
+		alertDedups.putAll(tmpDeduplicators);
+		DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
+		policyLoader.init(initialAlertDefs, dao, config);
+		for (String alertExecutorId : alertExecutorIdList) {
+		 	policyLoader.addPolicyChangeListener(alertExecutorId, this);
+		}
+	}
+
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){
+        String policyId = (String) input.get(0);
+        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
+        DefaultDeduplicator dedup;
+        synchronized(alertDedups) {
+            dedup = alertDedups.get(policyId);
+        }
+
+        List<AlertAPIEntity> ret = Arrays.asList(alertEntity);
+        if (dedup == null) {
+            LOG.warn("Dedup config for policyId " + policyId + " is not set or is not a valid config");
+        } else {
+            if (dedup.getDedupIntervalMin() == -1) {
+                LOG.warn("the dedup interval is set as -1, which mean all alerts should be deduped(skipped)");
+                return;
+            }
+            ret = dedup.dedup(ret);
+        }
+        for (AlertAPIEntity entity : ret) {
+            outputCollector.collect(new Tuple2(policyId, entity));
+        }
+    }
+
+	public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
+		if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be added : " + added);
+		for(AlertDefinitionAPIEntity alertDef : added.values()){
+			LOG.info("Alert dedup config really added " + alertDef);
+			DefaultDeduplicator dedup = createAlertDedup(alertDef);
+			if (dedup != null) {
+				synchronized(alertDedups) {		
+					alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
+				}
+			}
+		}
+	}
+	
+	public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
+		LOG.info("Alert dedup config changed : " + changed);
+		for(AlertDefinitionAPIEntity alertDef : changed.values()){
+			LOG.info("Alert dedup config really changed " + alertDef);
+			DefaultDeduplicator dedup = createAlertDedup(alertDef);
+			if (dedup != null) {
+				synchronized(alertDedups) {
+					alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
+				}
+			}
+		}
+	}
+	
+	public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
+		LOG.info("alert dedup config deleted : " + deleted);
+		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
+			LOG.info("alert dedup config deleted " + alertDef);
+			// no cleanup to do, just remove it
+			synchronized(alertDedups) {		
+				alertDedups.remove(alertDef.getTags().get(Constants.POLICY_ID));
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
new file mode 100644
index 0000000..8947d2c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+
+import java.util.List;
+
+public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorBase {
+
+	private static final long serialVersionUID = 1L;
+
+	public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
+		super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
new file mode 100644
index 0000000..b30dbda
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.List;
+
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase {
+
+	private static final long serialVersionUID = 1L;
+
+	public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
+		super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
new file mode 100644
index 0000000..1d79f9f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultDeduplicator implements EntityDeduplicator {
+	protected long dedupIntervalMin;
+	protected List<String> fields;
+	protected Map<EntityDedupKey, Long> entites = new HashMap<>();
+	public static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+	
+	public static enum AlertDeduplicationStatus{
+		NEW,
+		DUPLICATED,
+		IGNORED
+	}
+	
+	public DefaultDeduplicator() {
+		this.dedupIntervalMin = 0;
+		fields = null;
+	}
+
+	public DefaultDeduplicator(long intervalMin, List<String> fields) {
+		this.dedupIntervalMin = intervalMin;
+		this.fields = fields;
+	}
+	
+	public void clearOldCache() {
+		List<EntityDedupKey> removedkeys = new ArrayList<>();
+		for (Entry<EntityDedupKey, Long> entry : entites.entrySet()) {
+			EntityDedupKey entity = entry.getKey();
+			if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
+				removedkeys.add(entry.getKey());
+			}
+		}
+		for (EntityDedupKey alertKey : removedkeys) {
+			entites.remove(alertKey);
+		}
+	}
+	
+	public AlertDeduplicationStatus checkDedup(EntityDedupKey key){
+		long current = key.timestamp;
+		if(!entites.containsKey(key)){
+			entites.put(key, current);
+			return AlertDeduplicationStatus.NEW;
+		}
+		
+		long last = entites.get(key);
+		if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE){
+			entites.put(key, current);
+			return AlertDeduplicationStatus.DUPLICATED;
+		}
+		
+		return AlertDeduplicationStatus.IGNORED;
+	}
+
+	private List<String> getKeyList(AlertAPIEntity entity) {
+		List<String> keys = new ArrayList<>(entity.getTags().values());
+		if(fields != null && !fields.isEmpty()) {
+			for (String field: fields) {
+				AlertContext context = entity.getWrappedAlertContext();
+				keys.add(context.getProperty(field));
+			}
+		}
+		return keys;
+	}
+
+	public List<AlertAPIEntity> dedup(List<AlertAPIEntity> list) {
+		clearOldCache();
+		List<AlertAPIEntity> dedupList = new ArrayList<>();
+        int totalCount = list.size();
+        int dedupedCount = 0;
+		for(AlertAPIEntity entity: list) {
+			if (entity.getTags() == null) {
+				if(LOG.isDebugEnabled()) LOG.debug("Tags is null, don't know how to deduplicate, do nothing");
+			} else {
+                AlertDeduplicationStatus status = checkDedup(new EntityDedupKey(getKeyList(entity), entity.getTimestamp()));
+                if (!status.equals(AlertDeduplicationStatus.IGNORED)) {
+                    dedupList.add(entity);
+                } else {
+                    dedupedCount++;
+                    if (LOG.isDebugEnabled())
+                        LOG.debug(String.format("Entity is skipped because it's duplicated: " + entity.toString()));
+                }
+            }
+		}
+
+        if(dedupedCount>0){
+            LOG.info(String.format("Skipped %s of %s alerts because they are duplicated", dedupedCount, totalCount));
+        }else if(LOG.isDebugEnabled()){
+            LOG.debug(String.format("Skipped %s of %s duplicated alerts",dedupedCount,totalCount));
+        }
+
+		return dedupList;
+	}
+
+	public EntityDeduplicator setDedupIntervalMin(long dedupIntervalMin) {
+		this.dedupIntervalMin = dedupIntervalMin;
+		return this;
+	}
+	
+	public long getDedupIntervalMin() {
+		return dedupIntervalMin;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
new file mode 100644
index 0000000..36b83e1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.dedup;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class EntityDedupKey {
+    public List<String> values;
+    public Long timestamp;     // entity's timestamp
+    public long createdTime; // entityTagsUniq's created time, for cache removal;
+
+    private static final Logger LOG = LoggerFactory.getLogger(EntityDedupKey.class);
+
+    public EntityDedupKey(List<String> values, long timestamp) {
+        this.values = new ArrayList<>(values);
+        this.timestamp = timestamp;
+        this.createdTime = System.currentTimeMillis();
+    }
+
+    public boolean equals(Object obj) {
+        if (obj instanceof EntityDedupKey) {
+            EntityDedupKey key = (EntityDedupKey) obj;
+            if (key == null || key.values.size() != values.size()) {
+                return false;
+            }
+            return values.equals(key.values);
+        }
+        return false;
+    }
+
+    public int hashCode() {
+        HashCodeBuilder builder = new HashCodeBuilder();
+        for (String value : values) {
+            builder.append(value);
+        }
+        return builder.build();
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
new file mode 100644
index 0000000..85dd19a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.List;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+/**
+ * Dedup Eagle entities.
+ */
+public interface EntityDeduplicator {
+	
+	EntityDeduplicator setDedupIntervalMin(long intervalMin);
+	
+	long getDedupIntervalMin();
+	
+	List dedup(List<AlertAPIEntity> list);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
new file mode 100644
index 0000000..81c8ba6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.dedup;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Mar 19, 2015
+ */
+public class EntityTagsUniq {
+	public Map<String, String> tags;
+	public Long timestamp;	 // entity's timestamp
+	public long createdTime; // entityTagsUniq's created time, for cache removal;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(EntityTagsUniq.class);
+	
+	public EntityTagsUniq(Map<String, String> tags, long timestamp) {
+		this.tags = new HashMap<String, String>(tags);
+		this.timestamp = timestamp;
+		this.createdTime = System.currentTimeMillis();
+	}
+	
+	@Override	
+	public boolean equals(Object obj) {		
+		if (obj instanceof EntityTagsUniq) {
+			EntityTagsUniq au = (EntityTagsUniq) obj;
+			if (tags.size() != au.tags.size()) return false;
+			for (Entry<String, String> keyValue : au.tags.entrySet()) {
+				boolean keyExist = tags.containsKey(keyValue.getKey());
+				// sanity check
+				if (tags.get(keyValue.getKey()) == null || keyValue.getValue() == null) {
+					return true;
+				}
+				if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) {				
+					return false;
+				}
+			}
+			return true; 
+		}
+		return false;
+	}
+	
+	@Override
+	public int hashCode() {	
+		int hashCode = 0;
+		for (Map.Entry<String,String> entry : tags.entrySet()) {
+            if(entry.getValue() == null) {
+                LOG.warn("Tag value for key ["+entry.getKey()+"] is null, skipped for hash code");
+            }else {
+                try {
+                    hashCode ^= entry.getValue().hashCode();
+                } catch (Throwable t) {
+                    LOG.info("Got exception because of entry: " + entry, t);
+                }
+            }
+		}
+		return hashCode;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
new file mode 100644
index 0000000..2eee6c5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.executor;
+
+import org.apache.eagle.policy.ResultRender;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.policy.PolicyPartitioner;
+import org.apache.eagle.policy.executor.PolicyProcessExecutor;
+import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
+
+public class AlertExecutor extends PolicyProcessExecutor<AlertDefinitionAPIEntity, AlertAPIEntity> {
+
+	private final SiddhiAlertAPIEntityRender resultRender = new SiddhiAlertAPIEntityRender();
+
+	public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
+			PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) {
+		super(alertExecutorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams,
+				AlertDefinitionAPIEntity.class);
+	}
+
+	@Override
+	public ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity> getResultRender() {
+		return resultRender;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
new file mode 100644
index 0000000..8ab290e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.executor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.policy.DefaultPolicyPartitioner;
+import org.apache.eagle.policy.PolicyPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+
+/**
+ * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
+ *
+ * <br/><br/>
+ * Explanations for programId, alertExecutorId and policy<br/><br/>
+ * - programId - distributed or single-process program for example one storm topology<br/>
+ * - alertExecutorId - one process/thread which executes multiple policies<br/>
+ * - policy - some rules to be evaluated<br/>
+ *
+ * <br/>
+ *
+ * Normally the mapping is like following:
+ * <pre>
+ * programId (1:N) alertExecutorId
+ * alertExecutorId (1:N) policy
+ * </pre>
+ */
+public class AlertExecutorCreationUtils {
+	private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class);
+
+
+    /**
+     * Build DAG Tasks based on persisted alert definition and schemas from eagle store.
+     *
+     * <h3>Require configuration:</h3>
+     *
+     * <ul>
+     * <li>eagleProps.site: program site id.</li>
+     * <li>eagleProps.dataSource: program data source.</li>
+     * <li>alertExecutorConfigs: only configured executor will be built into execution tasks.</li>
+     * </ul>
+     *
+     * <h3>Steps:</h3>
+     *
+     * <ol>
+     * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li>
+     * <li>(dataSource) => Map[alertExecutorId:String,streamName:List[String]]</li>
+     * <li>(site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]</li>
+     * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]</li>
+     * </ol>
+     */
+	public static AlertExecutor[] createAlertExecutors(Config config, PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefDAO,
+			List<String> streamNames, String alertExecutorId) throws Exception{
+		// Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId
+        int numPartitions =1;
+        String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName();
+        String alertExecutorConfigsKey = "alertExecutorConfigs";
+        if(config.hasPath(alertExecutorConfigsKey)) {
+            Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey);
+            if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorId)) {
+                Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
+                int parts = 0;
+                if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
+                numPartitions = parts == 0 ? 1 : parts;
+                if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner");
+            }
+        }
+
+        return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls);
+	}
+
+    /**
+     * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"])
+     */
+	public static AlertExecutor[] createAlertExecutors(PolicyDefinitionDAO alertDefDAO, List<String> sourceStreams,
+                                                          String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
+		LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls);
+
+        // TODO: Create sourceStreams with alertExecutorID into AlertExecutorService
+
+		PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance();
+		AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions];
+        String[] _sourceStreams = sourceStreams.toArray(new String[0]);
+
+		for(int i = 0; i < numPartitions; i++){
+			alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams);
+		}	
+		return alertExecutors;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
new file mode 100644
index 0000000..af42dd3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.notification;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.common.AlertEmailSender;
+import org.apache.eagle.alert.email.AlertEmailComponent;
+import org.apache.eagle.alert.email.AlertEmailContext;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.ConfigObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Deprecated
+public class AlertEmailGenerator{
+	private String tplFile;
+	private String sender;
+	private String recipients;
+	private String subject;
+	private ConfigObject eagleProps;
+
+    private ThreadPoolExecutor executorPool;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    public void sendAlertEmail(AlertAPIEntity entity) {
+		sendAlertEmail(entity, recipients, null);
+	}
+	
+	public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
+		sendAlertEmail(entity, recipients, null);	
+	}
+	
+	public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
+		AlertEmailContext email = new AlertEmailContext();
+		
+		AlertEmailComponent component = new AlertEmailComponent();
+		component.setAlertContext(AlertContext.fromJsonString(entity.getAlertContext()));
+		List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
+		components.add(component);		
+		email.setComponents(components);
+		if (AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT) != null) {
+			email.setSubject(AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT));
+		}
+		else email.setSubject(subject);
+		email.setVelocityTplFile(tplFile);
+		email.setRecipients(recipients);
+		email.setCc(cc);
+		email.setSender(sender);
+		
+		/** asynchronized email sending */
+		@SuppressWarnings("rawtypes")
+        AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
+
+        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+
+        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
+        Future future = this.executorPool.submit(thread);
+        try {
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            LOG.info(String.format("Successfully send email to %s", recipients));
+        } catch (InterruptedException | ExecutionException  e) {
+            LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
+        } catch (TimeoutException e) {
+            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
+        }
+    }
+	
+	public String getTplFile() {
+		return tplFile;
+	}
+	
+	public void setTplFile(String tplFile) {
+		this.tplFile = tplFile;
+	}
+
+	public String getSender() {
+		return sender;
+	}
+
+	public void setSender(String sender) {
+		this.sender = sender;
+	}
+
+	public String getRecipients() {
+		return recipients;
+	}
+
+	public void setRecipients(String recipients) {
+		this.recipients = recipients;
+	}
+
+	public String getSubject() {
+		return subject;
+	}
+
+	public void setSubject(String subject) {
+		this.subject = subject;
+	}
+
+	public ConfigObject getEagleProps() {
+		return eagleProps;
+	}
+
+	public void setEagleProps(ConfigObject eagleProps) {
+		this.eagleProps = eagleProps;
+	}
+
+    public void setExecutorPool(ThreadPoolExecutor executorPool) {
+        this.executorPool = executorPool;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
new file mode 100644
index 0000000..b024c39
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.notification;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.policy.DynamicPolicyLoader;
+import org.apache.eagle.policy.PolicyLifecycleMethods;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import scala.Tuple1;
+
+/**
+ * notify alert by email, kafka message, storage or other means
+ */
+public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
+	private static final long serialVersionUID = 1690354365435407034L;
+	private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
+	private Config config;
+	/** Notification Manager - Responsible for forward and invoke configured Notification Plugin **/
+	private NotificationPluginManagerImpl notificationManager;
+
+	private List<String> alertExecutorIdList;
+	private PolicyDefinitionDAO dao;
+
+
+    public AlertNotificationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
+		this.alertExecutorIdList = alertExecutorIdList;
+		this.dao = dao;
+	}
+
+	@Override
+	public void init() {
+		String site = config.getString("eagleProps.site");
+		String application = config.getString("eagleProps.application");
+		Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
+		try {
+			initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId( site, application );
+		}
+		catch (Exception ex) {
+			LOG.error("fail to initialize initialAlertDefs: ", ex);
+			throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
+		}
+
+		if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+			LOG.warn("No alert definitions found for site: "+site+", application: "+ application);
+		}
+		try{
+			notificationManager = new NotificationPluginManagerImpl(config);
+		}catch (Exception ex ){
+			LOG.error("Fail to initialize NotificationManager: ", ex);
+			throw new IllegalStateException("Fail to initialize NotificationManager: ", ex);
+		}
+
+		DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
+		policyLoader.init(initialAlertDefs, dao, config);
+		for (String alertExecutorId : alertExecutorIdList) {
+			policyLoader.addPolicyChangeListener(alertExecutorId, this);
+		}
+	}
+
+	@Override
+	public void prepareConfig(Config config) {
+		this.config = config;
+	}
+
+	@Override
+	public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
+		AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
+		processAlerts(Arrays.asList(alertEntity));
+	}
+
+	private void processAlerts(List<AlertAPIEntity> list) {
+		for (AlertAPIEntity entity : list) {
+			notificationManager.notifyAlert(entity);
+		}
+	}
+
+	@Override
+	public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
+		if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
+		for(AlertDefinitionAPIEntity alertDef : added.values()){
+			LOG.info("alert notification config really changed " + alertDef);
+			notificationManager.updateNotificationPlugins( alertDef , false );
+		}
+	}
+
+	@Override
+	public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
+		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
+		for(AlertDefinitionAPIEntity alertDef : changed.values()){
+			LOG.info("alert notification config really added " + alertDef);
+			notificationManager.updateNotificationPlugins( alertDef , false );
+		}
+	}
+
+	@Override
+	public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
+		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
+		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
+			LOG.info("alert notification config really deleted " + alertDef);
+			notificationManager.updateNotificationPlugins( alertDef , true );
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
new file mode 100644
index 0000000..61bb7dc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.persist;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import scala.Tuple1;
+
+import java.util.Arrays;
+
+public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> {
+
+	private static final long serialVersionUID = 1L;
+	private Config config;
+	private EaglePersist persist;
+
+	public AlertPersistExecutor(){
+	}
+    @Override
+	public void prepareConfig(Config config) {
+		this.config = config;		
+	}
+
+    @Override
+	public void init() {
+		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+		String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME)
+				? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
+		String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)
+				? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
+		this.persist = new EaglePersist(host, port, username, password);
+	}
+
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
+        persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1))));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
new file mode 100644
index 0000000..ebba518
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.persist;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class EaglePersist {
+		
+	private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class);
+	private String eagleServiceHost;
+	private int eagleServicePort;
+	private String username;
+	private String password;
+
+	public EaglePersist(String eagleServiceHost, int eagleServicePort) {
+		this(eagleServiceHost, eagleServicePort, null, null);
+	}
+
+	public EaglePersist(String eagleServiceHost, int eagleServicePort, String username, String password) {
+		this.eagleServiceHost = eagleServiceHost;
+		this.eagleServicePort = eagleServicePort;
+		this.username = username;
+		this.password = password;
+	}
+	
+	public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
+		if (list.isEmpty()) return false;
+		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+			GenericServiceAPIResponseEntity<String> response = client.create(list);
+			client.close();
+			if (response.isSuccess()) {
+				LOG.info("Successfully create entities " + list.toString());
+				return true;
+			}
+			else {
+				LOG.error("Fail to create entities");
+				return false;
+			}
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception in persisting entities" + ex.getMessage(), ex);
+			return false;
+		}
+	}
+}


Mime
View raw message