eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [3/3] incubator-eagle git commit: notification plugin framework
Date Wed, 17 Feb 2016 17:44:16 GMT
notification plugin framework


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/88d0126c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/88d0126c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/88d0126c

Branch: refs/heads/master
Commit: 88d0126cb2e8a74f01aef968c553de763d1e5212
Parents: caa2dfe
Author: yonzhang <yonzhang@ebay.com>
Authored: Wed Feb 17 09:43:40 2016 -0800
Committer: yonzhang <yonzhang@ebay.com>
Committed: Wed Feb 17 09:43:40 2016 -0800

----------------------------------------------------------------------
 eagle-assembly/src/assembly/eagle-bin.xml       |   8 -
 .../src/main/bin/eagle-create-table.rb          |   1 +
 .../eagle-alert-notification-plugin/pom.xml     |  90 ++++++
 .../base/NotificationConstants.java             |  31 +++
 .../notification/base/NotificationStatus.java   |  26 ++
 .../notification/dao/AlertNotificationDAO.java  |  36 +++
 .../dao/AlertNotificationDAOImpl.java           |  70 +++++
 .../notification/email/AlertEmailComponent.java |  32 +++
 .../notification/email/AlertEmailContext.java   |  69 +++++
 .../notification/email/AlertEmailGenerator.java | 136 +++++++++
 .../email/AlertEmailGeneratorBuilder.java       |  59 ++++
 .../notification/email/AlertEmailSender.java    | 157 +++++++++++
 .../plugin/AlertEagleStorePersister.java        |  85 ++++++
 .../plugin/AlertEagleStorePlugin.java           |  99 +++++++
 .../notification/plugin/AlertEmailPlugin.java   | 142 ++++++++++
 .../notification/plugin/AlertKafkaPlugin.java   | 132 +++++++++
 .../plugin/KafkaProducerSingleton.java          |  44 +++
 .../notification/plugin/NotificationPlugin.java |  59 ++++
 .../plugin/NotificationPluginLoader.java        |  92 +++++++
 .../plugin/NotificationPluginManager.java       |  39 +++
 .../plugin/NotificationPluginManagerImpl.java   | 158 +++++++++++
 .../utils/NotificationPluginUtils.java          |  68 +++++
 .../src/main/resources/ALERT_DEFAULT.vm         | 275 +++++++++++++++++++
 .../src/main/resources/application.conf         |  69 +++++
 .../main/resources/notification-plugins-init.sh |  66 +++++
 .../testcases/TestAlertEagleStorePlugin.java    |  48 ++++
 .../testcases/TestAlertEmailPlugin.java         |  56 ++++
 .../testcases/TestAlertKafkaPlugin.java         |  61 ++++
 .../testcases/TestGetAllNotifications.java      |  41 +++
 .../testcases/TestNotificationPluginLoader.java |  41 +++
 .../src/test/resources/application.conf         |  69 +++++
 .../src/test/resources/log4j.properties         |  35 +++
 .../eagle-alert/eagle-alert-process/pom.xml     |   5 +
 .../alert/notification/AlertEmailGenerator.java | 134 ---------
 .../AlertEmailGeneratorBuilder.java             |  59 ----
 .../notification/AlertNotificationExecutor.java | 160 +++--------
 eagle-core/eagle-alert/pom.xml                  |   3 +-
 .../eagle-stream-pipeline/pom.xml               |  11 +-
 .../eagle/dataproc/core/JsonSerDeserUtils.java  |   1 +
 .../org/apache/eagle/ml/MLPolicyEvaluator.java  |   3 +-
 .../alert/entity/AlertEntityRepository.java     |   1 +
 .../alert/entity/AlertNotificationEntity.java   |  66 +++++
 .../apache/eagle/policy/common/Constants.java   |   3 +-
 .../policy/executor/PolicyProcessExecutor.java  |  17 +-
 eagle-examples/eagle-topology-example/pom.xml   |  70 +++++
 .../eagle-topology-example-assembly.xml         |  63 +++++
 .../NotificationPluginTestMain.java             |  81 ++++++
 .../eagle/example/persist/MetricSerializer.java |  31 +++
 .../example/persist/PersistTopoTestMain.java    | 115 ++++++++
 .../example/persist/PersistTopoTestMain2.java   |  60 ++++
 .../add-notification-for-plugin-test.sh         |  50 ++++
 .../main/resources/application-plugintest.conf  |  59 ++++
 .../src/main/resources/application.conf         |  86 ++++++
 .../resources/create-policy-for-plugin-test.sh  |  99 +++++++
 .../resources/delete-email-for-plugin-test.sh   |  50 ++++
 .../src/main/resources/log4j.properties         |  40 +++
 .../main/resources/persist-test-topo-init.sh    | 194 +++++++++++++
 .../src/main/resources/persit-test-storm.yaml   |  18 ++
 eagle-examples/pom.xml                          |  35 +++
 eagle-samples/eagle-persist-sample/pom.xml      |  70 -----
 .../assembly/eagle-persist-sample-assembly.xml  |  63 -----
 .../eagle/persist/test/MetricSerializer.java    |  31 ---
 .../eagle/persist/test/PersistTopoTestMain.java | 115 --------
 .../persist/test/PersistTopoTestMain2.java      |  60 ----
 .../src/main/resources/application.conf         |  86 ------
 .../src/main/resources/log4j.properties         |  40 ---
 .../main/resources/persist-test-topo-init.sh    | 194 -------------
 .../src/main/resources/persit-test-storm.yaml   |  18 --
 eagle-samples/pom.xml                           |  36 ---
 eagle-topology-assembly/pom.xml                 |   5 +
 pom.xml                                         |  37 +--
 71 files changed, 3592 insertions(+), 1071 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-assembly/src/assembly/eagle-bin.xml
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/assembly/eagle-bin.xml b/eagle-assembly/src/assembly/eagle-bin.xml
index 1c1a00f..2a04fdc 100644
--- a/eagle-assembly/src/assembly/eagle-bin.xml
+++ b/eagle-assembly/src/assembly/eagle-bin.xml
@@ -218,14 +218,6 @@
             </excludes>
         </fileSet>
 
