falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [3/3] falcon git commit: FALCON-2052 Process SLA monitoring
Date Wed, 06 Jul 2016 03:21:24 GMT
FALCON-2052 Process SLA monitoring

Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: Ajay Yadava <ajayyadava@apache.org>, Pallavi Rao

Closes #202 from PraveenAdlakha/2052


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/60e2f68b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/60e2f68b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/60e2f68b

Branch: refs/heads/master
Commit: 60e2f68b867476f600ba43dc4dafb97971a78b2e
Parents: bd32b61
Author: Praveen Adlakha <adlakha.praveen@gmail.com>
Authored: Wed Jul 6 08:51:03 2016 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Wed Jul 6 08:51:03 2016 +0530

----------------------------------------------------------------------
 client/src/main/resources/process-0.1.xsd       |   4 +-
 .../org/apache/falcon/entity/ProcessHelper.java |  36 ++
 .../falcon/persistence/EntitySLAAlertBean.java  | 168 +++++
 .../falcon/persistence/FeedSLAAlertBean.java    | 134 ----
 .../falcon/persistence/MonitoredEntityBean.java | 103 +++
 .../falcon/persistence/MonitoredFeedsBean.java  |  73 ---
 .../falcon/persistence/PendingInstanceBean.java |  58 +-
 .../persistence/PersistenceConstants.java       |  10 +-
 .../falcon/tools/FalconStateStoreDBCLI.java     |   4 +-
 .../src/main/resources/META-INF/persistence.xml |  18 +-
 common/src/main/resources/startup.properties    |   4 +-
 .../apache/falcon/entity/AbstractTestBase.java  |   2 +-
 .../entity/store/FeedLocationStoreTest.java     |   2 +-
 docs/src/site/twiki/FalconNativeScheduler.twiki |   2 +-
 docs/src/site/twiki/FeedSLAMonitoring.twiki     |   2 +-
 .../falcon/handler/SLAMonitoringHandler.java    |  13 +-
 .../falcon/jdbc/MonitoringJdbcStateStore.java   | 134 ++--
 .../AbstractSchedulableEntityManager.java       |   8 +-
 .../proxy/SchedulableEntityManagerProxy.java    |   2 +-
 .../falcon/service/EntitySLAAlertService.java   | 168 +++++
 .../falcon/service/EntitySLAListener.java       |   3 +-
 .../service/EntitySLAMonitoringService.java     | 644 +++++++++++++++++++
 .../falcon/service/FeedSLAAlertService.java     | 163 -----
 .../service/FeedSLAMonitoringService.java       | 450 -------------
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  77 ++-
 .../service/EntitySLAAlertServiceTest.java      | 213 ++++++
 .../falcon/service/FeedSLAAlertServiceTest.java | 162 -----
 .../falcon/service/FeedSLAMonitoringTest.java   |   5 +-
 src/build/findbugs-exclude.xml                  |   4 +-
 src/conf/startup.properties                     |   6 +-
 30 files changed, 1544 insertions(+), 1128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 0d01e33..7ed8474 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -204,8 +204,7 @@
             </xs:documentation>
         </xs:annotation>
         <xs:sequence>
-            <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1">
-            </xs:element>
+            <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1"/>
         </xs:sequence>
     </xs:complexType>
 
@@ -218,6 +217,7 @@
         </xs:annotation>
         <xs:sequence>
             <xs:element type="validity" name="validity"/>
+            <xs:element type="sla" name="sla" minOccurs="0" maxOccurs="1"/>
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:int" name="version" use="optional" default="0"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index bbfca68..e563d18 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -26,6 +26,8 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Validity;
+import  org.apache.falcon.entity.v0.process.Sla;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.resource.SchedulableEntityInstance;
@@ -185,4 +187,38 @@ public final class ProcessHelper {
         }
         return result;
     }
