falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1473 Feed SLA Miss Alerts through REST API. Contributed by Ajay Yadava.
Date Tue, 29 Sep 2015 09:04:07 GMT
Repository: falcon
Updated Branches:
  refs/heads/master f7ad3f487 -> 879204671


FALCON-1473 Feed SLA Miss Alerts through REST API. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/87920467
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/87920467
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/87920467

Branch: refs/heads/master
Commit: 87920467185d895c71c29b4b13445e5593fc2f42
Parents: f7ad3f4
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Tue Sep 29 13:58:15 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Tue Sep 29 13:58:15 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/falcon/ResponseHelper.java  |  14 ++
 .../java/org/apache/falcon/cli/FalconCLI.java   |  15 +-
 .../org/apache/falcon/client/FalconClient.java  |  21 +++
 .../apache/falcon/entity/v0/SchemaHelper.java   |   2 +-
 .../resource/SchedulableEntityInstance.java     |  22 ++-
 .../SchedulableEntityInstanceResult.java        |  86 ++++++++++
 .../org/apache/falcon/entity/FeedHelper.java    |   2 +-
 .../falcon/entity/parser/FeedEntityParser.java  |   2 +-
 .../lifecycle/retention/AgeBasedDelete.java     |   2 +-
 common/src/main/resources/startup.properties    |  14 ++
 docs/src/site/twiki/FalconCLI.twiki             |  48 ++++++
 docs/src/site/twiki/restapi/FeedSLA.twiki       |  56 +++++++
 docs/src/site/twiki/restapi/ResourceList.twiki  |   1 +
 .../falcon/resource/AbstractEntityManager.java  |   4 +-
 .../AbstractSchedulableEntityManager.java       |  70 +++++++++
 .../proxy/SchedulableEntityManagerProxy.java    |  37 ++++-
 .../service/FeedSLAMonitoringService.java       | 157 ++++++++++++++++---
 .../falcon/service/FeedSLAMonitoringTest.java   | 106 +++++++++++++
 src/conf/startup.properties                     |  10 ++
 .../resource/SchedulableEntityManager.java      |  20 +++
 21 files changed, 655 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d18d5aa..0c03cf2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
     FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) 
 
   NEW FEATURES
