eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [45/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
new file mode 100644
index 0000000..fd6b794
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.ConfigObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailGenerator{
+    private String tplFile;
+    private String sender;
+    private String recipients;
+    private String subject;
+    private ConfigObject eagleProps;
+
+    private ThreadPoolExecutor executorPool;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    public boolean sendAlertEmail(AlertAPIEntity entity) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertAPIEntity entity, String recipients) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
+        boolean sentSuccessfully = false;
+        AlertEmailContext email = new AlertEmailContext();
+
+        AlertEmailComponent component = new AlertEmailComponent();
+        AlertContext  context = AlertContext.fromJsonString(entity.getAlertContext());
+        component.setAlertContext(context);
+        List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
+        components.add(component);
+        email.setComponents(components);
+        if (context.getProperty(Constants.SUBJECT) != null) {
+            email.setSubject(context.getProperty(Constants.SUBJECT));
+        }
+        else email.setSubject(subject);
+        email.setVelocityTplFile(tplFile);
+        email.setRecipients(recipients);
+        email.setCc(cc);
+        email.setSender(sender);
+
+        /** asynchronized email sending */
+        @SuppressWarnings("rawtypes")
+        AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
+
+        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+
+        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
+        Future future = this.executorPool.submit(thread);
+        try {
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email to %s", recipients));
+        } catch (InterruptedException | ExecutionException  e) {
+            sentSuccessfully = false;
+            LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
+        } catch (TimeoutException e) {
+            sentSuccessfully = false;
+            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
+        }
+        return sentSuccessfully;
+    }
+
+    public String getTplFile() {
+        return tplFile;
+    }
+
+    public void setTplFile(String tplFile) {
+        this.tplFile = tplFile;
+    }
+
+    public String getSender() {
+        return sender;
+    }
+
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+
+    public String getRecipients() {
+        return recipients;
+    }
+
+    public void setRecipients(String recipients) {
+        this.recipients = recipients;
+    }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public ConfigObject getEagleProps() {
+        return eagleProps;
+    }
+
+    public void setEagleProps(ConfigObject eagleProps) {
+        this.eagleProps = eagleProps;
+    }
+
+    public void setExecutorPool(ThreadPoolExecutor executorPool) {
+        this.executorPool = executorPool;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
new file mode 100644
index 0000000..2e63dab
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailGeneratorBuilder {
+    private AlertEmailGenerator generator;
+    private AlertEmailGeneratorBuilder(){
+        generator = new AlertEmailGenerator();
+    }
+    public static AlertEmailGeneratorBuilder newBuilder(){
+        return new AlertEmailGeneratorBuilder();
+    }
+    public AlertEmailGeneratorBuilder withSubject(String subject){
+        generator.setSubject(subject);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withSender(String sender){
+        generator.setSender(sender);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withRecipients(String recipients){
+        generator.setRecipients(recipients);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+        generator.setTplFile(tplFile);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
+        generator.setEagleProps(eagleProps);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
+        generator.setExecutorPool(threadPoolExecutor);
+        return this;
+    }
+
+    public AlertEmailGenerator build(){
+        return this.generator;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
new file mode 100644
index 0000000..c2c4949
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.velocity.VelocityContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.email.EagleMailClient;
+import com.netflix.config.ConcurrentMapConfiguration;
+import com.typesafe.config.ConfigObject;
+import com.typesafe.config.ConfigValue;
+
+public class AlertEmailSender implements Runnable {
+
+    protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();
+    protected final String configFileName;
+    protected final String subject;
+    protected final String sender;
+    protected final String recipents;
+    protected final String cc;
+    protected final String origin;
+    protected boolean sentSuccessfully = false;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
+    private final static int MAX_RETRY_COUNT = 3;
+
+    private static final String MAIL_AUTH = "mail.smtp.auth";
+    private static final String MAIL_HOST = "mail.smtp.host";
+    private static final String MAIL_PORT = "mail.smtp.port";
+    private static final String MAIL_USER = "mail.user";
+    private static final String MAIL_PASSWORD = "mail.password";
+    private static final String MAIL_TLS_ENABLE = "mail.smtp.starttls.enable";
+    private static final String MAIL_DEBUG = "mail.debug";
+
+    private static final String CONF_KEY_MAIL_AUTH = "mailSmtpAuth";
+    private static final String CONF_KEY_MAIL_HOST = "mailHost";
+    private static final String CONF_KEY_MAIL_PORT = "mailSmtpPort";
+    private static final String CONF_KEY_MAIL_USER = "mailSmtpUser";
+    private static final String CONF_KEY_MAIL_PASSWORD = "mailSmtpPassword";
+    private static final String CONF_KEY_MAIL_SSL_ENABLE = "mailSmtpSslEnable";
+    private static final String CONF_KEY_MAIL_TLS_ENABLE = "mailSmtpTlsEnable";
+    private static final String CONF_KEY_MAIL_DEBUG = "mailDebug";
+
+    private ConfigObject eagleProps;
+
+
+    private String threadName;
+    /**
+     * Derived class may have some additional context properties to add
+     * @param context velocity context
+     * @param env environment
+     */
+    protected void additionalContext(VelocityContext context, String env) {
+        // By default there's no additional context added
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail){
+        this.recipents = alertEmail.getRecipients();
+        this.configFileName = alertEmail.getVelocityTplFile();
+        this.subject = alertEmail.getSubject();
+        this.sender = alertEmail.getSender();
+        this.cc = alertEmail.getCc();
+        for(AlertEmailComponent bean : alertEmail.getComponents()){
+            this.alertContexts.add(bean.getAlertContext().getProperties());
+        }
+        String tmp = ManagementFactory.getRuntimeMXBean().getName();
+        this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
+        threadName = Thread.currentThread().getName();
+        LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipents+", velocity TPL file: " + this.configFileName);
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail, ConfigObject eagleProps){
+        this(alertEmail);
+        this.eagleProps = eagleProps;
+    }
+
+    @Override
+    public void run() {
+        int count = 0;
+        boolean success = false;
+        while(count++ < MAX_RETRY_COUNT && !success){
+            LOG.info("Sending email, tried: " + count+", max: "+ MAX_RETRY_COUNT);
+            try {
+                final EagleMailClient client;
+                if (eagleProps != null) {
+                    ConcurrentMapConfiguration con = new ConcurrentMapConfiguration();
+                    con.addProperty(MAIL_HOST, eagleProps.get(CONF_KEY_MAIL_HOST).unwrapped());
+                    con.addProperty(MAIL_PORT, eagleProps.get(CONF_KEY_MAIL_PORT).unwrapped());
+
+                    // Add authentication for email.
+                    ConfigValue authValue = eagleProps.get(CONF_KEY_MAIL_AUTH);
+                    if (authValue != null && Boolean.parseBoolean(String.valueOf(authValue.unwrapped()))) {
+                        con.addProperty(MAIL_AUTH, authValue.unwrapped());
+                        con.addProperty(MAIL_USER, eagleProps.get(CONF_KEY_MAIL_USER).unwrapped());
+                        con.addProperty(MAIL_PASSWORD, eagleProps.get(CONF_KEY_MAIL_PASSWORD).unwrapped());
+
+                        // Via SSL.
+                        ConfigValue sslValue = eagleProps.get(CONF_KEY_MAIL_SSL_ENABLE);
+                        if (sslValue != null && Boolean.parseBoolean(String.valueOf(sslValue.unwrapped()))) {
+                            con.addProperty("mail.smtp.socketFactory.port", "465");
+                            con.addProperty("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
+                        }
+
+                        // Via TLS.
+                        ConfigValue tlsValue = eagleProps.get(CONF_KEY_MAIL_TLS_ENABLE);
+                        if (tlsValue != null && Boolean.parseBoolean(String.valueOf(tlsValue.unwrapped()))) {
+                            con.addProperty(MAIL_TLS_ENABLE, tlsValue.unwrapped());
+                        }
+                    }
+
+                    ConfigValue debugValue = eagleProps.get(CONF_KEY_MAIL_DEBUG);
+                    if (debugValue != null && Boolean.parseBoolean(String.valueOf(debugValue.unwrapped()))) {
+                        con.addProperty(MAIL_DEBUG, debugValue.unwrapped());
+                    }
+
+                    client = new EagleMailClient(con);
+                } else {
+                    client = new EagleMailClient();
+                }
+                String env = "prod";
+                if (eagleProps != null && eagleProps.get("env") != null) {
+                    env = (String) eagleProps.get("env").unwrapped();
+                }
+
+                LOG.info("Env is: " + env);
+                final VelocityContext context = new VelocityContext();
+                generateCommonContext(context);
+                LOG.info("After calling generateCommonContext...");
+                additionalContext(context, env);
+
+                if (recipents == null || recipents.equals("")) {
+                    LOG.error("Recipients is null, skip sending emails ");
+                    return;
+                }
+                String title = subject;
+                if (!env.trim().equals("prod")) {
+                    title = "[" + env + "]" + title;
+                }
+                success = client.send(sender, recipents, cc, title, configFileName, context, null);
+                LOG.info("Success of sending email: " + success);
+                if(!success && count < MAX_RETRY_COUNT) {
+                    LOG.info("Sleep for a while before retrying");
+                    Thread.sleep(10*1000);
+                }
+            }
+            catch (Exception e){
+                LOG.warn("Sending mail exception", e);
+            }
+        }
+
+        if(success){
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email, thread: %s",threadName));
+        }else{
+            LOG.warn(String.format("Fail sending email after tries %s times, thread: %s",MAX_RETRY_COUNT,threadName));
+        }
+    }
+
+    private void generateCommonContext(VelocityContext context) {
+        context.put(Constants.ALERT_EMAIL_TIME_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds( System.currentTimeMillis() ));
+        context.put(Constants.ALERT_EMAIL_COUNT_PROPERTY, alertContexts.size());
+        context.put(Constants.ALERT_EMAIL_ALERTLIST_PROPERTY, alertContexts);
+        context.put(Constants.ALERT_EMAIL_ORIGIN_PROPERTY, origin);
+    }
+
+    public boolean sentSuccessfully(){
+        return this.sentSuccessfully;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
new file mode 100644
index 0000000..e098256
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Alert API entity Persistor
+ */
+public class AlertEagleStorePersister {
+	private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
+	private String eagleServiceHost;
+	private int eagleServicePort;
+	private String username;
+	private String password;
+
+
+	public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort) {
+		this(eagleServiceHost, eagleServicePort, null, null);
+	}
+
+	public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort, String username, String password) {
+		this.eagleServiceHost = eagleServiceHost;
+		this.eagleServicePort = eagleServicePort;
+		this.username = username;
+		this.password = password;
+	}
+
+	public AlertEagleStorePersister(Config config ) {
+		this.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+		this.eagleServicePort = config.getInt("eagleProps.eagleService.port");
+		this.username = config.getString("eagleProps.eagleService.username");
+		this.password =config.getString("eagleProps.eagleService.password");
+	}
+
+	/**
+	 * Persist passes list of Entities
+	 * @param list
+	 * @return
+     */
+	public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
+		if (list.isEmpty()) return false;
+		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+			GenericServiceAPIResponseEntity<String> response = client.create(list);
+			client.close();
+			if (response.isSuccess()) {
+				LOG.info("Successfully create entities " + list.toString());
+				return true;
+			}
+			else {
+				LOG.error("Fail to create entities with exception " + response.getException());
+				return false;
+			}
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception in persisting entities", ex);
+			return false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
new file mode 100644
index 0000000..6013a80
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+/**
+ * Plugin to persist alerts to Eagle Storage
+ */
+public class AlertEagleStorePlugin implements NotificationPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
+    private List<NotificationStatus> statusList = new Vector<>();
+    private AlertEagleStorePersister persist;
+
+    @Override
+    public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+        this.persist = new AlertEagleStorePersister(config);
+        LOG.info("initialized plugin for EagleStorePlugin");
+    }
+
+    @Override
+    public void update(String policyId, List<Map<String,String>> notificationConfigCollection , boolean isPolicyDelete ) throws Exception {
+        if(isPolicyDelete){
+            LOG.info("Deleted policy ...");
+            return;
+        }
+        LOG.info("created/updated plugin ...");
+    }
+
+    @Override
+    public List<NotificationStatus> getStatusList() {
+        return this.statusList;
+    }
+
+    /**
+     * Persist AlertEntity to alert_details table
+     * @param alertEntity
+     */
+    @Override
+    public void onAlert(AlertAPIEntity alertEntity) {
+        LOG.info("write alert to eagle storage " + alertEntity);
+        NotificationStatus status = new NotificationStatus();
+        try{
+            List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
+            list.add(alertEntity);
+            boolean result = persist.doPersist(list);
+            if(result) {
+                status.successful = true;
+                status.errorMessage = "";
+            }else{
+                status.successful = false;
+                status.errorMessage = "";
+            }
+        }catch (Exception ex ){
+            status.successful = false;
+            status.errorMessage = ex.getMessage();
+            LOG.error("Fail writing alert entity to Eagle Store", ex);
+        }
+        this.statusList.add(status);
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+        if(o == this)
+            return true;
+        if(!(o instanceof AlertEagleStorePlugin))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
new file mode 100644
index 0000000..9acb38e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.email.AlertEmailGenerator;
+import org.apache.eagle.notification.email.AlertEmailGeneratorBuilder;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *  Send alert to email
+ */
+public class AlertEmailPlugin implements NotificationPlugin {
+	private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPlugin.class);
+	private Map<String, List<AlertEmailGenerator>> emailGenerators = new ConcurrentHashMap<>();
+	private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+	private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+	private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+	private transient ThreadPoolExecutor executorPool;
+	private Vector<NotificationStatus> statusList = new Vector<>();
+	private Config config;
+
+	@Override
+	public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+		this.config = config;
+		executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+		LOG.info(" Creating Email Generator... ");
+		for( AlertDefinitionAPIEntity  entity : initAlertDefs ){
+			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+			this.update(entity.getTags().get(Constants.POLICY_ID), configMaps, false);
+		}
+	}
+
+	/**
+	 * @param notificationConfigCollection
+	 * @throws Exception
+     */
+	@Override
+	public void update(String policyId, List<Map<String,String>> notificationConfigCollection, boolean isPolicyDelete) throws Exception {
+		if(isPolicyDelete){
+			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+			this.emailGenerators.remove(policyId);
+			return;
+		}
+		Vector<AlertEmailGenerator> generators = new Vector<>();
+		for(Map<String, String> notificationConf: notificationConfigCollection) {
+			String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+			if(notificationType == null || notificationType.equalsIgnoreCase(NotificationConstants.EMAIL_NOTIFICATION)) {
+				AlertEmailGenerator generator = createEmailGenerator(notificationConf);
+				generators.add(generator);
+			}
+		}
+		if(generators.size() != 0) {
+			this.emailGenerators.put(policyId, generators);
+			LOG.info("created/updated email generators for policy " + policyId);
+		}
+	}
+
+	/**
+	 * API to send email
+	 * @param alertEntity
+	 * @throws Exception
+     */
+	@Override
+	public void onAlert(AlertAPIEntity alertEntity) throws  Exception {
+		String policyId = alertEntity.getTags().get(Constants.POLICY_ID);
+		List<AlertEmailGenerator> generators = this.emailGenerators.get(policyId);
+		for(AlertEmailGenerator generator: generators) {
+			boolean isSuccess = generator.sendAlertEmail(alertEntity);
+			NotificationStatus status = new NotificationStatus();
+			if( !isSuccess ) {
+				status.errorMessage = "Failed to send email";
+				status.successful = false;
+			}else {
+				status.errorMessage = "";
+				status.successful = true;
+			}
+			this.statusList.add(status);
+		}
+	}
+
+	@Override
+	public List<NotificationStatus> getStatusList() {
+		return this.statusList;
+	}
+
+	/**
+	 * @param notificationConfig
+	 * @return
+     */
+	private AlertEmailGenerator createEmailGenerator( Map<String,String> notificationConfig ) {
+		String tplFileName = notificationConfig.get(NotificationConstants.TPL_FILE_NAME);
+		if (tplFileName == null || tplFileName.equals("")) {
+			tplFileName = "ALERT_DEFAULT.vm";
+		}
+		AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
+				withEagleProps(this.config.getObject("eagleProps")).
+				withSubject(notificationConfig.get(NotificationConstants.SUBJECT)).
+				withSender(notificationConfig.get(NotificationConstants.SENDER)).
+				withRecipients(notificationConfig.get(NotificationConstants.RECIPIENTS)).
+				withTplFile(tplFileName).
+				withExecutorPool(this.executorPool).build();
+		return gen;
+	}
+
+	@Override
+	public int hashCode(){
+		return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+	}
+
+	@Override
+	public boolean equals(Object o){
+		if(o == this)
+			return true;
+		if(!(o instanceof AlertEmailPlugin))
+			return false;
+		return true;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
new file mode 100644
index 0000000..683f2f5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  send alert to Kafka bus
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AlertKafkaPlugin implements NotificationPlugin {
+	private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPlugin.class);
+	private List<NotificationStatus> statusList = new Vector<>();
+	private Map<String, List<Map<String, String>>> kafaConfigs = new ConcurrentHashMap<>();
+	private Config config;
+
+	@Override
+	public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+		this.config = config;
+		for( AlertDefinitionAPIEntity entity : initAlertDefs ) {
+			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+			this.update(entity.getTags().get(Constants.POLICY_ID), configMaps, false);
+		}
+	}
+
+	/**
+	 * Update API to update policy delete/create/update in Notification Plug-ins
+	 * @param  notificationConfigCollection
+	 * @param isPolicyDelete
+	 * @throws Exception
+     */
+	@Override
+	public void update(String policyId, List<Map<String,String>> notificationConfigCollection, boolean isPolicyDelete ) throws Exception {
+		if( isPolicyDelete ){
+			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+			this.kafaConfigs.remove(policyId);
+			return;
+		}
+		Vector<Map<String, String>> kafkaConfigList = new Vector<>();
+		for(Map<String,String> notificationConfigMap : notificationConfigCollection){
+			String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+			if(notificationType == null){
+				LOG.error("invalid notificationType for this notification, ignoring and continue " + notificationConfigMap);
+				continue;
+			}else {
+				// single policy can have multiple configs , only load Kafka Config's
+				if (notificationType.equalsIgnoreCase(NotificationConstants.KAFKA_STORE)) {
+					kafkaConfigList.add(notificationConfigMap);
+				}
+			}
+		}
+		if(kafkaConfigList.size() != 0) {
+			kafaConfigs.put(policyId, kafkaConfigList);
+		}
+	}
+
+	/**
+	 * Post Notification to KafkaTopic
+	 * @param alertEntity
+     */
+	@Override
+	public void onAlert(AlertAPIEntity alertEntity) {
+		String policyId = alertEntity.getTags().get(Constants.POLICY_ID);
+		for(Map<String, String> kafkaConfig: this.kafaConfigs.get(policyId)) {
+			NotificationStatus status = new NotificationStatus();
+			try{
+				KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(kafkaConfig);
+				producer.send(createRecord(alertEntity, kafkaConfig.get(NotificationConstants.TOPIC)));
+				status.successful = true;
+				status.errorMessage = "";
+			}catch(Exception ex ){
+				LOG.error("fail writing alert to Kafka bus", ex);
+				status.successful = false;
+				status.errorMessage = ex.getMessage();
+			}
+			this.statusList.add(status);
+		}
+	}
+
+	/**
+	 * To Create  KafkaProducer Record 
+	 * @param entity
+	 * @return
+	 * @throws Exception
+	 */
+	private ProducerRecord  createRecord(AlertAPIEntity entity, String topic) throws Exception {
+		ProducerRecord  record  = new ProducerRecord(topic, NotificationPluginUtils.objectToStr(entity));
+		return record;
+	}	
+	
+	@Override
+	public List<NotificationStatus> getStatusList() {
+		return statusList;
+	}
+
+	@Override
+	public int hashCode(){
+		return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+	}
+
+	@Override
+	public boolean equals(Object o){
+		if(o == this)
+			return true;
+		if(!(o instanceof AlertKafkaPlugin))
+			return false;
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
new file mode 100644
index 0000000..bc9e3c9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import java.util.Map;
+import java.util.Properties;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances. 
+ */
+public enum KafkaProducerSingleton {
+	INSTANCE;	
+
+	public KafkaProducer<String, Object>  getProducer(Map<String, String> config) throws Exception{
+		Properties configMap = new Properties();
+		String broker_list = config.get(NotificationConstants.BROKER_LIST);
+		configMap.put("bootstrap.servers", broker_list);
+		configMap.put("metadata.broker.list", broker_list);
+		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("request.required.acks", "1");	     
+		configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+		return producer;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
new file mode 100644
index 0000000..92ee0b5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 2/10/16.
+ * Notification Plug-in interface which provide abstraction layer to notify to different system
+ */
+public interface NotificationPlugin {
+    /**
+     * for initialization
+     * @throws Exception
+     */
+    void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws  Exception;
+
+    /**
+     * Update Plugin if any change in Policy Definition
+     * @param policy to be impacted
+     * @param  notificationConfCollection
+     * @throws Exception
+     */
+    void update(String policy, List<Map<String,String>> notificationConfCollection , boolean isPolicyDelete) throws  Exception;
+
+    /**
+     * Post a notification for the given alertEntity
+     * @param alertEntity
+     * @throws Exception
+     */
+
+    void onAlert(AlertAPIEntity alertEntity) throws  Exception;
+
+    /**
+     * Returns Status of Notification Post
+     * @return
+     */
+    List<NotificationStatus> getStatusList();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
new file mode 100644
index 0000000..4aa90c5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.dao.AlertNotificationDAO;
+import org.apache.eagle.notification.dao.AlertNotificationDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ * don't support dynamic discovery as of 2/10
+ */
+public class NotificationPluginLoader {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginLoader.class);
+    private static NotificationPluginLoader instance = new NotificationPluginLoader();
+    private static Map<String,NotificationPlugin> notificationMapping = new ConcurrentHashMap<>();
+
+    private Config config;
+    private boolean initialized = false;
+
+    public static NotificationPluginLoader getInstance(){
+        return instance;
+    }
+
+    public void init(Config config){
+        if(!initialized){
+            synchronized (this){
+                if(!initialized){
+                    internalInit(config);
+                    initialized = true;
+                }
+            }
+        }
+    }
+
+    private void internalInit(Config config){
+        this.config = config;
+        loadPlugins();
+    }
+
+    /**
+     * Scan & Load Plugins
+     */
+    private void loadPlugins(){
+        try {
+            LOG.info("Start loading Plugins from eagle service ...");
+            AlertNotificationDAO dao = new AlertNotificationDAOImpl(new EagleServiceConnector(config));
+            List<AlertNotificationEntity> activeNotificationPlugins = dao.findAlertNotificationTypes();
+            for(AlertNotificationEntity plugin : activeNotificationPlugins){
+                notificationMapping.put(plugin.getTags().get(NotificationConstants.NOTIFICATION_TYPE),
+                        (NotificationPlugin) Class.forName(plugin.getClassName()).newInstance());
+            }
+            LOG.info("successfully loaded Plugins from eagle service " + activeNotificationPlugins);
+        }catch ( Exception ex ){
+            LOG.error("Error in loading Notification Plugins: ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    public Map<String, NotificationPlugin> getNotificationMapping() {
+        ensureInitialized();
+        return notificationMapping;
+    }
+
+    private void ensureInitialized(){
+        if(!initialized)
+            throw new IllegalStateException("Plugin loader not initialized");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
new file mode 100644
index 0000000..fdf62d1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+/**
+ * Created on 2/10/16.
+ */
+public interface NotificationPluginManager {
+    /**
+     * notify alerts to plugins for one specific alert entity
+     * @param entity
+     */
+    void notifyAlert( AlertAPIEntity entity );
+
+    /**
+     * responds to changes of alert notification definition
+     * @param entity
+     * @param isDelete
+     */
+    void updateNotificationPlugins(AlertDefinitionAPIEntity entity , boolean isDelete );
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
new file mode 100644
index 0000000..8e9e3b2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ */
+public class NotificationPluginManagerImpl implements NotificationPluginManager {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginManagerImpl.class);
+    // mapping from policy Id to NotificationPlugin instance
+    private Map<String, Collection<NotificationPlugin>> policyNotificationMapping = new ConcurrentHashMap<>(1); //only one write thread
+    private Config config;
+
+    public NotificationPluginManagerImpl(Config config){
+        this.config = config;
+        internalInit();
+    }
+
+    private void internalInit(){
+        // iterate all policy ids, keep those notification which belong to plugins
+        PolicyDefinitionDAO policyDefinitionDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector( config ) , Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME);
+        String site = config.getString("eagleProps.site");
+        String application = config.getString("eagleProps.application");
+        try{
+            List<AlertDefinitionAPIEntity> activeAlertDefs = policyDefinitionDao.findActivePolicies( site , application);
+            // initialize all loaded plugins
+            NotificationPluginLoader.getInstance().init(config);
+            for(NotificationPlugin plugin : NotificationPluginLoader.getInstance().getNotificationMapping().values()){
+                plugin.init(config, activeAlertDefs);
+            }
+            // build policy and plugin mapping
+            for( AlertDefinitionAPIEntity entity : activeAlertDefs ){
+                Map<String, NotificationPlugin> plugins = pluginsForPolicy(entity);
+                policyNotificationMapping.put(entity.getTags().get(Constants.POLICY_ID) , plugins.values());
+            }
+        }catch (Exception ex ){
+            LOG.error("Error initializing policy/notification mapping ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void notifyAlert(AlertAPIEntity entity) {
+        String policyId = entity.getTags().get(Constants.POLICY_ID);
+        Collection<NotificationPlugin> plugins = policyNotificationMapping.get(policyId);
+        if(plugins == null || plugins.size() == 0) {
+            LOG.warn("no alert notification plugins found for policy " + policyId);
+            return;
+        }
+        for(NotificationPlugin plugin : plugins){
+            try {
+                LOG.info("execute notification plugin " + plugin);
+                plugin.onAlert(entity);
+            }catch(Exception ex){
+                LOG.error("fail invoking plugin's onAlert, continue ", ex);
+            }
+        }
+    }
+
+    @Override
+    public void updateNotificationPlugins(AlertDefinitionAPIEntity alertDef, boolean isDelete) {
+        try {
+            // Update Notification Plugin about the change in AlertDefinition
+            String policyId = alertDef.getTags().get(Constants.POLICY_ID);
+            if(isDelete){
+                // iterate all plugins and delete this policy
+                for(NotificationPlugin plugin : policyNotificationMapping.get(policyId)){
+                    plugin.update(policyId, null, true);
+                }
+                policyNotificationMapping.remove(policyId);
+                LOG.info("Deleted notifications for policy " + policyId);
+                return;
+            }
+
+            Map<String, NotificationPlugin> plugins = pluginsForPolicy(alertDef);
+            // calculate difference between current plugins and previous plugin
+            Collection<NotificationPlugin> previousPlugins = policyNotificationMapping.get(policyId);
+            if(previousPlugins != null) {
+                Collection<NotificationPlugin> deletedPlugins = CollectionUtils.subtract(previousPlugins, plugins.values());
+                LOG.info("Going to delete plugins " + deletedPlugins + ", for policy " + policyId);
+                for (NotificationPlugin plugin : deletedPlugins) {
+                    plugin.update(policyId, null, true);
+                }
+            }
+
+            // iterate current notifications and update it individually
+            List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(alertDef.getNotificationDef());
+            for(NotificationPlugin plugin: plugins.values()) {
+                plugin.update(policyId, notificationConfigCollection, false);
+            }
+
+            policyNotificationMapping.put(policyId, plugins.values());// update policy - notification types map
+            LOG.info("Successfully broadcast policy updates to all Notification Plugins ...");
+        } catch (Exception e) {
+            LOG.error("Error broadcasting policy notification changes ", e);
+        }
+    }
+
+    private Map<String, NotificationPlugin> pluginsForPolicy(AlertDefinitionAPIEntity policy) throws Exception{
+        NotificationPluginLoader loader = NotificationPluginLoader.getInstance();
+        loader.init(config);
+        Map<String, NotificationPlugin> plugins = loader.getNotificationMapping();
+        // mapping from notificationType to plugin
+        Map<String, NotificationPlugin>  notifications = new HashMap<>();
+        List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(policy.getNotificationDef());
+        for(Map<String,String> notificationConf : notificationConfigCollection ){
+            String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+            // for backward compatibility, by default notification type is email if notification type is not specified
+            if(notificationType == null){
+                LOG.warn("notificationType is null so use default notification type email for this policy  " + policy);
+                notifications.put(NotificationConstants.EMAIL_NOTIFICATION, plugins.get(NotificationConstants.EMAIL_NOTIFICATION));
+                notifications.put(NotificationConstants.EAGLE_STORE, plugins.get(NotificationConstants.EAGLE_STORE));
+            }else if(!plugins.containsKey(notificationType)){
+                LOG.warn("No NotificationPlugin supports this notificationType " + notificationType);
+            }else {
+                notifications.put(notificationType, plugins.get(notificationType));
+            }
+        }
+        return notifications;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
new file mode 100644
index 0000000..e490be3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.type.CollectionType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Common methods for Notification Plugin
+ */
+public class NotificationPluginUtils {
+	private final static ObjectMapper OBJECT_MAPPER = TaggedLogAPIEntity.buildObjectMapper();
+	/**
+	 * Fetch Notification specific property value
+	 * @param key
+	 * @return
+	 * @throws Exception
+     */
+	public static String getPropValue(Config config, String key ) throws Exception {
+		if( config.getObject("eagleNotificationProps") == null )
+			throw new Exception("Eagle Notification Properties not found in application.conf ");
+		ConfigObject notificationConf = config.getObject("eagleNotificationProps");
+		return notificationConf.get(key).unwrapped().toString();
+	}
+
+	/**
+	 * Deserialize Notification Definition and convert all config to Key Value Pairs
+	 * @param notificationDef
+	 * @return
+	 * @throws Exception
+     */
+	public static List<Map<String,String>> deserializeNotificationConfig( String notificationDef ) throws Exception {
+		CollectionType mapCollectionType = OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, Map.class);
+		return OBJECT_MAPPER.readValue(notificationDef, mapCollectionType);
+	}
+
+	/**
+	 * Object to JSON String
+	 * @param obj
+	 * @return
+	 * @throws Exception
+     */
+	public static String objectToStr( Object obj ) throws  Exception {
+		return OBJECT_MAPPER.writeValueAsString(obj);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm
new file mode 100644
index 0000000..3e29439
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm
@@ -0,0 +1,266 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+	<head>
+		<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+		<meta name="viewport" content="width=device-width"/>
+		<style>
+			body {
+				width:100% !important;
+				min-width: 100%;
+				-webkit-text-size-adjust:100%;
+				-ms-text-size-adjust:100%;
+				margin:0;
+				padding:0;
+			}
+
+			table {
+				border-spacing: 0;
+				border-collapse: collapse;
+			}
+
+			table th,
+			table td {
+				padding: 3px 0 3px 0;
+			}
+
+			.body {
+				width: 100%;
+			}
+
+			p,a,h1,h2,h3,ul,ol,li {
+				font-family: Helvetica, Arial, sans-serif;
+				font-weight: normal;
+				margin: 0;
+				padding: 0;
+			}
+			p {
+				font-size: 14px;
+				line-height: 19px;
+			}
+			a {
+				color: #3294b1;
+			}
+			h1 {
+				font-size: 36px;
+				margin: 15px 0 5px 0;
+			}
+			h2 {
+				font-size: 32px;
+			}
+			h3 {
+				font-size: 28px;
+			}
+
+			ul,ol {
+				margin: 0 0 0 25px;
+				padding: 0;
+			}
+
+			.btn {
+				background: #2ba6cb !important;
+				border: 1px solid #2284a1;
+				padding: 10px 20px 10px 20px;
+				text-align: center;
+			}
+			.btn:hover {
+				background: #2795b6 !important;
+			}
+			.btn a {
+				color: #FFFFFF;
+				text-decoration: none;
+				font-weight: bold;
+				padding: 10px 20px 10px 20px;
+			}
+
+			.tableBordered {
+				border-top: 1px solid #b9e5ff;
+			}
+			.tableBordered th {
+				background: #ECF8FF;
+			}
+			.tableBordered th p {
+				font-weight: bold;
+				color: #3294b1;
+			}
+			.tableBordered th,
+			.tableBordered td {
+				color: #333333;
+				border-bottom: 1px solid #b9e5ff;
+				text-align: center;
+				padding-bottom: 5px;
+			}
+
+			.panel {
+				height: 100px;
+			}
+		</style>
+	</head>
+	<body>
+		#set ( $elem = $alertList[0] )
+		#set ( $alertUrl = $elem["alertDetailUrl"] )
+		#set ( $policyUrl = $elem["policyDetailUrl"] )
+		<table class="body">
+			<tr>
+				<td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
+					<!-- Eagle Header -->
+					<table width="580">
+						<tr>
+							<td style="padding: 0 0 0 0;" align="left" >
+								<p style="color:#FFFFFF;font-weight: bold; font-size: 24px">Eagle</p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+
+			<tr>
+				<td align="center" valign="top">
+					<!-- Eagle Body -->
+					<table width="580">
+						<tr>
+							<!-- Title -->
+							<td align="center">
+								<h1>$elem["application"] Alert Detected</h1>
+							</td>
+						</tr>
+						<tr>
+							<!-- Time -->
+							<td>
+								<table width="580">
+									<tr>
+										<td>
+											<p><b>Detected Time: $elem["alertTimestamp"]</b></p>
+										</td>
+										#set ( $severity = $elem["severity"] )
+										#if (!$severity || ("$severity" == ""))
+											#set ( $elem["severity"] = "WARNING")
+										#end
+										<td align="right">
+											<p><b>
+												Severity:
+									            #if ($elem["severity"] == "WARNING")
+													<span>$elem["severity"]</span>												
+    											#else
+													<span style="color: #FF0000;">$elem["severity"]</span>
+    											#end
+											</b></p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Description -->
+							<td valign="top" style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
+								<p>$elem["alertMessage"]</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$alertUrl">View Alert Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Basic Information:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information Content -->
+							<td>
+								<table class="tableBordered" width="580">
+									<tr>
+										<th>
+											<p>Site</p>
+										</th>
+										<th>
+											<p>Data Source</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>$elem["site"]</p>
+										</td>
+										<td>
+											<p>$elem["application"]</p>
+										</td>
+									</tr>
+									<tr>
+										<th>
+											<p>Policy Name</p>
+										</th>
+										<th>
+											<p>Severity</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>$elem["policyId"]</p>
+										</td>
+										<td>
+											<p>$elem["severity"]</p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$policyUrl">View Policy Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>						
+						<tr>
+							<!-- Actions Required -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Actions Required:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Possible Root Causes Content -->
+							<td class="panel" valign="top" style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
+								<p> $elem["application"] alert found, please check.</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Copyright -->
+							<td align="center">
+								<p><a href="<Eagle-Host>/alerts/alertlist.html">Apache Eagle</a></p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+		</table>
+	</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/application.conf
new file mode 100644
index 0000000..d57172b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/application.conf
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "cluster",
+    "topologyName" : "sandbox-hdfsAuditLog-topology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 1,
+      "hdfsAuditLogAlertExecutor*" : 1
+    }
+  },
+  "dataSourceConfig": {
+    "topic" : "sandbox_hdfs_audit_log",
+    "zkConnection" : "127.0.0.1:2181",
+    "brokerZkPath" : "/brokers",
+    "zkConnectionTimeoutMS" : 15000,
+    "fetchSize" : 1048586,
+    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+    "transactionZKServers" : "127.0.0.1",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/consumers",
+    "consumerGroupId" : "eagle.hdfsaudit.consumer",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+     "hdfsAuditLogAlertExecutor" : {
+       "parallelism" : 1,
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner",
+       "needValidation" : "true"
+     }
+  },
+  "eagleProps" : {
+    "site" : "sandbox",
+    "dataSource": "hdfsAuditLog",
+  	"dataJoinPollIntervalSec" : 30,
+    "mailHost" : "mailhost.com",
+    "mailSmtpPort":"25",
+    "mailDebug" : "true",
+    "eagleService": {
+      "host": "localhost",
+      "port": 9099,
+      "username": "admin",
+      "password": "secret"
+    }
+  },
+  "dynamicConfigSource" : {
+  	"enabled" : true,
+  	"initDelayMillis" : 0,
+  	"delayMillis" : 30000
+  },
+  "eagleNotificationProps" : {
+    "kafka_broker":"192.168.56.101:6667"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh
new file mode 100644
index 0000000..0293f9d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+#####################################################################
+#     Import notification plugin configuration into Eagle Service   #
+#####################################################################
+
+## AlertNotificationService : schema for notifcation plugin configuration
+echo ""
+echo "Importing notification plugin configurations ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertNotificationService" \
+ -d '
+ [
+     {
+       "prefix": "alertNotifications",
+       "tags": {
+         "notificationType": "email"
+       },
+       "className": "org.apache.eagle.notification.plugin.AlertEmailPlugin",
+       "description": "send alert to email",
+       "enabled":true
+     },
+     {
+       "prefix": "alertNotifications",
+       "tags": {
+         "notificationType": "kafka"
+       },
+       "className": "org.apache.eagle.notification.plugin.AlertKafkaPlugin",
+       "description": "send alert to kafka bus",
+       "enabled":true
+     },
+     {
+       "prefix": "alertNotifications",
+       "tags": {
+         "notificationType": "eagleStore"
+       },
+       "className": "org.apache.eagle.notification.plugin.AlertEagleStorePlugin",
+       "description": "send alert to eagle store",
+       "enabled":true
+     }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for alert notification plugins"
+
+exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
new file mode 100644
index 0000000..4a88964
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.AlertEagleStorePlugin;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * Created on 2/11/16.
+ */
+public class TestAlertEagleStorePlugin {
+    @Ignore // only work when eagle service is up
+    @Test
+    public void testEagleStorePlugin() throws Exception{
+        AlertEagleStorePlugin plugin = new AlertEagleStorePlugin();
+        Config config = ConfigFactory.load();
+        AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+        def.setNotificationDef("");
+        plugin.init(config, Arrays.asList(def));
+
+        AlertAPIEntity alert = new AlertAPIEntity();
+        alert.setDescription("");
+        alert.setAlertContext(new AlertContext().toJsonString());
+        plugin.onAlert(alert);
+        Assert.assertTrue(plugin.getStatusList().get(0).successful);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
new file mode 100644
index 0000000..9abed72
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.AlertEmailPlugin;
+import org.apache.eagle.policy.common.Constants;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * Created on 2/11/16.
+ */
+public class TestAlertEmailPlugin {
+    @Ignore // only works when there is correct email setup and eagle service
+    @Test
+    public void testAlertEmailPlugin() throws Exception{
+        AlertEmailPlugin plugin = new AlertEmailPlugin();
+        Config config = ConfigFactory.load();
+        AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+        def.setTags(new HashMap<String, String>());
+        def.getTags().put(Constants.POLICY_ID, "testPolicyId");
+        def.setNotificationDef("[{\"notificationType\":\"email\",\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"last check point time lag found.\",\"tplFileName\":\"\"}]");
+        plugin.init(config, Arrays.asList(def));
+
+        AlertAPIEntity alert = new AlertAPIEntity();
+        alert.setTags(new HashMap<String, String>());
+        alert.getTags().put(Constants.POLICY_ID, "testPolicyId");
+        alert.setDescription("");
+        alert.setDecodedAlertContext(new AlertContext());
+        plugin.onAlert(alert);
+        Assert.assertTrue(plugin.getStatusList().get(0).successful);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
new file mode 100644
index 0000000..814956e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.AlertKafkaPlugin;
+import org.apache.eagle.policy.common.Constants;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestAlertKafkaPlugin {
+	@Ignore // only work when kafka is ready for use
+	@Test
+	public void testAlertToKafkaBus() throws Exception
+	{
+		AlertKafkaPlugin plugin = new AlertKafkaPlugin();
+		Config config = ConfigFactory.load();
+		AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+		def.setTags(new HashMap<String, String>());
+		def.getTags().put(Constants.POLICY_ID, "testPolicyId");
+		def.setNotificationDef("[{\"notificationType\":\"kafka\",\"kafka_broker\":\"sandbox.hortonworks.com:6667\",\"topic\":\"sandbox_hdfs_audit_log\"}]");
+		plugin.init(config, Arrays.asList(def));
+
+		AlertAPIEntity alert = new AlertAPIEntity();
+		alert.setTags(new HashMap<String, String>());
+		alert.getTags().put(Constants.POLICY_ID, "testPolicyId");
+		alert.setDescription("");
+		alert.setAlertContext(new AlertContext().toJsonString());
+		plugin.onAlert(alert);
+		Thread.sleep(1000); // wait for message sent out
+		Assert.assertTrue(plugin.getStatusList().get(0).successful);
+	}
+}


Mime
View raw message