eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-475 Fix generic email publisher and publish emails for absence alert. Fix generic email publisher and publish emails for absence alert.
Date Thu, 18 Aug 2016 21:20:54 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 6fc103338 -> 5bb91c4bd


EAGLE-475 Fix generic email publisher and publish emails for absence alert.
Fix generic email publisher and publish emails for absence alert.

https://issues.apache.org/jira/browse/EAGLE-475

Author: @pkuwm <ihuizhi.lu@gmail.com>
Reviewer: @yonzhang <yonzhang2012@apache.org>

Closes: 359


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/076f3a49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/076f3a49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/076f3a49

Branch: refs/heads/develop
Commit: 076f3a491ae656bb9e938643ea4c0d5672bc5e04
Parents: 6fc1033
Author: yonzhang <yonzhang2012@gmail.com>
Authored: Thu Aug 18 14:01:06 2016 -0700
Committer: yonzhang <yonzhang2012@gmail.com>
Committed: Thu Aug 18 14:01:06 2016 -0700

----------------------------------------------------------------------
 .../evaluator/absence/AbsenceAlertDriver.java   |  16 +-
 .../evaluator/absence/AbsencePolicyHandler.java |  16 +-
 .../publisher/email/AlertEmailConstants.java    |  19 +-
 .../publisher/email/AlertEmailSender.java       |   8 +-
 .../engine/publisher/email/EagleMailClient.java |  55 ++--
 .../eagle/alert/engine/runner/AlertBolt.java    |   9 +-
 .../alert/engine/utils/AlertStreamUtils.java    |  42 +++
 .../alert/engine/utils/MetadataSerDeser.java    |  15 +-
 .../eagle/common/email/EagleMailClient.java     | 253 -------------------
 9 files changed, 117 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
