eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [33/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
new file mode 100644
index 0000000..e9635dd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+
+public class AlertEmailGeneratorBuilder {
+    private AlertEmailGenerator generator;
+    private AlertEmailGeneratorBuilder(){
+        generator = new AlertEmailGenerator();
+    }
+    public static AlertEmailGeneratorBuilder newBuilder(){
+        return new AlertEmailGeneratorBuilder();
+    }
+    public AlertEmailGeneratorBuilder withSubject(String subject){
+        generator.setSubject(subject);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withSender(String sender){
+        generator.setSender(sender);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withRecipients(String recipients){
+        generator.setRecipients(recipients);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+        generator.setTplFile(tplFile);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withMailProps(Map<String, String> mailProps) {
+        generator.setProperties(mailProps);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
+        generator.setExecutorPool(threadPoolExecutor);
+        return this;
+    }
+
+    public AlertEmailGenerator build(){
+        return this.generator;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
new file mode 100644
index 0000000..d0e5cf6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.velocity.VelocityContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailSender implements Runnable {
+
+    protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();
+    protected final String configFileName;
+    protected final String subject;
+    protected final String sender;
+    protected final String recipients;
+    protected final String cc;
+    protected final String origin;
+    protected boolean sentSuccessfully = false;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
+    private final static int MAX_RETRY_COUNT = 3;
+
+
+
+    private Map<String, String> mailProps;
+
+
+    private String threadName;
+    /**
+     * Derived class may have some additional context properties to add
+     * @param context velocity context
+     * @param env environment
+     */
+    protected void additionalContext(VelocityContext context, String env) {
+        // By default there's no additional context added
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail){
+        this.recipients = alertEmail.getRecipients();
+        this.configFileName = alertEmail.getVelocityTplFile();
+        this.subject = alertEmail.getSubject();
+        this.sender = alertEmail.getSender();
+        this.cc = alertEmail.getCc();
+
+        this.alertContexts.add(alertEmail.getAlertContext());
+        String tmp = ManagementFactory.getRuntimeMXBean().getName();
+        this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
+        threadName = Thread.currentThread().getName();
+        LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipients +", velocity TPL file: " + this.configFileName);
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail, Map<String, String> mailProps){
+        this(alertEmail);
+        this.mailProps = mailProps;
+    }
+
+    private Properties parseMailClientConfig(Map<String, String> mailProps) {
+        if (mailProps == null) return null;
+        Properties props = new Properties();
+        String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_SERVER);
+        String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
+        if (mailHost == null || mailPort == null || mailHost.isEmpty()) {
+            LOG.warn("SMTP server is unset, will exit");
+            return null;
+        }
+        props.put(AlertEmailConstants.MAIL_HOST, mailHost);
+        props.put(AlertEmailConstants.MAIL_PORT, mailPort);
+
+        String smtpAuth = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false");
+        props.put(AlertEmailConstants.MAIL_AUTH, smtpAuth);
+        if (Boolean.parseBoolean(smtpAuth)) {
+            props.put(AlertEmailConstants.CONF_AUTH_USER, mailProps.get(AlertEmailConstants.CONF_AUTH_USER));
+            props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD));
+        }
+
+        String smtpConn = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_CONN, AlertEmailConstants.CONN_PLAINTEXT);
+        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
+            props.put("mail.smtp.starttls.enable", "true");
+        }
+        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
+            props.put("mail.smtp.socketFactory.port", "465");
+            props.put("mail.smtp.socketFactory.class",
+                    "javax.net.ssl.SSLSocketFactory");
+        }
+        props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false"));
+        return props;
+    }
+
+    @Override
+    public void run() {
+        int count = 0;
+        boolean success = false;
+        while(count++ < MAX_RETRY_COUNT && !success){
+            LOG.info("Sending email, tried: " + count+", max: " + MAX_RETRY_COUNT);
+            try {
+                final EagleMailClient client;
+                if (mailProps != null) {
+                    Properties props = parseMailClientConfig(mailProps);
+                    client = new EagleMailClient(props);
+                }
+                else {
+                    client = new EagleMailClient();
+                }
+
+                final VelocityContext context = new VelocityContext();
+                generateCommonContext(context);
+                LOG.info("After calling generateCommonContext...");
+
+                if (recipients == null || recipients.equals("")) {
+                    LOG.error("Recipients is null, skip sending emails ");
+                    return;
+                }
+                String title = subject;
+
+                success = client.send(sender, recipients, cc, title, configFileName, context, null);
+                LOG.info("Success of sending email: " + success);
+                if(!success && count < MAX_RETRY_COUNT) {
+                    LOG.info("Sleep for a while before retrying");
+                    Thread.sleep(10 * 1000);
+                }
+            }
+            catch (Exception e){
+                LOG.warn("Sending mail exception", e);
+            }
+        }
+        if (success) {
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email, thread: %s", threadName));
+        } else{
+            LOG.warn(String.format("Fail sending email after tries %s times, thread: %s", MAX_RETRY_COUNT, threadName));
+        }
+    }
+
+    private void generateCommonContext(VelocityContext context) {
+        context.put(PublishConstants.ALERT_EMAIL_TIME_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
+        context.put(PublishConstants.ALERT_EMAIL_COUNT_PROPERTY, alertContexts.size());
+        context.put(PublishConstants.ALERT_EMAIL_ALERTLIST_PROPERTY, alertContexts);
+        context.put(PublishConstants.ALERT_EMAIL_ORIGIN_PROPERTY, origin);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
new file mode 100755
index 0000000..61194a2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.email;
+
+import java.io.File;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.activation.FileDataSource;
+import javax.mail.Authenticator;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+import javax.mail.Multipart;
+import javax.mail.PasswordAuthentication;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.AddressException;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeBodyPart;
+import javax.mail.internet.MimeMessage;
+import javax.mail.internet.MimeMultipart;
+
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.exception.ResourceNotFoundException;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EagleMailClient {
+//	private static final String CONFIG_FILE = "config.properties";
+	private static final String BASE_PATH = "templates/";
+
+	private VelocityEngine velocityEngine;
+	private Session session;
+	private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
+
+	public EagleMailClient() {
+		this(new Properties());
+	}
+	
+	public EagleMailClient(final Properties config) {
+		try {
+			velocityEngine = new VelocityEngine();
+			velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
+			velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
+			velocityEngine.init();
+
+			config.put("mail.transport.protocol", "smtp");
+			if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){
+				session = Session.getDefaultInstance(config, new Authenticator() {
+					protected PasswordAuthentication getPasswordAuthentication() {
+						return new PasswordAuthentication(config.getProperty(AlertEmailConstants.CONF_AUTH_USER), config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD));
+					}
+				});
+			}
+			else session = Session.getDefaultInstance(config, new Authenticator() {});
+			final String debugMode =  config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
+			final boolean debug =  Boolean.parseBoolean(debugMode);
+			session.setDebug(debug);
+		} catch (Exception e) {
+            LOG.error("Failed connect to smtp server",e);
+		}
+	}
+
+	private boolean _send(String from, String to, String cc, String title,
+			String content) {
+		Message msg = new MimeMessage(session);
+		try {
+			msg.setFrom(new InternetAddress(from));
+			msg.setSubject(title);
+			if (to != null) {
+				msg.setRecipients(Message.RecipientType.TO,
+						InternetAddress.parse(to));
+			}
+			if (cc != null) {
+				msg.setRecipients(Message.RecipientType.CC,
+						InternetAddress.parse(cc));
+			}
+			//msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
+			msg.setContent(content, "text/html;charset=utf-8");
+			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
+			Transport.send(msg);
+			return true;
+		} catch (AddressException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		} catch (MessagingException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		}
+	}
+
+	private boolean _send(String from,String to,String cc,String title,String content,List<MimeBodyPart> attachments){
+		MimeMessage  mail = new MimeMessage(session);
+		try {
+			mail.setFrom(new InternetAddress(from));
+			mail.setSubject(title);
+			if (to != null) {
+				mail.setRecipients(Message.RecipientType.TO,
+						InternetAddress.parse(to));
+			}
+			if (cc != null) {
+				mail.setRecipients(Message.RecipientType.CC,
+						InternetAddress.parse(cc));
+			}
+			
+			//mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
+
+			MimeBodyPart mimeBodyPart = new MimeBodyPart();
+			mimeBodyPart.setContent(content,"text/html;charset=utf-8");
+
+			Multipart  multipart = new MimeMultipart();
+			multipart.addBodyPart(mimeBodyPart);
+
+			for(MimeBodyPart attachment:attachments){
+				multipart.addBodyPart(attachment);
+			}
+
+			mail.setContent(multipart);
+//			mail.setContent(content, "text/html;charset=utf-8");
+			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
+			Transport.send(mail);
+			return true;
+		} catch (AddressException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		} catch (MessagingException e) {
+			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			return false;
+		}
+	}
+
+	public boolean send(String from, String to, String cc, String title,
+			String content) {
+		return this._send(from, to, cc, title, content);
+	}
+
+	public boolean send(String from, String to, String cc, String title,
+			String templatePath, VelocityContext context) {
+		Template t = null;
+		try {
+			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
+		} catch (ResourceNotFoundException ex) {
+		}
+		if (t == null) {
+			try {
+				t = velocityEngine.getTemplate(templatePath);
+			} catch (ResourceNotFoundException e) {
+				t = velocityEngine.getTemplate("/" + templatePath);
+			}
+		}
+		final StringWriter writer = new StringWriter();
+		t.merge(context, writer);
+		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+		return this.send(from, to, cc, title, writer.toString());
+	}
+
+	public boolean send(String from, String to, String cc, String title,
+	                    String templatePath, VelocityContext context, Map<String,File> attachments) {
+		if (attachments == null || attachments.isEmpty()) {
+			return send(from, to, cc, title, templatePath, context);
+		}
+		Template t = null;
+
+		List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
+		Map<String,String> cid = new HashMap<String,String>();
+
+		for (Map.Entry<String,File> entry : attachments.entrySet()) {
+			final String attachment = entry.getKey();
+			final File attachmentFile  = entry.getValue();
+			final MimeBodyPart mimeBodyPart = new MimeBodyPart();
+			if(attachmentFile !=null && attachmentFile.exists()){
+				DataSource source = new FileDataSource(attachmentFile);
+				try {
+					mimeBodyPart.setDataHandler(new DataHandler(source));
+					mimeBodyPart.setFileName(attachment);
+					mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
+					mimeBodyPart.setContentID(attachment);
+					cid.put(attachment,mimeBodyPart.getContentID());
+					mimeBodyParts.add(mimeBodyPart);
+				} catch (MessagingException e) {
+					LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e);
+				}
+			}else{
+				LOG.error("Attachment: " + attachment + " is null or not exists");
+			}
+		}
+		//TODO remove cid, because not used at all
+		if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid);
+		context.put("cid", cid);
+
+		try {
+			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
+		} catch (ResourceNotFoundException ex) {
+//			LOGGER.error("Template not found:"+BASE_PATH + templatePath, ex);
+		}
+
+		if (t == null) {
+			try {
+				t = velocityEngine.getTemplate(templatePath);
+			} catch (ResourceNotFoundException e) {
+				try {
+					t = velocityEngine.getTemplate("/" + templatePath);
+				}
+				catch (Exception ex) {
+					LOG.error("Template not found:"+ "/" + templatePath, ex);
+				}
+			}
+		}
+
+		final StringWriter writer = new StringWriter();
+		t.merge(context, writer);
+		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+		return this._send(from, to, cc, title, writer.toString(), mimeBodyParts);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
new file mode 100644
index 0000000..2a4e332
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * Alert API entity Persistor
+ */
+public class AlertEagleStorePersister {
+	private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
+	private Config config;
+
+	public AlertEagleStorePersister(Config config) {
+		this.config = config;
+	}
+
+	/**
+	 * Persist passes list of Entities
+	 * @param list
+	 * @return
+     */
+	public boolean doPersist(List<? extends StreamEvent> list) {
+		if (list.isEmpty()) return false;
+		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+		try {
+			IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+			// TODO: metadata service support
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception in persisting entities", ex);
+			return false;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
new file mode 100644
index 0000000..807aacc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * Plugin to persist alerts to Eagle Storage
+ */
+public class AlertEagleStorePublisher implements AlertPublishPlugin {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePublisher.class);
+    private PublishStatus status;
+    private AlertEagleStorePersister persist;
+    private AlertDeduplicator deduplicator;
+
+    @Override
+    public void init(Config config, Publishment publishment) throws Exception {
+        this.persist = new AlertEagleStorePersister(config);
+        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+        LOG.info("initialized plugin for EagleStorePlugin");
+    }
+
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+    }
+
+    @Override
+    public PublishStatus getStatus() {
+        return this.status;
+    }
+
+    @Override
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        return deduplicator.dedup(event);
+    }
+
+    /**
+     * Persist AlertEntity to alert_details table
+     * @param event
+     */
+    @Override
+    public void onAlert(AlertStreamEvent event) {
+        LOG.info("write alert to eagle storage " + event);
+        event = dedup(event);
+        if(event == null) {
+            return;
+        }
+        PublishStatus status = new PublishStatus();
+        try{
+            boolean result = persist.doPersist(Arrays.asList(event));
+            if(result) {
+                status.successful = true;
+                status.errorMessage = "";
+            }else{
+                status.successful = false;
+                status.errorMessage = "";
+            }
+        }catch (Exception ex ){
+            status.successful = false;
+            status.errorMessage = ex.getMessage();
+            LOG.error("Fail writing alert entity to Eagle Store", ex);
+        }
+        this.status = status;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+        if(o == this)
+            return true;
+        if(!(o instanceof AlertEagleStorePublisher))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
new file mode 100644
index 0000000..efe29bc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
+import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertEmailPublisher implements AlertPublishPlugin {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
+    private AlertEmailGenerator emailGenerator;
+    private AlertDeduplicator deduplicator;
+    private Map<String, String> emailConfig;
+    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+    private transient ThreadPoolExecutor executorPool;
+    private PublishStatus status;
+
+    @Override
+    public void init(Config config, Publishment publishment) throws Exception {
+        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+        LOG.info(" Creating Email Generator... ");
+        if (publishment.getProperties() != null) {
+            emailConfig = new HashMap<>(publishment.getProperties());
+            emailGenerator = createEmailGenerator(emailConfig);
+        }
+        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+    }
+
+    @Override
+    public void onAlert(AlertStreamEvent event) throws Exception {
+        if(emailGenerator == null) {
+            LOG.warn("emailGenerator is null due to the incorrect configurations");
+            return;
+        }
+        event = dedup(event);
+        if(event == null) {
+            return;
+        }
+        boolean isSuccess = emailGenerator.sendAlertEmail(event);
+        PublishStatus status = new PublishStatus();
+        if(!isSuccess) {
+            status.errorMessage = "Failed to send email";
+            status.successful = false;
+        } else {
+            status.errorMessage = "";
+            status.successful = true;
+        }
+        this.status = status;
+    }
+
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+        if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) {
+            emailConfig = new HashMap<>(pluginProperties);
+            emailGenerator = createEmailGenerator(pluginProperties);
+        }
+    }
+
+    @Override
+    public void close() {
+        this.executorPool.shutdown();
+    }
+
+    @Override
+    public PublishStatus getStatus() {
+        return this.status;
+    }
+
+    @Override
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        return deduplicator.dedup(event);
+    }
+
+    /**
+     * @param notificationConfig
+     * @return
+     */
+    private AlertEmailGenerator createEmailGenerator(Map<String, String> notificationConfig) {
+        String tplFileName = notificationConfig.get(PublishConstants.TEMPLATE);
+        if (tplFileName == null || tplFileName.equals("")) {
+            tplFileName = "ALERT_DEFAULT.vm";
+        }
+        String subject = notificationConfig.get(PublishConstants.SUBJECT);
+        if (subject == null) {
+            subject = "No subject";
+        }
+        String sender = notificationConfig.get(PublishConstants.SENDER);
+        String recipients = notificationConfig.get(PublishConstants.RECIPIENTS);
+        if(sender == null || recipients == null) {
+            LOG.warn("email sender or recipients is null");
+            return null;
+        }
+        AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
+                withMailProps(notificationConfig).
+                withSubject(subject).
+                withSender(sender).
+                withRecipients(recipients).
+                withTplFile(tplFileName).
+                withExecutorPool(this.executorPool).build();
+        return gen;
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+        if(o == this)
+            return true;
+        if(!(o instanceof AlertEmailPublisher))
+            return false;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
new file mode 100644
index 0000000..ea65298
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertKafkaPublisher implements AlertPublishPlugin {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class);
+    private AlertDeduplicator deduplicator;
+    private PublishStatus status;
+    @SuppressWarnings("rawtypes")
+    private KafkaProducer producer;
+    private String brokerList;
+    private String topic;
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    @Override
+    public void init(Config config, Publishment publishment) throws Exception {
+        deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin());
+        if (publishment.getProperties() != null) {
+            Map<String, String> kafkaConfig = new HashMap<>(publishment.getProperties());
+            brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim();
+            producer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+            topic = kafkaConfig.get(PublishConstants.TOPIC).trim();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onAlert(AlertStreamEvent event) throws Exception {
+        if (producer == null) {
+            LOG.warn("KafkaProducer is null due to the incorrect configurations");
+            return;
+        }
+        event = dedup(event);
+        if(event == null) {
+            return;
+        }
+        PublishStatus status = new PublishStatus();
+        try {
+            Future<?> future = producer.send(createRecord(event, topic));
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            status.successful = true;
+            status.errorMessage = "";
+            LOG.info("Successfully send message to Kafka: " + brokerList);
+        } catch (InterruptedException | ExecutionException e) {
+            status.successful = false;
+            status.errorMessage = String.format("Failed to send message to %s, due to:%s", brokerList, e);
+            LOG.error(status.errorMessage, e);
+        } catch (Exception ex ) {
+            LOG.error("fail writing alert to Kafka bus", ex);
+            status.successful = false;
+            status.errorMessage = ex.getMessage();
+        }
+        this.status = status;
+    }
+
+    @Override
+    public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
+        deduplicator.setDedupIntervalMin(dedupIntervalMin);
+        String newBrokerList = pluginProperties.get(PublishConstants.BROKER_LIST).trim();
+        String newTopic = pluginProperties.get(PublishConstants.TOPIC).trim();
+        if (!newBrokerList.equals(this.brokerList)) {
+            producer.close();
+            brokerList = newBrokerList;
+            KafkaProducer newProducer = null;
+            try {
+                newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList);
+            } catch (Exception e) {
+                LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties);
+            }
+            producer = newProducer;
+        }
+        topic = newTopic;
+    }
+
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+    /**
+     * To Create  KafkaProducer Record
+     * @param event
+     * @return ProducerRecord
+     * @throws Exception
+     */
+    private ProducerRecord<String, String> createRecord(AlertStreamEvent event, String topic) throws Exception {
+        ProducerRecord<String, String>  record  = new ProducerRecord<>(topic, event.toString());
+        return record;
+    }
+
+    @Override
+    public PublishStatus getStatus() {
+        return this.status;
+    }
+
+    @Override
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        return this.deduplicator.dedup(event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
new file mode 100644
index 0000000..f538088
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * @Since on 5/11/16.
+ */
+public class AlertPublishPluginsFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class);
+
+    public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config) {
+        AlertPublishPlugin plugin = null;
+        String publisherType = publishment.getType();
+        try {
+            plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance();
+            plugin.init(config, publishment);
+        } catch (Exception ex) {
+            LOG.error("Error in loading AlertPublisherPlugin class: ", ex);
+            //throw new IllegalStateException(ex);
+        }
+        return plugin;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
new file mode 100644
index 0000000..fce22f1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.collections.ListUtils;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertPublisherImpl implements AlertPublisher {
+    private static final long serialVersionUID = 4809983246198138865L;
+    private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
+    private final String name;
+
+    private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1);
+    private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1);
+    private Config config;
+
+    public AlertPublisherImpl(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void init(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void nextEvent(AlertStreamEvent event) {
+        if(LOG.isDebugEnabled())
+            LOG.debug(event.toString());
+        notifyAlert(event);
+    }
+
+    private void notifyAlert(AlertStreamEvent event) {
+        String policyId = event.getPolicyId();
+        if(policyId == null || !policyPublishPluginMapping.containsKey(policyId)) {
+            LOG.warn("Policy {} does NOT subscribe any publishments", policyId);
+            return;
+        }
+        for(String id: policyPublishPluginMapping.get(policyId)) {
+            AlertPublishPlugin plugin = publishPluginMapping.get(id);
+            try {
+                if(LOG.isDebugEnabled()) LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName());
+                plugin.onAlert(event);
+            } catch (Exception ex) {
+                LOG.error("Fail invoking publisher's onAlert, continue ", ex);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        publishPluginMapping.values().forEach(plugin -> plugin.close());
+    }
+
+    @Override
+    public void onPublishChange(List<Publishment> added,
+                                List<Publishment> removed,
+                                List<Publishment> afterModified,
+                                List<Publishment> beforeModified) {
+        if (added == null) added = new ArrayList<>();
+        if (removed == null) removed = new ArrayList<>();
+        if (afterModified == null) afterModified = new ArrayList<>();
+        if (beforeModified == null) beforeModified = new ArrayList<>();
+
+        if (afterModified.size() != beforeModified.size()) {
+            LOG.warn("beforeModified size != afterModified size");
+            return;
+        }
+
+        for (Publishment publishment : added) {
+            AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config);
+            if(plugin != null) {
+                publishPluginMapping.put(publishment.getName(), plugin);
+                onPolicyAdded(publishment.getPolicyIds(), publishment.getName());
+            } else {
+                LOG.error("Initialized alertPublisher {} failed due to invalid format", publishment);
+            }
+        }
+        for (Publishment publishment : removed) {
+            String pubName = publishment.getName();
+            onPolicyDeleted(publishment.getPolicyIds(), pubName);
+            publishPluginMapping.get(pubName).close();
+            publishPluginMapping.remove(publishment.getName());
+        }
+        for (int i = 0; i < afterModified.size(); i++) {
+            String pubName = afterModified.get(i).getName();
+            List<String> newPolicies = afterModified.get(i).getPolicyIds();
+            List<String> oldPolicies = beforeModified.get(i).getPolicyIds();
+
+            if (! newPolicies.equals(oldPolicies)) {
+                List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
+                onPolicyDeleted(deletedPolicies, pubName);
+                List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
+                onPolicyAdded(addedPolicies, pubName);
+            }
+            Publishment newPub = afterModified.get(i);
+            publishPluginMapping.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties());
+        }
+    }
+
+    private synchronized void onPolicyAdded(List<String> addedPolicyIds, String pubName) {
+        if (addedPolicyIds == null || pubName == null) return;
+
+        for (String policyId : addedPolicyIds) {
+            if (policyPublishPluginMapping.get(policyId) == null) {
+                policyPublishPluginMapping.put(policyId, new ArrayList<>());
+            }
+            List<String> publishIds = policyPublishPluginMapping.get(policyId);
+            publishIds.add(pubName);
+        }
+    }
+
+    private synchronized void onPolicyDeleted(List<String> deletedPolicyIds, String pubName) {
+        if (deletedPolicyIds == null || pubName == null) return;
+
+        for (String policyId : deletedPolicyIds) {
+            List<String> publishIds = policyPublishPluginMapping.get(policyId);
+            publishIds.remove(pubName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
new file mode 100644
index 0000000..054d679
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultDeduplicator implements AlertDeduplicator {
+	private long dedupIntervalMin;
+	private volatile Map<EventUniq, Long> events = new HashMap<>();
+	private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+
+	public enum AlertDeduplicationStatus{
+		NEW,
+		DUPLICATED,
+		IGNORED
+	}
+	
+	public DefaultDeduplicator() {
+		this.dedupIntervalMin = 0;
+	}
+
+	public DefaultDeduplicator(String intervalMin) {
+		setDedupIntervalMin(intervalMin);
+	}
+	
+	public DefaultDeduplicator(long intervalMin) {
+		this.dedupIntervalMin = intervalMin;
+	}
+	
+	public void clearOldCache() {
+		List<EventUniq> removedkeys = new ArrayList<>();
+		for (Entry<EventUniq, Long> entry : events.entrySet()) {
+			EventUniq entity = entry.getKey();
+			if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
+				removedkeys.add(entry.getKey());
+			}
+		}
+		for (EventUniq alertKey : removedkeys) {
+			events.remove(alertKey);
+		}
+	}
+	
+	public AlertDeduplicationStatus checkDedup(EventUniq key) {
+		long current = key.timestamp;
+		if(!events.containsKey(key)) {
+			events.put(key, current);
+			return AlertDeduplicationStatus.NEW;
+		}
+		
+		long last = events.get(key);
+		if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) {
+			events.put(key, current);
+			return AlertDeduplicationStatus.IGNORED;
+		}
+		
+		return AlertDeduplicationStatus.DUPLICATED;
+	}
+	
+	public AlertStreamEvent dedup(AlertStreamEvent event) {
+        if (event == null) return null;
+		clearOldCache();
+		AlertStreamEvent result = null;
+		AlertDeduplicationStatus status = checkDedup(new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime()));
+		if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
+			result = event;
+		} else if(LOG.isDebugEnabled()){
+			LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString());
+		}
+		return result;
+	}
+
+	@Override
+	public void setDedupIntervalMin(String newDedupIntervalMin) {
+		if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
+			dedupIntervalMin = 0;
+            return;
+		}
+		try {
+			Period period = Period.parse(newDedupIntervalMin);
+			this.dedupIntervalMin = period.toStandardMinutes().getMinutes();
+		} catch (Exception e) {
+			LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
+			this.dedupIntervalMin = 0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
new file mode 100644
index 0000000..df472d0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+/**
+ * 
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * @since Mar 19, 2015
+ */
+public class EventUniq {
+	public String streamId;
+	public String policyId;
+	public Long timestamp;	 // event's createTimestamp
+	public long createdTime; // created time, for cache removal;
+
+	public EventUniq(String streamId, String policyId, long timestamp) {
+		this.streamId = streamId;
+		this.timestamp = timestamp;
+		this.policyId = policyId;
+		this.createdTime = System.currentTimeMillis();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof EventUniq) {
+			EventUniq au = (EventUniq) obj;
+			return (this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId));
+		}
+		return false;
+	}
+
+	@Override
+	public int hashCode() {
+		return new HashCodeBuilder()
+				.append(streamId)
+				.append(policyId)
+				.build();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
new file mode 100644
index 0000000..e8964a8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.publisher.impl;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
+ */
+public class KafkaProducerManager {
+	public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
+
+	public KafkaProducer<String, Object> getProducer(String brokerList) {
+		Properties configMap = new Properties();
+		configMap.put("bootstrap.servers", brokerList);
+		configMap.put("metadata.broker.list", brokerList);
+		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("request.required.acks", "1");	     
+		configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+		return producer;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
new file mode 100644
index 0000000..c165686
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.publisher.impl;
+
+/**
+ * Object that holds the status of Notification Posted to Notification Plugin  
+ */
+public class PublishStatus {
+	public boolean successful;
+	public String errorMessage;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
new file mode 100644
index 0000000..e1f3e9c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Since 5/2/16.
+ */
+public interface AlertBoltSpecListener {
+    void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
new file mode 100644
index 0000000..aa40dc5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Since 5/3/16.
+ */
+public interface SpoutSpecListener {
+    void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
new file mode 100644
index 0000000..07518d9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
@@ -0,0 +1,85 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * <b></b>
+ * 1. Group by SingleStream[stream_1.col1]
+ *
+ * Shuffle(stream_1,[col1])
+ *
+ * <b></b>
+ * 2. Group by SingleStream[stream_1.col1,stream_1.col2]
+ *
+ * Shuffle(stream_1,[col1,col2])
+ *
+ * <b></b>
+ * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3]
+ *
+ * Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)
+ */
+public class StreamRoute implements Serializable{
+    private static final long serialVersionUID = 4649184902196034940L;
+
+    private String targetComponentId;
+    private int partitionKey;
+    private String partitionType;
+
+    public String getTargetComponentId() {
+        return targetComponentId;
+    }
+
+    public void setTargetComponentId(String targetComponentId) {
+        this.targetComponentId = targetComponentId;
+    }
+
+    public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type){
+        this.setTargetComponentId(targetComponentId);
+        this.setPartitionKey(partitionKey);
+        this.setPartitionType(type);
+    }
+
+    public int getPartitionKey() {
+        return partitionKey;
+    }
+
+    public void setPartitionKey(int partitionKey) {
+        this.partitionKey = partitionKey;
+    }
+
+    public StreamPartition.Type getPartitionType() {
+        return StreamPartition.Type.valueOf(partitionType);
+    }
+
+    public void setPartitionType(StreamPartition.Type partitionType) {
+        this.partitionType = partitionType.name();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(partitionKey).append(partitionType).append(targetComponentId).build();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Route[target=%s, key=%s, type=%s]", this.targetComponentId, this.partitionKey, this.partitionType);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
new file mode 100644
index 0000000..6dcd312
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner;
+
+public class StreamRoutePartitionFactory {
+    /**
+     * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner
+     *
+     * @param outputComponentIds
+     * @param streamDefinition
+     * @param partition
+     * @return
+     */
+    public static StreamRoutePartitioner createRoutePartitioner(List<String> outputComponentIds, StreamDefinition streamDefinition, StreamPartition partition) {
+        return new BasicStreamRoutePartitioner(outputComponentIds, streamDefinition, partition);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
new file mode 100644
index 0000000..213abef
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+public interface StreamRoutePartitioner {
+    /**
+     * @param event
+     * @return
+     */
+    List<StreamRoute> partition(StreamEvent event);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
new file mode 100644
index 0000000..abc465d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
@@ -0,0 +1,30 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamRouteSpecListener {
+    void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
+                                  Collection<StreamRouterSpec> removed,
+                                  Collection<StreamRouterSpec> modified,
+                                  Map<String, StreamDefinition> sds);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
new file mode 100644
index 0000000..da7fb7e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouter.java
@@ -0,0 +1,30 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.io.Serializable;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.StreamContext;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamRouter extends StreamSortSpecListener, Serializable {
+    void prepare(StreamContext context, PartitionedEventCollector outputCollector);
+    void nextEvent(PartitionedEvent event);
+    String getName();
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
new file mode 100644
index 0000000..cd4bfd3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouterBoltSpecListener.java
@@ -0,0 +1,33 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Since 5/1/16.
+ * Listen to change on StreamRouterBoltSpec
+ */
+public interface StreamRouterBoltSpecListener {
+    void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
new file mode 100644
index 0000000..2229099
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortHandler.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
+
+public interface StreamSortHandler extends StreamTimeClockListener {
+    /**
+     *
+     * @param streamId
+     * @param streamSortSpecSpec
+     * @param outputCollector
+     */
+    void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector);
+
+    /**
+     * @param event StreamEvent
+     */
+    void nextEvent(PartitionedEvent event);
+
+    /**
+     *
+     */
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
new file mode 100644
index 0000000..affa979
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamSortSpecListener.java
@@ -0,0 +1,28 @@
+package org.apache.eagle.alert.engine.router;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamSortSpecListener {
+    void onStreamSortSpecChange(Map<StreamPartition, StreamSortSpec> added,
+                                Map<StreamPartition, StreamSortSpec> removed,
+                                Map<StreamPartition, StreamSortSpec> changed);
+}
\ No newline at end of file


Mime
View raw message