-        <!--<fileSet>-->
-            <!--<directory>${project.basedir}/../eagle-samples/eagle-persist-sample/target/</directory>-->
-            <!--<outputDirectory>lib/topology</outputDirectory>-->
-            <!--<includes>-->
-                <!--<include>eagle-persist-sample-*-assembly.jar</include>-->
-            <!--</includes>-->
-        <!--</fileSet>-->
-
         <fileSet>
             <directory>${project.basedir}/../eagle-external/eagle-ambari</directory>
             <outputDirectory>lib/ambari</outputDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-assembly/src/main/bin/eagle-create-table.rb
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-create-table.rb b/eagle-assembly/src/main/bin/eagle-create-table.rb
index 21dc030..cbef6f4 100644
--- a/eagle-assembly/src/main/bin/eagle-create-table.rb
+++ b/eagle-assembly/src/main/bin/eagle-create-table.rb
@@ -56,5 +56,6 @@ createEagleTable(admin, 'appCommand')
 createEagleTable(admin, 'appDefinition')
 createEagleTable(admin, 'serviceAudit')
 createEagleTable(admin, 'aggregatedef')
+createEagleTable(admin, 'alertNotifications')
 
 exit

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml b/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml
new file mode 100644
index 0000000..b73d43b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml
@@ -0,0 +1,90 @@
+<!--
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>eagle</groupId>
+    <artifactId>eagle-alert-parent</artifactId>
+    <version>0.3.0</version>
+  </parent>
+  <groupId>eagle</groupId>
+  <artifactId>eagle-alert-notification-plugin</artifactId>
+  <name>eagle-alert-notification-plugin</name>
+  <description>Apache Eagle Notification Plugin to  enable services to use custom or default notification </description>
+  <dependencies>
+  	<dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-policy-base</artifactId>
+          <version>${project.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>log4j-over-slf4j</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-simple</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+  	<dependency>
+	    <groupId>org.apache.kafka</groupId>
+	    <artifactId>kafka-clients</artifactId>
+	</dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.module</groupId>
+          <artifactId>jackson-module-scala_${scala.version}</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-mapper-asl</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-core-asl</artifactId>
+      </dependency>
+
+      <dependency>
+          <groupId>org.reflections</groupId>
+          <artifactId>reflections</artifactId>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-simple</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+      </dependency>
+  </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
new file mode 100644
index 0000000..98d6b5c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.base;
+
+public class NotificationConstants {
+    public static final String NOTIFICATION_TYPE = "notificationType";
+    public static final String EMAIL_NOTIFICATION = "email";
+    public static final String KAFKA_STORE = "kafka";
+    public static final String EAGLE_STORE = "eagleStore";
+
+    // email specific constants
+    public static final String SUBJECT = "subject";
+    public static final String SENDER = "sender";
+    public static final String RECIPIENTS = "recipients";
+    public static final String TPL_FILE_NAME = "tplFileName";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
new file mode 100644
index 0000000..d57dbf9
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.base;
+
+/**
+ * Object that holds the status of Notification Posted to Notification Plugin  
+ */
+public class NotificationStatus {
+	public boolean successful;
+	public String errorMessage;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
new file mode 100644
index 0000000..bf2e75e
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.dao;
+
+
+
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+
+import java.util.List;
+
+/**
+ * Alert Notification Data Access Obj Interface
+ */
+public interface AlertNotificationDAO {
+    /**
+     * find the Alert Notification Types by querying alertNotifications Table
+     * @return
+     * @throws Exception
+     */
+    List<AlertNotificationEntity> findAlertNotificationTypes() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java
new file mode 100644
index 0000000..7251da3
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.dao;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Notification Service API implementation which Provides Read/Write API's of Hbase AlertNotifications Table
+ */
+public class AlertNotificationDAOImpl implements  AlertNotificationDAO {
+
+    private final Logger LOG = LoggerFactory.getLogger(AlertNotificationDAOImpl.class);
+    private final EagleServiceConnector connector;
+
+    public AlertNotificationDAOImpl(EagleServiceConnector connector){
+        this.connector = connector;
+    }
+
+    /**
+     * Find the Alerts by NotificationType
+     * @return
+     * @throws Exception
+     */
+    @Override
+    public List<AlertNotificationEntity> findAlertNotificationTypes() throws Exception {
+        try{
+            IEagleServiceClient client = new EagleServiceClientImpl(connector);
+            String query = Constants.ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME+"[@enabled=\"true\"]{*}";
+            GenericServiceAPIResponseEntity response = client.search(query).startTime(0)
+                    .endTime(10 * DateUtils.MILLIS_PER_DAY)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .send();
+            client.close();
+            if (response.getException() != null) {
+                throw new Exception("Got an exception when query eagle service: " + response.getException());
+            }
+            return response.getObj();
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception when query alert notification service ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
new file mode 100644
index 0000000..3fe55bf
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import org.apache.eagle.common.metric.AlertContext;
+
+/**
+ * Alert email component is one part of an email, which could be an individual alert
+ */
+public class AlertEmailComponent {
+    private AlertContext alertContext;
+    public AlertContext getAlertContext() {
+        return alertContext;
+    }
+    public void setAlertContext(AlertContext alertContext) {
+        this.alertContext = alertContext;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java
new file mode 100644
index 0000000..f1642be
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.List;
+
+/**
+ * alert email bean
+ * one email consists of a list of email component
+ */
+public class AlertEmailContext {
+    private List<AlertEmailComponent> components;
+    private String sender;
+    private String subject;
+    private String recipients;
+    private String velocityTplFile;
+    private String cc;
+
+    public List<AlertEmailComponent> getComponents() {
+        return components;
+    }
+    public void setComponents(List<AlertEmailComponent> components) {
+        this.components = components;
+    }
+    public String getVelocityTplFile() {
+        return velocityTplFile;
+    }
+    public void setVelocityTplFile(String velocityTplFile) {
+        this.velocityTplFile = velocityTplFile;
+    }
+    public String getRecipients() {
+        return recipients;
+    }
+    public void setRecipients(String recipients) {
+        this.recipients = recipients;
+    }
+    public String getSender() {
+        return sender;
+    }
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+    public String getSubject() {
+        return subject;
+    }
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+    public String getCc() {
+        return cc;
+    }
+    public void setCc(String cc) {
+        this.cc = cc;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
new file mode 100644
index 0000000..ddd3c96
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.ConfigObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailGenerator{
+    private String tplFile;
+    private String sender;
+    private String recipients;
+    private String subject;
+    private ConfigObject eagleProps;
+
+    private ThreadPoolExecutor executorPool;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    public boolean sendAlertEmail(AlertAPIEntity entity) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertAPIEntity entity, String recipients) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
+        boolean sentSuccessfully = false;
+        AlertEmailContext email = new AlertEmailContext();
+
+        AlertEmailComponent component = new AlertEmailComponent();
+        component.setAlertContext(entity.getAlertContext());
+        List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
+        components.add(component);
+        email.setComponents(components);
+        if (entity.getAlertContext().getProperty(Constants.SUBJECT) != null) {
+            email.setSubject(entity.getAlertContext().getProperty(Constants.SUBJECT));
+        }
+        else email.setSubject(subject);
+        email.setVelocityTplFile(tplFile);
+        email.setRecipients(recipients);
+        email.setCc(cc);
+        email.setSender(sender);
+
+        /** asynchronized email sending */
+        @SuppressWarnings("rawtypes")
+        AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
+
+        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+
+        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
+        Future future = this.executorPool.submit(thread);
+        try {
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email to %s", recipients));
+        } catch (InterruptedException | ExecutionException  e) {
+            sentSuccessfully = false;
+            LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
+        } catch (TimeoutException e) {
+            sentSuccessfully = false;
+            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
+        }
+        return sentSuccessfully;
+    }
+
+    public String getTplFile() {
+        return tplFile;
+    }
+
+    public void setTplFile(String tplFile) {
+        this.tplFile = tplFile;
+    }
+
+    public String getSender() {
+        return sender;
+    }
+
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+
+    public String getRecipients() {
+        return recipients;
+    }
+
+    public void setRecipients(String recipients) {
+        this.recipients = recipients;
+    }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public ConfigObject getEagleProps() {
+        return eagleProps;
+    }
+
+    public void setEagleProps(ConfigObject eagleProps) {
+        this.eagleProps = eagleProps;
+    }
+
+    public void setExecutorPool(ThreadPoolExecutor executorPool) {
+        this.executorPool = executorPool;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
new file mode 100644
index 0000000..2e63dab
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailGeneratorBuilder {
+    private AlertEmailGenerator generator;
+    private AlertEmailGeneratorBuilder(){
+        generator = new AlertEmailGenerator();
+    }
+    public static AlertEmailGeneratorBuilder newBuilder(){
+        return new AlertEmailGeneratorBuilder();
+    }
+    public AlertEmailGeneratorBuilder withSubject(String subject){
+        generator.setSubject(subject);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withSender(String sender){
+        generator.setSender(sender);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withRecipients(String recipients){
+        generator.setRecipients(recipients);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+        generator.setTplFile(tplFile);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
+        generator.setEagleProps(eagleProps);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
+        generator.setExecutorPool(threadPoolExecutor);
+        return this;
+    }
+
+    public AlertEmailGenerator build(){
+        return this.generator;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
new file mode 100644
index 0000000..2b18f1e
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.velocity.VelocityContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.email.EagleMailClient;
+import com.netflix.config.ConcurrentMapConfiguration;
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailSender implements Runnable {
+
+    protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();
+    protected final String configFileName;
+    protected final String subject;
+    protected final String sender;
+    protected final String recipents;
+    protected final String cc;
+    protected final String origin;
+    protected boolean sentSuccessfully = false;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
+    private final static int MAX_RETRY_COUNT = 3;
+
+    private static final String MAIL_HOST = "mail.host";
+    private static final String MAIL_PORT = "mail.smtp.port";
+    private static final String MAIL_DEBUG = "mail.debug";
+
+    private static final String CONF_KEY_MAIL_HOST = "mailHost";
+    private static final String CONF_KEY_MAIL_PORT = "mailSmtpPort";
+    private static final String CONF_KEY_MAIL_DEBUG = "mailDebug";
+
+    private ConfigObject eagleProps;
+
+
+    private String threadName;
+    /**
+     * Derived class may have some additional context properties to add
+     * @param context velocity context
+     * @param env environment
+     */
+    protected void additionalContext(VelocityContext context, String env) {
+        // By default there's no additional context added
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail){
+        this.recipents = alertEmail.getRecipients();
+        this.configFileName = alertEmail.getVelocityTplFile();
+        this.subject = alertEmail.getSubject();
+        this.sender = alertEmail.getSender();
+        this.cc = alertEmail.getCc();
+        for(AlertEmailComponent bean : alertEmail.getComponents()){
+            this.alertContexts.add(bean.getAlertContext().getProperties());
+        }
+        String tmp = ManagementFactory.getRuntimeMXBean().getName();
+        this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
+        threadName = Thread.currentThread().getName();
+        LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipents+", velocity TPL file: " + this.configFileName);
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail, ConfigObject eagleProps){
+        this(alertEmail);
+        this.eagleProps = eagleProps;
+    }
+
+    @Override
+    public void run() {
+        int count = 0;
+        boolean success = false;
+        while(count++ < MAX_RETRY_COUNT && !success){
+            LOG.info("Sending email, tried: " + count+", max: "+MAX_RETRY_COUNT);
+            try {
+                final EagleMailClient client;
+                if (eagleProps != null) {
+                    ConcurrentMapConfiguration con = new ConcurrentMapConfiguration();
+                    con.addProperty(MAIL_HOST, eagleProps.get(CONF_KEY_MAIL_HOST).unwrapped());
+                    con.addProperty(MAIL_PORT, eagleProps.get(CONF_KEY_MAIL_PORT).unwrapped());
+                    if (eagleProps.get(CONF_KEY_MAIL_DEBUG) != null) {
+                        con.addProperty(MAIL_DEBUG, eagleProps.get(CONF_KEY_MAIL_DEBUG).unwrapped());
+                    }
+                    client = new EagleMailClient(con);
+                }
+                else {
+                    client = new EagleMailClient();
+                }
+                String env = "prod";
+                if (eagleProps != null && eagleProps.get("env") != null) {
+                    env = (String) eagleProps.get("env").unwrapped();
+                }
+                LOG.info("Env is: " + env);
+                final VelocityContext context = new VelocityContext();
+                generateCommonContext(context);
+                LOG.info("After calling generateCommonContext...");
+                additionalContext(context, env);
+
+                if (recipents == null || recipents.equals("")) {
+                    LOG.error("Recipients is null, skip sending emails ");
+                    return;
+                }
+                String title = subject;
+                if (!env.trim().equals("prod")) {
+                    title = "[" + env + "]" + title;
+                }
+                success = client.send(sender, recipents, cc, title, configFileName, context, null);
+                LOG.info("Success of sending email: " + success);
+                if(!success && count < MAX_RETRY_COUNT) {
+                    LOG.info("Sleep for a while before retrying");
+                    Thread.sleep(10*1000);
+                }
+            }
+            catch (Exception e){
+                LOG.warn("Sending mail exception", e);
+            }
+        }
+
+        if(success){
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email, thread: %s",threadName));
+        }else{
+            LOG.warn(String.format("Fail sending email after tries %s times, thread: %s",MAX_RETRY_COUNT,threadName));
+        }
+    }
+
+    private void generateCommonContext(VelocityContext context) {
+        context.put(Constants.ALERT_EMAIL_TIME_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds( System.currentTimeMillis() ));
+        context.put(Constants.ALERT_EMAIL_COUNT_PROPERTY, alertContexts.size());
+        context.put(Constants.ALERT_EMAIL_ALERTLIST_PROPERTY, alertContexts);
+        context.put(Constants.ALERT_EMAIL_ORIGIN_PROPERTY, origin);
+    }
+
+    public boolean sentSuccessfully(){
+        return this.sentSuccessfully;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
new file mode 100644
index 0000000..e098256
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Alert API entity Persistor
+ */
+public class AlertEagleStorePersister {
+	private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
+	private String eagleServiceHost;
+	private int eagleServicePort;
+	private String username;
+	private String password;
+
+
+	public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort) {
+		this(eagleServiceHost, eagleServicePort, null, null);
+	}
+
+	public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort, String username, String password) {
+		this.eagleServiceHost = eagleServiceHost;
+		this.eagleServicePort = eagleServicePort;
+		this.username = username;
+		this.password = password;
+	}
+
+	public AlertEagleStorePersister(Config config ) {
+		this.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+		this.eagleServicePort = config.getInt("eagleProps.eagleService.port");
+		this.username = config.getString("eagleProps.eagleService.username");
+		this.password =config.getString("eagleProps.eagleService.password");
+	}
+
+	/**
+	 * Persist passes list of Entities
+	 * @param list
+	 * @return
+     */
+	public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
+		if (list.isEmpty()) return false;
+		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+			GenericServiceAPIResponseEntity<String> response = client.create(list);
+			client.close();
+			if (response.isSuccess()) {
+				LOG.info("Successfully create entities " + list.toString());
+				return true;
+			}
+			else {
+				LOG.error("Fail to create entities with exception " + response.getException());
+				return false;
+			}
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception in persisting entities", ex);
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
new file mode 100644
index 0000000..cb0df70
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Plugin to persist alerts to Eagle Storage
+ */
+public class AlertEagleStorePlugin implements NotificationPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
+    private NotificationStatus status;
+    private AlertEagleStorePersister persist;
+
+    @Override
+    public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+        this.persist = new AlertEagleStorePersister(config);
+        this.status = new NotificationStatus();
+        LOG.info("initialized plugin for EagleStorePlugin");
+    }
+
+    @Override
+    public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+        if( isPolicyDelete ){
+            LOG.info("Deleted policy ...");
+            return;
+        }
+        LOG.info("created/updated plugin ...");
+    }
+
+    @Override
+    public NotificationStatus getStatus() {
+        return this.status;
+    }
+
+    /**
+     * Persist AlertEntity to alert_details table
+     * @param alertEntity
+     */
+    @Override
+    public void onAlert(AlertAPIEntity alertEntity) {
+        LOG.info("write alert to eagle storage " + alertEntity);
+        try{
+            List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
+            list.add(alertEntity);
+            boolean result = persist.doPersist( list );
+            if(result) {
+                status.successful = true;
+                status.errorMessage = "";
+            }else{
+                status.successful = false;
+                status.errorMessage = "";
+            }
+        }catch (Exception ex ){
+            status.successful = false;
+            status.errorMessage = ex.getMessage();
+            LOG.error("Fail writing alert entity to Eagle Store", ex);
+        }
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+        if(o == this)
+            return true;
+        if(!(o instanceof AlertEagleStorePlugin))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
new file mode 100644
index 0000000..0577f5c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.email.AlertEmailGenerator;
+import org.apache.eagle.notification.email.AlertEmailGeneratorBuilder;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *  Send alert to email
+ */
+public class AlertEmailPlugin implements NotificationPlugin {
+	private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPlugin.class);
+	private Map<String, AlertEmailGenerator> emailGenerators = new ConcurrentHashMap<>();
+	private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+	private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+	private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+	private transient ThreadPoolExecutor executorPool;
+	private NotificationStatus status = new NotificationStatus();
+
+	@Override
+	public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+		executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+		LOG.info(" Creating Email Generator... ");
+		for( AlertDefinitionAPIEntity  entity : initAlertDefs ){
+			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+			for( Map<String,String> notificationConfigMap : configMaps ){
+				String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+				// for backward compatibility, default notification is email
+				if(notificationType == null || notificationType.equalsIgnoreCase(NotificationConstants.EMAIL_NOTIFICATION)){
+					AlertEmailGenerator generator = createEmailGenerator(notificationConfigMap);
+						this.emailGenerators.put(entity.getTags().get(Constants.POLICY_ID), generator);
+						LOG.info("Successfully initialized email notification for policy " + entity.getTags().get(Constants.POLICY_ID) + ",with " + notificationConfigMap);
+				}
+			}
+		}
+	}
+
+	/**
+	 * @param notificationConf
+	 * @throws Exception
+     */
+	@Override
+	public void update(String policyId, Map<String,String> notificationConf  , boolean isPolicyDelete ) throws Exception {
+		if( isPolicyDelete ){
+			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+			this.emailGenerators.remove(policyId);
+			return;
+		}
+		AlertEmailGenerator generator = createEmailGenerator(notificationConf);
+		this.emailGenerators.put(policyId , generator );
+		LOG.info("created/updated email generator for updated policy " + policyId);
+	}
+
+	/**
+	 * API to send email
+	 * @param alertEntity
+	 * @throws Exception
+     */
+	@Override
+	public void onAlert(AlertAPIEntity alertEntity) throws  Exception {
+		String policyId = alertEntity.getTags().get(Constants.POLICY_ID);
+		AlertEmailGenerator generator = this.emailGenerators.get(policyId);
+		boolean isSuccess = generator.sendAlertEmail(alertEntity);
+		if( !isSuccess ) {
+			status.errorMessage = "Failed to send email";
+			status.successful = false;
+		}else {
+			status.errorMessage = "";
+			status.successful = true;
+		}
+	}
+
+	@Override
+	public NotificationStatus getStatus() {
+		return this.status;
+	}
+
+	/**
+	 * @param notificationConfig
+	 * @return
+     */
+	private AlertEmailGenerator createEmailGenerator( Map<String,String> notificationConfig ) {
+		String tplFileName = notificationConfig.get(NotificationConstants.TPL_FILE_NAME);
+		if (tplFileName == null || tplFileName.equals("")) {
+			tplFileName = "ALERT_DEFAULT.vm";
+		}
+		AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
+				withEagleProps(EagleConfigFactory.load().getConfig().getObject("eagleProps")).
+				withSubject(notificationConfig.get(NotificationConstants.SUBJECT)).
+				withSender(notificationConfig.get(NotificationConstants.SENDER)).
+				withRecipients(notificationConfig.get(NotificationConstants.RECIPIENTS)).
+				withTplFile(tplFileName).
+				withExecutorPool(this.executorPool).build();
+		return gen;
+	}
+
+	@Override
+	public int hashCode(){
+		return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+	}
+
+	@Override
+	public boolean equals(Object o){
+		if(o == this)
+			return true;
+		if(!(o instanceof AlertEmailPlugin))
+			return false;
+		return true;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
new file mode 100644
index 0000000..cc4f89d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  send alert to Kafka bus
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AlertKafkaPlugin implements NotificationPlugin {
+	private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPlugin.class);
+	private NotificationStatus status = new NotificationStatus();
+	private Map<String, Map<String, String>> kafaConfigs = new ConcurrentHashMap<>();
+	private Config config;
+
+	@Override
+	public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+		this.config = config;
+		for( AlertDefinitionAPIEntity entity : initAlertDefs ) {
+			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+			for( Map<String,String> notificationConfigMap : configMaps ){
+				String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+				if(notificationType == null){
+					LOG.error("no notificationType field for this notification, ignoring and continue " + notificationConfigMap);
+					continue;
+				}else {
+					// single policy can have multiple configs , only load Kafka Config's
+					if (notificationType.equalsIgnoreCase(NotificationConstants.KAFKA_STORE)) {
+						kafaConfigs.put(entity.getTags().get(Constants.POLICY_ID), notificationConfigMap);
+						break;
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Update API to update policy delete/create/update in Notification Plug-ins
+	 * @param  notificationConf
+	 * @param isPolicyDelete
+	 * @throws Exception
+     */
+	@Override
+	public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+		if( isPolicyDelete ){
+			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+			this.kafaConfigs.remove(policyId);
+			return;
+		}
+		kafaConfigs.put(policyId, notificationConf );
+	}
+
+	/**
+	 * Post Notification to KafkaTopic
+	 * @param alertEntity
+     */
+	@Override
+	public void onAlert(AlertAPIEntity alertEntity) {
+		try{
+			KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(config);
+			producer.send(createRecord(alertEntity));
+			status.successful = true;
+			status.errorMessage = "";
+		}catch(Exception ex ){
+			LOG.error("fail writing alert to Kafka bus", ex);
+			status.successful = false;
+			status.errorMessage = ex.getMessage();
+		}
+	}
+
+	/**
+	 * To Create  KafkaProducer Record 
+	 * @param entity
+	 * @return
+	 * @throws Exception
+	 */
+	private ProducerRecord  createRecord(AlertAPIEntity entity ) throws Exception {
+		String policyId = entity.getTags().get(Constants.POLICY_ID);
+		ProducerRecord  record  = new ProducerRecord( this.kafaConfigs.get(policyId).get("topic"), NotificationPluginUtils.objectToStr(entity));
+		return record;
+	}	
+	
+	@Override
+	public NotificationStatus getStatus() {
+		return status;
+	}
+
+	@Override
+	public int hashCode(){
+		return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+	}
+
+	@Override
+	public boolean equals(Object o){
+		if(o == this)
+			return true;
+		if(!(o instanceof AlertKafkaPlugin))
+			return false;
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
new file mode 100644
index 0000000..53f3bb0
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import java.util.Properties;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances. 
+ */
+public enum KafkaProducerSingleton {
+	INSTANCE;	
+
+	public KafkaProducer<String, Object>  getProducer(Config config) throws Exception{
+		Properties configMap = new Properties();
+		configMap.put("bootstrap.servers", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
+		configMap.put("metadata.broker.list", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
+		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("request.required.acks", "1");	     
+		configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+		return producer;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
new file mode 100644
index 0000000..5780176
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 2/10/16.
+ * Notification Plug-in interface which provide abstraction layer to notify to different system
+ */
+public interface NotificationPlugin {
+    /**
+     * for initialization
+     * @throws Exception
+     */
+    public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws  Exception;
+
+    /**
+     * Update Plugin if any change in Policy Definition
+     * @param policy to be impacted
+     * @param  notificationConf
+     * @throws Exception
+     */
+    public void update(String policy, Map<String,String> notificationConf , boolean isPolicyDelete ) throws  Exception;
+
+    /**
+     * Post a notification for the given alertEntity
+     * @param alertEntity
+     * @throws Exception
+     */
+
+    public void onAlert( AlertAPIEntity alertEntity ) throws  Exception;
+
+    /**
+     * Returns Status of Notification Post
+     * @return
+     */
+    public NotificationStatus getStatus();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
new file mode 100644
index 0000000..4aa90c5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.dao.AlertNotificationDAO;
+import org.apache.eagle.notification.dao.AlertNotificationDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ * don't support dynamic discovery as of 2/10
+ */
+public class NotificationPluginLoader {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginLoader.class);
+    private static NotificationPluginLoader instance = new NotificationPluginLoader();
+    private static Map<String,NotificationPlugin> notificationMapping = new ConcurrentHashMap<>();
+
+    private Config config;
+    private boolean initialized = false;
+
+    public static NotificationPluginLoader getInstance(){
+        return instance;
+    }
+
+    public void init(Config config){
+        if(!initialized){
+            synchronized (this){
+                if(!initialized){
+                    internalInit(config);
+                    initialized = true;
+                }
+            }
+        }
+    }
+
+    private void internalInit(Config config){
+        this.config = config;
+        loadPlugins();
+    }
+
+    /**
+     * Scan & Load Plugins
+     */
+    private void loadPlugins(){
+        try {
+            LOG.info("Start loading Plugins from eagle service ...");
+            AlertNotificationDAO dao = new AlertNotificationDAOImpl(new EagleServiceConnector(config));
+            List<AlertNotificationEntity> activeNotificationPlugins = dao.findAlertNotificationTypes();
+            for(AlertNotificationEntity plugin : activeNotificationPlugins){
+                notificationMapping.put(plugin.getTags().get(NotificationConstants.NOTIFICATION_TYPE),
+                        (NotificationPlugin) Class.forName(plugin.getClassName()).newInstance());
+            }
+            LOG.info("successfully loaded Plugins from eagle service " + activeNotificationPlugins);
+        }catch ( Exception ex ){
+            LOG.error("Error in loading Notification Plugins: ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    public Map<String, NotificationPlugin> getNotificationMapping() {
+        ensureInitialized();
+        return notificationMapping;
+    }
+
+    private void ensureInitialized(){
+        if(!initialized)
+            throw new IllegalStateException("Plugin loader not initialized");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
new file mode 100644
index 0000000..fdf62d1
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+/**
+ * Created on 2/10/16.
+ */
+public interface NotificationPluginManager {
+    /**
+     * notify alerts to plugins for one specific alert entity
+     * @param entity
+     */
+    void notifyAlert( AlertAPIEntity entity );
+
+    /**
+     * responds to changes of alert notification definition
+     * @param entity
+     * @param isDelete
+     */
+    void updateNotificationPlugins(AlertDefinitionAPIEntity entity , boolean isDelete );
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
new file mode 100644
index 0000000..887a6a9
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ */
+public class NotificationPluginManagerImpl implements NotificationPluginManager {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginManagerImpl.class);
+    // mapping from policy Id to NotificationPlugin instance
+    private Map<String, Collection<NotificationPlugin>> policyNotificationMapping = new ConcurrentHashMap<>(1); //only one write thread
+    private Config config;
+
+    public NotificationPluginManagerImpl(Config config){
+        this.config = config;
+        internalInit();
+    }
+
+    private void internalInit(){
+        // iterate all policy ids, keep those notification which belong to plugins
+        PolicyDefinitionDAO policyDefinitionDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector( config ) , Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME);
+        String site = config.getString("eagleProps.site");
+        String dataSource = config.getString("eagleProps.dataSource");
+        try{
+            List<AlertDefinitionAPIEntity> activeAlertDefs = policyDefinitionDao.findActivePolicies( site , dataSource );
+            // initialize all loaded plugins
+            NotificationPluginLoader.getInstance().init(config);
+            for(NotificationPlugin plugin : NotificationPluginLoader.getInstance().getNotificationMapping().values()){
+                plugin.init(config, activeAlertDefs);
+            }
+            // build policy and plugin mapping
+            for( AlertDefinitionAPIEntity entity : activeAlertDefs ){
+                Map<String, NotificationPlugin> plugins = pluginsForPolicy(entity);
+                policyNotificationMapping.put(entity.getTags().get(Constants.POLICY_ID) , plugins.values());
+            }
+        }catch (Exception ex ){
+            LOG.error("Error initializing poliy/notification mapping ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void notifyAlert(AlertAPIEntity entity) {
+        String policyId = entity.getTags().get(Constants.POLICY_ID);
+        Collection<NotificationPlugin> plugins = policyNotificationMapping.get(policyId);
+        if(plugins == null || plugins.size() == 0) {
+            LOG.debug("no plugin found for policy " + policyId);
+            return;
+        }
+        for(NotificationPlugin plugin : plugins){
+            try {
+                LOG.info("execute notification plugin " + plugin);
+                plugin.onAlert(entity);
+            }catch(Exception ex){
+                LOG.error("fail invoking plugin's onAlert, continue ", ex);
+            }
+        }
+    }
+
+    @Override
+    public void updateNotificationPlugins(AlertDefinitionAPIEntity alertDef, boolean isDelete) {
+        try {
+            // Update Notification Plugin about the change in AlertDefinition
+            String policyId = alertDef.getTags().get(Constants.POLICY_ID);
+            if(isDelete){
+                // iterate all plugins and delete this policy
+                for(NotificationPlugin plugin : policyNotificationMapping.get(policyId)){
+                    plugin.update(policyId, null, true);
+                }
+                policyNotificationMapping.remove(policyId);
+                LOG.info("Deleted notifications for policy " + policyId);
+                return;
+            }
+
+            Map<String, NotificationPlugin> plugins = pluginsForPolicy(alertDef);
+            // calculate difference between current plugins and previous plugin
+            Collection<NotificationPlugin> previousPlugins = policyNotificationMapping.get(policyId);
+            if(previousPlugins != null) {
+                Collection<NotificationPlugin> deletedPlugins = CollectionUtils.subtract(previousPlugins, plugins.values());
+                LOG.info("Going to delete plugins " + deletedPlugins + ", for policy " + policyId);
+                for (NotificationPlugin plugin : deletedPlugins) {
+                    plugin.update(policyId, null, true);
+                }
+            }
+
+            // iterate current notifications and update it individually
+            List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(alertDef.getNotificationDef());
+            for( Map<String,String> notificationConf : notificationConfigCollection ) {
+                String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+                // for backward compatibility, use email for default notification type
+                if(notificationType == null){
+                    notificationType = NotificationConstants.EMAIL_NOTIFICATION;
+                }
+                NotificationPlugin plugin = plugins.get(notificationType);
+                if(plugin != null){
+                    plugin.update(policyId, notificationConf, false);
+                }
+            }
+
+            policyNotificationMapping.put(policyId, plugins.values());// update policy - notification types map
+            LOG.info("Successfully broadcasted policy updates to all Notification Plugins ...");
+        } catch (Exception e) {
+            LOG.error("Error broadcasting policy notification changes ", e);
+        }
+    }
+
+    private Map<String, NotificationPlugin> pluginsForPolicy(AlertDefinitionAPIEntity policy) throws Exception{
+        NotificationPluginLoader loader = NotificationPluginLoader.getInstance();
+        loader.init(config);
+        Map<String, NotificationPlugin> plugins = loader.getNotificationMapping();
+        // mapping from notificationType to plugin
+        Map<String, NotificationPlugin>  notifications = new HashMap<>();
+        List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(policy.getNotificationDef());
+        for( Map<String,String> notificationConf : notificationConfigCollection ){
+            String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+            // for backward compatibility, by default notification type is email if notification type is not specified
+            if(notificationType == null){
+                LOG.warn("notificationType is null so use default notification type email for this policy  " + policy);
+                notifications.put(NotificationConstants.EMAIL_NOTIFICATION, plugins.get(NotificationConstants.EMAIL_NOTIFICATION));
+            }else if(!plugins.containsKey(notificationType)){
+                LOG.warn("No NotificationPlugin supports this notificationType " + notificationType);
+            }else {
+                notifications.put(notificationType, plugins.get(notificationType));
+            }
+        }
+        return notifications;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/88d0126c/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
new file mode 100644
index 0000000..350ecb5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.utils;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Common methods for Notification Plugin
+ */
+public class NotificationPluginUtils {
+	/**
+	 * Fetch Notification specific property value
+	 * @param key
+	 * @return
+	 * @throws Exception
+     */
+	public static String getPropValue(Config config, String key ) throws Exception {
+		if( config.getObject("eagleNotificationProps") == null )
+			throw new Exception("Eagle Notification Properties not found in application.conf ");
+		ConfigObject notificationConf = config.getObject("eagleNotificationProps");
+		return notificationConf.get(key).unwrapped().toString();
+	}
+
+	/**
+	 * Deserialize Notification Definition and convert all config to Key Value Pairs
+	 * @param notificationDef
+	 * @return
+	 * @throws Exception
+     */
+	public static List<Map<String,String>> deserializeNotificationConfig( String notificationDef ) throws Exception {
+		ObjectMapper mapper = new ObjectMapper();
+		CollectionType mapCollectionType = mapper.getTypeFactory().constructCollectionType(List.class, Map.class);
+		return mapper.readValue( notificationDef , mapCollectionType);
+	}
+
+	/**
+	 * Object to JSON String
+	 * @param obj
+	 * @return
+	 * @throws Exception
+     */
+	public static String objectToStr( Object obj ) throws  Exception {
+		ObjectMapper mapper = new ObjectMapper();
+		return mapper.writeValueAsString(obj);
+	}
+}
\ No newline at end of file


Mime
View raw message