falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/4] git commit: FALCON-666 Add Alerts for unrecoverable failures. Contributed by Venkatesh Seetharam
Date Wed, 17 Sep 2014 23:56:55 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master df6f1d8eb -> 33b420b5d


FALCON-666 Add Alerts for unrecoverable failures. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a94cb7a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a94cb7a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a94cb7a8

Branch: refs/heads/master
Commit: a94cb7a8b7c14f1819785f0ec5e51d8e2478d610
Parents: caa2284
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Wed Sep 17 14:37:36 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Wed Sep 17 16:56:56 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 common/src/main/resources/log4j.xml             |  9 +++
 .../falcon/aspect/AbstractFalconAspect.java     | 50 ++++++++++------
 .../org/apache/falcon/aspect/AlertMessage.java  | 56 ++++++++++++++++++
 .../org/apache/falcon/aspect/GenericAlert.java  |  7 +--
 .../java/org/apache/falcon/monitors/Alert.java  | 37 ++++++++++++
 .../apache/falcon/plugin/AlertingPlugin.java    | 30 ++++++++++
 .../org/apache/falcon/plugin/LoggingPlugin.java |  9 ++-
 .../falcon/util/ResourcesReflectionUtil.java    | 31 +++++-----
 .../apache/falcon/aspect/AlertMessageTest.java  | 53 +++++++++++++++++
 .../plugin/ChainableMonitoringPlugin.java       | 51 +++++++++++++++--
 .../plugin/ChainableMonitoringPluginTest.java   | 60 ++++++++++++++++++++
 src/conf/log4j.xml                              |  9 +++
 webapp/src/main/resources/log4j.xml             |  9 +++
 14 files changed, 369 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 824bddd..523b218 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,8 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-666 Add Alerts for unrecoverable failures (Venkatesh Seetharam)