+
+    public static Validity getClusterValidity(Process process, String clusterName) throws FalconException {
+        org.apache.falcon.entity.v0.process.Cluster cluster = getCluster(process, clusterName);
+        if (cluster == null) {
+            throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName());
+        }
+        return cluster.getValidity();
+    }
+
+    public static Sla getSLA(String clusterName, Process process) throws FalconException{
+        Cluster cluster = getCluster(process, clusterName);
+        if (cluster == null){
+            throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName());
+        }
+        return getSLA(cluster, process);
+    }
+
+    public static Sla getSLA(Cluster cluster, Process process) {
+        final Sla clusterSla = cluster.getSla();
+        if (clusterSla != null) {
+            return clusterSla;
+        }
+        return process.getSla();
+    }
+
+    public static Date getProcessValidityStart(Process process, String clusterName) throws FalconException {
+        Cluster processCluster = getCluster(process, clusterName);
+        if (processCluster != null) {
+            return processCluster.getValidity().getStart();
+        } else {
+            throw new FalconException("No matching cluster " + clusterName
+                    + "found for process " + process.getName());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
new file mode 100644
index 0000000..e2096fe
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
+import java.util.Date;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Feed SLA monitoring.
+ * */
+@Entity
+@NamedQueries({
+@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERTS, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.entityType = :entityType"),
+@NamedQuery(name = PersistenceConstants.GET_ALL_ENTITY_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "),
+@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from EntitySLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "),
+    @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update EntitySLAAlertBean a set a.isSLAHighMissed = true where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERT_INSTANCE, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE, query = "delete from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+})
+@Table(name = "ENTITY_SLA_ALERTS")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class EntitySLAAlertBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_name")
+    private String entityName;
+
+    @Basic
+    @NotNull
+    @Column(name = "cluster_name")
+    private String clusterName;
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(String entityType) throws FalconException {
+        checkEntityType(entityType);
+        this.entityType = entityType;
+    }
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_type")
+    private String entityType;
+
+    @Basic
+    @NotNull
+    @Column(name = "nominal_time")
+    private Date nominalTime;
+
+    @Basic
+    @Column(name = "sla_low_missed")
+    private Boolean isSLALowMissed = false;
+
+    @Basic
+    @Column(name = "sla_high_missed")
+    private Boolean isSLAHighMissed = false;
+
+    @Basic
+    @Column(name = "sla_low_alert_sent")
+    private Boolean slaLowAlertSent;
+
+
+    @Basic
+    @Column(name = "sla_high_alert_sent")
+    private Boolean slaHighAlertSent;
+
+    public Date getNominalTime() {
+        return new Date(nominalTime.getTime());
+    }
+
+    public void setNominalTime(Date nominalTime) {
+        this.nominalTime = new Date(nominalTime.getTime());
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
+    }
+
+    public Boolean getIsSLALowMissed() {
+        return isSLALowMissed;
+    }
+
+    public void setIsSLALowMissed(Boolean isSLALowMissed) {
+        this.isSLALowMissed = isSLALowMissed;
+    }
+
+    public Boolean getIsSLAHighMissed() {
+        return isSLAHighMissed;
+    }
+
+    public void setIsSLAHighMissed(Boolean isSLAHighMissed) {
+        this.isSLAHighMissed = isSLAHighMissed;
+    }
+
+    public static final String ENTITYNAME = "entityName";
+
+    public static final String CLUSTERNAME = "clusterName";
+
+    public static final String ENTITYTYPE = "entityType";
+
+    public static final String NOMINALTIME = "nominalTime";
+
+    void checkEntityType(String entityType)throws FalconException{
+        if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+            return;
+        } else {
+            throw new FalconException("EntityType"+ entityType
+                    + " is not valid,Feed and Process are the valid input type.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
deleted file mode 100644
index 4ea3454..0000000
--- a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.persistence;
-
-import java.util.Date;
-
-import javax.persistence.Basic;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
- * Feed SLA monitoring.
- * */
-@Entity
-@NamedQueries({
-@NamedQuery(name = PersistenceConstants.GET_FEED_ALERTS, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName"),
-@NamedQuery(name = PersistenceConstants.GET_ALL_FEED_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "),
-@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from FeedSLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "),
-    @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update FeedSLAAlertBean a set a.isSLAHighMissed = true where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
-@NamedQuery(name = PersistenceConstants.GET_FEED_ALERT_INSTANCE, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime "),
- @NamedQuery(name = PersistenceConstants.DELETE_FEED_ALERT_INSTANCE, query = "delete from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime")
-})
-@Table(name = "FEED_SLA_ALERTS")
-//RESUME CHECKSTYLE CHECK  LineLengthCheck
-public class FeedSLAAlertBean {
-    @NotNull
-    @GeneratedValue(strategy = GenerationType.AUTO)
-    @Id
-    private String id;
-
-    @Basic
-    @NotNull
-    @Column(name = "feed_name")
-    private String feedName;
-
-    @Basic
-    @NotNull
-    @Column(name = "cluster_name")
-    private String clusterName;
-
-    @Basic
-    @NotNull
-    @Column(name = "nominal_time")
-    private Date nominalTime;
-
-    @Basic
-    @Column(name = "sla_low_missed")
-    private Boolean isSLALowMissed = false;
-
-    @Basic
-    @Column(name = "sla_high_missed")
-    private Boolean isSLAHighMissed = false;
-
-    @Basic
-    @Column(name = "sla_low_alert_sent")
-    private Boolean slaLowAlertSent;
-
-
-    @Basic
-    @Column(name = "sla_high_alert_sent")
-    private Boolean slaHighAlertSent;
-
-    public Date getNominalTime() {
-        return new Date(nominalTime.getTime());
-    }
-
-    public void setNominalTime(Date nominalTime) {
-        this.nominalTime = new Date(nominalTime.getTime());
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    public void setFeedName(String feedName) {
-        this.feedName = feedName;
-    }
-
-    public Boolean getIsSLALowMissed() {
-        return isSLALowMissed;
-    }
-
-    public void setIsSLALowMissed(Boolean isSLALowMissed) {
-        this.isSLALowMissed = isSLALowMissed;
-    }
-
-    public Boolean getIsSLAHighMissed() {
-        return isSLAHighMissed;
-    }
-
-    public void setIsSLAHighMissed(Boolean isSLAHighMissed) {
-        this.isSLAHighMissed = isSLAHighMissed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
new file mode 100644
index 0000000..20ce537
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.persistence.Basic;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The Feeds that are to be monitered will be stored in the db.
+* */
+
+@Entity
+@NamedQueries({
+        @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+                + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"),
+        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean "
+                + "a where a.entityName = :entityName and a.entityType = :entityType"),
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
+                + "from MonitoredEntityBean a")
+})
+@Table(name="MONITORED_ENTITY")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class MonitoredEntityBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_name")
+    private String entityName;
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(String entityType) throws FalconException {
+        checkEntityType(entityType);
+        this.entityType = entityType;
+    }
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_type")
+    private String entityType;
+
+    public String getFeedName() {
+        return entityName;
+    }
+
+    public void setEntityName(String feedName) {
+        this.entityName = feedName;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public static final String ENTITYNAME = "entityName";
+
+    public static final String ENTITYTYPE = "entityType";
+
+    void checkEntityType(String entityType)throws FalconException {
+        if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+            return;
+        } else {
+            throw new FalconException("EntityType"+ entityType
+                    + " is not valid,Feed and Process are the valid input type.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
deleted file mode 100644
index 2b48569..0000000
--- a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.persistence;
-
-import javax.persistence.Entity;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Column;
-import javax.persistence.Basic;
-import javax.validation.constraints.NotNull;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
-* The Feeds that are to be monitered will be stored in the db.
-* */
-
-@Entity
-@NamedQueries({
-        @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
-                + "MonitoredFeedsBean a where a.feedName = :feedName"),
-        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean "
-                + "a where a.feedName = :feedName"),
-        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
-                + "from MonitoredFeedsBean a")
-})
-@Table(name="MONITORED_FEEDS")
-//RESUME CHECKSTYLE CHECK  LineLengthCheck
-public class MonitoredFeedsBean {
-    @NotNull
-    @GeneratedValue(strategy = GenerationType.AUTO)
-    @Id
-    private String id;
-
-    @Basic
-    @NotNull
-    @Column(name = "feed_name")
-    private String feedName;
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    public void setFeedName(String feedName) {
-        this.feedName = feedName;
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 41eb048..863abdc 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -17,6 +17,9 @@
  */
 package org.apache.falcon.persistence;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
 import javax.persistence.Entity;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
@@ -35,13 +38,13 @@ import java.util.Date;
 * */
 @Entity
 @NamedQueries({
-    @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.feedName = :feedName"),
-    @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
-    @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
-    @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
-    @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+    @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
     @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a "),
-    @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select  OBJECT(a) from PendingInstanceBean a  where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime")
+    @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select  OBJECT(a) from PendingInstanceBean a  where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
 })
 @Table(name = "PENDING_INSTANCES")
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
@@ -53,8 +56,8 @@ public class PendingInstanceBean {
 
     @Basic
     @NotNull
-    @Column(name = "feed_name")
-    private String feedName;
+    @Column(name = "entity_name")
+    private String entityName;
 
     @Basic
     @NotNull
@@ -66,6 +69,20 @@ public class PendingInstanceBean {
     @Column(name = "nominal_time")
     private Date nominalTime;
 
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public void setEntityType(String entityType) throws FalconException {
+        checkEntityType(entityType);
+        this.entityType = entityType;
+    }
+
+    @Basic
+    @NotNull
+    @Column(name = "entity_type")
+    private String entityType;
+
     public Date getNominalTime() {
         return nominalTime;
     }
@@ -90,11 +107,28 @@ public class PendingInstanceBean {
         this.clusterName = clusterName;
     }
 
-    public String getFeedName() {
-        return feedName;
+    public String getEntityName() {
+        return entityName;
+    }
+
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
     }
 
-    public void setFeedName(String feedName) {
-        this.feedName = feedName;
+    public static final String ENTITYNAME = "entityName";
+
+    public static final String CLUSTERNAME = "clusterName";
+
+    public static final String NOMINALTIME = "nominalTime";
+
+    public static final String ENTITYTYPE = "entityType";
+
+    void checkEntityType(String entityType)throws FalconException {
+        if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+            return;
+        } else {
+            throw new FalconException("EntityType"+ entityType
+                    + " is not valid,Feed and Process are the valid input type.");
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 72c382e..f9aa1f5 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -30,7 +30,7 @@ public final class PersistenceConstants {
     public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
     public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE";
     public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
-    public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED";
+    public static final String DELETE_ALL_INSTANCES_FOR_ENTITY = "DELETE_ALL_INSTANCES_FOR_ENTITY";
     public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
     public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
     public static final String GET_ENTITY = "GET_ENTITY";
@@ -55,10 +55,10 @@ public final class PersistenceConstants {
     public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE";
     public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
     public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
-    public static final String GET_FEED_ALERTS = "GET_FEED_ALERTS";
-    public static final String GET_ALL_FEED_ALERTS = "GET_ALL_FEED_ALERTS";
+    public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
+    public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
     public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
     public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
-    public static final String GET_FEED_ALERT_INSTANCE = "GET_FEED_ALERT_INSTANCE";
-    public static final String DELETE_FEED_ALERT_INSTANCE = "DELETE_FEED_ALERT_INSTANCE";
+    public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
+    public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 1bdfc25..102b986 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -244,8 +244,8 @@ public class FalconStateStoreDBCLI {
         args.add("org.apache.falcon.persistence.EntityBean");
         args.add("org.apache.falcon.persistence.InstanceBean");
         args.add("org.apache.falcon.persistence.PendingInstanceBean");
-        args.add("org.apache.falcon.persistence.MonitoredFeedsBean");
-        args.add("org.apache.falcon.persistence.FeedSLAAlertBean");
+        args.add("org.apache.falcon.persistence.MonitoredEntityBean");
+        args.add("org.apache.falcon.persistence.EntitySLAAlertBean");
         return args.toArray(new String[args.size()]);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index c9b444d..ac2f397 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -26,8 +26,8 @@
         <class>org.apache.falcon.persistence.EntityBean</class>
         <class>org.apache.falcon.persistence.InstanceBean</class>
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
-        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
-        <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+        <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
+        <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
 
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -37,7 +37,7 @@
             <property name="openjpa.MetaDataFactory"
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
-                org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
 
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
@@ -58,8 +58,8 @@
         <class>org.apache.falcon.persistence.EntityBean</class>
         <class>org.apache.falcon.persistence.InstanceBean</class>
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
-        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
-        <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+        <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
+        <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
 
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -69,7 +69,7 @@
             <property name="openjpa.MetaDataFactory"
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
-                org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
 
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>
@@ -88,9 +88,9 @@
 
         <class>org.apache.falcon.persistence.EntityBean</class>
         <class>org.apache.falcon.persistence.InstanceBean</class>
-        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+        <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
         <class>org.apache.falcon.persistence.PendingInstanceBean</class>
-        <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+        <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
 
         <properties>
             <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -100,7 +100,7 @@
             <property name="openjpa.MetaDataFactory"
                       value="jpa(Types=org.apache.falcon.persistence.EntityBean;
                 org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
-                org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+                org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
 
             <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
             <property name="openjpa.LockManager" value="pessimistic"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 374ff17..de24621 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -35,7 +35,7 @@
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.extensions.ExtensionService,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
@@ -67,7 +67,7 @@
                         org.apache.falcon.entity.ColoClusterRelation,\
                         org.apache.falcon.group.FeedGroupMap,\
                         org.apache.falcon.entity.store.FeedLocationStore,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
 ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
 #                       org.apache.falcon.state.store.jdbc.JdbcStateStore

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 3817056..afd9307 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -79,7 +79,7 @@ public class AbstractTestBase {
         cleanupStore();
         String listeners = StartupProperties.get().getProperty("configstore.listeners");
         listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "");
-        listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", "");
+        listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", "");
         StartupProperties.get().setProperty("configstore.listeners", listeners);
         store = ConfigurationStore.get();
         store.init();

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
index 40c077e..d13769e 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
@@ -63,7 +63,7 @@ public class FeedLocationStoreTest extends AbstractTestBase {
         cleanupStore();
         String listeners = StartupProperties.get().getProperty("configstore.listeners");
         listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "");
-        listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", "");
+        listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", "");
         StartupProperties.get().setProperty("configstore.listeners", listeners);
         store = ConfigurationStore.get();
         store.init();

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 1f51739..b15fd5b 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -27,7 +27,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
-                        org.apache.falcon.service.FeedSLAMonitoringService,\
+                        org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FeedSLAMonitoring.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FeedSLAMonitoring.twiki b/docs/src/site/twiki/FeedSLAMonitoring.twiki
index 88132ce..469c0aa 100644
--- a/docs/src/site/twiki/FeedSLAMonitoring.twiki
+++ b/docs/src/site/twiki/FeedSLAMonitoring.twiki
@@ -6,7 +6,7 @@ Feed SLA monitoring service requires FalconJPAService to be up.Following are the
 In startup.properties :
 
 *.application.services= org.apache.falcon.state.store.service.FalconJPAService,
-                        org.apache.falcon.service.FeedSLAMonitoringService
+                        org.apache.falcon.service.EntitySLAMonitoringService
 
 These properties are required for FalconJPAService in statestore.properties:
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
index df2a1e0..56376fc 100644
--- a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
+++ b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
@@ -26,7 +26,7 @@ import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.service.FeedSLAMonitoringService;
+import org.apache.falcon.service.EntitySLAMonitoringService;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowExecutionListener;
 import org.apache.hadoop.fs.Path;
@@ -45,8 +45,14 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener {
     @Override
     public void onSuccess(WorkflowExecutionContext context) throws FalconException {
         if (context.hasWorkflowSucceeded()) {
-            updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(),
+            if (context.getEntityType().toString().equals(EntityType.FEED.name())){
+                updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(),
                     context.getOutputFeedInstancePathsList());
+            }
+            if (context.getEntityType().toString().equals(EntityType.PROCESS.name())){
+                EntitySLAMonitoringService.get().makeProcessInstanceAvailable(context.getClusterName(),
+                        context.getEntityName(), context.getNominalTimeAsISO8601(), context.getEntityType());
+            }
         }
     }
 
@@ -60,7 +66,8 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener {
                 String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
                 Date date = FeedHelper.getDate(templatePath, new Path(outputFeedInstancePathsList[index]),
                     EntityUtil.getTimeZone(feed));
-                FeedSLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index], clusterName, date);
+                EntitySLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index],
+                        clusterName, date);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 4fd1b53..c1f818a 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -18,11 +18,13 @@
 package org.apache.falcon.jdbc;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.falcon.persistence.MonitoredFeedsBean;
-import org.apache.falcon.persistence.FeedSLAAlertBean;
-import org.apache.falcon.persistence.PersistenceConstants;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.persistence.MonitoredEntityBean;
 import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.persistence.PersistenceConstants;
 import org.apache.falcon.persistence.ResultNotFoundException;
+import org.apache.falcon.persistence.EntitySLAAlertBean;
 import org.apache.falcon.service.FalconJPAService;
 
 import javax.persistence.EntityManager;
@@ -32,7 +34,7 @@ import java.util.Date;
 import java.util.List;
 
 /**
-* StateStore for MonitoringFeeds and PendingFeedInstances.
+* StateStore for MonitoringEntity and PendingEntityInstances.
 */
 
 public class MonitoringJdbcStateStore {
@@ -42,23 +44,25 @@ public class MonitoringJdbcStateStore {
     }
 
 
-    public void putMonitoredFeed(String feedName){
+    public void putMonitoredEntity(String entityName, String entityType) throws FalconException{
 
-        MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean();
-        monitoredFeedsBean.setFeedName(feedName);
+        MonitoredEntityBean monitoredEntityBean = new MonitoredEntityBean();
+        monitoredEntityBean.setEntityName(entityName);
+        monitoredEntityBean.setEntityType(entityType);
         EntityManager entityManager = getEntityManager();
         try {
             beginTransaction(entityManager);
-            entityManager.persist(monitoredFeedsBean);
+            entityManager.persist(monitoredEntityBean);
         } finally {
             commitAndCloseTransaction(entityManager);
         }
     }
 
-    public MonitoredFeedsBean getMonitoredFeed(String feedName){
+    public MonitoredEntityBean getMonitoredEntity(String entityName, String entityType){
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
-        q.setParameter("feedName", feedName);
+        q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
+        q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
         List result = q.getResultList();
         try {
             if (result.isEmpty()) {
@@ -67,14 +71,15 @@ public class MonitoringJdbcStateStore {
         } finally {
             entityManager.close();
         }
-        return ((MonitoredFeedsBean)result.get(0));
+        return ((MonitoredEntityBean)result.get(0));
     }
 
-    public void deleteMonitoringFeed(String feedName) {
+    public void deleteMonitoringEntity(String entityName, String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
-        q.setParameter("feedName", feedName);
+        q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
+        q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -82,7 +87,7 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException {
+    public List<MonitoredEntityBean> getAllMonitoredFeed() throws ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
         List result = q.getResultList();
@@ -90,22 +95,24 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
-    public Date getLastInstanceTime(String feedName) throws ResultNotFoundException {
+    public Date getLastInstanceTime(String entityName , String entityType) throws ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class);
-        q.setParameter("feedName", feedName);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         Date result = (Date)q.getSingleResult();
         entityManager.close();
         return result;
     }
 
-    public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
+    public void deletePendingInstance(String entityName, String clusterName , Date nominalTime, String entityType){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -113,12 +120,13 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void deletePendingInstances(String feedName, String clusterName){
+    public void deletePendingInstances(String entityName, String clusterName, String entityType){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -126,23 +134,26 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void putPendingInstances(String feed, String clusterName, Date nominalTime){
+    public void putPendingInstances(String entity, String clusterName, Date nominalTime, String entityType)
+        throws FalconException{
         EntityManager entityManager = getEntityManager();
         PendingInstanceBean pendingInstanceBean = new PendingInstanceBean();
-        pendingInstanceBean.setFeedName(feed);
+        pendingInstanceBean.setEntityName(entity);
         pendingInstanceBean.setClusterName(clusterName);
         pendingInstanceBean.setNominalTime(nominalTime);
+        pendingInstanceBean.setEntityType(entityType);
 
         beginTransaction(entityManager);
         entityManager.persist(pendingInstanceBean);
         commitAndCloseTransaction(entityManager);
     }
 
-    public List<Date> getNominalInstances(String feedName, String clusterName) {
+    public List<Date> getNominalInstances(String entityName, String clusterName, String entityType) {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         List result = q.getResultList();
         entityManager.close();
         return result;
@@ -168,15 +179,17 @@ public class MonitoringJdbcStateStore {
         entityManager.close();
     }
 
-    public PendingInstanceBean getPendingInstance(String feedName, String clusterName, Date nominalTime) {
+    public PendingInstanceBean getPendingInstance(String entityName, String clusterName, Date nominalTime,
+                                                  String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         TypedQuery<PendingInstanceBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCE,
                             PendingInstanceBean.class);
-        q.setParameter("feedName", feedName);
+        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
 
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+        q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
+        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
         try {
             return q.getSingleResult();
         } finally {
@@ -184,14 +197,16 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public FeedSLAAlertBean getFeedAlertInstance(String feedName, String clusterName, Date nominalTime) {
+    public EntitySLAAlertBean getEntityAlertInstance(String entityName, String clusterName, Date nominalTime,
+                                                     String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        TypedQuery<FeedSLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_FEED_ALERT_INSTANCE,
-                FeedSLAAlertBean.class);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        TypedQuery<EntitySLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.
+                GET_ENTITY_ALERT_INSTANCE, EntitySLAAlertBean.class);
+        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
         try {
             return q.getSingleResult();
         } finally {
@@ -199,30 +214,32 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void putSLAAlertInstance(String feedName, String cluster, Date nominalTime, Boolean isSLALowMissed,
-                                    Boolean isSLAHighMissed) {
+    public void putSLAAlertInstance(String entityName, String cluster, String entityType, Date nominalTime,
+                                    Boolean isSLALowMissed, Boolean isSLAHighMissed) throws FalconException{
         EntityManager entityManager = getEntityManager();
-        FeedSLAAlertBean feedSLAAlertBean = new FeedSLAAlertBean();
-        feedSLAAlertBean.setFeedName(feedName);
-        feedSLAAlertBean.setClusterName(cluster);
-        feedSLAAlertBean.setNominalTime(nominalTime);
-        feedSLAAlertBean.setIsSLALowMissed(isSLALowMissed);
-        feedSLAAlertBean.setIsSLAHighMissed(isSLAHighMissed);
+        EntitySLAAlertBean entitySLAAlertBean = new EntitySLAAlertBean();
+        entitySLAAlertBean.setEntityName(entityName);
+        entitySLAAlertBean.setClusterName(cluster);
+        entitySLAAlertBean.setNominalTime(nominalTime);
+        entitySLAAlertBean.setIsSLALowMissed(isSLALowMissed);
+        entitySLAAlertBean.setIsSLAHighMissed(isSLAHighMissed);
+        entitySLAAlertBean.setEntityType(entityType);
         try {
             beginTransaction(entityManager);
-            entityManager.persist(feedSLAAlertBean);
+            entityManager.persist(entitySLAAlertBean);
         } finally {
             commitAndCloseTransaction(entityManager);
         }
     }
 
-    public void updateSLAAlertInstance(String feedName, String clusterName, Date nominalTime) {
+    public void updateSLAAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_SLA_HIGH);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -230,13 +247,14 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public void deleteFeedAlertInstance(String feedName, String clusterName, Date nominalTime){
+    public void deleteEntityAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_FEED_ALERT_INSTANCE);
-        q.setParameter("feedName", feedName);
-        q.setParameter("clusterName", clusterName);
-        q.setParameter("nominalTime", nominalTime);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE);
+        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
         try{
             q.executeUpdate();
         } finally {
@@ -245,7 +263,7 @@ public class MonitoringJdbcStateStore {
     }
 
 
-    public List<FeedSLAAlertBean> getSLAHighCandidates() {
+    public List<EntitySLAAlertBean> getSLAHighCandidates() {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_SLA_HIGH_CANDIDATES);

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index c6903a4..895f8b2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -31,7 +31,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.UnschedulableEntityException;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.monitors.Dimension;
-import org.apache.falcon.service.FeedSLAMonitoringService;
+import org.apache.falcon.service.EntitySLAMonitoringService;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -162,11 +162,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
             Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr);
 
             if (StringUtils.isBlank(feedName)) {
-                instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(start, end));
+                instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end));
             } else {
                 for (String clusterName : DeploymentUtil.getCurrentClusters()) {
-                    instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(feedName,
-                            clusterName, start, end));
+                    instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(feedName,
+                            clusterName, start, end, EntityType.FEED.toString()));
                 }
             }
         } catch (FalconException e) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 53a9de1..249c273 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -134,7 +134,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
             @Override
             protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException {
-                return getEntityManager(colo).invoke("getFeedSLAMissPendingAlerts", entityType, entityName,
+                return getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, entityName,
                         start, end, colo);
             }
         }.execute();

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
new file mode 100644
index 0000000..f023c35
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+  * Service to know which all feeds have missed SLA.
+  */
+public final class EntitySLAAlertService implements FalconService, EntitySLAListener {
+
+    private static final String NAME = "EntitySLAAlertService";
+
+    private static final Logger LOG = LoggerFactory.getLogger(EntitySLAAlertService.class);
+
+    private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
+
+    private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>();
+
+    private static final EntitySLAAlertService SERVICE = new EntitySLAAlertService();
+
+    public static EntitySLAAlertService get() {
+        return SERVICE;
+    }
+
+    private EntitySLAAlertService(){}
+
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public void init() throws FalconException {
+        String listenerClassNames = StartupProperties.get().
+                getProperty("feedAlert.listeners");
+        for (String listenerClassName : listenerClassNames.split(",")) {
+            listenerClassName = listenerClassName.trim();
+            if (listenerClassName.isEmpty()) {
+                continue;
+            }
+            EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
+            registerListener(listener);
+        }
+
+        String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+        int statusCheckFrequencySeconds = Integer.parseInt(freq);
+
+        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS);
+    }
+
+    public void registerListener(EntitySLAListener listener) {
+        listeners.add(listener);
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+
+    }
+
+
+    private class Monitor implements Runnable {
+
+        @Override
+        public void run() {
+            processSLACandidates();
+        }
+    }
+
+    void processSLACandidates(){
+        //Get all feeds instances to be monitored
+        List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances();
+        if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){
+            return;
+        }
+
+        LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size());
+        try{
+            for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) {
+
+                String entityName = pendingInstanceBean.getEntityName();
+                String clusterName = pendingInstanceBean.getClusterName();
+                Date nominalTime = pendingInstanceBean.getNominalTime();
+                String entityType = pendingInstanceBean.getEntityType();
+
+                org.apache.falcon.entity.v0.cluster.Cluster cluster = ClusterHelper.getCluster(clusterName);
+
+                Set<SchedulableEntityInstance> schedulableEntityInstances= EntitySLAMonitoringService.get().
+                        getEntitySLAMissPendingAlerts(entityName, cluster.getName(), nominalTime, nominalTime
+                                , entityType);
+                if (schedulableEntityInstances.isEmpty()){
+                    store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime,
+                            entityType);
+                    return;
+                }
+                List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances);
+                SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0);
+
+
+                if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_WARN)) {
+                    store.putSLAAlertInstance(entityName, clusterName, entityType,
+                            nominalTime, true, false);
+                    //Mark in DB as SLA missed
+                    LOG.info("Feed :"+ entityName
+                                + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow");
+                } else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){
+                    if (entityType.equals(EntityType.PROCESS.name())){
+                        store.putSLAAlertInstance(entityName, clusterName, entityType,
+                                nominalTime, true, false);
+                    }
+                    store.updateSLAAlertInstance(entityName, clusterName, nominalTime, entityType);
+                    LOG.info("Entity :"+ entityName
+                            + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType
+                            + "missed SLAHigh");
+                    highSLAMissed(entityName, clusterName, entityType, nominalTime);
+                }
+            }
+        } catch (FalconException e){
+            LOG.error("Exception in FeedSLAALertService:", e);
+        }
+
+    }
+
+    @Override
+    public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime
+                              ) throws FalconException {
+        for (EntitySLAListener listener : listeners) {
+            listener.highSLAMissed(entityName, clusterName, entityType, nominalTime);
+            store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
index 991052f..421ea38 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
@@ -18,7 +18,6 @@
 package org.apache.falcon.service;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
 
 import java.util.Date;
 
@@ -26,6 +25,6 @@ import java.util.Date;
  * Interface for FeedSLAAlert to be used by Listeners.
  */
 public interface EntitySLAListener {
-    void highSLAMissed(String enityName, EntityType entityType, String clusterName, Date nominalTime)
+    void highSLAMissed(String entityName, String clusterName, String entityType, Date nominalTime)
         throws FalconException;
 }


Mime
View raw message