index bf142cd..3b4aba8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsenceAlertDriver.java
@@ -31,12 +31,12 @@ public class AbsenceAlertDriver {
     private AbsenceWindowProcessor processor;
     private AbsenceWindowGenerator windowGenerator;
 
-    public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator){
+    public AbsenceAlertDriver(List<Object> expectedAttrs, AbsenceWindowGenerator windowGenerator)
{
         this.expectedAttrs = expectedAttrs;
         this.windowGenerator = windowGenerator;
     }
 
-    public void process(List<Object> appearAttrs, long occurTime){
+    public boolean process(List<Object> appearAttrs, long occurTime) {
         // initialize window
         if(processor == null){
             processor = nextProcessor(occurTime);
@@ -45,15 +45,21 @@ public class AbsenceAlertDriver {
         processor.process(appearAttrs, occurTime);
         AbsenceWindowProcessor.OccurStatus status = processor.checkStatus();
         boolean expired = processor.checkExpired();
-        if(expired){
+        boolean isAbsenceAlert = false;
+        if (expired){
             if(status == AbsenceWindowProcessor.OccurStatus.absent){
                 // send alert
-                LOG.info("this is an alert");
+                LOG.info("===================");
+                LOG.info("|| Absence Alert ||");
+                LOG.info("===================");
+                isAbsenceAlert = true;
                 // figure out next window and set the new window
             }
             processor = nextProcessor(occurTime);
             LOG.info("created a new window {}", processor);
         }
+
+        return isAbsenceAlert;
     }
 
     /**
@@ -61,7 +67,7 @@ public class AbsenceAlertDriver {
      * @param currTime milliseconds
      * @return
      */
-    private AbsenceWindowProcessor nextProcessor(long currTime){
+    private AbsenceWindowProcessor nextProcessor(long currTime) {
         AbsenceWindow window = windowGenerator.nextWindow(currTime);
         return new AbsenceWindowProcessor(expectedAttrs, window);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
index 0a07a27..826a69d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/absence/AbsencePolicyHandler.java
@@ -23,6 +23,7 @@ import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
 import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.utils.AlertStreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,15 +81,16 @@ public class AbsencePolicyHandler implements PolicyStreamHandler {
             throw new IllegalArgumentException("policy inputStream size has to be 1 for absence
alert");
         // validate outputStream has to contain only one stream
         if(policyDef.getOutputStreams().size() != 1)
-            throw new IllegalArgumentException("policy outputStream size has to be 1 for
absense alert");
+            throw new IllegalArgumentException("policy outputStream size has to be 1 for
absence alert");
 
         String is = inputStreams.get(0);
         StreamDefinition sd = sds.get(is);
 
         String policyValue = policyDef.getDefinition().getValue();
 
-        // assume that absence alert policy value consists of "numOfFields, f1_name, f2_name,
f1_value, f2_value, absence_window_rule_type, startTimeOffset, endTimeOffset}
-        String[] segments = policyValue.split(",");
+        // Assume that absence alert policy value consists of
+        // "numOfFields, f1_name, f2_name, f1_value, f2_value, absence_window_rule_type,
startTimeOffset, endTimeOffset"
+        String[] segments = policyValue.split(",\\s*");
         int offset = 0;
         // populate wisb field names
         int numOfFields = Integer.parseInt(segments[offset++]);
@@ -124,7 +126,13 @@ public class AbsencePolicyHandler implements PolicyStreamHandler {
             columnValues.add(o.toString());
         }
 
-        driver.process(columnValues, event.getTimestamp());
+        boolean isAbsenceAlert = driver.process(columnValues, event.getTimestamp());
+
+        // Publishing alerts.
+        if (isAbsenceAlert) {
+            AlertStreamEvent alertEvent = AlertStreamUtils.createAlertEvent(event, context,
sds);
+            collector.emit(alertEvent);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
index 7d31bb5..9010c93 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
@@ -20,20 +20,15 @@ package org.apache.eagle.alert.engine.publisher.email;
 
 public class AlertEmailConstants {
 
-    public static final String MAIL_AUTH = "mail.smtp.auth";
-    public static final String MAIL_HOST = "mail.smtp.host";
-    public static final String MAIL_PORT = "mail.smtp.port";
-
     public static final String CONN_PLAINTEXT = "plaintext";
     public static final String CONN_TLS = "tls";
     public static final String CONN_SSL = "ssl";
 
-    public static final String CONF_MAIL_SERVER = "smtp.server";
-    public static final String CONF_MAIL_PORT = "smtp.port";
-    public static final String CONF_MAIL_CONN = "connection";
-    public static final String CONF_MAIL_DEBUG = "mailDebug";
-    public static final String CONF_MAIL_AUTH = "smtp.auth.enable";
-    public static final String CONF_AUTH_USER = "auth.username";
-    public static final String CONF_AUTH_PASSWORD = "auth.password";
-
+    public static final String CONF_MAIL_HOST = "mail.smtp.host";
+    public static final String CONF_MAIL_PORT = "mail.smtp.port";
+    public static final String CONF_MAIL_AUTH = "mail.smtp.auth";
+    public static final String CONF_AUTH_USER = "mail.username";
+    public static final String CONF_AUTH_PASSWORD = "mail.password";
+    public static final String CONF_MAIL_CONN = "mail.connection";
+    public static final String CONF_MAIL_DEBUG = "mail.debug";
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
index d0e5cf6..4091457 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
@@ -80,17 +80,17 @@ public class AlertEmailSender implements Runnable {
     private Properties parseMailClientConfig(Map<String, String> mailProps) {
         if (mailProps == null) return null;
         Properties props = new Properties();
-        String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_SERVER);
+        String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_HOST);
         String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
         if (mailHost == null || mailPort == null || mailHost.isEmpty()) {
             LOG.warn("SMTP server is unset, will exit");
             return null;
         }
-        props.put(AlertEmailConstants.MAIL_HOST, mailHost);
-        props.put(AlertEmailConstants.MAIL_PORT, mailPort);
+        props.put(AlertEmailConstants.CONF_MAIL_HOST, mailHost);
+        props.put(AlertEmailConstants.CONF_MAIL_PORT, mailPort);
 
         String smtpAuth = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false");
-        props.put(AlertEmailConstants.MAIL_AUTH, smtpAuth);
+        props.put(AlertEmailConstants.CONF_MAIL_AUTH, smtpAuth);
         if (Boolean.parseBoolean(smtpAuth)) {
             props.put(AlertEmailConstants.CONF_AUTH_USER, mailProps.get(AlertEmailConstants.CONF_AUTH_USER));
             props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
index 61194a2..21fe354 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
@@ -20,7 +20,6 @@ package org.apache.eagle.alert.engine.publisher.email;
 import java.io.File;
 import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -51,12 +50,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class EagleMailClient {
-//	private static final String CONFIG_FILE = "config.properties";
+	private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
 	private static final String BASE_PATH = "templates/";
 
 	private VelocityEngine velocityEngine;
 	private Session session;
-	private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
 
 	public EagleMailClient() {
 		this(new Properties());
@@ -71,34 +69,37 @@ public class EagleMailClient {
 
 			config.put("mail.transport.protocol", "smtp");
 			if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){
-				session = Session.getDefaultInstance(config, new Authenticator() {
+				session = Session.getInstance(config, new Authenticator() {
 					protected PasswordAuthentication getPasswordAuthentication() {
-						return new PasswordAuthentication(config.getProperty(AlertEmailConstants.CONF_AUTH_USER),
config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD));
+						return new PasswordAuthentication(
+								config.getProperty(AlertEmailConstants.CONF_AUTH_USER),
+								config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD)
+						);
 					}
 				});
+			} else {
+				session = Session.getInstance(config, new Authenticator() {});
 			}
-			else session = Session.getDefaultInstance(config, new Authenticator() {});
-			final String debugMode =  config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
-			final boolean debug =  Boolean.parseBoolean(debugMode);
+
+			final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
+			final boolean debug = Boolean.parseBoolean(debugMode);
+            LOG.info("Set email debug mode: " + debugMode);
 			session.setDebug(debug);
 		} catch (Exception e) {
-            LOG.error("Failed connect to smtp server",e);
+            LOG.error("Failed to connect to smtp server", e);
 		}
 	}
 
-	private boolean _send(String from, String to, String cc, String title,
-			String content) {
+	private boolean _send(String from, String to, String cc, String title, String content) {
 		Message msg = new MimeMessage(session);
 		try {
 			msg.setFrom(new InternetAddress(from));
 			msg.setSubject(title);
 			if (to != null) {
-				msg.setRecipients(Message.RecipientType.TO,
-						InternetAddress.parse(to));
+				msg.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
 			}
 			if (cc != null) {
-				msg.setRecipients(Message.RecipientType.CC,
-						InternetAddress.parse(cc));
+				msg.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
 			}
 			//msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
 			msg.setContent(content, "text/html;charset=utf-8");
@@ -106,10 +107,10 @@ public class EagleMailClient {
 			Transport.send(msg);
 			return true;
 		} catch (AddressException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
 			return false;
 		} catch (MessagingException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
 			return false;
 		}
 	}
@@ -120,12 +121,10 @@ public class EagleMailClient {
 			mail.setFrom(new InternetAddress(from));
 			mail.setSubject(title);
 			if (to != null) {
-				mail.setRecipients(Message.RecipientType.TO,
-						InternetAddress.parse(to));
+				mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
 			}
 			if (cc != null) {
-				mail.setRecipients(Message.RecipientType.CC,
-						InternetAddress.parse(cc));
+				mail.setRecipients(Message.RecipientType.CC, InternetAddress.parse(cc));
 			}
 			
 			//mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
@@ -146,10 +145,10 @@ public class EagleMailClient {
 			Transport.send(mail);
 			return true;
 		} catch (AddressException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
 			return false;
 		} catch (MessagingException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
+			LOG.info("Failed to send mail, got an AddressException: " + e.getMessage(), e);
 			return false;
 		}
 	}
@@ -176,6 +175,7 @@ public class EagleMailClient {
 		final StringWriter writer = new StringWriter();
 		t.merge(context, writer);
 		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+
 		return this.send(from, to, cc, title, writer.toString());
 	}
 
@@ -187,7 +187,6 @@ public class EagleMailClient {
 		Template t = null;
 
 		List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
-		Map<String,String> cid = new HashMap<String,String>();
 
 		for (Map.Entry<String,File> entry : attachments.entrySet()) {
 			final String attachment = entry.getKey();
@@ -200,7 +199,6 @@ public class EagleMailClient {
 					mimeBodyPart.setFileName(attachment);
 					mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
 					mimeBodyPart.setContentID(attachment);
-					cid.put(attachment,mimeBodyPart.getContentID());
 					mimeBodyParts.add(mimeBodyPart);
 				} catch (MessagingException e) {
 					LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(),
e);
@@ -209,9 +207,6 @@ public class EagleMailClient {
 				LOG.error("Attachment: " + attachment + " is null or not exists");
 			}
 		}
-		//TODO remove cid, because not used at all
-		if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid);
-		context.put("cid", cid);
 
 		try {
 			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
@@ -225,8 +220,7 @@ public class EagleMailClient {
 			} catch (ResourceNotFoundException e) {
 				try {
 					t = velocityEngine.getTemplate("/" + templatePath);
-				}
-				catch (Exception ex) {
+				} catch (Exception ex) {
 					LOG.error("Template not found:"+ "/" + templatePath, ex);
 				}
 			}
@@ -235,6 +229,7 @@ public class EagleMailClient {
 		final StringWriter writer = new StringWriter();
 		t.merge(context, writer);
 		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
+
 		return this._send(from, to, cc, title, writer.toString(), mimeBodyParts);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index f653c9c..86c8b3d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -111,9 +111,12 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
                     pe.getEvent().setMetaVersion(specVersion);
                 }
                 // check if specVersion is older than stream_event_version
-                else if (specVersion != null && stream_event_version != null &&
specVersion.contains("spec_version_") && specVersion.contains("spec_version_")){
-                    Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
-                    Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
+                else if (specVersion != null && stream_event_version != null &&
+                        specVersion.contains("spec_version_") && stream_event_version.contains("spec_version_")){
+//                    Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]);
+//                    Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]);
+                    Long timestamp_of_specVersion = Long.valueOf(specVersion.substring(13));
+                    Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.substring(13));
                     specVersionOutofdate = timestamp_of_specVersion < timestamp_of_streamEventVersion;
                     if (!specVersionOutofdate){
                         pe.getEvent().setMetaVersion(specVersion);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
new file mode 100644
index 0000000..7e2941f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/AlertStreamUtils.java
@@ -0,0 +1,42 @@
+package org.apache.eagle.alert.engine.utils;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.Map;
+
+/**
+ * Created on 8/16/16.
+ */
+public class AlertStreamUtils {
+
+    /**
+     * Create alert stream event for publisher.
+     */
+    public static AlertStreamEvent createAlertEvent(StreamEvent event,
+                                                    PolicyHandlerContext context,
+                                                    Map<String, StreamDefinition> sds)
{
+        PolicyDefinition policyDef = context.getPolicyDefinition();
+        AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
+
+        alertStreamEvent.setTimestamp(event.getTimestamp());
+        alertStreamEvent.setData(event.getData());
+        alertStreamEvent.setStreamId(policyDef.getOutputStreams().get(0));
+        alertStreamEvent.setPolicy(policyDef);
+
+        if (context.getPolicyEvaluator() != null) {
+            alertStreamEvent.setCreatedBy(context.getPolicyEvaluator().getName());
+        }
+
+        alertStreamEvent.setCreatedTime(System.currentTimeMillis());
+
+        String is = policyDef.getInputStreams().get(0);
+        StreamDefinition sd = sds.get(is);
+        alertStreamEvent.setSchema(sd);
+
+        return alertStreamEvent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
index a576404..1060d32 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
@@ -40,7 +40,8 @@ public class MetadataSerDeser {
             K spec = mapper.readValue(is, typeRef);
             return spec;
         }catch(Exception ex){
-            LOG.error("error in deserializing metadata of type {} from input stream", new
TypeReference<K>(){}.getType().getTypeName(), ex);
+            LOG.error("error in deserializing metadata of type {} from input stream",
+                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(),
ex);
         }
         return null;
     }
@@ -52,7 +53,8 @@ public class MetadataSerDeser {
             K spec = mapper.readValue(is, cls);
             return spec;
         }catch(Exception ex){
-            LOG.error("Got error to deserialize metadata of type {} from input stream", new
TypeReference<K>(){}.getType().getTypeName(), ex);
+            LOG.error("Got error to deserialize metadata of type {} from input stream",
+                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(),
ex);
         }
         return null;
     }
@@ -64,7 +66,8 @@ public class MetadataSerDeser {
             K spec = mapper.readValue(json, typeRef);
             return spec;
         }catch(Exception ex){
-            LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(),
json, ex);
+            LOG.error("error in deserializing metadata of type {} from {}",
+                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(),
json, ex);
         }
         return null;
     }
@@ -75,7 +78,8 @@ public class MetadataSerDeser {
             K spec = mapper.readValue(json, cls);
             return spec;
         }catch(Exception ex){
-            LOG.error("error in deserializing metadata of type {} from {}", new TypeReference<K>(){}.getType().getTypeName(),
json, ex);
+            LOG.error("error in deserializing metadata of type {} from {}",
+                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(),
json, ex);
         }
         return null;
     }
@@ -86,7 +90,8 @@ public class MetadataSerDeser {
             String json = mapper.writeValueAsString(spec);
             return json;
         }catch(Exception ex){
-            LOG.error("error in serializing object {} with type {}", spec, new TypeReference<K>(){}.getType().getTypeName(),
ex);
+            LOG.error("error in serializing object {} with type {}", spec,
+                    new TypeReference<K>(){}.getType().getClass().getCanonicalName(),
ex);
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/076f3a49/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java
b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java
deleted file mode 100755
index 6edac0a..0000000
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/email/EagleMailClient.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.common.email;
-
-import java.io.File;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.activation.DataHandler;
-import javax.activation.DataSource;
-import javax.activation.FileDataSource;
-import javax.mail.Authenticator;
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Multipart;
-import javax.mail.PasswordAuthentication;
-import javax.mail.Session;
-import javax.mail.Transport;
-import javax.mail.internet.AddressException;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeBodyPart;
-import javax.mail.internet.MimeMessage;
-import javax.mail.internet.MimeMultipart;
-
-import org.apache.commons.configuration.AbstractConfiguration;
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.exception.ResourceNotFoundException;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.netflix.config.ConcurrentMapConfiguration;
-
-public class EagleMailClient {
-//	private static final String CONFIG_FILE = "config.properties";
-	private static final String BASE_PATH = "templates/";
-	private static final String AUTH_CONFIG = "mail.smtp.auth";
-	private static final String DEBUG_CONFIG = "mail.debug";
-	private static final String USER_CONFIG = "mail.user";
-	private static final String PASSWORD_CONFIG = "mail.password";
-
-	private VelocityEngine velocityEngine;
-	private Session session;
-	private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class);
-
-	public EagleMailClient() {
-		this(new ConcurrentMapConfiguration());
-	}
-
-	public EagleMailClient(AbstractConfiguration configuration) {
-		try {
-			ConcurrentMapConfiguration con = (ConcurrentMapConfiguration)configuration;
-			velocityEngine = new VelocityEngine();
-			velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
-			velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
-			velocityEngine.init();
-
-			con.setProperty("mail.transport.protocol", "smtp");
-			final Properties config = con.getProperties();
-			if(Boolean.parseBoolean(config.getProperty(AUTH_CONFIG))){
-				session = Session.getDefaultInstance(config, new Authenticator() {
-					protected PasswordAuthentication getPasswordAuthentication() {
-						return new PasswordAuthentication(config.getProperty(USER_CONFIG), config.getProperty(PASSWORD_CONFIG));
-					}
-				});
-			}
-			else session = Session.getDefaultInstance(config, new Authenticator() {});
-			final String debugMode = config.getProperty(DEBUG_CONFIG, "false");
-			final boolean debug = Boolean.parseBoolean(debugMode);
-			session.setDebug(debug);
-		} catch (Exception e) {
-            LOG.error("Failed connect to smtp server",e);
-		}
-	}
-
-	private boolean _send(String from, String to, String cc, String title,
-			String content) {
-		Message msg = new MimeMessage(session);
-		try {
-			msg.setFrom(new InternetAddress(from));
-			msg.setSubject(title);
-			if (to != null) {
-				msg.setRecipients(Message.RecipientType.TO,
-						InternetAddress.parse(to));
-			}
-			if (cc != null) {
-				msg.setRecipients(Message.RecipientType.CC,
-						InternetAddress.parse(cc));
-			}
-			//msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
-			msg.setContent(content, "text/html;charset=utf-8");
-			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from,
to, cc, title));
-
-			Transport.send(msg);
-
-			return true;
-		} catch (AddressException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
-			return false;
-		} catch (MessagingException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
-			return false;
-		}
-	}
-
-	private boolean _send(String from,String to,String cc,String title,String content,List<MimeBodyPart>
attachments){
-		MimeMessage mail = new MimeMessage(session);
-		try {
-			mail.setFrom(new InternetAddress(from));
-			mail.setSubject(title);
-			if (to != null) {
-				mail.setRecipients(Message.RecipientType.TO,
-						InternetAddress.parse(to));
-			}
-			if (cc != null) {
-				mail.setRecipients(Message.RecipientType.CC,
-						InternetAddress.parse(cc));
-			}
-
-			//mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS));
-
-			MimeBodyPart mimeBodyPart = new MimeBodyPart();
-			mimeBodyPart.setContent(content,"text/html;charset=utf-8");
-
-			Multipart multipart = new MimeMultipart();
-			multipart.addBodyPart(mimeBodyPart);
-
-			for(MimeBodyPart attachment:attachments){
-				multipart.addBodyPart(attachment);
-			}
-
-			mail.setContent(multipart);
-//			mail.setContent(content, "text/html;charset=utf-8");
-			LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from,
to, cc, title));
-
-			Transport.send(mail);
-
-			return true;
-		} catch (AddressException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
-			return false;
-		} catch (MessagingException e) {
-			LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e);
-			return false;
-		}
-	}
-
-	public boolean send(String from, String to, String cc, String title,
-			String content) {
-		return this._send(from, to, cc, title, content);
-	}
-
-	public boolean send(String from, String to, String cc, String title,
-			String templatePath, VelocityContext context) {
-		Template t = null;
-		try {
-			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
-		} catch (ResourceNotFoundException ex) {
-
-		}
-		if (t == null) {
-			try {
-				t = velocityEngine.getTemplate(templatePath);
-			} catch (ResourceNotFoundException e) {
-				t = velocityEngine.getTemplate("/" + templatePath);
-			}
-		}
-		final StringWriter writer = new StringWriter();
-		t.merge(context, writer);
-		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
-		return this.send(from, to, cc, title, writer.toString());
-	}
-
-	public boolean send(String from, String to, String cc, String title,
-	                    String templatePath, VelocityContext context, Map<String,File>
attachments) {
-		if (attachments == null || attachments.isEmpty()) {
-			return send(from, to, cc, title, templatePath, context);
-		}
-		Template t = null;
-
-		List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>();
-		Map<String,String> cid = new HashMap<String,String>();
-
-		for (Map.Entry<String,File> entry : attachments.entrySet()) {
-			final String attachment = entry.getKey();
-			final File attachmentFile  = entry.getValue();
-			final MimeBodyPart mimeBodyPart = new MimeBodyPart();
-			if(attachmentFile !=null && attachmentFile.exists()){
-				DataSource source = new FileDataSource(attachmentFile);
-				try {
-					mimeBodyPart.setDataHandler(new DataHandler(source));
-					mimeBodyPart.setFileName(attachment);
-					mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
-					mimeBodyPart.setContentID(attachment);
-					cid.put(attachment,mimeBodyPart.getContentID());
-					mimeBodyParts.add(mimeBodyPart);
-				} catch (MessagingException e) {
-					LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(),
e);
-				}
-			}else{
-				LOG.error("Attachment: " + attachment + " is null or not exists");
-			}
-		}
-		//TODO remove cid, because not used at all
-		if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid);
-		context.put("cid", cid);
-
-		try {
-			t = velocityEngine.getTemplate(BASE_PATH + templatePath);
-		} catch (ResourceNotFoundException ex) {
-//			LOGGER.error("Template not found:"+BASE_PATH + templatePath, ex);
-		}
-
-		if (t == null) {
-			try {
-				t = velocityEngine.getTemplate(templatePath);
-			} catch (ResourceNotFoundException e) {
-				try {
-					t = velocityEngine.getTemplate("/" + templatePath);
-				}
-				catch (Exception ex) {
-					LOG.error("Template not found:"+ "/" + templatePath, ex);
-				}
-			}
-		}
-
-		final StringWriter writer = new StringWriter();
-		t.merge(context, writer);
-		if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
-		return this._send(from, to, cc, title, writer.toString(), mimeBodyParts);
-	}
-}



Mime
View raw message