+
    FALCON-665 Handle message consumption failures in JMSMessageConsumer
    (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
index dd1ef70..75c8267 100644
--- a/common/src/main/resources/log4j.xml
+++ b/common/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
         </layout>
     </appender>
 
+    <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <logger name="org.apache.falcon" additivity="false">
         <level value="debug"/>
         <appender-ref ref="FILE"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java b/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
index 29c77ce..06bda3f 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
@@ -38,11 +38,11 @@ public abstract class AbstractFalconAspect {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractFalconAspect.class);
 
     @Around("@annotation(org.apache.falcon.monitors.Monitored)")
-    public Object logAround(ProceedingJoinPoint joinPoint) throws Throwable {
+    public Object logAroundMonitored(ProceedingJoinPoint joinPoint) throws Throwable {
 
         String methodName = joinPoint.getSignature().getName();
         Object[] args = joinPoint.getArgs();
-        Object result = null;
+        Object result;
         ResourceMessage.Status status;
 
         long startTime = System.nanoTime();
@@ -52,8 +52,8 @@ public abstract class AbstractFalconAspect {
         } catch (Exception e) {
             endTime = System.nanoTime();
             status = ResourceMessage.Status.FAILED;
-            publishMessage(getResourceMessage(joinPoint.getSignature()
-                    .getDeclaringType().getSimpleName()
+            publishMessage(getResourceMessage(
+                    joinPoint.getSignature().getDeclaringType().getSimpleName()
                     + "." + methodName, args, status, endTime - startTime));
             throw e;
         }
@@ -65,13 +65,11 @@ public abstract class AbstractFalconAspect {
         return result;
     }
 
-    private ResourceMessage getResourceMessage(String methodName,
-                                               Object[] args, ResourceMessage.Status status,
long executionTime) {
-        String action = ResourcesReflectionUtil
-                .getResourceMonitorName(methodName);
+    private ResourceMessage getResourceMessage(String methodName, Object[] args,
+                                               ResourceMessage.Status status, long executionTime)
{
+        String action = ResourcesReflectionUtil.getResourceMonitorName(methodName);
 
-        assert action != null : "Method :" + methodName
-                + " not parsed by reflection util";
+        assert action != null : "Method :" + methodName + " not parsed by reflection util";
         Map<String, String> dimensions = new HashMap<String, String>();
 
         if (ResourcesReflectionUtil.getResourceDimensionsName(methodName) == null) {
@@ -79,17 +77,37 @@ public abstract class AbstractFalconAspect {
         } else {
             for (Map.Entry<Integer, String> param : ResourcesReflectionUtil
                     .getResourceDimensionsName(methodName).entrySet()) {
-                dimensions.put(
-                        param.getValue(),
-                        args[param.getKey()] == null ? "NULL" : args[param
-                                .getKey()].toString());
+                dimensions.put(param.getValue(),
+                        args[param.getKey()] == null ? "NULL" : args[param.getKey()].toString());
             }
         }
-        Integer timeTakenArg = ResourcesReflectionUtil
-                .getResourceTimeTakenName(methodName);
+
+        Integer timeTakenArg = ResourcesReflectionUtil.getResourceTimeTakenName(methodName);
         return timeTakenArg == null ? new ResourceMessage(action, dimensions, status, executionTime)
             : new ResourceMessage(action, dimensions, status, Long.valueOf(args[timeTakenArg].toString()));
     }
 
     public abstract void publishMessage(ResourceMessage message);
+
+    @Around("@annotation(org.apache.falcon.monitors.Alert)")
+    public Object logAroundAlert(ProceedingJoinPoint joinPoint) throws Throwable {
+
+        String methodName = joinPoint.getSignature().getName();
+        String event = ResourcesReflectionUtil.getResourceMonitorName(
+                joinPoint.getSignature().getDeclaringType().getSimpleName() + "." + methodName);
+        Object[] args = joinPoint.getArgs();
+        Object result;
+
+        try {
+            result = joinPoint.proceed();
+        } finally {
+            AlertMessage alertMessage = new AlertMessage(
+                    event, args[0].toString(), args[1].toString());
+            publishAlert(alertMessage);
+        }
+
+        return result;
+    }
+
+    public abstract void publishAlert(AlertMessage alertMessage);
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java b/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
new file mode 100644
index 0000000..0f38e34
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.aspect;
+
+/**
+ * Message to be sent to the alerting system.
+ */
+public class AlertMessage {
+
+    private final String event;
+    private final String alert;
+    private final String error;
+
+    public AlertMessage(String event, String alert, String error) {
+        this.event = event;
+        this.alert = alert;
+        this.error = error;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public String getAlert() {
+        return alert;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    @Override
+    public String toString() {
+        return "AlertMessage{"
+                + "event='" + event + '\''
+                + ", alert='" + alert + '\''
+                + ", error='" + error + '\''
+                + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index d8efc37..c7a86d9 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.aspect;
 
+import org.apache.falcon.monitors.Alert;
 import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.monitors.Monitored;
 import org.apache.falcon.monitors.TimeTaken;
@@ -71,7 +72,6 @@ public final class GenericAlert {
             @Dimension(value = "error-message") String errorMessage,
             @Dimension(value = "message") String message,
             @TimeTaken long timeTaken) {
-
         return "IGNORE";
     }
 
@@ -87,7 +87,6 @@ public final class GenericAlert {
             @Dimension(value = "operation") String operation,
             @Dimension(value = "start-time") String startTime,
             @TimeTaken long timeTaken) {
-
         return "IGNORE";
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
@@ -99,14 +98,14 @@ public final class GenericAlert {
         return "IGNORE";
     }
 
-    @Monitored(event = "log-cleanup-service-failed")
+    @Alert(event = "log-cleanup-service-failed")
     public static String alertLogCleanupServiceFailed(
             @Dimension(value = "message") String message,
             @Dimension(value = "exception") Throwable throwable) {
         return "IGNORE";
     }
 
-    @Monitored(event = "jms-message-consumer-failed")
+    @Alert(event = "jms-message-consumer-failed")
     public static String alertJMSMessageConsumerFailed(
             @Dimension(value = "message") String message,
             @Dimension(value = "exception") Throwable throwable) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/monitors/Alert.java b/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
new file mode 100644
index 0000000..03bd387
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.monitors;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Alert annotation for monitoring.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Alert {
+
+    /**
+     * @return Event name associated with this alert
+     */
+    String event();
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java b/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
new file mode 100644
index 0000000..8539ea3
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.plugin;
+
+import org.apache.falcon.aspect.AlertMessage;
+
+/**
+ * Generic interface to receiving alerts.
+ */
+public interface AlertingPlugin {
+
+    void alert(AlertMessage message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java b/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
index 77d0d30..c959fc2 100644
--- a/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
+++ b/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.plugin;
 
+import org.apache.falcon.aspect.AlertMessage;
 import org.apache.falcon.aspect.ResourceMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,11 +26,17 @@ import org.slf4j.LoggerFactory;
 /**
  * Plugin for logging metrics using log4j.
  */
-public class LoggingPlugin implements MonitoringPlugin {
+public class LoggingPlugin implements MonitoringPlugin, AlertingPlugin {
     private static final Logger METRIC = LoggerFactory.getLogger("METRIC");
+    private static final Logger ALERT = LoggerFactory.getLogger("ALERT");
 
     @Override
     public void monitor(ResourceMessage message) {
         METRIC.info("{}", message);
     }
+
+    @Override
+    public void alert(AlertMessage message) {
+        ALERT.info("{}", message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
index 6e80db8..3bafaee 100644
--- a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
+++ b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.util;
 
+import org.apache.falcon.monitors.Alert;
 import org.apache.falcon.monitors.Monitored;
 import org.apache.falcon.monitors.TimeTaken;
 
@@ -48,7 +49,8 @@ public final class ResourcesReflectionUtil {
     }
 
     public static Map<Integer, String> getResourceDimensionsName(String methodName)
{
-        return METHODS.get(methodName) != null ? Collections.unmodifiableMap(METHODS.get(methodName).params)
: null;
+        return METHODS.get(methodName) != null
+                ? Collections.unmodifiableMap(METHODS.get(methodName).params) : null;
     }
 
     public static String getResourceMonitorName(String methodName) {
@@ -56,8 +58,7 @@ public final class ResourcesReflectionUtil {
     }
 
     public static Integer getResourceTimeTakenName(String methodName) {
-        return METHODS.get(methodName) != null ? METHODS.get(methodName).timeTakenArgIndex
-                : null;
+        return METHODS.get(methodName) != null ? METHODS.get(methodName).timeTakenArgIndex
: null;
     }
 
     /**
@@ -81,11 +82,11 @@ public final class ResourcesReflectionUtil {
     private static void buildAnnotationsMapForClass(String className) {
         Class clazz;
         try {
-            clazz = ResourcesReflectionUtil.class.
-                    getClassLoader().loadClass(className);
+            clazz = ResourcesReflectionUtil.class.getClassLoader().loadClass(className);
         } catch (ClassNotFoundException e) {
             throw new RuntimeException("Unable to load class " + className, e);
         }
+
         Method[] declMethods = clazz.getMethods();
 
         // scan every method
@@ -93,20 +94,17 @@ public final class ResourcesReflectionUtil {
             Annotation[] methodAnnots = declMethod.getDeclaredAnnotations();
             // scan every annotation on method
             for (Annotation methodAnnot : methodAnnots) {
-                if (methodAnnot.annotationType().getSimpleName()
-                        .equals(Monitored.class.getSimpleName())) {
+                final String simpleName = methodAnnot.annotationType().getSimpleName();
+                if (simpleName.equals(Monitored.class.getSimpleName())
+                        || simpleName.equals(Alert.class.getSimpleName())) {
                     MethodAnnotation annotation = new MethodAnnotation();
-                    annotation.monitoredName = getAnnotationValue(
-                            methodAnnot, "event");
-                    Annotation[][] paramAnnots = declMethod
-                            .getParameterAnnotations();
+                    annotation.monitoredName = getAnnotationValue(methodAnnot, "event");
+                    Annotation[][] paramAnnots = declMethod.getParameterAnnotations();
+
                     // scan every param
                     annotation.params = getDeclaredParamAnnots(paramAnnots, annotation);
-                    METHODS.put(
-                            clazz.getSimpleName() + "."
-                                    + declMethod.getName(), annotation);
+                    METHODS.put(clazz.getSimpleName() + "." + declMethod.getName(), annotation);
                 }
-
             }
         }
     }
@@ -126,8 +124,8 @@ public final class ResourcesReflectionUtil {
                 }
             }
         }
-        return params;
 
+        return params;
     }
 
     private static String getAnnotationValue(Annotation annotation,
@@ -145,5 +143,4 @@ public final class ResourcesReflectionUtil {
 
         return value;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
----------------------------------------------------------------------
diff --git a/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java b/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
new file mode 100644
index 0000000..d53a234
--- /dev/null
+++ b/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.aspect;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Test Message to be sent to the alerting system.
+ */
+public class AlertMessageTest {
+
+    private final AlertMessage alertMessage = new AlertMessage(
+            "event", "alert", "error"
+    );
+
+    @Test
+    public void testGetEvent() throws Exception {
+        Assert.assertEquals(alertMessage.getEvent(), "event");
+    }
+
+    @Test
+    public void testGetAlert() throws Exception {
+        Assert.assertEquals(alertMessage.getAlert(), "alert");
+    }
+
+    @Test
+    public void testGetError() throws Exception {
+        Assert.assertEquals(alertMessage.getError(), "error");
+    }
+
+    @Test
+    public void testToString() throws Exception {
+        Assert.assertEquals(alertMessage.toString(),
+                "AlertMessage{event='event', alert='alert', error='error'}");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
index c695bb7..1a5a331 100644
--- a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
+++ b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
@@ -20,6 +20,7 @@ package org.apache.falcon.plugin;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.AbstractFalconAspect;
+import org.apache.falcon.aspect.AlertMessage;
 import org.apache.falcon.aspect.ResourceMessage;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
@@ -36,29 +37,51 @@ import java.util.List;
  * of {@link MonitoringPlugin}. {@link LoggingPlugin} is the default.
  */
 @Aspect
-public class ChainableMonitoringPlugin extends AbstractFalconAspect implements MonitoringPlugin
{
+public class ChainableMonitoringPlugin extends AbstractFalconAspect
+        implements MonitoringPlugin, AlertingPlugin {
     private static final Logger LOG = LoggerFactory.getLogger(ChainableMonitoringPlugin.class);
 
-    private List<MonitoringPlugin> plugins = new ArrayList<MonitoringPlugin>();
+    private List<MonitoringPlugin> monitoringPlugins = new ArrayList<MonitoringPlugin>();
+    private List<AlertingPlugin> alertingPlugins = new ArrayList<AlertingPlugin>();
 
     public ChainableMonitoringPlugin() {
+        initializeMonitoringPlugins();
+        initializeAlertingPlugins();
+    }
+
+    private void initializeMonitoringPlugins() {
         String pluginClasses = StartupProperties.get().
                 getProperty("monitoring.plugins", LoggingPlugin.class.getName());
         try {
             for (String pluginClass : pluginClasses.split(",")) {
                 MonitoringPlugin plugin = ReflectionUtils.getInstanceByClassName(pluginClass.trim());
-                plugins.add(plugin);
+                monitoringPlugins.add(plugin);
                 LOG.info("Registered Monitoring Plugin {}", pluginClass);
             }
         } catch (FalconException e) {
-            plugins = Arrays.asList((MonitoringPlugin) new LoggingPlugin());
-            LOG.error("Unable to initialize monitoring plugins: {}", pluginClasses, e);
+            monitoringPlugins = Arrays.asList((MonitoringPlugin) new LoggingPlugin());
+            LOG.error("Unable to initialize monitoring.plugins: {}", pluginClasses, e);
+        }
+    }
+
+    private void initializeAlertingPlugins() {
+        String pluginClasses = StartupProperties.get().
+                getProperty("alerting.plugins", LoggingPlugin.class.getName());
+        try {
+            for (String pluginClass : pluginClasses.split(",")) {
+                AlertingPlugin plugin = ReflectionUtils.getInstanceByClassName(pluginClass.trim());
+                alertingPlugins.add(plugin);
+                LOG.info("Registered Alerting Plugin {}", pluginClass);
+            }
+        } catch (FalconException e) {
+            alertingPlugins = Arrays.asList((AlertingPlugin) new LoggingPlugin());
+            LOG.error("Unable to initialize alerting.plugins: {}", pluginClasses, e);
         }
     }
 
     @Override
     public void monitor(ResourceMessage message) {
-        for (MonitoringPlugin plugin : plugins) {
+        for (MonitoringPlugin plugin : monitoringPlugins) {
             try {
                 plugin.monitor(message);
             } catch (Exception e) {
@@ -71,4 +94,20 @@ public class ChainableMonitoringPlugin extends AbstractFalconAspect implements
M
     public void publishMessage(ResourceMessage message) {
         monitor(message);
     }
+
+    @Override
+    public void publishAlert(AlertMessage alertMessage) {
+        alert(alertMessage);
+    }
+
+    @Override
+    public void alert(AlertMessage alertMessage) {
+        for (AlertingPlugin plugin : alertingPlugins) {
+            try {
+                plugin.alert(alertMessage);
+            } catch (Exception e) {
+                LOG.debug("Unable to publish message to {}", plugin.getClass(), e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
b/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
new file mode 100644
index 0000000..9a386c6
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.plugin;
+
+import org.apache.falcon.aspect.AlertMessage;
+import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.aspect.ResourceMessage;
+import org.apache.falcon.util.StartupProperties;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test for ChainableMonitoringPlugin.
+ */
+public class ChainableMonitoringPluginTest implements MonitoringPlugin, AlertingPlugin {
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        StartupProperties.get().
+                setProperty("monitoring.plugins", this.getClass().getName());
+        StartupProperties.get().
+                setProperty("alerting.plugins", this.getClass().getName());
+    }
+
+    @Test
+    public void testPlugin() throws Exception {
+        GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df",
+                "ef-id", "wf-user", "1", "DELETE", "now", "error", "none", 1242);
+        GenericAlert.alertJMSMessageConsumerFailed("test-alert", new Exception("test"));
+    }
+
+    @Override
+    public void monitor(ResourceMessage message) {
+        Assert.assertNotNull(message);
+        Assert.assertEquals(message.getAction(), "wf-instance-failed");
+    }
+
+    @Override
+    public void alert(AlertMessage alertMessage) {
+        Assert.assertNotNull(alertMessage);
+        Assert.assertEquals(alertMessage.getEvent(), "jms-message-consumer-failed");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/src/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml
index 90abe26..1341c6e 100644
--- a/src/conf/log4j.xml
+++ b/src/conf/log4j.xml
@@ -51,6 +51,15 @@
         </layout>
     </appender>
 
+    <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${falcon.log.dir}/${falcon.app.type}.security.audit.log"/>
         <param name="Append" value="true"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index 5ba6f16..3f9a191 100644
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
         </layout>
     </appender>
 
+    <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${user.dir}/target/logs/security.audit.log"/>
         <param name="Append" value="true"/>


Mime
View raw message