Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7A3DD200B7C for ; Thu, 8 Sep 2016 09:14:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 78D7A160AA5; Thu, 8 Sep 2016 07:14:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ECC6B160AD1 for ; Thu, 8 Sep 2016 09:14:26 +0200 (CEST) Received: (qmail 88312 invoked by uid 500); 8 Sep 2016 07:14:26 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 88293 invoked by uid 99); 8 Sep 2016 07:14:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 07:14:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 858FCC0C10 for ; Thu, 8 Sep 2016 07:14:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.344 X-Spam-Level: X-Spam-Status: No, score=-4.344 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.124] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id jQntfkNSZpId for ; Thu, 8 Sep 2016 07:14:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7BAFE60E30 for ; Thu, 8 Sep 2016 07:14:04 +0000 (UTC) Received: (qmail 86804 invoked by uid 99); 8 Sep 2016 07:14:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 07:14:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0C95FE04BA; Thu, 8 Sep 2016 07:14:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Date: Thu, 08 Sep 2016 07:14:12 -0000 Message-Id: <24ad7832f28c474397621bfecfc25e99@git.apache.org> In-Reply-To: <4ff9bd06edd34702805de83004f2559e@git.apache.org> References: <4ff9bd06edd34702805de83004f2559e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation archived-at: Thu, 08 Sep 2016 07:14:29 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java index fc1fc28..b03305f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java @@ -1,11 +1,4 @@ -package org.apache.eagle.alert.engine.publisher; - -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -21,6 +14,13 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.publisher; + +import org.apache.eagle.alert.coordination.model.PublishSpec; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; + +import java.util.Map; + public interface AlertPublishSpecListener { void onAlertPublishSpecChange(PublishSpec spec, Map sds); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java index 5c0e597..cdd52db 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java @@ -1,14 +1,4 @@ -package org.apache.eagle.alert.engine.publisher; - - -import java.io.Serializable; -import java.util.Map; - -import org.apache.eagle.alert.engine.model.AlertStreamEvent; - -import com.typesafe.config.Config; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -24,10 +14,21 @@ import com.typesafe.config.Config; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.publisher; + +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import com.typesafe.config.Config; + +import java.io.Serializable; +import java.util.Map; + public interface AlertPublisher extends AlertPublishListener, Serializable { @SuppressWarnings("rawtypes") void init(Config config, Map stormConfig); + String getName(); + void nextEvent(AlertStreamEvent event); + void close(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java index 91c9296..991bba0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java @@ -1,8 +1,4 @@ -package org.apache.eagle.alert.engine.publisher; - -import java.io.Serializable; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -18,16 +14,12 @@ import java.io.Serializable; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.publisher; + +import java.io.Serializable; + public interface AlertSink extends Serializable { - /** - * - * @throws Exception - */ void open() throws Exception; - /** - * - * @throws Exception - */ void close() throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java index 8723283..77cb1a7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java @@ -21,7 +21,7 @@ import java.util.Map; /** * alert email bean - * one email consists of a list of email component + * one email consists of a list of email component. */ public class AlertEmailContext { private Map alertContext; @@ -38,33 +38,43 @@ public class AlertEmailContext { public void setAlertContext(Map alertContext) { this.alertContext = alertContext; } + public String getVelocityTplFile() { return velocityTplFile; } + public void setVelocityTplFile(String velocityTplFile) { this.velocityTplFile = velocityTplFile; } + public String getRecipients() { return recipients; } + public void setRecipients(String recipients) { this.recipients = recipients; } + public String getSender() { return sender; } + public void setSender(String sender) { this.sender = sender; } + public String getSubject() { return subject; } + public void setSubject(String subject) { this.subject = subject; } + public String getCc() { return cc; } + public void setCc(String cc) { this.cc = cc; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java index 0c68629..06bd9ac 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java @@ -20,20 +20,16 @@ */ package org.apache.eagle.alert.engine.publisher.email; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.alert.utils.DateTimeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; + public class AlertEmailGenerator { private String tplFile; private String sender; @@ -43,9 +39,9 @@ public class AlertEmailGenerator { private ThreadPoolExecutor executorPool; - private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class); + private static final Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class); - private final static long MAX_TIMEOUT_MS =60000; + private static final long MAX_TIMEOUT_MS = 60000; public boolean sendAlertEmail(AlertStreamEvent entity) { return sendAlertEmail(entity, recipients, null); @@ -64,11 +60,13 @@ public class AlertEmailGenerator { email.setSender(sender); email.setRecipients(recipients); email.setCc(cc); - - /** asynchronized email sending */ + + /** asynchronized email sending. */ AlertEmailSender thread = new AlertEmailSender(email, properties); - if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet"); + if (this.executorPool == null) { + throw new IllegalStateException("Invoking thread executor pool but it's is not set yet"); + } LOG.info("Sending email in asynchronous to: " + recipients + ", cc: " + cc); Future future = this.executorPool.submit(thread); @@ -79,10 +77,10 @@ public class AlertEmailGenerator { //LOG.info(String.format("Successfully send email to %s", recipients)); } catch (InterruptedException | ExecutionException e) { status = false; - LOG.error(String.format("Failed to send email to %s, due to:%s", recipients, e),e); + LOG.error(String.format("Failed to send email to %s, due to:%s", recipients, e), e); } catch (TimeoutException e) { status = false; - LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ", recipients, MAX_TIMEOUT_MS),e); + LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ", recipients, MAX_TIMEOUT_MS), e); } return status; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java index e9635dd..b894b23 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java @@ -22,38 +22,46 @@ import java.util.concurrent.ThreadPoolExecutor; public class AlertEmailGeneratorBuilder { private AlertEmailGenerator generator; - private AlertEmailGeneratorBuilder(){ + + private AlertEmailGeneratorBuilder() { generator = new AlertEmailGenerator(); } - public static AlertEmailGeneratorBuilder newBuilder(){ + + public static AlertEmailGeneratorBuilder newBuilder() { return new AlertEmailGeneratorBuilder(); } - public AlertEmailGeneratorBuilder withSubject(String subject){ + + public AlertEmailGeneratorBuilder withSubject(String subject) { generator.setSubject(subject); return this; } - public AlertEmailGeneratorBuilder withSender(String sender){ + + public AlertEmailGeneratorBuilder withSender(String sender) { generator.setSender(sender); return this; } - public AlertEmailGeneratorBuilder withRecipients(String recipients){ + + public AlertEmailGeneratorBuilder withRecipients(String recipients) { generator.setRecipients(recipients); return this; } - public AlertEmailGeneratorBuilder withTplFile(String tplFile){ + + public AlertEmailGeneratorBuilder withTplFile(String tplFile) { generator.setTplFile(tplFile); return this; } + public AlertEmailGeneratorBuilder withMailProps(Map mailProps) { generator.setProperties(mailProps); return this; } + public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) { generator.setExecutorPool(threadPoolExecutor); return this; } - public AlertEmailGenerator build(){ + public AlertEmailGenerator build() { return this.generator; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java index 4091457..344b3aa 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java @@ -17,18 +17,18 @@ */ package org.apache.eagle.alert.engine.publisher.email; -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.alert.utils.DateTimeUtil; import org.apache.velocity.VelocityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + public class AlertEmailSender implements Runnable { protected final List> alertContexts = new ArrayList>(); @@ -40,25 +40,26 @@ public class AlertEmailSender implements Runnable { protected final String origin; protected boolean sentSuccessfully = false; - private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class); - private final static int MAX_RETRY_COUNT = 3; - + private static final Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class); + private static final int MAX_RETRY_COUNT = 3; private Map mailProps; private String threadName; + /** - * Derived class may have some additional context properties to add + * Derived class may have some additional context properties to add. + * * @param context velocity context - * @param env environment + * @param env environment */ protected void additionalContext(VelocityContext context, String env) { // By default there's no additional context added } - public AlertEmailSender(AlertEmailContext alertEmail){ + public AlertEmailSender(AlertEmailContext alertEmail) { this.recipients = alertEmail.getRecipients(); this.configFileName = alertEmail.getVelocityTplFile(); this.subject = alertEmail.getSubject(); @@ -69,16 +70,18 @@ public class AlertEmailSender implements Runnable { String tmp = ManagementFactory.getRuntimeMXBean().getName(); this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")"; threadName = Thread.currentThread().getName(); - LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipients +", velocity TPL file: " + this.configFileName); + LOG.info("Initialized " + threadName + ": origin is : " + this.origin + ", recipient of the email: " + this.recipients + ", velocity TPL file: " + this.configFileName); } - public AlertEmailSender(AlertEmailContext alertEmail, Map mailProps){ + public AlertEmailSender(AlertEmailContext alertEmail, Map mailProps) { this(alertEmail); this.mailProps = mailProps; } private Properties parseMailClientConfig(Map mailProps) { - if (mailProps == null) return null; + if (mailProps == null) { + return null; + } Properties props = new Properties(); String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_HOST); String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT); @@ -103,7 +106,7 @@ public class AlertEmailSender implements Runnable { if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) { props.put("mail.smtp.socketFactory.port", "465"); props.put("mail.smtp.socketFactory.class", - "javax.net.ssl.SSLSocketFactory"); + "javax.net.ssl.SSLSocketFactory"); } props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false")); return props; @@ -113,15 +116,14 @@ public class AlertEmailSender implements Runnable { public void run() { int count = 0; boolean success = false; - while(count++ < MAX_RETRY_COUNT && !success){ - LOG.info("Sending email, tried: " + count+", max: " + MAX_RETRY_COUNT); + while (count++ < MAX_RETRY_COUNT && !success) { + LOG.info("Sending email, tried: " + count + ", max: " + MAX_RETRY_COUNT); try { final EagleMailClient client; if (mailProps != null) { Properties props = parseMailClientConfig(mailProps); client = new EagleMailClient(props); - } - else { + } else { client = new EagleMailClient(); } @@ -137,19 +139,18 @@ public class AlertEmailSender implements Runnable { success = client.send(sender, recipients, cc, title, configFileName, context, null); LOG.info("Success of sending email: " + success); - if(!success && count < MAX_RETRY_COUNT) { + if (!success && count < MAX_RETRY_COUNT) { LOG.info("Sleep for a while before retrying"); Thread.sleep(10 * 1000); } - } - catch (Exception e){ + } catch (Exception e) { LOG.warn("Sending mail exception", e); } } if (success) { sentSuccessfully = true; LOG.info(String.format("Successfully send email, thread: %s", threadName)); - } else{ + } else { LOG.warn(String.format("Fail sending email after tries %s times, thread: %s", MAX_RETRY_COUNT, threadName)); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java index 21fe354..7147894 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java @@ -17,219 +17,214 @@ */ package org.apache.eagle.alert.engine.publisher.email; +import org.apache.velocity.Template; +import org.apache.velocity.VelocityContext; +import org.apache.velocity.app.VelocityEngine; +import org.apache.velocity.exception.ResourceNotFoundException; +import org.apache.velocity.runtime.RuntimeConstants; +import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; - import javax.activation.DataHandler; import javax.activation.DataSource; import javax.activation.FileDataSource; -import javax.mail.Authenticator; -import javax.mail.Message; -import javax.mail.MessagingException; -import javax.mail.Multipart; -import javax.mail.PasswordAuthentication; -import javax.mail.Session; -import javax.mail.Transport; -import javax.mail.internet.AddressException; -import javax.mail.internet.InternetAddress; -import javax.mail.internet.MimeBodyPart; -import javax.mail.internet.MimeMessage; -import javax.mail.internet.MimeMultipart; - -import org.apache.velocity.Template; -import org.apache.velocity.VelocityContext; -import org.apache.velocity.app.VelocityEngine; -import org.apache.velocity.exception.ResourceNotFoundException; -import org.apache.velocity.runtime.RuntimeConstants; -import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import javax.mail.*; +import javax.mail.internet.*; public class EagleMailClient { - private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class); - private static final String BASE_PATH = "templates/"; - - private VelocityEngine velocityEngine; - private Session session; - - public EagleMailClient() { - this(new Properties()); - } - - public EagleMailClient(final Properties config) { - try { - velocityEngine = new VelocityEngine(); - velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath"); - velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName()); - velocityEngine.init(); - - config.put("mail.transport.protocol", "smtp"); - if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){ - session = Session.getInstance(config, new Authenticator() { - protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication( - config.getProperty(AlertEmailConstants.CONF_AUTH_USER), - config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD) - ); - } - }); - } else { - session = Session.getInstance(config, new Authenticator() {}); - } - - final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false"); - final boolean debug = Boolean.parseBoolean(debugMode); + private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class); + private static final String BASE_PATH = "templates/"; + + private VelocityEngine velocityEngine; + private Session session; + + public EagleMailClient() { + this(new Properties()); + } + + public EagleMailClient(final Properties config) { + try { + velocityEngine = new VelocityEngine(); + velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath"); + velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName()); + velocityEngine.init(); + + config.put("mail.transport.protocol", "smtp"); + if (Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))) { + session = Session.getInstance(config, new Authenticator() { + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication( + config.getProperty(AlertEmailConstants.CONF_AUTH_USER), + config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD) + ); + } + }); + } else { + session = Session.getInstance(config, new Authenticator() { + }); + } + + final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false"); + final boolean debug = Boolean.parseBoolean(debugMode); LOG.info("Set email debug mode: " + debugMode); - session.setDebug(debug); - } catch (Exception e) { + session.setDebug(debug); + } catch (Exception e) { LOG.error("Failed to connect to smtp server", e); - } - } - - private boolean _send(String from, String to, String cc, String title, String content) { - Message msg = new MimeMessage(session); - try { - msg.setFrom(new InternetAddress(from)); - msg.setSubject(title); - if (to != null) { - msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to)); - } - if (cc != null) { - msg.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc)); - } - //msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS)); - msg.setContent(content, "text/html;charset=utf-8"); - LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title)); - Transport.send(msg); - return true; - } catch (AddressException e) { - LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); - return false; - } catch (MessagingException e) { - LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); - return false; - } - } - - private boolean _send(String from,String to,String cc,String title,String content,List attachments){ - MimeMessage mail = new MimeMessage(session); - try { - mail.setFrom(new InternetAddress(from)); - mail.setSubject(title); - if (to != null) { - mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to)); - } - if (cc != null) { - mail.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc)); - } - - //mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS)); - - MimeBodyPart mimeBodyPart = new MimeBodyPart(); - mimeBodyPart.setContent(content,"text/html;charset=utf-8"); - - Multipart multipart = new MimeMultipart(); - multipart.addBodyPart(mimeBodyPart); - - for(MimeBodyPart attachment:attachments){ - multipart.addBodyPart(attachment); - } - - mail.setContent(multipart); -// mail.setContent(content, "text/html;charset=utf-8"); - LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title)); - Transport.send(mail); - return true; - } catch (AddressException e) { - LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); - return false; - } catch (MessagingException e) { - LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); - return false; - } - } - - public boolean send(String from, String to, String cc, String title, - String content) { - return this._send(from, to, cc, title, content); - } - - public boolean send(String from, String to, String cc, String title, - String templatePath, VelocityContext context) { - Template t = null; - try { - t = velocityEngine.getTemplate(BASE_PATH + templatePath); - } catch (ResourceNotFoundException ex) { - } - if (t == null) { - try { - t = velocityEngine.getTemplate(templatePath); - } catch (ResourceNotFoundException e) { - t = velocityEngine.getTemplate("/" + templatePath); - } - } - final StringWriter writer = new StringWriter(); - t.merge(context, writer); - if(LOG.isDebugEnabled()) LOG.debug(writer.toString()); - - return this.send(from, to, cc, title, writer.toString()); - } - - public boolean send(String from, String to, String cc, String title, - String templatePath, VelocityContext context, Map attachments) { - if (attachments == null || attachments.isEmpty()) { - return send(from, to, cc, title, templatePath, context); - } - Template t = null; - - List mimeBodyParts = new ArrayList(); - - for (Map.Entry entry : attachments.entrySet()) { - final String attachment = entry.getKey(); - final File attachmentFile = entry.getValue(); - final MimeBodyPart mimeBodyPart = new MimeBodyPart(); - if(attachmentFile !=null && attachmentFile.exists()){ - DataSource source = new FileDataSource(attachmentFile); - try { - mimeBodyPart.setDataHandler(new DataHandler(source)); - mimeBodyPart.setFileName(attachment); - mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT); - mimeBodyPart.setContentID(attachment); - mimeBodyParts.add(mimeBodyPart); - } catch (MessagingException e) { - LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e); - } - }else{ - LOG.error("Attachment: " + attachment + " is null or not exists"); - } - } - - try { - t = velocityEngine.getTemplate(BASE_PATH + templatePath); - } catch (ResourceNotFoundException ex) { -// LOGGER.error("Template not found:"+BASE_PATH + templatePath, ex); - } - - if (t == null) { - try { - t = velocityEngine.getTemplate(templatePath); - } catch (ResourceNotFoundException e) { - try { - t = velocityEngine.getTemplate("/" + templatePath); - } catch (Exception ex) { - LOG.error("Template not found:"+ "/" + templatePath, ex); - } - } - } - - final StringWriter writer = new StringWriter(); - t.merge(context, writer); - if(LOG.isDebugEnabled()) LOG.debug(writer.toString()); - - return this._send(from, to, cc, title, writer.toString(), mimeBodyParts); - } + } + } + + private boolean sendInternal(String from, String to, String cc, String title, String content) { + Message msg = new MimeMessage(session); + try { + msg.setFrom(new InternetAddress(from)); + msg.setSubject(title); + if (to != null) { + msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to)); + } + if (cc != null) { + msg.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc)); + } + //msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS)); + msg.setContent(content, "text/html;charset=utf-8"); + LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title)); + Transport.send(msg); + return true; + } catch (AddressException e) { + LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); + return false; + } catch (MessagingException e) { + LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); + return false; + } + } + + private boolean sendInternal(String from, String to, String cc, String title, String content, List attachments) { + MimeMessage mail = new MimeMessage(session); + try { + mail.setFrom(new InternetAddress(from)); + mail.setSubject(title); + if (to != null) { + mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to)); + } + if (cc != null) { + mail.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc)); + } + + //mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS)); + + MimeBodyPart mimeBodyPart = new MimeBodyPart(); + mimeBodyPart.setContent(content, "text/html;charset=utf-8"); + + Multipart multipart = new MimeMultipart(); + multipart.addBodyPart(mimeBodyPart); + + for (MimeBodyPart attachment : attachments) { + multipart.addBodyPart(attachment); + } + + mail.setContent(multipart); + // mail.setContent(content, "text/html;charset=utf-8"); + LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title)); + Transport.send(mail); + return true; + } catch (AddressException e) { + LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); + return false; + } catch (MessagingException e) { + LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e); + return false; + } + } + + public boolean send(String from, String to, String cc, String title, + String content) { + return this.sendInternal(from, to, cc, title, content); + } + + public boolean send(String from, String to, String cc, String title, + String templatePath, VelocityContext context) { + Template t = null; + try { + t = velocityEngine.getTemplate(BASE_PATH + templatePath); + } catch (ResourceNotFoundException ex) { + // ignored + } + if (t == null) { + try { + t = velocityEngine.getTemplate(templatePath); + } catch (ResourceNotFoundException e) { + t = velocityEngine.getTemplate("/" + templatePath); + } + } + final StringWriter writer = new StringWriter(); + t.merge(context, writer); + if (LOG.isDebugEnabled()) { + LOG.debug(writer.toString()); + } + + return this.send(from, to, cc, title, writer.toString()); + } + + public boolean send(String from, String to, String cc, String title, + String templatePath, VelocityContext context, Map attachments) { + if (attachments == null || attachments.isEmpty()) { + return send(from, to, cc, title, templatePath, context); + } + Template t = null; + + List mimeBodyParts = new ArrayList(); + + for (Map.Entry entry : attachments.entrySet()) { + final String attachment = entry.getKey(); + final File attachmentFile = entry.getValue(); + final MimeBodyPart mimeBodyPart = new MimeBodyPart(); + if (attachmentFile != null && attachmentFile.exists()) { + DataSource source = new FileDataSource(attachmentFile); + try { + mimeBodyPart.setDataHandler(new DataHandler(source)); + mimeBodyPart.setFileName(attachment); + mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT); + mimeBodyPart.setContentID(attachment); + mimeBodyParts.add(mimeBodyPart); + } catch (MessagingException e) { + LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e); + } + } else { + LOG.error("Attachment: " + attachment + " is null or not exists"); + } + } + + try { + t = velocityEngine.getTemplate(BASE_PATH + templatePath); + } catch (ResourceNotFoundException ex) { + // ignored + } + + if (t == null) { + try { + t = velocityEngine.getTemplate(templatePath); + } catch (ResourceNotFoundException e) { + try { + t = velocityEngine.getTemplate("/" + templatePath); + } catch (Exception ex) { + LOG.error("Template not found:" + "/" + templatePath, ex); + } + } + } + + final StringWriter writer = new StringWriter(); + t.merge(context, writer); + if (LOG.isDebugEnabled()) { + LOG.debug(writer.toString()); + } + + return this.sendInternal(from, to, cc, title, writer.toString(), mimeBodyParts); + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index 7a67011..31110ef 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -16,19 +16,18 @@ */ package org.apache.eagle.alert.engine.publisher.impl; -import com.typesafe.config.Config; import org.apache.eagle.alert.engine.codec.IEventSerializer; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; +import com.typesafe.config.Config; import org.slf4j.Logger; import java.util.Map; /** - * @since Jun 3, 2016 - * + * @since Jun 3, 2016. */ public abstract class AbstractPublishPlugin implements AlertPublishPlugin { @@ -47,9 +46,9 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin { Object obj = Class.forName(serializerClz).getConstructor(Map.class).newInstance(conf); if (!(obj instanceof IEventSerializer)) { throw new Exception(String.format("serializer %s of publishment %s is not subclass to %s!", - publishment.getSerializer(), - publishment.getName(), - IEventSerializer.class.getName())); + publishment.getSerializer(), + publishment.getName(), + IEventSerializer.class.getName())); } serializer = (IEventSerializer) obj; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java index a87fc7d..65837dd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java @@ -18,13 +18,13 @@ package org.apache.eagle.alert.engine.publisher.impl; -import com.typesafe.config.Config; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator; import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder; +import com.typesafe.config.Config; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +37,9 @@ import java.util.concurrent.TimeUnit; public class AlertEmailPublisher extends AbstractPublishPlugin { private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class); - private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4; - private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8; - private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute + private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 4; + private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 8; + private static final long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute private AlertEmailGenerator emailGenerator; private Map emailConfig; @@ -60,18 +60,18 @@ public class AlertEmailPublisher extends AbstractPublishPlugin { @Override public void onAlert(AlertStreamEvent event) throws Exception { - if(emailGenerator == null) { + if (emailGenerator == null) { LOG.warn("emailGenerator is null due to the incorrect configurations"); return; } event = dedup(event); - if(event == null) { + if (event == null) { return; } boolean isSuccess = emailGenerator.sendAlertEmail(event); PublishStatus status = new PublishStatus(); - if(!isSuccess) { + if (!isSuccess) { status.errorMessage = "Failed to send email"; status.successful = false; } else { @@ -85,7 +85,7 @@ public class AlertEmailPublisher extends AbstractPublishPlugin { public void update(String dedupIntervalMin, Map pluginProperties) { super.update(dedupIntervalMin, pluginProperties); - if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) { + if (pluginProperties != null && !emailConfig.equals(pluginProperties)) { emailConfig = new HashMap<>(pluginProperties); emailGenerator = createEmailGenerator(pluginProperties); } @@ -96,10 +96,6 @@ public class AlertEmailPublisher extends AbstractPublishPlugin { this.executorPool.shutdown(); } - /** - * @param notificationConfig - * @return - */ private AlertEmailGenerator createEmailGenerator(Map notificationConfig) { String tplFileName = notificationConfig.get(PublishConstants.TEMPLATE); if (tplFileName == null || tplFileName.equals("")) { @@ -111,31 +107,33 @@ public class AlertEmailPublisher extends AbstractPublishPlugin { } String sender = notificationConfig.get(PublishConstants.SENDER); String recipients = notificationConfig.get(PublishConstants.RECIPIENTS); - if(sender == null || recipients == null) { + if (sender == null || recipients == null) { LOG.warn("email sender or recipients is null"); return null; } - AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder(). - withMailProps(notificationConfig). - withSubject(subject). - withSender(sender). - withRecipients(recipients). - withTplFile(tplFileName). - withExecutorPool(this.executorPool).build(); + AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder() + .withMailProps(notificationConfig) + .withSubject(subject) + .withSender(sender) + .withRecipients(recipients) + .withTplFile(tplFileName) + .withExecutorPool(this.executorPool).build(); return gen; } @Override - public int hashCode(){ + public int hashCode() { return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode(); } @Override - public boolean equals(Object o){ - if(o == this) + public boolean equals(Object o) { + if (o == this) { return true; - if(!(o instanceof AlertEmailPublisher)) + } + if (!(o instanceof AlertEmailPublisher)) { return false; + } return true; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java index ca46aeb..048424c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java @@ -18,21 +18,20 @@ package org.apache.eagle.alert.engine.publisher.impl; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.PublishConstants; +import com.typesafe.config.Config; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.typesafe.config.Config; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class AlertKafkaPublisher extends AbstractPublishPlugin { @@ -57,7 +56,7 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { } } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings( {"unchecked", "rawtypes"}) @Override public void onAlert(AlertStreamEvent event) throws Exception { if (producer == null) { @@ -65,7 +64,7 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { return; } event = dedup(event); - if(event == null) { + if (event == null) { return; } PublishStatus status = new PublishStatus(); @@ -86,7 +85,7 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { status.successful = false; status.errorMessage = String.format("Failed to send message to %s, due to:%s", brokerList, e); LOG.error(status.errorMessage, e); - } catch (Exception ex ) { + } catch (Exception ex) { LOG.error("fail writing alert to Kafka bus", ex); status.successful = false; status.errorMessage = ex.getMessage(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java index 82be5a0..957d356 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java @@ -18,17 +18,16 @@ package org.apache.eagle.alert.engine.publisher.impl; -import java.util.Map; - import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; +import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.typesafe.config.Config; +import java.util.Map; /** - * @Since on 5/11/16. + * @since on 5/11/16. */ public class AlertPublishPluginsFactory { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index f9306dc..fe1438e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -17,25 +17,24 @@ package org.apache.eagle.alert.engine.publisher.impl; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.collections.ListUtils; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.AlertPublisher; +import com.typesafe.config.Config; +import org.apache.commons.collections.ListUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.typesafe.config.Config; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; @SuppressWarnings("rawtypes") public class AlertPublisherImpl implements AlertPublisher { private static final long serialVersionUID = 4809983246198138865L; - private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class); private final String name; private volatile Map> policyPublishPluginMapping = new ConcurrentHashMap<>(1); @@ -60,21 +59,24 @@ public class AlertPublisherImpl implements AlertPublisher { @Override public void nextEvent(AlertStreamEvent event) { - if(LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug(event.toString()); + } notifyAlert(event); } private void notifyAlert(AlertStreamEvent event) { String policyId = event.getPolicyId(); - if(policyId == null || !policyPublishPluginMapping.containsKey(policyId)) { + if (policyId == null || !policyPublishPluginMapping.containsKey(policyId)) { LOG.warn("Policy {} does NOT subscribe any publishments", policyId); return; } - for(String id: policyPublishPluginMapping.get(policyId)) { + for (String id : policyPublishPluginMapping.get(policyId)) { AlertPublishPlugin plugin = publishPluginMapping.get(id); try { - if(LOG.isDebugEnabled()) LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Execute alert publisher " + plugin.getClass().getCanonicalName()); + } plugin.onAlert(event); } catch (Exception ex) { LOG.error("Fail invoking publisher's onAlert, continue ", ex); @@ -93,10 +95,18 @@ public class AlertPublisherImpl implements AlertPublisher { List removed, List afterModified, List beforeModified) { - if (added == null) added = new ArrayList<>(); - if (removed == null) removed = new ArrayList<>(); - if (afterModified == null) afterModified = new ArrayList<>(); - if (beforeModified == null) beforeModified = new ArrayList<>(); + if (added == null) { + added = new ArrayList<>(); + } + if (removed == null) { + removed = new ArrayList<>(); + } + if (afterModified == null) { + afterModified = new ArrayList<>(); + } + if (beforeModified == null) { + beforeModified = new ArrayList<>(); + } if (afterModified.size() != beforeModified.size()) { LOG.warn("beforeModified size != afterModified size"); @@ -109,7 +119,7 @@ public class AlertPublisherImpl implements AlertPublisher { } AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); - if(plugin != null) { + if (plugin != null) { publishPluginMapping.put(publishment.getName(), plugin); onPolicyAdded(publishment.getPolicyIds(), publishment.getName()); } else { @@ -127,7 +137,7 @@ public class AlertPublisherImpl implements AlertPublisher { List newPolicies = afterModified.get(i).getPolicyIds(); List oldPolicies = beforeModified.get(i).getPolicyIds(); - if (! newPolicies.equals(oldPolicies)) { + if (!newPolicies.equals(oldPolicies)) { List deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies); onPolicyDeleted(deletedPolicies, pubName); List addedPolicies = ListUtils.subtract(newPolicies, oldPolicies); @@ -139,7 +149,9 @@ public class AlertPublisherImpl implements AlertPublisher { } private synchronized void onPolicyAdded(List addedPolicyIds, String pubName) { - if (addedPolicyIds == null || pubName == null) return; + if (addedPolicyIds == null || pubName == null) { + return; + } for (String policyId : addedPolicyIds) { if (policyPublishPluginMapping.get(policyId) == null) { @@ -151,7 +163,9 @@ public class AlertPublisherImpl implements AlertPublisher { } private synchronized void onPolicyDeleted(List deletedPolicyIds, String pubName) { - if (deletedPolicyIds == null || pubName == null) return; + if (deletedPolicyIds == null || pubName == null) { + return; + } for (String policyId : deletedPolicyIds) { List publishIds = policyPublishPluginMapping.get(policyId); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java index 43be2e1..258f613 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java @@ -17,10 +17,10 @@ */ package org.apache.eagle.alert.engine.publisher.impl; -import org.apache.commons.lang.time.DateUtils; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; +import org.apache.commons.lang.time.DateUtils; import org.joda.time.Period; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,121 +32,118 @@ import java.util.Map; import java.util.Map.Entry; public class DefaultDeduplicator implements AlertDeduplicator { - private long dedupIntervalMin; - private List customDedupFields = new ArrayList<>(); - private volatile Map events = new HashMap<>(); - private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class); - - public enum AlertDeduplicationStatus{ - NEW, - DUPLICATED, - IGNORED - } - - public DefaultDeduplicator() { - this.dedupIntervalMin = 0; - } - - public DefaultDeduplicator(String intervalMin) { - setDedupIntervalMin(intervalMin); - } - - public DefaultDeduplicator(long intervalMin) { - this.dedupIntervalMin = intervalMin; - } - - public DefaultDeduplicator(String intervalMin, List customDedupFields) { - setDedupIntervalMin(intervalMin); - if (customDedupFields != null){ - this.customDedupFields = customDedupFields; - } - } - - public void clearOldCache() { - List removedkeys = new ArrayList<>(); - for (Entry entry : events.entrySet()) { - EventUniq entity = entry.getKey(); - if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) { - removedkeys.add(entry.getKey()); - } - } - for (EventUniq alertKey : removedkeys) { - events.remove(alertKey); - } - } - - /*** - * - * @param key - * @return - */ - public AlertDeduplicationStatus checkDedup(EventUniq key) { - long current = key.timestamp; - if(!events.containsKey(key)) { - events.put(key, current); - return AlertDeduplicationStatus.NEW; - } - - long last = events.get(key); - if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) { - events.put(key, current); - return AlertDeduplicationStatus.IGNORED; - } - - return AlertDeduplicationStatus.DUPLICATED; - } - - public AlertStreamEvent dedup(AlertStreamEvent event) { - if (event == null) return null; - clearOldCache(); - AlertStreamEvent result = null; - - // check custom field, and get the field values - StreamDefinition streamDefinition = event.getSchema(); - HashMap customFieldValues = new HashMap<>(); - for (int i = 0; i < event.getData().length; i++){ - if (i > streamDefinition.getColumns().size()) { - if (LOG.isWarnEnabled()) { - LOG.warn("output column does not found for event data, this indicate code error!"); - } - continue; - } - String colName = streamDefinition.getColumns().get(i).getName(); - - for (String field : customDedupFields){ - if (colName.equals(field)){ - customFieldValues.put(field, event.getData()[i].toString()); - break; - } - } - } - - AlertDeduplicationStatus status = checkDedup( - new EventUniq(event.getStreamId(), - event.getPolicyId(), - event.getCreatedTime(), - customFieldValues)); - if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) { - result = event; - } else if(LOG.isDebugEnabled()){ - LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString()); - } - return result; - } - - @Override - public void setDedupIntervalMin(String newDedupIntervalMin) { - if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) { - dedupIntervalMin = 0; + private long dedupIntervalMin; + private List customDedupFields = new ArrayList<>(); + private volatile Map events = new HashMap<>(); + private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class); + + public enum AlertDeduplicationStatus { + NEW, + DUPLICATED, + IGNORED + } + + public DefaultDeduplicator() { + this.dedupIntervalMin = 0; + } + + public DefaultDeduplicator(String intervalMin) { + setDedupIntervalMin(intervalMin); + } + + public DefaultDeduplicator(long intervalMin) { + this.dedupIntervalMin = intervalMin; + } + + public DefaultDeduplicator(String intervalMin, List customDedupFields) { + setDedupIntervalMin(intervalMin); + if (customDedupFields != null) { + this.customDedupFields = customDedupFields; + } + } + + public void clearOldCache() { + List removedkeys = new ArrayList<>(); + for (Entry entry : events.entrySet()) { + EventUniq entity = entry.getKey(); + if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) { + removedkeys.add(entry.getKey()); + } + } + for (EventUniq alertKey : removedkeys) { + events.remove(alertKey); + } + } + + public AlertDeduplicationStatus checkDedup(EventUniq key) { + long current = key.timestamp; + if (!events.containsKey(key)) { + events.put(key, current); + return AlertDeduplicationStatus.NEW; + } + + long last = events.get(key); + if (current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE) { + events.put(key, current); + return AlertDeduplicationStatus.IGNORED; + } + + return AlertDeduplicationStatus.DUPLICATED; + } + + public AlertStreamEvent dedup(AlertStreamEvent event) { + if (event == null) { + return null; + } + clearOldCache(); + AlertStreamEvent result = null; + + // check custom field, and get the field values + StreamDefinition streamDefinition = event.getSchema(); + HashMap customFieldValues = new HashMap<>(); + for (int i = 0; i < event.getData().length; i++) { + if (i > streamDefinition.getColumns().size()) { + if (LOG.isWarnEnabled()) { + LOG.warn("output column does not found for event data, this indicate code error!"); + } + continue; + } + String colName = streamDefinition.getColumns().get(i).getName(); + + for (String field : customDedupFields) { + if (colName.equals(field)) { + customFieldValues.put(field, event.getData()[i].toString()); + break; + } + } + } + + AlertDeduplicationStatus status = checkDedup( + new EventUniq(event.getStreamId(), + event.getPolicyId(), + event.getCreatedTime(), + customFieldValues)); + if (!status.equals(AlertDeduplicationStatus.DUPLICATED)) { + result = event; + } else if (LOG.isDebugEnabled()) { + LOG.debug("Alert event is skipped because it's duplicated: {}", event.toString()); + } + return result; + } + + @Override + public void setDedupIntervalMin(String newDedupIntervalMin) { + if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) { + dedupIntervalMin = 0; return; - } - try { - Period period = Period.parse(newDedupIntervalMin); - this.dedupIntervalMin = period.toStandardMinutes().getMinutes(); - } catch (Exception e) { - LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e); - this.dedupIntervalMin = 0; - } - } + } + try { + Period period = Period.parse(newDedupIntervalMin); + this.dedupIntervalMin = period.toStandardMinutes().getMinutes(); + } catch (Exception e) { + LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e); + this.dedupIntervalMin = 0; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java index 79fa6cc..1b90833 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java @@ -16,7 +16,7 @@ * */ /** - * + * */ package org.apache.eagle.alert.engine.publisher.impl; @@ -25,50 +25,50 @@ import org.apache.commons.lang3.builder.HashCodeBuilder; import java.util.HashMap; /** - * @since Mar 19, 2015 + * @since Mar 19, 2015. */ public class EventUniq { - public String streamId; - public String policyId; - public Long timestamp; // event's createTimestamp - public long createdTime; // created time, for cache removal; - public HashMap customFieldValues; + public String streamId; + public String policyId; + public Long timestamp; // event's createTimestamp + public long createdTime; // created time, for cache removal; + public HashMap customFieldValues; - public EventUniq(String streamId, String policyId, long timestamp) { - this.streamId = streamId; - this.timestamp = timestamp; - this.policyId = policyId; - this.createdTime = System.currentTimeMillis(); - } + public EventUniq(String streamId, String policyId, long timestamp) { + this.streamId = streamId; + this.timestamp = timestamp; + this.policyId = policyId; + this.createdTime = System.currentTimeMillis(); + } - public EventUniq(String streamId, String policyId, long timestamp, HashMap customFieldValues) { - this.streamId = streamId; - this.timestamp = timestamp; - this.policyId = policyId; - this.createdTime = System.currentTimeMillis(); - this.customFieldValues = customFieldValues; - } + public EventUniq(String streamId, String policyId, long timestamp, HashMap customFieldValues) { + this.streamId = streamId; + this.timestamp = timestamp; + this.policyId = policyId; + this.createdTime = System.currentTimeMillis(); + this.customFieldValues = customFieldValues; + } - @Override - public boolean equals(Object obj) { - if (obj instanceof EventUniq) { - EventUniq au = (EventUniq) obj; - boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId); - if (this.customFieldValues != null && au.customFieldValues != null) { - result = result & this.customFieldValues.equals(au.customFieldValues); - } - return result; - } - return false; - } + @Override + public boolean equals(Object obj) { + if (obj instanceof EventUniq) { + EventUniq au = (EventUniq) obj; + boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId); + if (this.customFieldValues != null && au.customFieldValues != null) { + result = result & this.customFieldValues.equals(au.customFieldValues); + } + return result; + } + return false; + } - @Override - public int hashCode() { - HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId); + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId); - if (customFieldValues != null){ - builder.append(customFieldValues); - } - return builder.build(); - } + if (customFieldValues != null) { + builder.append(customFieldValues); + } + return builder.build(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java index a448703..aa22884 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/JsonEventSerializer.java @@ -28,8 +28,7 @@ import java.util.List; import java.util.Map; /** - * @since Jul 9, 2016 - * + * @since Jul 9, 2016. */ public class JsonEventSerializer implements IEventSerializer { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java index fb038ba..423017b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java @@ -17,13 +17,13 @@ */ package org.apache.eagle.alert.engine.publisher.impl; -import java.util.Map; -import java.util.Properties; - import org.apache.kafka.clients.producer.KafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.Properties; + /** * The producer is thread safe and sharing a single producer instance across threads will generally be faster than * having multiple instances. @@ -33,16 +33,16 @@ public class KafkaProducerManager { private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String STRING_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - + private static final String VALUE_DESERIALIZER = "value.deserializer"; private static final String VALUE_DESERIALIZER_UNDERSCORE = "value_deserializer"; - + private static final String VALUE_SERIALIZER = "value.serializer"; private static final String VALUE_SERIALIZER_UNDERSCORE = "value_serializer"; - + private static final String KEY_DESERIALIZER = "key.deserializer"; private static final String KEY_DESERIALIZER_UNDERSCORE = "key_deserializer"; - + private static final String KEY_SERIALIZER = "key.serializer"; private static final String KEY_SERIALIZER_UNDERSCORE = "key_serializer"; @@ -73,7 +73,7 @@ public class KafkaProducerManager { configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER); } configMap.put("request.required.acks", "1"); - + // value deserializer if (kafkaConfig.containsKey(VALUE_DESERIALIZER_UNDERSCORE)) { configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER_UNDERSCORE)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java index c165686..44f902c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/PublishStatus.java @@ -19,9 +19,9 @@ package org.apache.eagle.alert.engine.publisher.impl; /** - * Object that holds the status of Notification Posted to Notification Plugin + * Object that holds the status of Notification Posted to Notification Plugin. */ public class PublishStatus { - public boolean successful; - public String errorMessage; + public boolean successful; + public String errorMessage; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java index 012ebaa..5eb444d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java @@ -16,17 +16,16 @@ */ package org.apache.eagle.alert.engine.publisher.impl; -import java.util.Map; - import org.apache.eagle.alert.engine.codec.IEventSerializer; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import java.util.Map; + /** - * @since Jun 3, 2016 - * + * @since Jun 3, 2016. */ public class StringEventSerializer implements IEventSerializer { - + @SuppressWarnings("rawtypes") public StringEventSerializer(Map stormConf) { } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java index e1f3e9c..84345dd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/AlertBoltSpecListener.java @@ -19,11 +19,11 @@ package org.apache.eagle.alert.engine.router; -import java.util.Map; - import org.apache.eagle.alert.coordination.model.AlertBoltSpec; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import java.util.Map; + /** * Since 5/2/16. */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java index aa40dc5..598ce18 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/SpoutSpecListener.java @@ -19,11 +19,11 @@ package org.apache.eagle.alert.engine.router; -import java.util.Map; - import org.apache.eagle.alert.coordination.model.SpoutSpec; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import java.util.Map; + /** * Since 5/3/16. */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java index 07518d9..049e852 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoute.java @@ -1,42 +1,43 @@ -package org.apache.eagle.alert.engine.router; - -import java.io.Serializable; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.eagle.alert.engine.router; + +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.io.Serializable; + +/** * * 1. Group by SingleStream[stream_1.col1] * - * Shuffle(stream_1,[col1]) + *

Shuffle(stream_1,[col1])

* * * 2. Group by SingleStream[stream_1.col1,stream_1.col2] * - * Shuffle(stream_1,[col1,col2]) + *

Shuffle(stream_1,[col1,col2])

* * * 3. Group by JoinedStream[stream_1.col1,stream_1.col2,stream_2.col3] - * - * Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3) + *

Shuffle(stream_1.col1,stream_1.col2) + Global(stream_2.col3)

*/ -public class StreamRoute implements Serializable{ +public class StreamRoute implements Serializable { private static final long serialVersionUID = 4649184902196034940L; private String targetComponentId; @@ -51,7 +52,7 @@ public class StreamRoute implements Serializable{ this.targetComponentId = targetComponentId; } - public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type){ + public StreamRoute(String targetComponentId, int partitionKey, StreamPartition.Type type) { this.setTargetComponentId(targetComponentId); this.setPartitionKey(partitionKey); this.setPartitionType(type); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java index 6dcd312..0d397e4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitionFactory.java @@ -16,15 +16,15 @@ */ package org.apache.eagle.alert.engine.router; -import java.util.List; - import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.router.impl.BasicStreamRoutePartitioner; +import java.util.List; + public class StreamRoutePartitionFactory { /** - * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner + * TODO: Decouple different StreamRoutePartitioner implementation from BasicStreamRoutePartitioner. * * @param outputComponentIds * @param streamDefinition http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java index 213abef..5b5632d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRoutePartitioner.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,14 +16,10 @@ */ package org.apache.eagle.alert.engine.router; -import java.util.List; - import org.apache.eagle.alert.engine.model.StreamEvent; +import java.util.List; + public interface StreamRoutePartitioner { - /** - * @param event - * @return - */ List partition(StreamEvent event); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java index abc465d..dfd2cc4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamRouteSpecListener.java @@ -1,12 +1,4 @@ -package org.apache.eagle.alert.engine.router; - -import java.util.Collection; -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.StreamRouterSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -22,6 +14,14 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.engine.router; + +import org.apache.eagle.alert.coordination.model.StreamRouterSpec; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; + +import java.util.Collection; +import java.util.Map; + public interface StreamRouteSpecListener { void onStreamRouterSpecChange(Collection added, Collection removed,