+    FALCON-1473 Feed SLA Miss Alerts through REST API(Ajay Yadava)
+
     FALCON-965 Open up life cycle stage implementation within Falcon for extension(Ajay Yadava)
 
     FALCON-1437 Change DR recipes notification with Falcon notification(Peeyush Bishnoi via Sowmya Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/ResponseHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/ResponseHelper.java b/client/src/main/java/org/apache/falcon/ResponseHelper.java
index a13682b..8f22af7 100644
--- a/client/src/main/java/org/apache/falcon/ResponseHelper.java
+++ b/client/src/main/java/org/apache/falcon/ResponseHelper.java
@@ -26,6 +26,7 @@ import org.apache.falcon.resource.FeedLookupResult;
 import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.SchedulableEntityInstanceResult;
 import org.apache.falcon.resource.TriageResult;
 
 import java.util.Date;
@@ -287,4 +288,17 @@ public final class ResponseHelper {
         sb.append("\nRequest Id: ").append(dependencyResult.getRequestId());
         return sb.toString();
     }
+
+    public static String getString(SchedulableEntityInstanceResult instances) {
+        StringBuilder sb = new StringBuilder();
+        String results = instances.toString();
+        if (StringUtils.isEmpty(results)) {
+            sb.append("No sla miss found!");
+        } else {
+            sb.append(results);
+        }
+        sb.append("\n\nResponse: ").append(instances.getMessage());
+        sb.append("\nRequest Id: ").append(instances.getRequestId());
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index c914649..1574585 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -37,6 +37,7 @@ import org.apache.falcon.resource.FeedLookupResult;
 import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.SchedulableEntityInstanceResult;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -86,6 +87,7 @@ public class FalconCLI {
     public static final String SUMMARY_OPT = "summary";
     public static final String DEFINITION_OPT = "definition";
     public static final String DEPENDENCY_OPT = "dependency";
+    public static final String SLA_MISS_ALERT_OPT = "slaAlert";
     public static final String LOOKUP_OPT = "lookup";
     public static final String PATH_OPT = "path";
     public static final String LIST_OPT = "list";
@@ -420,6 +422,7 @@ public class FalconCLI {
         String entityName = commandLine.getOptionValue(ENTITY_NAME_OPT);
         String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
         String colo = commandLine.getOptionValue(COLO_OPT);
+        colo = getColo(colo);
         String cluster = commandLine.getOptionValue(CLUSTER_OPT);
         String start = commandLine.getOptionValue(START_OPT);
         String end = commandLine.getOptionValue(END_OPT);
@@ -462,7 +465,15 @@ public class FalconCLI {
         validateSortOrder(sortOrder);
         String entityAction = "entity";
 
-        if (optionsList.contains(SUBMIT_OPT)) {
+        if (optionsList.contains(SLA_MISS_ALERT_OPT)) {
+            validateNotEmpty(entityType, ENTITY_TYPE_OPT);
+            validateNotEmpty(start, START_OPT);
+            parseDateString(start);
+            parseDateString(end);
+            SchedulableEntityInstanceResult response = client.getFeedSlaMissPendingAlerts(entityType,
+                entityName, start, end, colo);
+            result = ResponseHelper.getString(response);
+        } else if (optionsList.contains(SUBMIT_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
             result = client.submit(entityType, filePath, doAsUser).getMessage();
@@ -710,6 +721,7 @@ public class FalconCLI {
         Option list = new Option(LIST_OPT, false,
                 "List entities registered for a type");
         Option lookup = new Option(LOOKUP_OPT, false, "Lookup a feed given its instance's path");
+        Option slaAlert = new Option(SLA_MISS_ALERT_OPT, false, "Get missing feed instances which missed SLA");
         Option entitySummary = new Option(SUMMARY_OPT, false,
                 "Get summary of instances for list of entities");
         Option touch = new Option(TOUCH_OPT, false,
@@ -729,6 +741,7 @@ public class FalconCLI {
         group.addOption(dependency);
         group.addOption(list);
         group.addOption(lookup);
+        group.addOption(slaAlert);
         group.addOption(entitySummary);
         group.addOption(touch);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 981559b..20f6447 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -42,6 +42,7 @@ import org.apache.falcon.resource.InstanceDependencyResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.resource.LineageGraphResult;
+import org.apache.falcon.resource.SchedulableEntityInstanceResult;
 import org.apache.falcon.resource.TriageResult;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
@@ -177,6 +178,7 @@ public class FalconClient extends AbstractFalconClient {
         return currentToken;
     }
 
+
     /**
      * Methods allowed on Entity Resources.
      */
@@ -195,6 +197,7 @@ public class FalconClient extends AbstractFalconClient {
         SUMMARY("api/entities/summary", HttpMethod.GET, MediaType.APPLICATION_JSON),
         LOOKUP("api/entities/lookup/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         DEPENDENCY("api/entities/dependencies/", HttpMethod.GET, MediaType.TEXT_XML),
+        SLA("api/entities/sla-alert", HttpMethod.GET, MediaType.APPLICATION_JSON),
         TOUCH("api/entities/touch", HttpMethod.POST, MediaType.TEXT_XML);
 
         private String path;
@@ -381,6 +384,24 @@ public class FalconClient extends AbstractFalconClient {
 
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
 
+    public SchedulableEntityInstanceResult getFeedSlaMissPendingAlerts(String entityType, String entityName,
+                                           String startTime, String endTime, String colo) throws FalconCLIException {
+
+        WebResource resource = service.path(Entities.SLA.path).path(entityType).queryParam("start", startTime)
+                .queryParam("colo", colo);
+        if (endTime != null) {
+            resource = resource.queryParam("end", endTime);
+        }
+        if (entityName != null) {
+            resource = resource.queryParam("name", entityName);
+        }
+        ClientResponse clientResponse = resource.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(Entities.SLA.mimeType).type(MediaType.APPLICATION_JSON)
+                .method(Entities.SLA.method, ClientResponse.class);
+        checkIfSuccessful(clientResponse);
+        return clientResponse.getEntity(SchedulableEntityInstanceResult.class);
+    }
+
     public TriageResult triage(String entityType, String entityName, String instanceTime, String colo)
         throws FalconCLIException {
         ClientResponse clientResponse = service

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java b/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
index 62b810c..1c02f37 100644
--- a/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
+++ b/client/src/main/java/org/apache/falcon/entity/v0/SchemaHelper.java
@@ -54,7 +54,7 @@ public final class SchemaHelper {
         try {
             return getDateFormat().parse(dateStr);
         } catch (ParseException e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Unable to parse date: " + dateStr, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
index f5be63d..0968734 100644
--- a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
+++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstance.java
@@ -27,7 +27,7 @@ import java.util.Date;
 /**
  * Instance of a Schedulable Entity (Feed/Process).
  */
-public class SchedulableEntityInstance {
+public class SchedulableEntityInstance implements Comparable<SchedulableEntityInstance> {
 
     public static final String INPUT = "Input";
     public static final String OUTPUT = "Output";
@@ -152,4 +152,24 @@ public class SchedulableEntityInstance {
         }
         return result;
     }
+
+    @Override
+    public int compareTo(SchedulableEntityInstance o) {
+        int result = this.cluster.compareTo(o.cluster);
+        if (result != 0) {
+            return result;
+        }
+
+        result = this.entityType.compareTo(o.entityType);
+        if (result != 0) {
+            return result;
+        }
+
+        result = this.entityName.compareToIgnoreCase(o.entityName);
+        if (result != 0) {
+            return result;
+        }
+
+        return this.instanceTime.compareTo(o.instanceTime);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java
new file mode 100644
index 0000000..752c48d
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/SchedulableEntityInstanceResult.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Arrays;
+
+/**
+ * Instances list used for marshalling / unmarshalling with REST calls.
+ */
+@XmlRootElement(name = "instances")
+@XmlAccessorType(XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class SchedulableEntityInstanceResult extends APIResult {
+
+    @XmlElement(name = "instances")
+    private SchedulableEntityInstance[] instances;
+
+    //For JAXB
+    private SchedulableEntityInstanceResult() {
+        super();
+    }
+
+    public SchedulableEntityInstanceResult(Status status, String message) {
+        super(status, message);
+    }
+
+    public SchedulableEntityInstance[] getInstances() {
+        return instances;
+    }
+
+    public void setInstances(SchedulableEntityInstance[] instances) {
+        this.instances = instances;
+    }
+
+
+    @Override
+    public Object[] getCollection() {
+        return getInstances();
+    }
+
+    @Override
+    public void setCollection(Object[] items) {
+        if (items == null) {
+            setInstances(new SchedulableEntityInstance[0]);
+        } else {
+            SchedulableEntityInstance[] newInstances = new SchedulableEntityInstance[items.length];
+            for (int index = 0; index < items.length; index++) {
+                newInstances[index] = (SchedulableEntityInstance)items[index];
+            }
+            setInstances(newInstances);
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        if (instances != null) {
+            Arrays.sort(instances);
+            for (SchedulableEntityInstance element : instances) {
+                buffer.append(element.toString());
+                buffer.append("\n");
+            }
+        }
+        return buffer.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 79f1959..5c252a8 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -264,7 +264,7 @@ public final class FeedHelper {
         return null;
     }
 
-    public static Sla getSLAs(Cluster cluster, Feed feed) {
+    public static Sla getSLA(Cluster cluster, Feed feed) {
         final Sla clusterSla = cluster.getSla();
         if (clusterSla != null) {
             return clusterSla;

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index c73cc78..6be2495 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -152,7 +152,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
 
     private void validateFeedSLA(Feed feed) throws FalconException {
         for (Cluster cluster : feed.getClusters().getClusters()) {
-            Sla clusterSla = FeedHelper.getSLAs(cluster, feed);
+            Sla clusterSla = FeedHelper.getSLA(cluster, feed);
             if (clusterSla != null) {
                 Frequency slaLowExpression = clusterSla.getSlaLow();
                 ExpressionHelper evaluator = ExpressionHelper.get();

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
index 0a1810e..a4ae780 100644
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
@@ -69,7 +69,7 @@ public class AgeBasedDelete extends RetentionPolicy {
 
     private void validateLimitWithSla(Feed feed, Cluster cluster, String retentionExpression) throws FalconException {
         // test that slaHigh is less than retention
-        Sla clusterSla = FeedHelper.getSLAs(cluster, feed);
+        Sla clusterSla = FeedHelper.getSLA(cluster, feed);
         if (clusterSla != null) {
             ExpressionHelper evaluator = ExpressionHelper.get();
             ExpressionHelper.setReferenceDate(new Date());

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 357b90c..3383129 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -98,6 +98,20 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 
 *.falcon.cleanup.service.frequency=minutes(5)
 
+######### Properties for Feed SLA Monitoring #########
+# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
+*.feed.sla.serialization.frequency.millis=3600000
+
+# Do not change unless really sure
+# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
+*.feed.sla.statusCheck.frequency.seconds=600
+
+# Do not change unless really sure
+# Time Duration (in milliseconds) in future for generating pending feed instances.
+# In every cycle pending feed instances are added for monitoring, till this time in future.
+# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
+*.feed.sla.lookAheadWindow.millis=900000
+
 
 ######### Properties for configuring JMS provider - activemq #########
 # Default Active MQ url

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 4f72bf8..22003d3 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -145,6 +145,54 @@ $FALCON_HOME/bin/falcon entity -type feed -lookup -path /data/projects/my-hourly
 If you have multiple feeds with location as /data/projects/my-hourly/${YEAR}/${MONTH}/${DAY}/${HOUR} then this command will return all of them.
 
 
+---+++SLAAlert
+<verbatim>
+Since: 0.8
+</verbatim>
+
+This command lists all the feed instances which have missed sla and are still not available. If a feed instance missed
+sla but is now available, then it will not be reported in results. The purpose of this API is alerting and hence it
+ doesn't return feed instances which missed SLA but are available as they don't require any action.
+
+* Currently sla monitoring is supported only for feeds.
+
+* Option end is optional and will default to current time if missing.
+
+* Option name is optional, if provided only instances of that feed will be considered.
+
+Usage:
+
+*Example 1*
+
+*$FALCON_HOME/bin/falcon entity -type feed -start 2014-09-05T00:00Z -slaAlert  -end 2016-05-03T00:00Z -colo local*
+
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T11:59Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:00Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:01Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:02Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:03Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:04Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:05Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:06Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:07Z, tags: Missed SLA High
+name: out, type: FEED, cluster: local, instanceTime: 2015-09-26T12:08Z, tags: Missed SLA Low
+
+
+Response: default/Success!
+
+Request Id: default/216978070@qtp-830047511-4 - f5a6c129-ab42-4feb-a2bf-c3baed356248
+
+*Example 2*
+
+*$FALCON_HOME/bin/falcon entity -type feed -start 2014-09-05T00:00Z -slaAlert  -end 2016-05-03T00:00Z -colo local -name in*
+
+name: in, type: FEED, cluster: local, instanceTime: 2015-09-26T06:00Z, tags: Missed SLA High
+
+Response: default/Success!
+
+Request Id: default/1580107885@qtp-830047511-7 - f16cbc51-5070-4551-ad25-28f75e5e4cf2
+
+
 ---++Instance Management Options
 
 ---+++Kill

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/docs/src/site/twiki/restapi/FeedSLA.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/FeedSLA.twiki b/docs/src/site/twiki/restapi/FeedSLA.twiki
new file mode 100644
index 0000000..5fd984e
--- /dev/null
+++ b/docs/src/site/twiki/restapi/FeedSLA.twiki
@@ -0,0 +1,56 @@
+---++ GET /api/entities/sla-alert/:entity-type
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+<verbatim>
+Since: 0.8
+</verbatim>
+This command lists all the feed instances which have missed sla and are still not available. If a feed instance missed
+sla but is now available, then it will not be reported in results. The purpose of this API is alerting and hence it
+ doesn't return feed instances which missed SLA but are available as they don't require any action.
+
+---++ Parameters
+   * :entity-type Only valid option is feed.
+   * entity-name <optional param> parameter to restrict results for a particular feed using feed's name.
+   * start <mandatory param> start of the time window for nominal instances, inclusive.
+   * end <mandatory param> end of the time window for nominal instances to be considered, default is treated as current time.
+   * colo <optional param> name of the colo
+
+
+---++ Results
+Pending feed instances which missed SLA.
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/entities/feed?colo=*&start=2012-04-03T07:00Z
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "status":"SUCCEEDED",
+    "message":"default/Success!\n",
+    "requestId":"default/885720178@qtp-495452957-6 - f6e82e9b-d23f-466b-82df-4fb8293ce9cf\n",
+    "instances":[
+            {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:33:00+05:30","tags":"Missed SLA High"},
+            {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:29:00+05:30","tags":"Missed SLA High"},
+            {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:35:00+05:30","tags":"Missed SLA Low"},
+            {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:30:00+05:30","tags":"Missed SLA High"},
+            {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:34:00+05:30","tags":"Missed SLA High"},
+            {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:31:00+05:30","tags":"Missed SLA High"},
+            {"cluster":"local","entityName":"out","entityType":"FEED","instanceTime":"2015-09-26T17:32:00+05:30","tags":"Missed SLA High"}
+    ]
+}
+</verbatim>
+
+In case there are no pending instances which have missed sla the response will be like below:
+<verbatim>
+{
+    "status":"SUCCEEDED",
+    "message":"default/Success!\n",
+    "requestId":"default/979808239@qtp-1243851750-3 - 8c7396c0-efe2-43e9-9aea-7ae6afea5fd6\n"
+}
+</verbatim>

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index ea3e3b6..34c2c6f 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -54,6 +54,7 @@ The current version of the rest api's documentation is also hosted on the Falcon
 | GET         | [[EntityList][api/entities/list/:entity-type]]                              | Get the list of entities           |
 | GET         | [[EntitySummary][api/entities/summary/:entity-type/:cluster]]               | Get instance summary of all entities |
 | GET         | [[EntityDependencies][api/entities/dependencies/:entity-type/:entity-name]] | Get the dependencies of the entity |
+| GET         | [[FeedSLA][api/entities/sla-alert/:entity-type]]                            | Get pending feed instances which missed sla |
 | GET         | [[FeedLookup][api/entities/lookup/feed/]]                                   | Get feed for given path            |
 
 ---++ REST Call on Feed and Process Instances

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 2682257..a6a0e4c 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -101,7 +101,7 @@ public abstract class AbstractEntityManager {
         return result;
     }
 
-    protected void checkColo(String colo) {
+    protected static void checkColo(String colo) {
         if (DeploymentUtil.isEmbeddedMode()) {
             return;
         }
@@ -146,7 +146,7 @@ public abstract class AbstractEntityManager {
                 return DeploymentUtil.getDefaultColos();
             }
 
-            if (EntityType.getEnum(type) == EntityType.CLUSTER) {
+            if (EntityType.getEnum(type) == EntityType.CLUSTER || name == null) {
                 return getAllColos();
             }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 3280789..63e0647 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -24,12 +24,15 @@ import org.apache.falcon.FalconWebException;
 import org.apache.falcon.Pair;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.lock.MemoryLocks;
+import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.UnschedulableEntityException;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.monitors.Dimension;
+import org.apache.falcon.service.FeedSLAMonitoringService;
+import org.apache.falcon.util.DeploymentUtil;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +106,73 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
         }
     }
 
+    public static void validateSlaParams(String entityType, String entityName, String start, String end,
+                                  String colo) throws FalconException {
+        EntityType type = EntityType.getEnum(entityType);
+        if (type != EntityType.FEED) {
+            throw new ValidationException("SLA monitoring is not supported for: " + type);
+        }
+
+        // validate valid feed name.
+        if (StringUtils.isNotBlank(entityName)) {
+            EntityUtil.getEntity(EntityType.FEED, entityName);
+        }
+
+        Date startTime, endTime;
+        // validate mandatory start date
+        if (StringUtils.isBlank(start)) {
+            throw new ValidationException("'start' is mandatory and can not be blank.");
+        } else {
+            startTime = SchemaHelper.parseDateUTC(start);
+        }
+
+        // validate optional end date
+        if (StringUtils.isBlank(end)) {
+            endTime = new Date();
+        } else {
+            endTime = SchemaHelper.parseDateUTC(end);
+        }
+
+        if (startTime.after(endTime)) {
+            throw new ValidationException("start can not be after end");
+        }
+
+        checkColo(colo);
+    }
+
+    /**
+     * Returns the feed instances which are not yet available and have missed either slaLow or slaHigh.
+     * This api doesn't return the feeds which missed SLA but are now available. Purpose of this api is to show feed
+     * instances which you need to attend to.
+     * @param startStr startTime in
+     * @param endStr
+     */
+    public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(String feedName, String startStr, String endStr,
+                                                                       String colo) {
+
+        Set<SchedulableEntityInstance> instances = new HashSet<>();
+        try {
+            checkColo(colo);
+            Date start = EntityUtil.parseDateUTC(startStr);
+            Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr);
+
+            if (StringUtils.isBlank(feedName)) {
+                instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(start, end));
+            } else {
+                for (String clusterName : DeploymentUtil.getCurrentClusters()) {
+                    instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(feedName,
+                            clusterName, start, end));
+                }
+            }
+        } catch (FalconException e) {
+            throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
+        }
+        SchedulableEntityInstanceResult result = new SchedulableEntityInstanceResult(APIResult.Status.SUCCEEDED,
+                "Success!");
+        result.setCollection(instances.toArray());
+        return result;
+    }
+
     /**
      * Submits a new entity and schedules it immediately.
      *

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 47038e5..9d13d74 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -34,19 +34,20 @@ import org.apache.falcon.resource.AbstractSchedulableEntityManager;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.EntitySummaryResult;
 import org.apache.falcon.resource.FeedLookupResult;
+import org.apache.falcon.resource.SchedulableEntityInstanceResult;
 import org.apache.falcon.resource.channel.Channel;
 import org.apache.falcon.resource.channel.ChannelFactory;
-
 import org.apache.falcon.util.DeploymentUtil;
+
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
 import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
-import javax.ws.rs.POST;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
@@ -110,6 +111,36 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
         return new BufferedRequest(request);
     }
 
+    @GET
+    @Path("sla-alert/{type}")
+    @Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
+    @Monitored(event = "feed-sla-misses")
+    public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(
+            @Dimension("entityType") @PathParam("type") final String entityType,
+            @Dimension("entityName") @QueryParam("name") final String entityName,
+            @Dimension("start") @QueryParam("start") final String start,
+            @Dimension("end") @QueryParam("end") final String end,
+            @Dimension("colo") @QueryParam("colo") final String colo) {
+        try {
+            validateSlaParams(entityType, entityName, start, end, colo);
+        } catch (Exception e) {
+            throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+        }
+        return new EntityProxy<SchedulableEntityInstanceResult>(entityType, entityName,
+            SchedulableEntityInstanceResult.class) {
+            @Override
+            protected Set<String> getColosToApply() {
+                return getApplicableColos(entityType, entityName);
+            }
+
+            @Override
+            protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException {
+                return getEntityManager(colo).invoke("getFeedSLAMissPendingAlerts", entityType, entityName,
+                        start, end, colo);
+            }
+        }.execute();
+    }
+
     /**
      * Submit the given entity.
      * @param request Servlet Request

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 8bf43b8..193aa64 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -25,15 +25,20 @@ import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.FeedInstanceStatus;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.resource.SchedulableEntityInstance;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,19 +52,30 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Service to monitor Feed SLAs.
  */
-public class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService {
+public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService {
     private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
 
     private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000);
 
     private static final int ONE_MS = 1;
 
+    private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService();
+
+    private FeedSLAMonitoringService() {
+
+    }
+
+    public static FeedSLAMonitoringService get() {
+        return SERVICE;
+    }
+
     /**
      * Permissions for storePath.
      */
@@ -68,31 +84,31 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
     /**
      * Feeds to be monitored.
      */
-    private static Set<String> monitoredFeeds;
+    private Set<String> monitoredFeeds;
 
 
     /**
      * Map<Pair<feedName, clusterName>, Set<instanceTime> to store
      * each missing instance of a feed.
      */
-    private static Map<Pair<String, String>, Set<Date>> pendingInstances;
+    private Map<Pair<String, String>, Set<Date>> pendingInstances;
 
 
     /**
      * Used to store the last time when pending instances were checked for SLA.
      */
-    private static Date lastCheckedAt;
+    private Date lastCheckedAt;
 
 
     /**
      * Used to store last time when the state was serialized to the store.
      */
-    private static Date lastSerializedAt;
+    private Date lastSerializedAt;
 
     /**
      * Frequency in seconds of "status check" for pending feed instances.
      */
-    private static final int STATUS_CHECK_FREQUENCY_SECS = 10 * 60; // 10 minutes
+    private int statusCheckFrequencySeconds; // 10 minutes
 
 
     /**
@@ -100,7 +116,7 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
      *
      * In every cycle pending feed instances are added for monitoring, till this time in future.
      */
-    private static final int LOOKAHEAD_WINDOW_MILLIS = 15 * 60 * 1000; // 15 MINUTES
+    private int lookAheadWindowMillis; // 15 MINUTES
 
 
     /**
@@ -128,12 +144,14 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
         if (entity.getEntityType() == EntityType.FEED) {
             Feed feed = (Feed) entity;
             // currently sla service is enabled only for fileSystemStorage
-            if (feed.getLocations() != null && feed.getSla() != null) {
+            if (feed.getLocations() != null) {
                 Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
                 for (Cluster cluster : feed.getClusters().getClusters()) {
                     if (currentClusters.contains(cluster.getName())) {
-                        LOG.debug("Adding feed:{} for monitoring", feed.getName());
-                        monitoredFeeds.add(feed.getName());
+                        if (FeedHelper.getSLA(cluster, feed) != null) {
+                            LOG.debug("Adding feed:{} for monitoring", feed.getName());
+                            monitoredFeeds.add(feed.getName());
+                        }
                     }
                 }
             }
@@ -180,13 +198,14 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
         fileSystem = initializeFileSystem();
 
         String freq = StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis", ONE_HOUR);
-        try {
-            serializationFrequencyMillis = Integer.valueOf(freq);
-        } catch (NumberFormatException e) {
-            LOG.error("Invalid value : {} found in startup.properties for the property "
-                    + "feed.sla.serialization.frequency.millis Should be an integer", freq);
-            throw new FalconException("Invalid integer value for property ", e);
-        }
+        serializationFrequencyMillis = Integer.valueOf(freq);
+
+        freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+        statusCheckFrequencySeconds = Integer.valueOf(freq);
+
+        freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
+        lookAheadWindowMillis = Integer.valueOf(freq);
+
         try {
             if (fileSystem.exists(filePath)) {
                 deserialize(filePath);
@@ -198,7 +217,7 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
             throw new FalconException("Couldn't check the existence of " + filePath, e);
         }
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-        executor.scheduleWithFixedDelay(new Monitor(), 0, STATUS_CHECK_FREQUENCY_SECS, TimeUnit.SECONDS);
+        executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
     }
 
     @Override
@@ -231,7 +250,7 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
 
                     // add Instances from last checked time to 10 minutes from now(some buffer for status check)
                     Date now = new Date();
-                    Date newCheckPoint = new Date(now.getTime() + LOOKAHEAD_WINDOW_MILLIS);
+                    Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
                     addNewPendingFeedInstances(lastCheckedAt, newCheckPoint);
                     lastCheckedAt = newCheckPoint;
 
@@ -339,10 +358,10 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
     private void deserialize(Path path) throws FalconException {
         try {
             Map<String, Object> state = deserializeInternal(path);
-            pendingInstances = (Map<Pair<String, String>, Set<Date>>) state.get("pendingInstances");
+            pendingInstances = (ConcurrentHashMap<Pair<String, String>, Set<Date>>) state.get("pendingInstances");
             lastCheckedAt = new Date((Long) state.get("lastCheckedAt"));
             lastSerializedAt = new Date((Long) state.get("lastSerializedAt"));
-            monitoredFeeds = new HashSet<>(); // will be populated on the onLoad of entities.
+            monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities.
             LOG.debug("Restored the service from old state.");
         } catch (IOException | ClassNotFoundException e) {
             throw new FalconException("Couldn't deserialize the old state", e);
@@ -350,10 +369,10 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
     }
 
     private void initializeService() {
-        pendingInstances = new HashMap<>();
+        pendingInstances = new ConcurrentHashMap<>();
         lastCheckedAt = new Date();
         lastSerializedAt = new Date();
-        monitoredFeeds = new HashSet<>();
+        monitoredFeeds = new ConcurrentHashSet<>();
     }
 
     @SuppressWarnings("unchecked")
@@ -364,10 +383,98 @@ public class FeedSLAMonitoringService implements ConfigurationChangeListener, Fa
         try {
             state = (Map<String, Object>) ois.readObject();
         } finally {
-            ois.close();
+            IOUtils.closeQuietly(ois);
         }
         return state;
     }
 
-}
+    /**
+     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed
+     * slaLow or slaHigh.
+     *
+     * Only feeds which have defined sla in their definition are considered.
+     * Only the feed instances between the given time range are considered.
+     * Start time and end time are both inclusive.
+     * @param start start time, inclusive
+     * @param end end time, inclusive
+     * @return
+     * @throws FalconException
+     */
+    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end)
+        throws FalconException {
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        for (Map.Entry<Pair<String, String>, Set<Date>> feedInstances : pendingInstances.entrySet()) {
+            Pair<String, String> feedClusterPair = feedInstances.getKey();
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
+            Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
+            Sla sla = FeedHelper.getSLA(cluster, feed);
+            if (sla != null) {
+                Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, feedInstances.getValue());
+                for (Pair<Date, String> status : slaStatus){
+                    SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
+                            feedClusterPair.second, status.first, EntityType.FEED);
+                    instance.setTags(status.second);
+                    result.add(instance);
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range
+     * which missed sla.Only those instances are included which have missed either slaLow or slaHigh.
+     * @param feedName name of the feed
+     * @param clusterName cluster name
+     * @param start start time, inclusive
+     * @param end end time, inclusive
+     * @return
+     * @throws FalconException
+     */
+    public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName,
+                                                                 Date start, Date end) throws FalconException {
+
+        Set<SchedulableEntityInstance> result = new HashSet<>();
+        Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
+        Set<Date> missingInstances = pendingInstances.get(feedClusterPair);
+        Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
+        Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
+        Sla sla = FeedHelper.getSLA(cluster, feed);
+        if (missingInstances != null && sla != null) {
+            Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, missingInstances);
+            for (Pair<Date, String> status : slaStatus){
+                SchedulableEntityInstance instance = new SchedulableEntityInstance(feedName, clusterName, status.first,
+                        EntityType.FEED);
+                instance.setTags(status.second);
+                result.add(instance);
+            }
+        }
+        return result;
+    }
 
+    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, Set<Date> missingInstances)
+        throws FalconException {
+        String tagCritical = "Missed SLA High";
+        String tagWarn = "Missed SLA Low";
+        Date now = new Date();
+        Frequency slaLow = sla.getSlaLow();
+        Frequency slaHigh = sla.getSlaHigh();
+        Set<Pair<Date, String>> result = new HashSet<>();
+        for (Date nominalTime : missingInstances) {
+            if (!nominalTime.before(start) && !nominalTime.after(end)) {
+                ExpressionHelper.setReferenceDate(nominalTime);
+                ExpressionHelper evaluator = ExpressionHelper.get();
+                Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
+                Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class);
+                Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
+                Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration);
+                if (slaCriticalTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, tagCritical));
+                } else if (slaWarnTime.before(now)) {
+                    result.add(new Pair<>(nominalTime, tagWarn));
+                }
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
new file mode 100644
index 0000000..bc03cb5
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.EntityNotRegisteredException;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Tests for FeedSLAMonitoring Service.
+ */
+public class FeedSLAMonitoringTest {
+
+    @Test
+    public void testSLAStatus() throws FalconException {
+        // sla, start, end, missingInstances
+        Sla sla = new Sla();
+        sla.setSlaLow(new Frequency("days(1)"));
+        sla.setSlaHigh(new Frequency("days(2)"));
+
+        Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z");
+        Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z");
+
+        Set<Date> missingInstances = new HashSet<>();
+        missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start time
+        missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to start time
+        missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between
+        missingInstances.add(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"));
+        missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time
+        missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time
+
+        Set<Pair<Date, String>> result = FeedSLAMonitoringService.get().getSLAStatus(sla, start, end, missingInstances);
+        Set<Pair<Date, String>> expected = new HashSet<>();
+        expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), "Missed SLA High"));
+        expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), "Missed SLA High"));
+        expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-07T00:00Z"), "Missed SLA High"));
+        expected.add(new Pair<>(SchemaHelper.parseDateUTC("2015-05-05T00:00Z"), "Missed SLA High"));
+        Assert.assertEquals(result, expected);
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+            expectedExceptionsMessageRegExp = "SLA monitoring is not supported for: PROCESS")
+    public void testInvalidType() throws FalconException {
+        AbstractSchedulableEntityManager.validateSlaParams("process",
+                "in", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+    }
+
+    @Test(expectedExceptions = EntityNotRegisteredException.class,
+            expectedExceptionsMessageRegExp = ".*\\(FEED\\) not found.*")
+    public void testInvalidName() throws FalconException {
+        AbstractSchedulableEntityManager.validateSlaParams("feed",
+                "non-existent", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "2015-05-00T00:00Z is not a valid UTC string")
+    public void testInvalidStart() throws FalconException {
+        AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-00T00:00Z", "2015-05-05T00:00Z", "*");
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+            expectedExceptionsMessageRegExp = "start can not be after end")
+    public void testInvalidRange() throws FalconException {
+        AbstractSchedulableEntityManager.validateSlaParams("feed",
+                null, "2015-05-05T00:00Z", "2014-05-05T00:00Z", "*");
+    }
+
+    @Test
+    public void testOptionalName() throws FalconException {
+        AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+        AbstractSchedulableEntityManager.validateSlaParams("feed", "", "2015-05-05T00:00Z", "2015-05-05T00:00Z", "*");
+    }
+
+    @Test
+    public void testOptionalEnd() throws FalconException {
+        AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", "", "*");
+        AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*");
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 9c6aef7..8e4ce97 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -96,9 +96,19 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 
 *.falcon.cleanup.service.frequency=days(1)
 
+######### Properties for Feed SLA Monitoring #########
 # frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
 *.feed.sla.serialization.frequency.millis=3600000
 
+# Do not change unless really sure
+# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
+*.feed.sla.statusCheck.frequency.seconds=600
+
+# Do not change unless really sure
+# Time Duration (in milliseconds) in future for generating pending feed instances.
+# In every cycle pending feed instances are added for monitoring, till this time in future.
+# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
+*.feed.sla.lookAheadWindow.millis=900000
 
 ######### Properties for configuring JMS provider - activemq #########
 # Default Active MQ url

http://git-wip-us.apache.org/repos/asf/falcon/blob/87920467/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 1c0fc74..4a20e41 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.resource;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconWebException;
 import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.monitors.Monitored;
 
@@ -33,6 +34,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
 /**
  * Entity management operations as REST API for feed and process.
@@ -52,6 +54,24 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
     }
 
     @GET
+    @Path("sla-alert/{type}")
+    @Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_XML})
+    @Monitored(event = "feed-sla-misses")
+    public SchedulableEntityInstanceResult getFeedSLAMissPendingAlerts(
+            @Dimension("entityType") @PathParam("type") String entityType,
+            @Dimension("entityName") @QueryParam("name") String entityName,
+            @Dimension("start") @QueryParam("start") String start,
+            @Dimension("end") @QueryParam("end") String end,
+            @Dimension("colo") @QueryParam("colo") final String colo) {
+        try {
+            validateSlaParams(entityType, entityName, start, end, colo);
+        } catch (Exception e) {
+            throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
+        }
+        return super.getFeedSLAMissPendingAlerts(entityName, start, end, colo);
+    }
+
+    @GET
     @Path("dependencies/{type}/{entity}")
     @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
     @Monitored(event = "dependencies")


Mime
View raw message