eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [11/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation
Date Thu, 08 Sep 2016 07:14:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
index fc1fc28..b03305f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
@@ -1,11 +1,4 @@
-package org.apache.eagle.alert.engine.publisher;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -21,6 +14,13 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+import java.util.Map;
+
 public interface AlertPublishSpecListener {
     void onAlertPublishSpecChange(PublishSpec spec, Map<String, StreamDefinition> sds);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
index 5c0e597..cdd52db 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
@@ -1,14 +1,4 @@
-package org.apache.eagle.alert.engine.publisher;
-
-
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
-import com.typesafe.config.Config;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -24,10 +14,21 @@ import com.typesafe.config.Config;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import com.typesafe.config.Config;
+
+import java.io.Serializable;
+import java.util.Map;
+
 public interface AlertPublisher extends AlertPublishListener, Serializable {
     @SuppressWarnings("rawtypes")
     void init(Config config, Map stormConfig);
+
     String getName();
+
     void nextEvent(AlertStreamEvent event);
+
     void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
index 91c9296..991bba0 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
@@ -1,8 +1,4 @@
-package org.apache.eagle.alert.engine.publisher;
-
-import java.io.Serializable;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,16 +14,12 @@ import java.io.Serializable;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.publisher;
+
+import java.io.Serializable;
+
 public interface AlertSink extends Serializable {
-    /**
-     *
-     * @throws Exception
-     */
     void open() throws Exception;
 
-    /**
-     *
-     * @throws Exception
-     */
     void close() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
index 8723283..77cb1a7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
@@ -21,7 +21,7 @@ import java.util.Map;
 
 /**
  * alert email bean
- * one email consists of a list of email component
+ * one email consists of a list of email component.
  */
 public class AlertEmailContext {
     private Map<String, String> alertContext;
@@ -38,33 +38,43 @@ public class AlertEmailContext {
     public void setAlertContext(Map<String, String> alertContext) {
         this.alertContext = alertContext;
     }
+
     public String getVelocityTplFile() {
         return velocityTplFile;
     }
+
     public void setVelocityTplFile(String velocityTplFile) {
         this.velocityTplFile = velocityTplFile;
     }
+
     public String getRecipients() {
         return recipients;
     }
+
     public void setRecipients(String recipients) {
         this.recipients = recipients;
     }
+
     public String getSender() {
         return sender;
     }
+
     public void setSender(String sender) {
         this.sender = sender;
     }
+
     public String getSubject() {
         return subject;
     }
+
     public void setSubject(String subject) {
         this.subject = subject;
     }
+
     public String getCc() {
         return cc;
     }
+
     public void setCc(String cc) {
         this.cc = cc;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 0c68629..06bd9ac 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -20,20 +20,16 @@
  */
 package org.apache.eagle.alert.engine.publisher.email;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+
 public class AlertEmailGenerator {
     private String tplFile;
     private String sender;
@@ -43,9 +39,9 @@ public class AlertEmailGenerator {
 
     private ThreadPoolExecutor executorPool;
 
-    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
 
-    private final static long MAX_TIMEOUT_MS =60000;
+    private static final long MAX_TIMEOUT_MS = 60000;
 
     public boolean sendAlertEmail(AlertStreamEvent entity) {
         return sendAlertEmail(entity, recipients, null);
@@ -64,11 +60,13 @@ public class AlertEmailGenerator {
         email.setSender(sender);
         email.setRecipients(recipients);
         email.setCc(cc);
-        
-        /** asynchronized email sending */
+
+        /** asynchronized email sending. */
         AlertEmailSender thread = new AlertEmailSender(email, properties);
 
-        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+        if (this.executorPool == null) {
+            throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+        }
 
         LOG.info("Sending email  in asynchronous to: " + recipients + ", cc: " + cc);
         Future<?> future = this.executorPool.submit(thread);
@@ -79,10 +77,10 @@ public class AlertEmailGenerator {
             //LOG.info(String.format("Successfully send email to %s", recipients));
         } catch (InterruptedException | ExecutionException e) {
             status = false;
-            LOG.error(String.format("Failed to send email to %s, due to:%s", recipients, e),e);
+            LOG.error(String.format("Failed to send email to %s, due to:%s", recipients, e), e);
         } catch (TimeoutException e) {
             status = false;
-            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ", recipients, MAX_TIMEOUT_MS),e);
+            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ", recipients, MAX_TIMEOUT_MS), e);
         }
         return status;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
index e9635dd..b894b23 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
@@ -22,38 +22,46 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 public class AlertEmailGeneratorBuilder {
     private AlertEmailGenerator generator;
-    private AlertEmailGeneratorBuilder(){
+
+    private AlertEmailGeneratorBuilder() {
         generator = new AlertEmailGenerator();
     }
-    public static AlertEmailGeneratorBuilder newBuilder(){
+
+    public static AlertEmailGeneratorBuilder newBuilder() {
         return new AlertEmailGeneratorBuilder();
     }
-    public AlertEmailGeneratorBuilder withSubject(String subject){
+
+    public AlertEmailGeneratorBuilder withSubject(String subject) {
         generator.setSubject(subject);
         return this;
     }
-    public AlertEmailGeneratorBuilder withSender(String sender){
+
+    public AlertEmailGeneratorBuilder withSender(String sender) {
         generator.setSender(sender);
         return this;
     }
-    public AlertEmailGeneratorBuilder withRecipients(String recipients){
+
+    public AlertEmailGeneratorBuilder withRecipients(String recipients) {
         generator.setRecipients(recipients);
         return this;
     }
-    public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+
+    public AlertEmailGeneratorBuilder withTplFile(String tplFile) {
         generator.setTplFile(tplFile);
         return this;
     }
+
     public AlertEmailGeneratorBuilder withMailProps(Map<String, String> mailProps) {
         generator.setProperties(mailProps);
         return this;
     }
+
     public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
         generator.setExecutorPool(threadPoolExecutor);
         return this;
     }
 
-    public AlertEmailGenerator build(){
+    public AlertEmailGenerator build() {
         return this.generator;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
index 4091457..344b3aa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
@@ -17,18 +17,18 @@
  */
 package org.apache.eagle.alert.engine.publisher.email;
 
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.apache.velocity.VelocityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 public class AlertEmailSender implements Runnable {
 
     protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();
@@ -40,25 +40,26 @@ public class AlertEmailSender implements Runnable {
     protected final String origin;
     protected boolean sentSuccessfully = false;
 
-    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
-    private final static int MAX_RETRY_COUNT = 3;
-
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
+    private static final int MAX_RETRY_COUNT = 3;
 
 
     private Map<String, String> mailProps;
 
 
     private String threadName;
+
     /**
-     * Derived class may have some additional context properties to add
+     * Derived class may have some additional context properties to add.
+     *
      * @param context velocity context
-     * @param env environment
+     * @param env     environment
      */
     protected void additionalContext(VelocityContext context, String env) {
         // By default there's no additional context added
     }
 
-    public AlertEmailSender(AlertEmailContext alertEmail){
+    public AlertEmailSender(AlertEmailContext alertEmail) {
         this.recipients = alertEmail.getRecipients();
         this.configFileName = alertEmail.getVelocityTplFile();
         this.subject = alertEmail.getSubject();
@@ -69,16 +70,18 @@ public class AlertEmailSender implements Runnable {
         String tmp = ManagementFactory.getRuntimeMXBean().getName();
         this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
         threadName = Thread.currentThread().getName();
-        LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipients +", velocity TPL file: " + this.configFileName);
+        LOG.info("Initialized " + threadName + ": origin is : " + this.origin + ", recipient of the email: " + this.recipients + ", velocity TPL file: " + this.configFileName);
     }
 
-    public AlertEmailSender(AlertEmailContext alertEmail, Map<String, String> mailProps){
+    public AlertEmailSender(AlertEmailContext alertEmail, Map<String, String> mailProps) {
         this(alertEmail);
         this.mailProps = mailProps;
     }
 
     private Properties parseMailClientConfig(Map<String, String> mailProps) {
-        if (mailProps == null) return null;
+        if (mailProps == null) {
+            return null;
+        }
         Properties props = new Properties();
         String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_HOST);
         String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
@@ -103,7 +106,7 @@ public class AlertEmailSender implements Runnable {
         if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
             props.put("mail.smtp.socketFactory.port", "465");
             props.put("mail.smtp.socketFactory.class",
-                    "javax.net.ssl.SSLSocketFactory");
+                "javax.net.ssl.SSLSocketFactory");
         }
         props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false"));
         return props;
@@ -113,15 +116,14 @@ public class AlertEmailSender implements Runnable {
     public void run() {
         int count = 0;
         boolean success = false;
-        while(count++ < MAX_RETRY_COUNT && !success){
-            LOG.info("Sending email, tried: " + count+", max: " + MAX_RETRY_COUNT);
+        while (count++ < MAX_RETRY_COUNT && !success) {
+            LOG.info("Sending email, tried: " + count + ", max: " + MAX_RETRY_COUNT);
             try {
                 final EagleMailClient client;
                 if (mailProps != null) {
                     Properties props = parseMailClientConfig(mailProps);
                     client = new EagleMailClient(props);
-                }
-                else {
+                } else {
                     client = new EagleMailClient();
                 }
 
@@ -137,19 +139,18 @@ public class AlertEmailSender implements Runnable {
 
                 success = client.send(sender, recipients, cc, title, configFileName, context, null);
                 LOG.info("Success of sending email: " + success);
-                if(!success && count < MAX_RETRY_COUNT) {
+                if (!success && count < MAX_RETRY_COUNT) {
                     LOG.info("Sleep for a while before retrying");
                     Thread.sleep(10 * 1000);
                 }
-            }
-            catch (Exception e){
+            } catch (Exception e) {
                 LOG.warn("Sending mail exception", e);
             }
         }
         if (success) {
             sentSuccessfully = true;
             LOG.info(String.format("Successfully send email, thread: %s", threadName));
-        } else{
+        } else {
             LOG.warn(String.format("Fail sending email after tries %s times, thread: %s", MAX_RETRY_COUNT, threadName));
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
index 21fe354..7147894 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
@@ -17,219 +17,214 @@
  */
 package org.apache.eagle.alert.engine.publisher.email;
 
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.exception.ResourceNotFoundException;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
 import javax.activation.DataHandler;
 import javax.activation.DataSource;
 import javax.activation.FileDataSource;
-import javax.mail.Authenticator;
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Multipart;
-import javax.mail.PasswordAuthentication;
-import javax.mail.Session;
-import javax.mail.Transport;
-import javax.mail.internet.AddressException;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeBodyPart;
-import javax.mail.internet.MimeMessage;
-import javax.mail.internet.MimeMultipart;
-
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.exception.ResourceNotFoundException;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.mail.*;
+import javax.mail.internet.*;
 
 public class EagleMailClient {
-	private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
-	private static final String BASE_PATH = "templates/";
-
-	private VelocityEngine velocityEngine;
-	private Session session;
-
-	public EagleMailClient() {
-		this(new Properties());
-	}
-	
-	public EagleMailClient(final Properties config) {
-		try {
-			velocityEngine = new VelocityEngine();
-			velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
-			velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
-			velocityEngine.init();
-
-			config.put("mail.transport.protocol", "smtp");
-			if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){
-				session = Session.getInstance(config, new Authenticator() {
-					protected PasswordAuthentication getPasswordAuthentication() {
-						return new PasswordAuthentication(
-								config.getProperty(AlertEmailConstants.CONF_AUTH_USER),
-								config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD)
-						);
-					}
-				});
-			} else {
-				session = Session.getInstance(config, new Authenticator() {});
-			}
-
-			final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
-			final boolean debug = Boolean.parseBoolean(debugMode);
+    private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
+    private static final String BASE_PATH = "templates/";
+
+    private VelocityEngine velocityEngine;
+    private Session session;
+
+    public EagleMailClient() {
+        this(new Properties());
+    }
+
+    public EagleMailClient(final Properties config) {
+        try {
+            velocityEngine = new VelocityEngine();
+            velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
+            velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
+            velocityEngine.init();
+
+            config.put("mail.transport.protocol", "smtp");
+            if (Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))) {
+                session = Session.getInstance(config, new Authenticator() {
+                    protected PasswordAuthentication getPasswordAuthentication() {
+                        return new PasswordAuthentication(
+                            config.getProperty(AlertEmailConstants.CONF_AUTH_USER),
+                            config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD)
+                        );
+                    }
+                });
+            } else {
+                session = Session.getInstance(config, new Authenticator() {
+                });
+            }
+
+            final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
+            final boolean debug = Boolean.parseBoolean(debugMode);
             LOG.info("Set email debug mode: " + debugMode);
-			session.setDebug(debug);
-		} catch (Exception e) {
+            session.setDebug(debug);
+        } catch (Exception e) {
             LOG.error("Failed to connect to smtp server", e);
-		}
-	}
-
-	private boolean _send(String from, String to, String cc, String title, String content) {
-		Message msg = new MimeMessage(session);
-		try {
-			msg.setFrom(new InternetAddress(from));
-			msg.setSubject(title);
-			if (to != null) {
-				msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
-			}
-			if (cc != null) {
-				msg.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
-			}
-			//msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
-			msg.setContent(content, "text/html;charset=utf-8");
-			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
-			Transport.send(msg);
-			return true;
-		} catch (AddressException e) {
-			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
-			return false;
-		} catch (MessagingException e) {
-			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
-			return false;
-		}
-	}
-
-	private boolean _send(String from,String to,String cc,String title,String content,List<MimeBodyPart> attachments){
-		MimeMessage  mail = new MimeMessage(session);
-		try {
-			mail.setFrom(new InternetAddress(from));
-			mail.setSubject(title);
-			if (to != null) {
-				mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
-			}
-			if (cc != null) {
-				mail.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
-			}
-			
-			//mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
-
-			MimeBodyPart mimeBodyPart = new MimeBodyPart();
-			mimeBodyPart.setContent(content,"text/html;charset=utf-8");
-
-			Multipart  multipart = new MimeMultipart();
-			multipart.addBodyPart(mimeBodyPart);
-
-			for(MimeBodyPart attachment:attachments){
-				multipart.addBodyPart(attachment);
-			}
-
-			mail.setContent(multipart);
-//			mail.setContent(content, "text/html;charset=utf-8");
-			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
-			Transport.send(mail);
-			return true;
-		} catch (AddressException e) {
-			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
-			return false;
-		} catch (MessagingException e) {
-			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
-			return false;
-		}
-	}
-
-	public boolean send(String from, String to, String cc, String title,
-			String content) {
-		return this._send(from, to, cc, title, content);
-	}
-
-	public boolean send(String from, String to, String cc, String title,
-			String templatePath, VelocityContext context) {
-		Template t = null;
-		try {
-			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
-		} catch (ResourceNotFoundException ex) {
-		}
-		if (t == null) {
-			try {
-				t = velocityEngine.getTemplate(templatePath);
-			} catch (ResourceNotFoundException e) {
-				t = velocityEngine.getTemplate("/" + templatePath);
-			}
-		}
-		final StringWriter writer = new StringWriter();
-		t.merge(context, writer);
-		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
-
-		return this.send(from, to, cc, title, writer.toString());
-	}
-
-	public boolean send(String from, String to, String cc, String title,
-	                    String templatePath, VelocityContext context, Map<String,File> attachments) {
-		if (attachments == null || attachments.isEmpty()) {
-			return send(from, to, cc, title, templatePath, context);
-		}
-		Template t = null;
-
-		List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
-
-		for (Map.Entry<String,File> entry : attachments.entrySet()) {
-			final String attachment = entry.getKey();
-			final File attachmentFile  = entry.getValue();
-			final MimeBodyPart mimeBodyPart = new MimeBodyPart();
-			if(attachmentFile !=null && attachmentFile.exists()){
-				DataSource source = new FileDataSource(attachmentFile);
-				try {
-					mimeBodyPart.setDataHandler(new DataHandler(source));
-					mimeBodyPart.setFileName(attachment);
-					mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
-					mimeBodyPart.setContentID(attachment);
-					mimeBodyParts.add(mimeBodyPart);
-				} catch (MessagingException e) {
-					LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e);
-				}
-			}else{
-				LOG.error("Attachment: " + attachment + " is null or not exists");
-			}
-		}
-
-		try {
-			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
-		} catch (ResourceNotFoundException ex) {
-//			LOGGER.error("Template not found:"+BASE_PATH + templatePath, ex);
-		}
-
-		if (t == null) {
-			try {
-				t = velocityEngine.getTemplate(templatePath);
-			} catch (ResourceNotFoundException e) {
-				try {
-					t = velocityEngine.getTemplate("/" + templatePath);
-				} catch (Exception ex) {
-					LOG.error("Template not found:"+ "/" + templatePath, ex);
-				}
-			}
-		}
-
-		final StringWriter writer = new StringWriter();
-		t.merge(context, writer);
-		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
-
-		return this._send(from, to, cc, title, writer.toString(), mimeBodyParts);
-	}
+        }
+    }
+
+    private boolean sendInternal(String from, String to, String cc, String title, String content) {
+        Message msg = new MimeMessage(session);
+        try {
+            msg.setFrom(new InternetAddress(from));
+            msg.setSubject(title);
+            if (to != null) {
+                msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
+            }
+            if (cc != null) {
+                msg.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
+            }
+            //msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
+            msg.setContent(content, "text/html;charset=utf-8");
+            LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
+            Transport.send(msg);
+            return true;
+        } catch (AddressException e) {
+            LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
+            return false;
+        } catch (MessagingException e) {
+            LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
+            return false;
+        }
+    }
+
+    private boolean sendInternal(String from, String to, String cc, String title, String content, List<MimeBodyPart> attachments) {
+        MimeMessage mail = new MimeMessage(session);
+        try {
+            mail.setFrom(new InternetAddress(from));
+            mail.setSubject(title);
+            if (to != null) {
+                mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
+            }
+            if (cc != null) {
+                mail.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
+            }
+
+            //mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
+
+            MimeBodyPart mimeBodyPart = new MimeBodyPart();
+            mimeBodyPart.setContent(content, "text/html;charset=utf-8");
+
+            Multipart multipart = new MimeMultipart();
+            multipart.addBodyPart(mimeBodyPart);
+
+            for (MimeBodyPart attachment : attachments) {
+                multipart.addBodyPart(attachment);
+            }
+
+            mail.setContent(multipart);
+            //  mail.setContent(content, "text/html;charset=utf-8");
+            LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title));
+            Transport.send(mail);
+            return true;
+        } catch (AddressException e) {
+            LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
+            return false;
+        } catch (MessagingException e) {
+            LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
+            return false;
+        }
+    }
+
+    public boolean send(String from, String to, String cc, String title,
+                        String content) {
+        return this.sendInternal(from, to, cc, title, content);
+    }
+
+    public boolean send(String from, String to, String cc, String title,
+                        String templatePath, VelocityContext context) {
+        Template t = null;
+        try {
+            t = velocityEngine.getTemplate(BASE_PATH + templatePath);
+        } catch (ResourceNotFoundException ex) {
+            // ignored
+        }
+        if (t == null) {
+            try {
+                t = velocityEngine.getTemplate(templatePath);
+            } catch (ResourceNotFoundException e) {
+                t = velocityEngine.getTemplate("/" + templatePath);
+            }
+        }
+        final StringWriter writer = new StringWriter();
+        t.merge(context, writer);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(writer.toString());
+        }
+
+        return this.send(from, to, cc, title, writer.toString());
+    }
+
+    public boolean send(String from, String to, String cc, String title,
+                        String templatePath, VelocityContext context, Map<String, File> attachments) {
+        if (attachments == null || attachments.isEmpty()) {
+            return send(from, to, cc, title, templatePath, context);
+        }
+        Template t = null;
+
+        List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
+
+        for (Map.Entry<String, File> entry : attachments.entrySet()) {
+            final String attachment = entry.getKey();
+            final File attachmentFile = entry.getValue();
+            final MimeBodyPart mimeBodyPart = new MimeBodyPart();
+            if (attachmentFile != null && attachmentFile.exists()) {
+                DataSource source = new FileDataSource(attachmentFile);
+                try {
+                    mimeBodyPart.setDataHandler(new DataHandler(source));
+                    mimeBodyPart.setFileName(attachment);
+                    mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
+                    mimeBodyPart.setContentID(attachment);
+                    mimeBodyParts.add(mimeBodyPart);
+                } catch (MessagingException e) {
+                    LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e);
+                }
+            } else {
+                LOG.error("Attachment: " + attachment + " is null or not exists");
+            }
+        }
+
+        try {
+            t = velocityEngine.getTemplate(BASE_PATH + templatePath);
+        } catch (ResourceNotFoundException ex) {
+            // ignored
+        }
+
+        if (t == null) {
+            try {
+                t = velocityEngine.getTemplate(templatePath);
+            } catch (ResourceNotFoundException e) {
+                try {
+                    t = velocityEngine.getTemplate("/" + templatePath);
+                } catch (Exception ex) {
+                    LOG.error("Template not found:" + "/" + templatePath, ex);
+                }
+            }
+        }
+
+        final StringWriter writer = new StringWriter();
+        t.merge(context, writer);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(writer.toString());
+        }
+
+        return this.sendInternal(from, to, cc, title, writer.toString(), mimeBodyParts);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 7a67011..31110ef 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -16,19 +16,18 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.codec.IEventSerializer;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 
 import java.util.Map;
 
 /**
- * @since Jun 3, 2016
- *
+ * @since Jun 3, 2016.
  */
 public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
 
@@ -47,9 +46,9 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin {
             Object obj = Class.forName(serializerClz).getConstructor(Map.class).newInstance(conf);
             if (!(obj instanceof IEventSerializer)) {
                 throw new Exception(String.format("serializer %s of publishment %s is not subclass to %s!",
-                        publishment.getSerializer(),
-                        publishment.getName(),
-                        IEventSerializer.class.getName()));
+                    publishment.getSerializer(),
+                    publishment.getName(),
+                    IEventSerializer.class.getName()));
             }
             serializer = (IEventSerializer) obj;
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index a87fc7d..65837dd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -18,13 +18,13 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator;
 import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder;
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,9 +37,9 @@ import java.util.concurrent.TimeUnit;
 public class AlertEmailPublisher extends AbstractPublishPlugin {
 
     private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class);
-    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
-    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
-    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+    private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+    private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+    private static final long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
 
     private AlertEmailGenerator emailGenerator;
     private Map<String, String> emailConfig;
@@ -60,18 +60,18 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
 
     @Override
     public void onAlert(AlertStreamEvent event) throws Exception {
-        if(emailGenerator == null) {
+        if (emailGenerator == null) {
             LOG.warn("emailGenerator is null due to the incorrect configurations");
             return;
         }
         event = dedup(event);
-        if(event == null) {
+        if (event == null) {
             return;
         }
 
         boolean isSuccess = emailGenerator.sendAlertEmail(event);
         PublishStatus status = new PublishStatus();
-        if(!isSuccess) {
+        if (!isSuccess) {
             status.errorMessage = "Failed to send email";
             status.successful = false;
         } else {
@@ -85,7 +85,7 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
     public void update(String dedupIntervalMin, Map<String, String> pluginProperties) {
         super.update(dedupIntervalMin, pluginProperties);
 
-        if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) {
+        if (pluginProperties != null && !emailConfig.equals(pluginProperties)) {
             emailConfig = new HashMap<>(pluginProperties);
             emailGenerator = createEmailGenerator(pluginProperties);
         }
@@ -96,10 +96,6 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
         this.executorPool.shutdown();
     }
 
-    /**
-     * @param notificationConfig
-     * @return
-     */
     private AlertEmailGenerator createEmailGenerator(Map<String, String> notificationConfig) {
         String tplFileName = notificationConfig.get(PublishConstants.TEMPLATE);
         if (tplFileName == null || tplFileName.equals("")) {
@@ -111,31 +107,33 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
         }
         String sender = notificationConfig.get(PublishConstants.SENDER);
         String recipients = notificationConfig.get(PublishConstants.RECIPIENTS);
-        if(sender == null || recipients == null) {
+        if (sender == null || recipients == null) {
             LOG.warn("email sender or recipients is null");
             return null;
         }
-        AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
-                withMailProps(notificationConfig).
-                withSubject(subject).
-                withSender(sender).
-                withRecipients(recipients).
-                withTplFile(tplFileName).
-                withExecutorPool(this.executorPool).build();
+        AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder()
+            .withMailProps(notificationConfig)
+            .withSubject(subject)
+            .withSender(sender)
+            .withRecipients(recipients)
+            .withTplFile(tplFileName)
+            .withExecutorPool(this.executorPool).build();
         return gen;
     }
 
     @Override
-    public int hashCode(){
+    public int hashCode() {
         return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
     }
 
     @Override
-    public boolean equals(Object o){
-        if(o == this)
+    public boolean equals(Object o) {
+        if (o == this) {
             return true;
-        if(!(o instanceof AlertEmailPublisher))
+        }
+        if (!(o instanceof AlertEmailPublisher)) {
             return false;
+        }
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index ca46aeb..048424c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -18,21 +18,20 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
+import com.typesafe.config.Config;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class AlertKafkaPublisher extends AbstractPublishPlugin {
 
@@ -57,7 +56,7 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
         }
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     @Override
     public void onAlert(AlertStreamEvent event) throws Exception {
         if (producer == null) {
@@ -65,7 +64,7 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             return;
         }
         event = dedup(event);
-        if(event == null) {
+        if (event == null) {
             return;
         }
         PublishStatus status = new PublishStatus();
@@ -86,7 +85,7 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             status.successful = false;
             status.errorMessage = String.format("Failed to send message to %s, due to:%s", brokerList, e);
             LOG.error(status.errorMessage, e);
-        } catch (Exception ex ) {
+        } catch (Exception ex) {
             LOG.error("fail writing alert to Kafka bus", ex);
             status.successful = false;
             status.errorMessage = ex.getMessage();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
index 82be5a0..957d356 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java
@@ -18,17 +18,16 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.Map;
 
 /**
- * @Since on 5/11/16.
+ * @since on 5/11/16.
  */
 public class AlertPublishPluginsFactory {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index f9306dc..fe1438e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -17,25 +17,24 @@
 
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.collections.ListUtils;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.ListUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @SuppressWarnings("rawtypes")
 public class AlertPublisherImpl implements AlertPublisher {
     private static final long serialVersionUID = 4809983246198138865L;
-    private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class);
     private final String name;
 
     private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1);
@@ -60,21 +59,24 @@ public class AlertPublisherImpl implements AlertPublisher {
 
     @Override
     public void nextEvent(AlertStreamEvent event) {
-        if(LOG.isDebugEnabled())
+        if (LOG.isDebugEnabled()) {
             LOG.debug(event.toString());
+        }
         notifyAlert(event);
     }
 
     private void notifyAlert(AlertStreamEvent event) {
         String policyId = event.getPolicyId();
-        if(policyId == null || !policyPublishPluginMapping.containsKey(policyId)) {
+        if (policyId == null || !policyPublishPluginMapping.containsKey(policyId)) {
             LOG.warn("Policy {} does NOT subscribe any publishments", policyId);
             return;
         }
-        for(String id: policyPublishPluginMapping.get(policyId)) {
+        for (String id : policyPublishPluginMapping.get(policyId)) {
             AlertPublishPlugin plugin = publishPluginMapping.get(id);
             try {
-                if(LOG.isDebugEnabled()) LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName());
+                }
                 plugin.onAlert(event);
             } catch (Exception ex) {
                 LOG.error("Fail invoking publisher's onAlert, continue ", ex);
@@ -93,10 +95,18 @@ public class AlertPublisherImpl implements AlertPublisher {
                                 List<Publishment> removed,
                                 List<Publishment> afterModified,
                                 List<Publishment> beforeModified) {
-        if (added == null) added = new ArrayList<>();
-        if (removed == null) removed = new ArrayList<>();
-        if (afterModified == null) afterModified = new ArrayList<>();
-        if (beforeModified == null) beforeModified = new ArrayList<>();
+        if (added == null) {
+            added = new ArrayList<>();
+        }
+        if (removed == null) {
+            removed = new ArrayList<>();
+        }
+        if (afterModified == null) {
+            afterModified = new ArrayList<>();
+        }
+        if (beforeModified == null) {
+            beforeModified = new ArrayList<>();
+        }
 
         if (afterModified.size() != beforeModified.size()) {
             LOG.warn("beforeModified size != afterModified size");
@@ -109,7 +119,7 @@ public class AlertPublisherImpl implements AlertPublisher {
             }
 
             AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
-            if(plugin != null) {
+            if (plugin != null) {
                 publishPluginMapping.put(publishment.getName(), plugin);
                 onPolicyAdded(publishment.getPolicyIds(), publishment.getName());
             } else {
@@ -127,7 +137,7 @@ public class AlertPublisherImpl implements AlertPublisher {
             List<String> newPolicies = afterModified.get(i).getPolicyIds();
             List<String> oldPolicies = beforeModified.get(i).getPolicyIds();
 
-            if (! newPolicies.equals(oldPolicies)) {
+            if (!newPolicies.equals(oldPolicies)) {
                 List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies);
                 onPolicyDeleted(deletedPolicies, pubName);
                 List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies);
@@ -139,7 +149,9 @@ public class AlertPublisherImpl implements AlertPublisher {
     }
 
     private synchronized void onPolicyAdded(List<String> addedPolicyIds, String pubName) {
-        if (addedPolicyIds == null || pubName == null) return;
+        if (addedPolicyIds == null || pubName == null) {
+            return;
+        }
 
         for (String policyId : addedPolicyIds) {
             if (policyPublishPluginMapping.get(policyId) == null) {
@@ -151,7 +163,9 @@ public class AlertPublisherImpl implements AlertPublisher {
     }
 
     private synchronized void onPolicyDeleted(List<String> deletedPolicyIds, String pubName) {
-        if (deletedPolicyIds == null || pubName == null) return;
+        if (deletedPolicyIds == null || pubName == null) {
+            return;
+        }
 
         for (String policyId : deletedPolicyIds) {
             List<String> publishIds = policyPublishPluginMapping.get(policyId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index 43be2e1..258f613 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -17,10 +17,10 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import org.apache.commons.lang.time.DateUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+import org.apache.commons.lang.time.DateUtils;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,121 +32,118 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 public class DefaultDeduplicator implements AlertDeduplicator {
-	private long dedupIntervalMin;
-	private List<String> customDedupFields = new ArrayList<>();
-	private volatile Map<EventUniq, Long> events = new HashMap<>();
-	private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
-
-	public enum AlertDeduplicationStatus{
-		NEW,
-		DUPLICATED,
-		IGNORED
-	}
-	
-	public DefaultDeduplicator() {
-		this.dedupIntervalMin = 0;
-	}
-
-	public DefaultDeduplicator(String intervalMin) {
-		setDedupIntervalMin(intervalMin);
-	}
-	
-	public DefaultDeduplicator(long intervalMin) {
-		this.dedupIntervalMin = intervalMin;
-	}
-
-	public DefaultDeduplicator(String intervalMin, List<String> customDedupFields) {
-		setDedupIntervalMin(intervalMin);
-		if (customDedupFields != null){
-			this.customDedupFields = customDedupFields;
-		}
-	}
-	
-	public void clearOldCache() {
-		List<EventUniq> removedkeys = new ArrayList<>();
-		for (Entry<EventUniq, Long> entry : events.entrySet()) {
-			EventUniq entity = entry.getKey();
-			if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
-				removedkeys.add(entry.getKey());
-			}
-		}
-		for (EventUniq alertKey : removedkeys) {
-			events.remove(alertKey);
-		}
-	}
-
-	/***
-	 *
-	 * @param key
-	 * @return
-	 */
-	public AlertDeduplicationStatus checkDedup(EventUniq key) {
-		long current = key.timestamp;
-		if(!events.containsKey(key)) {
-			events.put(key, current);
-			return AlertDeduplicationStatus.NEW;
-		}
-		
-		long last = events.get(key);
-		if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) {
-			events.put(key, current);
-			return AlertDeduplicationStatus.IGNORED;
-		}
-
-		return AlertDeduplicationStatus.DUPLICATED;
-	}
-	
-	public AlertStreamEvent dedup(AlertStreamEvent event) {
-        if (event == null) return null;
-		clearOldCache();
-		AlertStreamEvent result = null;
-
-		// check custom field, and get the field values
-		StreamDefinition streamDefinition = event.getSchema();
-		HashMap<String, String> customFieldValues = new HashMap<>();
-		for (int i = 0; i < event.getData().length; i++){
-			if (i > streamDefinition.getColumns().size()) {
-				if (LOG.isWarnEnabled()) {
-					LOG.warn("output column does not found for event data, this indicate code error!");
-				}
-				continue;
-			}
-			String colName = streamDefinition.getColumns().get(i).getName();
-
-			for (String field : customDedupFields){
-				if (colName.equals(field)){
-					customFieldValues.put(field, event.getData()[i].toString());
-					break;
-				}
-			}
-		}
-
-		AlertDeduplicationStatus status = checkDedup(
-				new EventUniq(event.getStreamId(),
-						event.getPolicyId(),
-						event.getCreatedTime(),
-						customFieldValues));
-		if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
-			result = event;
-		} else if(LOG.isDebugEnabled()){
-			LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString());
-		}
-		return result;
-	}
-
-	@Override
-	public void setDedupIntervalMin(String newDedupIntervalMin) {
-		if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
-			dedupIntervalMin = 0;
+    private long dedupIntervalMin;
+    private List<String> customDedupFields = new ArrayList<>();
+    private volatile Map<EventUniq, Long> events = new HashMap<>();
+    private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class);
+
+    public enum AlertDeduplicationStatus {
+        NEW,
+        DUPLICATED,
+        IGNORED
+    }
+
+    public DefaultDeduplicator() {
+        this.dedupIntervalMin = 0;
+    }
+
+    public DefaultDeduplicator(String intervalMin) {
+        setDedupIntervalMin(intervalMin);
+    }
+
+    public DefaultDeduplicator(long intervalMin) {
+        this.dedupIntervalMin = intervalMin;
+    }
+
+    public DefaultDeduplicator(String intervalMin, List<String> customDedupFields) {
+        setDedupIntervalMin(intervalMin);
+        if (customDedupFields != null) {
+            this.customDedupFields = customDedupFields;
+        }
+    }
+
+    public void clearOldCache() {
+        List<EventUniq> removedkeys = new ArrayList<>();
+        for (Entry<EventUniq, Long> entry : events.entrySet()) {
+            EventUniq entity = entry.getKey();
+            if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) {
+                removedkeys.add(entry.getKey());
+            }
+        }
+        for (EventUniq alertKey : removedkeys) {
+            events.remove(alertKey);
+        }
+    }
+
+    public AlertDeduplicationStatus checkDedup(EventUniq key) {
+        long current = key.timestamp;
+        if (!events.containsKey(key)) {
+            events.put(key, current);
+            return AlertDeduplicationStatus.NEW;
+        }
+
+        long last = events.get(key);
+        if (current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) {
+            events.put(key, current);
+            return AlertDeduplicationStatus.IGNORED;
+        }
+
+        return AlertDeduplicationStatus.DUPLICATED;
+    }
+
+    public AlertStreamEvent dedup(AlertStreamEvent event) {
+        if (event == null) {
+            return null;
+        }
+        clearOldCache();
+        AlertStreamEvent result = null;
+
+        // check custom field, and get the field values
+        StreamDefinition streamDefinition = event.getSchema();
+        HashMap<String, String> customFieldValues = new HashMap<>();
+        for (int i = 0; i < event.getData().length; i++) {
+            if (i > streamDefinition.getColumns().size()) {
+                if (LOG.isWarnEnabled()) {
+                    LOG.warn("output column does not found for event data, this indicate code error!");
+                }
+                continue;
+            }
+            String colName = streamDefinition.getColumns().get(i).getName();
+
+            for (String field : customDedupFields) {
+                if (colName.equals(field)) {
+                    customFieldValues.put(field, event.getData()[i].toString());
+                    break;
+                }
+            }
+        }
+
+        AlertDeduplicationStatus status = checkDedup(
+            new EventUniq(event.getStreamId(),
+                event.getPolicyId(),
+                event.getCreatedTime(),
+                customFieldValues));
+        if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) {
+            result = event;
+        } else if (LOG.isDebugEnabled()) {
+            LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString());
+        }
+        return result;
+    }
+
+    @Override
+    public void setDedupIntervalMin(String newDedupIntervalMin) {
+        if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) {
+            dedupIntervalMin = 0;
             return;
-		}
-		try {
-			Period period = Period.parse(newDedupIntervalMin);
-			this.dedupIntervalMin = period.toStandardMinutes().getMinutes();
-		} catch (Exception e) {
-			LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
-			this.dedupIntervalMin = 0;
-		}
-	}
+        }
+        try {
+            Period period = Period.parse(newDedupIntervalMin);
+            this.dedupIntervalMin = period.toStandardMinutes().getMinutes();
+        } catch (Exception e) {
+            LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e);
+            this.dedupIntervalMin = 0;
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
index 79fa6cc..1b90833 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
@@ -16,7 +16,7 @@
  *
  */
 /**
- * 
+ *
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
@@ -25,50 +25,50 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.util.HashMap;
 
 /**
- * @since Mar 19, 2015
+ * @since Mar 19, 2015.
  */
 public class EventUniq {
-	public String streamId;
-	public String policyId;
-	public Long timestamp;	 // event's createTimestamp
-	public long createdTime; // created time, for cache removal;
-	public HashMap<String, String> customFieldValues;
+    public String streamId;
+    public String policyId;
+    public Long timestamp;     // event's createTimestamp
+    public long createdTime; // created time, for cache removal;
+    public HashMap<String, String> customFieldValues;
 
-	public EventUniq(String streamId, String policyId, long timestamp) {
-		this.streamId = streamId;
-		this.timestamp = timestamp;
-		this.policyId = policyId;
-		this.createdTime = System.currentTimeMillis();
-	}
+    public EventUniq(String streamId, String policyId, long timestamp) {
+        this.streamId = streamId;
+        this.timestamp = timestamp;
+        this.policyId = policyId;
+        this.createdTime = System.currentTimeMillis();
+    }
 
-	public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) {
-		this.streamId = streamId;
-		this.timestamp = timestamp;
-		this.policyId = policyId;
-		this.createdTime = System.currentTimeMillis();
-		this.customFieldValues = customFieldValues;
-	}
+    public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) {
+        this.streamId = streamId;
+        this.timestamp = timestamp;
+        this.policyId = policyId;
+        this.createdTime = System.currentTimeMillis();
+        this.customFieldValues = customFieldValues;
+    }
 
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof EventUniq) {
-			EventUniq au = (EventUniq) obj;
-			boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId);
-			if (this.customFieldValues != null && au.customFieldValues != null) {
-				result = result & this.customFieldValues.equals(au.customFieldValues);
-			}
-			return result;
-		}
-		return false;
-	}
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof EventUniq) {
+            EventUniq au = (EventUniq) obj;
+            boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId);
+            if (this.customFieldValues != null && au.customFieldValues != null) {
+                result = result & this.customFieldValues.equals(au.customFieldValues);
+            }
+            return result;
+        }
+        return false;
+    }
 
-	@Override
-	public int hashCode() {
-		HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId);
+    @Override
+    public int hashCode() {
+        HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId);
 
-		if (customFieldValues != null){
-			builder.append(customFieldValues);
-		}
-		return builder.build();
-	}
+        if (customFieldValues != null) {
+            builder.append(customFieldValues);
+        }
+        return builder.build();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
index a448703..aa22884 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java
@@ -28,8 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * @since Jul 9, 2016
- *
+ * @since Jul 9, 2016.
  */
 public class JsonEventSerializer implements IEventSerializer {
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
index fb038ba..423017b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
@@ -17,13 +17,13 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.Properties;
+
 /**
  * The producer is thread safe and sharing a single producer instance across threads will generally be faster than
  * having multiple instances.
@@ -33,16 +33,16 @@ public class KafkaProducerManager {
 
     private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
     private static final String STRING_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
-    
+
     private static final String VALUE_DESERIALIZER = "value.deserializer";
     private static final String VALUE_DESERIALIZER_UNDERSCORE = "value_deserializer";
-    
+
     private static final String VALUE_SERIALIZER = "value.serializer";
     private static final String VALUE_SERIALIZER_UNDERSCORE = "value_serializer";
-    
+
     private static final String KEY_DESERIALIZER = "key.deserializer";
     private static final String KEY_DESERIALIZER_UNDERSCORE = "key_deserializer";
-    
+
     private static final String KEY_SERIALIZER = "key.serializer";
     private static final String KEY_SERIALIZER_UNDERSCORE = "key_serializer";
 
@@ -73,7 +73,7 @@ public class KafkaProducerManager {
             configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER);
         }
         configMap.put("request.required.acks", "1");
-        
+
         // value deserializer
         if (kafkaConfig.containsKey(VALUE_DESERIALIZER_UNDERSCORE)) {
             configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER_UNDERSCORE));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
index c165686..44f902c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java
@@ -19,9 +19,9 @@
 package org.apache.eagle.alert.engine.publisher.impl;
 
 /**
- * Object that holds the status of Notification Posted to Notification Plugin  
+ * Object that holds the status of Notification Posted to Notification Plugin.
  */
 public class PublishStatus {
-	public boolean successful;
-	public String errorMessage;
+    public boolean successful;
+    public String errorMessage;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
index 012ebaa..5eb444d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java
@@ -16,17 +16,16 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.codec.IEventSerializer;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 
+import java.util.Map;
+
 /**
- * @since Jun 3, 2016
- *
+ * @since Jun 3, 2016.
  */
 public class StringEventSerializer implements IEventSerializer {
-    
+
     @SuppressWarnings("rawtypes")
     public StringEventSerializer(Map stormConf) {
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
index e1f3e9c..84345dd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java
@@ -19,11 +19,11 @@
 
 package org.apache.eagle.alert.engine.router;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
+import java.util.Map;
+
 /**
  * Since 5/2/16.
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
index aa40dc5..598ce18 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java
@@ -19,11 +19,11 @@
 
 package org.apache.eagle.alert.engine.router;
 
-import java.util.Map;
-
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 
+import java.util.Map;
+
 /**
  * Since 5/3/16.
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
index 07518d9..049e852 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java
@@ -1,42 +1,43 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
  *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.io.Serializable;
+
+/**
  * <b></b>
  * 1. Group by SingleStream[stream_1.col1]
  *
- * Shuffle(stream_1,[col1])
+ * <p>Shuffle(stream_1,[col1])</p>
  *
  * <b></b>
  * 2. Group by SingleStream[stream_1.col1,stream_1.col2]
  *
- * Shuffle(stream_1,[col1,col2])
+ * <p>Shuffle(stream_1,[col1,col2])</p>
  *
  * <b></b>
  * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3]
- *
- * Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)
+ * <p>Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)</p>
  */
-public class StreamRoute implements Serializable{
+public class StreamRoute implements Serializable {
     private static final long serialVersionUID = 4649184902196034940L;
 
     private String targetComponentId;
@@ -51,7 +52,7 @@ public class StreamRoute implements Serializable{
         this.targetComponentId = targetComponentId;
     }
 
-    public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type){
+    public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type) {
         this.setTargetComponentId(targetComponentId);
         this.setPartitionKey(partitionKey);
         this.setPartitionType(type);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
index 6dcd312..0d397e4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java
@@ -16,15 +16,15 @@
  */
 package org.apache.eagle.alert.engine.router;
 
-import java.util.List;
-
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner;
 
+import java.util.List;
+
 public class StreamRoutePartitionFactory {
     /**
-     * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner
+     * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner.
      *
      * @param outputComponentIds
      * @param streamDefinition

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
index 213abef..5b5632d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,14 +16,10 @@
  */
 package org.apache.eagle.alert.engine.router;
 
-import java.util.List;
-
 import org.apache.eagle.alert.engine.model.StreamEvent;
 
+import java.util.List;
+
 public interface StreamRoutePartitioner {
-    /**
-     * @param event
-     * @return
-     */
     List<StreamRoute> partition(StreamEvent event);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
index abc465d..dfd2cc4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java
@@ -1,12 +1,4 @@
-package org.apache.eagle.alert.engine.router;
-
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -22,6 +14,14 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eagle.alert.engine.router;
+
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+import java.util.Collection;
+import java.util.Map;
+
 public interface StreamRouteSpecListener {
     void onStreamRouterSpecChange(Collection<StreamRouterSpec> added,
                                   Collection<StreamRouterSpec> removed,


Mime
View raw message