falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-2170 Umbrella jira for bugs in EntitySLAMonitoring and Backlog…
Date Mon, 24 Oct 2016 06:52:46 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 4746e039a -> aa522a548


FALCON-2170 Umbrella jira for bugs in EntitySLAMonitoring and Backlog…

…Emitter service

Author: Praveen Adlakha <adlakha.praveen@gmail.com>

Reviewers: @pallavi-rao

Closes #289 from PraveenAdlakha/enity_fixes and squashes the following commits:

6b90812 [Praveen Adlakha] comments addressed
d8d1571 [Praveen Adlakha] FALCON-2170 Umbrella jira for bugs in EntitySLAMonitoring and BacklogEmitter service


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aa522a54
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aa522a54
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aa522a54

Branch: refs/heads/master
Commit: aa522a5489bed408573b375f897b8385d06e3cce
Parents: 4746e03
Author: Praveen Adlakha <adlakha.praveen@gmail.com>
Authored: Mon Oct 24 12:22:34 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Mon Oct 24 12:22:34 2016 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/entity/EntityUtil.java    |  89 +++++++++-
 .../metrics/MetricNotificationService.java      |  11 ++
 .../falcon/persistence/EntitySLAAlertBean.java  |  10 +-
 .../falcon/persistence/MonitoredEntityBean.java |  39 +++--
 .../falcon/persistence/PendingInstanceBean.java |  12 +-
 .../persistence/PersistenceConstants.java       |  13 +-
 common/src/main/resources/startup.properties    |  19 ++-
 .../apache/falcon/entity/EntityUtilTest.java    |  30 ++++
 .../twiki/BacklogMetricEmitterService.twiki     |   2 +
 .../falcon/messaging/JMSMessageConsumer.java    |   3 +-
 .../org/apache/falcon/logging/JobLogMover.java  |   1 -
 .../apache/falcon/service/LogMoverService.java  |  18 ++-
 .../workflow/engine/OozieWorkflowEngine.java    |   5 +-
 .../apache/falcon/jdbc/BacklogMetricStore.java  |  12 ++
 .../falcon/jdbc/MonitoringJdbcStateStore.java   | 123 ++++++--------
 .../plugin/GraphiteNotificationPlugin.java      |   1 -
 .../service/BacklogMetricEmitterService.java    | 115 ++++++++++---
 .../falcon/service/EntitySLAAlertService.java   |   2 +-
 .../service/EntitySLAMonitoringService.java     | 161 +++++++++----------
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  31 +++-
 .../service/EntitySLAAlertServiceTest.java      |   2 +-
 src/conf/startup.properties                     |  15 +-
 22 files changed, 466 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 8fe316c..f3d5d28 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -1024,18 +1024,69 @@ public final class EntityUtil {
      */
     public static List<Date> getEntityInstanceTimes(Entity entity, String clusterName, Date startRange, Date endRange) {
         Date start = null;
+        Date end = null;
+
         switch (entity.getEntityType()) {
 
         case FEED:
             Feed feed = (Feed) entity;
-            start = FeedHelper.getCluster(feed, clusterName).getValidity().getStart();
+            org.apache.falcon.entity.v0.feed.Validity feedValidity =
+                    FeedHelper.getCluster(feed, clusterName).getValidity();
+            start = feedValidity.getStart();
+            end = feedValidity.getEnd().before(endRange) ? feedValidity.getEnd() : endRange;
             return getInstanceTimes(start, feed.getFrequency(), feed.getTimezone(),
-                    startRange, endRange);
+                    startRange, end);
 
         case PROCESS:
+
             Process process = (Process) entity;
-            start = ProcessHelper.getCluster(process, clusterName).getValidity().getStart();
+            org.apache.falcon.entity.v0.process.Validity processValidity =
+                    ProcessHelper.getCluster(process, clusterName).getValidity();
+            start = processValidity.getStart();
+            end = processValidity.getEnd().before(endRange) ? processValidity.getEnd() : endRange;
             return getInstanceTimes(start, process.getFrequency(),
+                    process.getTimezone(), startRange, end);
+
+        default:
+            throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
+        }
+    }
+
+    /**
+     * Find the entity instance times in between the given time range.
+     * <p/>
+     * Both start and end Date are inclusive.
+     *
+     * @param entity      feed or process entity whose instance times are to be found
+     * @param clusterName name of the cluster
+     * @param startRange  start time for the input range
+     * @param endRange    end time for the input range
+     * @return List of instance times in between the given time range
+     */
+    public static List<Date> getEntityInstanceTimesInBetween(Entity entity, String clusterName, Date startRange,
+                                                             Date endRange) {
+        Date start = null;
+        Date end = null;
+
+
+        switch (entity.getEntityType()) {
+        case FEED:
+            Feed feed = (Feed) entity;
+            org.apache.falcon.entity.v0.feed.Validity feedValidity =
+                    FeedHelper.getCluster(feed, clusterName).getValidity();
+            start = feedValidity.getStart();
+            end = feedValidity.getEnd();
+            return getInstancesInBetween(start, end, feed.getFrequency(), feed.getTimezone(),
+                    startRange, endRange);
+
+        case PROCESS:
+            Process process = (Process) entity;
+            org.apache.falcon.entity.v0.process.Validity processValidity =
+                    ProcessHelper.getCluster(process, clusterName).getValidity();
+            start = processValidity.getStart();
+            end = processValidity.getEnd();
+
+            return getInstancesInBetween(start, end, process.getFrequency(),
                     process.getTimezone(), startRange, endRange);
 
         default:
@@ -1066,13 +1117,37 @@ public final class EntityUtil {
 
         Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange);
         while (true) {
-            Date nextStartTime = getNextStartTime(startTime, frequency, timeZone, current);
-            if (nextStartTime.after(endRange)){
+            Date nextInstanceTime = getNextStartTime(startTime, frequency, timeZone, current);
+            if (nextInstanceTime.after(endRange)){
                 break;
             }
-            result.add(nextStartTime);
+            result.add(nextInstanceTime);
             // this is required because getNextStartTime returns greater than or equal to referenceTime
-            current = new Date(nextStartTime.getTime() + ONE_MS); // 1 milli seconds later
+            current = new Date(nextInstanceTime.getTime() + ONE_MS); // 1 milli seconds later
+        }
+        return result;
+    }
+
+
+    public static List<Date> getInstancesInBetween(Date startTime, Date endTime, Frequency frequency, TimeZone timeZone,
+                                                  Date startRange, Date endRange) {
+        List<Date> result = new LinkedList<>();
+        if (endRange.before(startRange)) {
+            return result;
+        }
+        if (timeZone == null) {
+            timeZone = TimeZone.getTimeZone("UTC");
+        }
+        Date current = getPreviousInstanceTime(startTime, frequency, timeZone, startRange);
+        while (true) {
+            if (!current.before(startRange) && !current.after(endRange)
+                    && current.before(endTime) && !current.before(startTime)) {
+                result.add(current);
+            }
+            current = getNextInstanceTime(current, frequency, timeZone, 1);
+            if (current.after(endRange)){
+                break;
+            }
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
index 30e6bb6..90fbfa9 100644
--- a/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/metrics/MetricNotificationService.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -94,6 +95,16 @@ public class MetricNotificationService implements FalconService {
         }
     }
 
+    public void deleteMetric(String metricName){
+        synchronized (this){
+            SortedMap<String, Gauge> gaugeMap = metricRegistry.getGauges();
+            if (gaugeMap.get(metricName) != null){
+                metricRegistry.remove(metricName);
+                metricMap.remove(metricName);
+            }
+        }
+    }
+
     private static class MetricGauge implements Gauge<Long> {
 
         private Long value=0L;

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
index 186c5e0..6482e8c 100644
--- a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
@@ -35,7 +35,7 @@ import javax.validation.constraints.NotNull;
 
 //SUSPEND CHECKSTYLE CHECK LineLengthCheck
 /**
- * Entity SLA monitoring.
+ * Feed SLA monitoring.
  * */
 @Entity
 @NamedQueries({
@@ -148,12 +148,12 @@ public class EntitySLAAlertBean {
         this.isSLAHighMissed = isSLAHighMissed;
     }
 
-    public static final String ENTITYNAME = "entityName";
+    public static final String ENTITY_NAME = "entityName";
 
-    public static final String CLUSTERNAME = "clusterName";
+    public static final String CLUSTER_NAME = "clusterName";
 
-    public static final String ENTITYTYPE = "entityType";
+    public static final String ENTITY_TYPE = "entityType";
 
-    public static final String NOMINALTIME = "nominalTime";
+    public static final String NOMINAL_TIME = "nominalTime";
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
index c620e45..5181cf5 100644
--- a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
@@ -29,22 +29,26 @@ import javax.persistence.Id;
 import javax.persistence.Column;
 import javax.persistence.Basic;
 import javax.validation.constraints.NotNull;
+import java.util.Date;
 
 //SUSPEND CHECKSTYLE CHECK LineLengthCheck
 /**
-* The Feeds that are to be monitered will be stored in the db.
+* The Entities that are to be monitored will be stored in MONITORED_ENTITY table.
 * */
 
 @Entity
 @NamedQueries({
-        @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+        @NamedQuery(name = PersistenceConstants.GET_MONITORED_ENTITY, query = "select OBJECT(a) from "
                 + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"),
-        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean "
+        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_ENTITIES, query = "delete from MonitoredEntityBean "
                 + "a where a.entityName = :entityName and a.entityType = :entityType"),
-        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE, query = "select OBJECT(a) "
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITIES_FOR_TYPE, query = "select OBJECT(a) "
                 + "from MonitoredEntityBean a where a.entityType = :entityType"),
         @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_ENTITY, query = "select OBJECT(a) "
-                + "from MonitoredEntityBean a")
+                + "from MonitoredEntityBean a"),
+        @NamedQuery(name = PersistenceConstants.UPDATE_LAST_MONITORED_TIME, query = "update MonitoredEntityBean a "
+                + "set a.lastMonitoredTime = :lastMonitoredTime where a.entityName = :entityName and a.entityType = "
+                + ":entityType")
 })
 @Table(name="MONITORED_ENTITY")
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
@@ -73,12 +77,25 @@ public class MonitoredEntityBean {
     @Column(name = "entity_type")
     private String entityType;
 
-    public String getFeedName() {
+    public String getEntityName() {
         return entityName;
     }
 
-    public void setEntityName(String feedName) {
-        this.entityName = feedName;
+    public void setEntityName(String entityName) {
+        this.entityName = entityName;
+    }
+
+    @Basic
+    @NotNull
+    @Column(name = "last_monitored_time")
+    private Date lastMonitoredTime;
+
+    public Date getLastMonitoredTime() {
+        return lastMonitoredTime;
+    }
+
+    public void setLastMonitoredTime(Date lastMonitoredTime) {
+        this.lastMonitoredTime = lastMonitoredTime;
     }
 
     public String getId() {
@@ -89,8 +106,10 @@ public class MonitoredEntityBean {
         this.id = id;
     }
 
-    public static final String ENTITYNAME = "entityName";
+    public static final String ENTITY_NAME = "entityName";
+
+    public static final String ENTITY_TYPE = "entityType";
 
-    public static final String ENTITYTYPE = "entityType";
+    public static final String LAST_MONITORED_TIME = "lastMonitoredTime";
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 43b6b8e..05c5ea3 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -40,9 +40,9 @@ import java.util.Date;
     @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
     @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
     @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
-    @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
+    @NamedQuery(name = PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
     @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
-    @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a "),
+    @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a  order by a.nominalTime asc"),
     @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select  OBJECT(a) from PendingInstanceBean a  where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
 })
 @Table(name = "PENDING_INSTANCES")
@@ -114,12 +114,12 @@ public class PendingInstanceBean {
         this.entityName = entityName;
     }
 
-    public static final String ENTITYNAME = "entityName";
+    public static final String ENTITY_NAME = "entityName";
 
-    public static final String CLUSTERNAME = "clusterName";
+    public static final String CLUSTER_NAME = "clusterName";
 
-    public static final String NOMINALTIME = "nominalTime";
+    public static final String NOMINAL_TIME = "nominalTime";
 
-    public static final String ENTITYTYPE = "entityType";
+    public static final String ENTITY_TYPE = "entityType";
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 5c3de51..8be0eb5 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -24,13 +24,16 @@ public final class PersistenceConstants {
     private PersistenceConstants(){
 
     }
-    public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE";
-    public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES";
-    public static final String GET_ALL_MONITORING_ENTITY_FOR_TYPE = "GET_ALL_MONITORING_ENTITY_FOR_TYPE";
+    public static final String GET_MONITORED_ENTITY = "GET_MONITORED_ENTITY";
+    public static final String DELETE_MONITORED_ENTITIES = "DELETE_MONITORED_ENTITIES";
+    public static final String GET_ALL_MONITORING_ENTITIES_FOR_TYPE = "GET_ALL_MONITORING_ENTITIES_FOR_TYPE";
+    public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY";
+    public static final String UPDATE_LAST_MONITORED_TIME = "UPDATE_LAST_MONITORED_TIME";
+
     public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
     public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE";
     public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
-    public static final String DELETE_ALL_INSTANCES_FOR_ENTITY = "DELETE_ALL_INSTANCES_FOR_ENTITY";
+    public static final String DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY = "DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY";
     public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
     public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
     public static final String GET_ENTITY = "GET_ENTITY";
@@ -63,5 +66,5 @@ public final class PersistenceConstants {
     public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
     public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE";
     public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES";
-    public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY";
+    public static final String DELETE_ALL_BACKLOG_ENTITY_INSTANCES ="DELETE_ALL_BACKLOG_ENTITY_INSTANCES";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 3beab62..9fb1c0a 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -69,6 +69,10 @@
                         org.apache.falcon.entity.store.FeedLocationStore,\
                         org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
+
+## if you wish to use BacklogEmitterService please add  BackLogEmitter service as a configstore listners.##
+#                       org.apache.falcon.service.BacklogMetricEmitterService
+
 ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
 #                       org.apache.falcon.state.store.jdbc.JdbcStateStore
 
@@ -83,7 +87,9 @@
 
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=
+#org.apache.falcon.handler.SLAMonitoringHandler
 #org.apache.falcon.service.LogMoverService
+#org.apache.falcon.service.BacklogMetricEmitterService
 
 ######### Implementation classes #########
 
@@ -244,15 +250,6 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 # Authorization Enabled flag: false (default)|true
 *.falcon.security.authorization.enabled=false
 
-# CSRF filter enabled flag: false (default) | true
-*.falcon.security.csrf.enabled=false
-
-# Custom header for CSRF filter
-*.falcon.security.csrf.header=FALCON-CSRF-FILTER
-
-# Browser user agents to be filtered
-*.falcon.security.csrf.browser=^Mozilla.*,^Opera.*
-
 # The name of the group of super-users
 *.falcon.security.authorization.superusergroup=falcon
 
@@ -351,5 +348,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 *.falcon.postprocessing.enable=true
 
 ### LogMoveService Properties
-*.falcon.logMoveService.threadCount=200
+*.falcon.logMoveService.max.threadCount=200
 *.falcon.logMoveService.blockingQueue.length=50
+##Note min threadCount should always be smaller than max threadCount.
+*.falcon.logMoveService.min.threadCount=20

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index 42ae3e6..28a9270 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -44,6 +44,7 @@ import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
@@ -81,6 +82,35 @@ public class EntityUtilTest extends AbstractTestBase {
         view = EntityUtil.getClusterView(feed, "backupCluster");
         Assert.assertEquals(view.getClusters().getClusters().size(), 2);
     }
+    @Test
+    public void  testGetInstancesInBetween(){
+        Date startTime = SchemaHelper.parseDateUTC("2016-09-30T15:24Z");
+        Date endTime = SchemaHelper.parseDateUTC("2016-09-30T17:04Z");
+        Frequency frequency = new Frequency("minutes(5)");
+        Date startRange = SchemaHelper.parseDateUTC("2016-09-30T15:25Z");
+        Date endRange = SchemaHelper.parseDateUTC("2016-09-30T15:30Z");
+        List<Date> instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange,
+                endRange);
+        startRange = SchemaHelper.parseDateUTC("2016-09-30T15:18Z");
+        endRange = SchemaHelper.parseDateUTC("2016-09-30T15:24Z");
+        instances.addAll(EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange));
+        Assert.assertEquals(instances.size(), 2);
+        startRange = SchemaHelper.parseDateUTC("2016-09-30T15:24Z");
+        endRange = SchemaHelper.parseDateUTC("2016-09-30T15:25Z");
+        instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange);
+        Assert.assertEquals(instances.size(), 1);
+
+        frequency = new Frequency("minutes(2)");
+        startRange = SchemaHelper.parseDateUTC("2016-09-30T16:32Z");
+        endRange = SchemaHelper.parseDateUTC("2016-09-30T17:02Z");
+        instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange);
+        Assert.assertEquals(instances.size(), 16);
+        startRange = SchemaHelper.parseDateUTC("2016-09-30T15:24Z");
+        endRange = SchemaHelper.parseDateUTC("2016-09-30T17:05Z");
+        instances = EntityUtil.getInstancesInBetween(startTime, endTime, frequency, tz, startRange, endRange);
+        Assert.assertEquals(instances.size(), 50);
+
+    }
 
     @Test
     public void testEquals() throws Exception {

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/docs/src/site/twiki/BacklogMetricEmitterService.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/BacklogMetricEmitterService.twiki b/docs/src/site/twiki/BacklogMetricEmitterService.twiki
index 2b10f6c..f92b594 100644
--- a/docs/src/site/twiki/BacklogMetricEmitterService.twiki
+++ b/docs/src/site/twiki/BacklogMetricEmitterService.twiki
@@ -41,6 +41,8 @@ Following services and listeners should be enabled for Backlog Emitter Service i
 #                        org.apache.falcon.service.BacklogMetricEmitterService
 
 *.entityAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService
+
+*.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler
 </verbatim>
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 8b48e93..5383e7f 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -157,7 +157,8 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
             wfProperties.put(WorkflowExecutionArgs.ENTITY_TYPE, entityTypePair.second.name());
             wfProperties.put(WorkflowExecutionArgs.WORKFLOW_USER, message.getStringProperty("user"));
             wfProperties.put(WorkflowExecutionArgs.OPERATION, getOperation(appName).name());
-
+            wfProperties.put(WorkflowExecutionArgs.USER_SUBFLOW_ID,
+                    json.getString("id").concat("@user-action"));
             String appType = message.getStringProperty("appType");
             return WorkflowExecutionContext.create(wfProperties, WorkflowExecutionContext.Type.valueOf(appType));
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 6ec2a20..72c3dc5 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -92,7 +92,6 @@ public class JobLogMover {
                 LOG.error("Error getting jobinfo for: {}", context.getUserSubflowId(), e);
                 return 0;
             }
-
             //Assumption is - Each wf run will have a directory
             //the corresponding job logs are stored within the respective dir
             Path path = new Path(context.getLogDir() + "/"

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
index 7d1425a..7e4640e 100644
--- a/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/LogMoverService.java
@@ -46,14 +46,24 @@ public class LogMoverService implements WorkflowExecutionListener  {
 
     private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(Integer.parseInt(
             StartupProperties.get().getProperty("falcon.logMoveService.blockingQueue.length", "50")));
-    private ExecutorService executorService = new ThreadPoolExecutor(20, getThreadCount(), 120,
+    private ExecutorService executorService = new ThreadPoolExecutor(getCorePoolSize(), getThreadCount(), 120,
             TimeUnit.SECONDS, blockingQueue);
+
+    public int getCorePoolSize(){
+        try{
+            return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.min.threadCount", "20"));
+        } catch (NumberFormatException  e){
+            LOG.error("Exception in LogMoverService", e);
+            return 20;
+        }
+    }
     public int getThreadCount() {
         try{
-            return Integer.parseInt(StartupProperties.get().getProperty("falcon.logMoveService.threadCount", "200"));
+            return Integer.parseInt(StartupProperties.get()
+                    .getProperty("falcon.logMoveService.max.threadCount", "200"));
         } catch (NumberFormatException  e){
             LOG.error("Exception in LogMoverService", e);
-            return 50;
+            return 200;
         }
     }
 
@@ -86,7 +96,7 @@ public class LogMoverService implements WorkflowExecutionListener  {
         if (Boolean.parseBoolean(ENABLE_POSTPROCESSING)) {
             return;
         }
-        while(0<blockingQueue.remainingCapacity()){
+        while(blockingQueue.remainingCapacity()<=0){
             try {
                 LOG.trace("Sleeping, no capacity in threadpool....");
                 TimeUnit.MILLISECONDS.sleep(500);

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 6964200..06e4cb2 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -589,10 +589,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
     @Override
     public InstancesResult ignoreInstances(Entity entity, Date start, Date end, Properties props,
-                                              List<LifeCycle> lifeCycles) throws FalconException {
+                                           List<LifeCycle> lifeCycles) throws FalconException {
         return doJobAction(JobAction.IGNORE, entity, start, end, props, lifeCycles);
     }
-
     @Override
     public InstancesResult reRunInstances(Entity entity, Date start, Date end,
                                           Properties props, List<LifeCycle> lifeCycles,
@@ -1111,7 +1110,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         if (CoordinatorAction.Status.READY.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.READY.name();
         } else if (CoordinatorAction.Status.WAITING.toString().equals(status)
-            || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
+                || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.WAITING.name();
         } else if (CoordinatorAction.Status.KILLED.toString().equals(status)) {
             return InstancesResult.WorkflowStatus.KILLED.name();

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
index ef9a396..621974d 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
@@ -77,6 +77,18 @@ public class BacklogMetricStore {
         }
     }
 
+    public void deleteEntityInstance(String entityName){
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
+        q.setParameter("entityName", entityName);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
 
     private void beginTransaction(EntityManager entityManager) {
         entityManager.getTransaction().begin();

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index c479940..552ebde 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -44,11 +44,12 @@ public class MonitoringJdbcStateStore {
     }
 
 
-    public void putMonitoredEntity(String entityName, String entityType) throws FalconException{
+    public void putMonitoredEntity(String entityName, String entityType, Date lastMonitoredTime) throws FalconException{
 
         MonitoredEntityBean monitoredEntityBean = new MonitoredEntityBean();
         monitoredEntityBean.setEntityName(entityName);
         monitoredEntityBean.setEntityType(entityType);
+        monitoredEntityBean.setLastMonitoredTime(lastMonitoredTime);
         EntityManager entityManager = getEntityManager();
         try {
             beginTransaction(entityManager);
@@ -58,11 +59,25 @@ public class MonitoringJdbcStateStore {
         }
     }
 
+    public void updateLastMonitoredTime(String entityName, String entityType, Date lastCheckedTime) {
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_LAST_MONITORED_TIME);
+        q.setParameter(MonitoredEntityBean.ENTITY_NAME, entityName);
+        q.setParameter(MonitoredEntityBean.ENTITY_TYPE, entityType.toLowerCase());
+        q.setParameter(MonitoredEntityBean.LAST_MONITORED_TIME, lastCheckedTime);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
     public MonitoredEntityBean getMonitoredEntity(String entityName, String entityType){
         EntityManager entityManager = getEntityManager();
-        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
-        q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
-        q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase());
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITORED_ENTITY);
+        q.setParameter(MonitoredEntityBean.ENTITY_NAME, entityName);
+        q.setParameter(MonitoredEntityBean.ENTITY_TYPE, entityType.toLowerCase());
         List result = q.getResultList();
         try {
             if (result.isEmpty()) {
@@ -77,9 +92,9 @@ public class MonitoringJdbcStateStore {
     public void deleteMonitoringEntity(String entityName, String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
-        q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
-        q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType.toLowerCase());
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_ENTITIES);
+        q.setParameter(MonitoredEntityBean.ENTITY_NAME, entityName);
+        q.setParameter(MonitoredEntityBean.ENTITY_TYPE, entityType.toLowerCase());
         try{
             q.executeUpdate();
         } finally {
@@ -87,7 +102,7 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-    public List<MonitoredEntityBean> getAllMonitoredEntity() throws ResultNotFoundException {
+    public List<MonitoredEntityBean> getAllMonitoredEntities() throws ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY);
         List result = q.getResultList();
@@ -95,10 +110,10 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
-    public List<MonitoredEntityBean> getAllMonitoredEntityForEntity(String entityType) throws ResultNotFoundException {
+    public List<MonitoredEntityBean> getAllMonitoredEntities(String entityType) throws ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
-        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITY_FOR_TYPE);
-        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_ENTITIES_FOR_TYPE);
+        q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase());
         List result = q.getResultList();
         entityManager.close();
         return result;
@@ -107,8 +122,8 @@ public class MonitoringJdbcStateStore {
     public Date getLastInstanceTime(String entityName , String entityType) throws ResultNotFoundException {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class);
-        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
-        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
+        q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName);
+        q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase());
         Date result = (Date)q.getSingleResult();
         entityManager.close();
         return result;
@@ -118,10 +133,10 @@ public class MonitoringJdbcStateStore {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
-        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
-        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
-        q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
-        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
+        q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName);
+        q.setParameter(PendingInstanceBean.NOMINAL_TIME, nominalTime);
+        q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase());
         try{
             q.executeUpdate();
         } finally {
@@ -132,10 +147,10 @@ public class MonitoringJdbcStateStore {
     public void deletePendingInstances(String entityName, String clusterName, String entityType){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
-        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY);
-        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
-        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
-        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY);
+        q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName);
+        q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase());
         try{
             q.executeUpdate();
         } finally {
@@ -160,9 +175,9 @@ public class MonitoringJdbcStateStore {
     public List<Date> getNominalInstances(String entityName, String clusterName, String entityType) {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
-        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
-        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
-        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
+        q.setParameter(PendingInstanceBean.ENTITY_NAME, entityName);
+        q.setParameter(PendingInstanceBean.CLUSTER_NAME, clusterName);
+        q.setParameter(PendingInstanceBean.ENTITY_TYPE, entityType.toLowerCase());
         List result = q.getResultList();
         entityManager.close();
         return result;
@@ -188,34 +203,16 @@ public class MonitoringJdbcStateStore {
         entityManager.close();
     }
 
-    public PendingInstanceBean getPendingInstance(String entityName, String clusterName, Date nominalTime,
-                                                  String entityType) {
-        EntityManager entityManager = getEntityManager();
-        beginTransaction(entityManager);
-        TypedQuery<PendingInstanceBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCE,
-                            PendingInstanceBean.class);
-        q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
-
-        q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
-        q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
-        q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType.toLowerCase());
-        try {
-            return q.getSingleResult();
-        } finally {
-            commitAndCloseTransaction(entityManager);
-        }
-    }
-
     public EntitySLAAlertBean getEntityAlertInstance(String entityName, String clusterName, Date nominalTime,
                                                      String entityType) {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         TypedQuery<EntitySLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.
                 GET_ENTITY_ALERT_INSTANCE, EntitySLAAlertBean.class);
-        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
-        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
-        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
-        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase());
+        q.setParameter(EntitySLAAlertBean.ENTITY_NAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTER_NAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINAL_TIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITY_TYPE, entityType.toLowerCase());
         try {
             return q.getSingleResult();
         } finally {
@@ -245,10 +242,10 @@ public class MonitoringJdbcStateStore {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_SLA_HIGH);
-        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
-        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
-        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
-        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase());
+        q.setParameter(EntitySLAAlertBean.ENTITY_NAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTER_NAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINAL_TIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITY_TYPE, entityType.toLowerCase());
         try{
             q.executeUpdate();
         } finally {
@@ -260,10 +257,10 @@ public class MonitoringJdbcStateStore {
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
         Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE);
-        q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
-        q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
-        q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
-        q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType.toLowerCase());
+        q.setParameter(EntitySLAAlertBean.ENTITY_NAME, entityName);
+        q.setParameter(EntitySLAAlertBean.CLUSTER_NAME, clusterName);
+        q.setParameter(EntitySLAAlertBean.NOMINAL_TIME, nominalTime);
+        q.setParameter(EntitySLAAlertBean.ENTITY_TYPE, entityType.toLowerCase());
         try{
             q.executeUpdate();
         } finally {
@@ -271,24 +268,6 @@ public class MonitoringJdbcStateStore {
         }
     }
 
-
-    public List<EntitySLAAlertBean> getSLAHighCandidates() {
-        EntityManager entityManager = getEntityManager();
-        beginTransaction(entityManager);
-        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_SLA_HIGH_CANDIDATES);
-        List result = q.getResultList();
-
-        try {
-            if (CollectionUtils.isEmpty(result)) {
-                return null;
-            }
-        } finally{
-            entityManager.close();
-        }
-        return result;
-    }
-
-
     private void beginTransaction(EntityManager entityManager) {
         entityManager.getTransaction().begin();
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
index 881f8ce..56df23b 100644
--- a/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
+++ b/prism/src/main/java/org/apache/falcon/plugin/GraphiteNotificationPlugin.java
@@ -50,7 +50,6 @@ public class GraphiteNotificationPlugin implements MonitoringPlugin {
             String entityName = StringUtils.isNotBlank(message.getDimensions().get("entityName"))
                     ? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name");
             String prefix = StartupProperties.get().getProperty("falcon.graphite.prefix");
-            String separator = ".";
             LOG.debug("message:" + message.getAction());
             if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())
                     && ConfigurationStore.get().get(EntityType.PROCESS, entityName) != null) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
index d9ac386..3aa2155 100644
--- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.jdbc.BacklogMetricStore;
 import org.apache.falcon.metrics.MetricNotificationService;
@@ -60,9 +61,9 @@ import static org.apache.falcon.workflow.WorkflowEngineFactory.getWorkflowEngine
  * Backlog Metric Emitter Service to publish metrics to Graphite.
  */
 public final class BacklogMetricEmitterService implements FalconService,
-        EntitySLAListener, WorkflowExecutionListener {
+        EntitySLAListener, WorkflowExecutionListener, ConfigurationChangeListener {
 
-    private static final String METRIC_PREFIX = "falcon";
+    private static final String METRIC_PREFIX = StartupProperties.get().getProperty("falcon.graphite.prefix");
     private static final String METRIC_SEPARATOR = ".";
     private static final String BACKLOG_METRIC_EMIT_INTERVAL = "falcon.backlog.metricservice.emit.interval.millisecs";
     private static final String BACKLOG_METRIC_RECHECK_INTERVAL = "falcon.backlog.metricservice."
@@ -101,9 +102,63 @@ public final class BacklogMetricEmitterService implements FalconService,
     private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();
 
     @Override
-    public void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime)
-        throws FalconException {
+    public void onAdd(Entity entity) throws FalconException{
+        //DO Nothing
+    }
 
+    @Override
+    public void onRemove(Entity entity) throws FalconException{
+        if (entity.getEntityType() != EntityType.PROCESS){
+            return;
+        }
+        backlogMetricStore.deleteEntityInstance(entity.getName());
+        entityBacklogs.remove(entity);
+        Process process = EntityUtil.getEntity(entity.getEntityType(), entity.getName());
+        for(Cluster cluster : process.getClusters().getClusters()){
+            dropMetric(cluster.getName(), process);
+        }
+    }
+
+    public void dropMetric(String clusterName, Process process){
+        String pipelinesStr = process.getPipelines();
+        String metricName;
+
+        if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
+            String[] pipelines = pipelinesStr.split(",");
+            for (String pipeline : pipelines) {
+                metricName = getMetricName(clusterName, process.getName(), pipeline);
+                metricNotificationService.deleteMetric(metricName);
+            }
+        } else {
+            metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE);
+            metricNotificationService.deleteMetric(metricName);
+        }
+    }
+
+    @Override
+    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException{
+        if (oldEntity.getEntityType() != EntityType.PROCESS){
+            return;
+        }
+        Process newProcess = (Process) newEntity;
+        if (newProcess.getSla() == null || newProcess.getSla().getShouldEndIn() == null){
+            backlogMetricStore.deleteEntityInstance(newProcess.getName());
+            entityBacklogs.remove(newProcess);
+            Process process = EntityUtil.getEntity(oldEntity.getEntityType(), oldEntity.getName());
+            for(Cluster cluster : process.getClusters().getClusters()){
+                dropMetric(cluster.getName(), process);
+            }
+        }
+    }
+
+    @Override
+    public void onReload(Entity entity) throws FalconException{
+        // Do Nothing
+    }
+
+    @Override
+    public void highSLAMissed(String entityName, String clusterName, EntityType entityType,
+                              Date nominalTime) throws FalconException {
         if (entityType != EntityType.PROCESS) {
             return;
         }
@@ -146,7 +201,7 @@ public final class BacklogMetricEmitterService implements FalconService,
                 List<MetricInfo> metricsInDB = entry.getValue();
                 List<MetricInfo> metricInfoList = Collections.synchronizedList(metricsInDB);
                 entityBacklogs.put(entry.getKey(), metricInfoList);
-                LOG.debug("Backlog of entity " + entry.getKey().getName() + " for instances " + metricInfoList);
+                LOG.debug("Initializing backlog for entity " + entry.getKey().getName());
             }
         }
     }
@@ -172,6 +227,7 @@ public final class BacklogMetricEmitterService implements FalconService,
                 metrics.remove(new MetricInfo(DATE_FORMAT.get().format(date), context.getClusterName()));
                 if (metrics.isEmpty()) {
                     entityBacklogs.remove(entity);
+                    publishBacklog((Process) entity, context.getClusterName(), 0L);
                 }
             }
         }
@@ -205,7 +261,7 @@ public final class BacklogMetricEmitterService implements FalconService,
 
         @Override
         public void run() {
-            LOG.debug("Starting periodic check for backlog");
+            LOG.debug("BacklogMetricEmitter running for entities");
             executor = new ScheduledThreadPoolExecutor(10);
             List<Future> futures = new ArrayList<>();
             try {
@@ -271,31 +327,38 @@ public final class BacklogMetricEmitterService implements FalconService,
             if (backLogsCluster != null && !backLogsCluster.isEmpty()) {
                 for (Map.Entry<String, Long> entry : backLogsCluster.entrySet()) {
                     String clusterName = entry.getKey();
-                    String pipelinesStr = process.getPipelines();
-                    String metricName;
                     Long backlog = entry.getValue() / (60 * 1000L); // Converting to minutes
-                    if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
-                        String[] pipelines = pipelinesStr.split(",");
-                        for (String pipeline : pipelines) {
-                            metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
-                                    + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
-                                    + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR
-                                    + "backlogInMins";
-                            metricNotificationService.publish(metricName, backlog);
-                        }
-                    } else {
-                        metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
-                                + DEFAULT_PIPELINE + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
-                                + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR
-                                + "backlogInMins";
-                        metricNotificationService.publish(metricName, backlog);
-                    }
+                    publishBacklog(process, clusterName, backlog);
                 }
             }
         }
     }
 
 
+    public static void publishBacklog(Process process, String clusterName, Long backlog){
+        String pipelinesStr = process.getPipelines();
+        String metricName;
+
+        if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
+            String[] pipelines = pipelinesStr.split(",");
+            for (String pipeline : pipelines) {
+                metricName = getMetricName(clusterName, process.getName(), pipeline);
+                metricNotificationService.publish(metricName, backlog);
+            }
+        } else {
+            metricName = getMetricName(clusterName, process.getName(), DEFAULT_PIPELINE);
+            metricNotificationService.publish(metricName, backlog);
+        }
+    }
+
+    public static String getMetricName(String clusterName, String processName, String pipeline){
+        String metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+                + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+                + METRIC_SEPARATOR + processName + METRIC_SEPARATOR
+                + "backlogInMins";
+        return metricName;
+    }
+
     /**
      * Service runs periodically and removes succeeded instances from backlog list.
      */
@@ -303,7 +366,7 @@ public final class BacklogMetricEmitterService implements FalconService,
 
         @Override
         public void run() {
-            LOG.debug("BacklogCheckService running for entities");
+            LOG.trace("BacklogCheckService running for entities");
             try {
                 AbstractWorkflowEngine wfEngine = getWorkflowEngine();
                 for (Entity entity : entityBacklogs.keySet()) {
@@ -331,7 +394,7 @@ public final class BacklogMetricEmitterService implements FalconService,
                                     if (status.getInstances().length > 0
                                             && status.getInstances()[0].status == InstancesResult.
                                             WorkflowStatus.SUCCEEDED) {
-                                        LOG.debug("Instance of nominaltime {} of entity {} was succeeded, removing "
+                                        LOG.debug("Instance of nominaltime {} of entity {} has succeeded, removing "
                                                 + "from backlog entries", nominalTimeStr, entity.getName());
                                         backlogMetricStore.deleteMetricInstance(entity.getName(),
                                                 metricInfo.getCluster(), nominalTime, entity.getEntityType());

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index 09c6695..c4069dd 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -142,7 +142,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
                     LOG.info("Entity : {} Cluster : {} Nominal Time : {} missed SLALow", entityName, entityType,
                             clusterName, nominalTime);
                 } else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){
-                    if (entityType.equals(EntityType.PROCESS.name())){
+                    if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())){
                         store.putSLAAlertInstance(entityName, clusterName, entityType,
                                 nominalTime, true, false);
                     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
index 816846d..7ff9309 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -17,23 +17,22 @@
  */
 package org.apache.falcon.service;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.text.ParseException;
-import java.util.HashSet;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
-import org.apache.falcon.entity.FeedInstanceStatus;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.FeedInstanceStatus;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -42,6 +41,7 @@ import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Clusters;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
@@ -60,12 +60,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.falcon.entity.v0.process.Process;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
+import static org.apache.falcon.entity.EntityUtil.getStartTime;
+import static org.apache.falcon.util.DateUtil.now;
 
 /**
  * Service to monitor Feed SLAs.
@@ -81,6 +80,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
 
     public static final String TAG_CRITICAL = "Missed-SLA-High";
     public static final String TAG_WARN = "Missed-SLA-Low";
+    private static final long MINUTE_DELAY = 60000L;
 
     private EntitySLAMonitoringService() {
 
@@ -128,30 +128,36 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
     @Override
     public void onAdd(Entity entity) throws FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+        Set<String> clustersDefined = EntityUtil.getClustersDefined(entity);
         if (entity.getEntityType() == EntityType.FEED) {
             Feed feed = (Feed) entity;
             // currently sla service is enabled only for fileSystemStorage
             if (feed.getLocations() != null || feed.getSla() != null || checkFeedClusterSLA(feed)) {
-                for (Cluster cluster : feed.getClusters().getClusters()) {
-                    if (currentClusters.contains(cluster.getName())) {
+                for (String cluster : clustersDefined) {
+                    if (currentClusters.contains(cluster)) {
                         if (FeedHelper.getSLA(cluster, feed) != null) {
                             LOG.debug("Adding feed:{} for monitoring", feed.getName());
-                            MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString());
-                            break;
+                            MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString(),
+                                    new Date(now().getTime() + MINUTE_DELAY));
+                            List<Date> instances = EntityUtil.getEntityInstanceTimesInBetween(entity, cluster,
+                                    getStartTime(entity, cluster), now());
+                            addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, cluster,
+                                    instances);
                         }
                     }
                 }
             }
-        }
-        if (entity.getEntityType() == EntityType.PROCESS){
+        } else if (entity.getEntityType() == EntityType.PROCESS) {
             Process process = (Process) entity;
-            if (process.getSla() != null || checkProcessClusterSLA(process)){
-                for (org.apache.falcon.entity.v0.process.Cluster  cluster : process.getClusters().getClusters()) {
-                    if (currentClusters.contains(cluster.getName())) {
+            if (process.getSla() != null || checkProcessClusterSLA(process)) {
+                for (String cluster : clustersDefined) {
+                    if (currentClusters.contains(cluster)) {
                         LOG.debug("Adding process:{} for monitoring", process.getName());
                         MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
-                                EntityType.PROCESS.toString());
-                        break;
+                                EntityType.PROCESS.toString(), new Date(now().getTime() + MINUTE_DELAY));
+                        List<Date> instances = EntityUtil.getEntityInstanceTimesInBetween(entity, cluster,
+                                getStartTime(entity, cluster), now());
+                        addPendingInstances(entity.getEntityType().name().toLowerCase(), entity, cluster, instances);
                     }
                 }
             }
@@ -186,7 +192,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         if (entity.getEntityType() == EntityType.FEED) {
             Feed feed = (Feed) entity;
             // currently sla service is enabled only for fileSystemStorage
-            if (feed.getLocations() != null) {
+            if (feed.getSla() != null && feed.getLocations() != null) {
                 for (Cluster cluster : feed.getClusters().getClusters()) {
                     if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
                         MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString());
@@ -257,9 +263,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         if (newEntity.getEntityType() == EntityType.PROCESS) {
             Process oldProcess = (Process) oldEntity;
             Process newProcess = (Process) newEntity;
-            if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){
-                onRemove(newProcess);
-            } else if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){
+            if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){
+                onRemove(oldProcess);
+            } else if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){
                 onAdd(newProcess);
             } else {
                 List<String> slaRemovedClusters = new ArrayList<>();
@@ -283,7 +289,6 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
 
     @Override
     public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
     }
 
     @Override
@@ -305,6 +310,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         lookAheadWindowMillis = Integer.parseInt(freq);
         LOG.info("Initializing EntitySLAMonitoringService from ", filePath.toString());
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        addPendingEntityInstances(EntityType.FEED.name(), null, now());
+        addPendingEntityInstances(EntityType.PROCESS.name(), null, now());
         executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
     }
 
@@ -345,15 +352,14 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         @Override
         public void run() {
             try {
-                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntity().size() > 0) {
+                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredEntities().size() > 0) {
                     checkPendingInstanceAvailability(EntityType.FEED.toString());
                     checkPendingInstanceAvailability(EntityType.PROCESS.toString());
 
                     // add Instances from last checked time to 10 minutes from now(some buffer for status check)
-                    Date now = new Date();
-                    Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
-                    addNewPendingEntityInstances(newCheckPoint, EntityType.FEED.toString());
-                    addNewPendingEntityInstances(newCheckPoint, EntityType.PROCESS.toString());
+                    Date newCheckPointTime = new Date(now().getTime() + lookAheadWindowMillis);
+                    addPendingEntityInstances(EntityType.FEED.toString(), null, newCheckPointTime);
+                    addPendingEntityInstances(EntityType.PROCESS.toString(), null, newCheckPointTime);
                 }
             } catch (Throwable e) {
                 LOG.error("Feed SLA monitoring failed: ", e);
@@ -361,55 +367,42 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         }
     }
 
+    private void addPendingInstances(String entityType, Entity entity,
+                                     String clusterName,
+                                     List<Date> instances) throws FalconException {
+        if (instances != null && !instances.isEmpty()) {
+            for (Date date : instances) {
+                LOG.debug("Adding pending instance ={} for entity= {} in cluster>={} and entityType={}", date,
+                        entity.getName(), clusterName, entityType);
+                MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), clusterName, date,
+                        entityType);
+            }
+        }
+    }
 
-    void addNewPendingEntityInstances(Date to, String entityType) throws FalconException {
+    void addPendingEntityInstances(String entityType, Date startTime, Date endTime) throws FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
         List<MonitoredEntityBean> entityBeanList = MONITORING_JDBC_STATE_STORE.
-                getAllMonitoredEntityForEntity(entityType);
+                getAllMonitoredEntities(entityType);
         for(MonitoredEntityBean monitoredEntityBean : entityBeanList) {
-            String entityName = monitoredEntityBean.getFeedName();
+            String entityName = monitoredEntityBean.getEntityName();
+            Date lastMonitoredInstanceTime = (startTime != null) ? startTime
+                    : monitoredEntityBean.getLastMonitoredTime();
+            Date newCheckPointTime = endTime != null ? endTime : now();
             Entity entity = EntityUtil.getEntity(entityType, entityName);
-            Set<String> clusters =  EntityUtil.getClustersDefined(entity);
-            List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new ArrayList();
-            for(String string : clusters){
-                cluster.add(ClusterHelper.getCluster(string));
+            Set<String> clustersDefined =  EntityUtil.getClustersDefined(entity);
+            List<org.apache.falcon.entity.v0.cluster.Cluster> clusters = new ArrayList();
+            for(String cluster : clustersDefined){
+                clusters.add(ClusterHelper.getCluster(cluster));
             }
-            for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : cluster) {
+            for (org.apache.falcon.entity.v0.cluster.Cluster entityCluster : clusters) {
                 if (currentClusters.contains(entityCluster.getName())) {
-                    // get start of instances from the database
-                    Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName,
-                            entityType);
-                    Pair<String, String> key = new Pair<>(entity.getName(), entityCluster.getName());
-                    if (nextInstanceTime == null) {
-                        nextInstanceTime = getInitialStartTime(entity, entityCluster.getName(), entityType);
-                    } else {
-                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
-                    }
-
-                    Set<Date> instances = new HashSet<>();
-                    org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
-                            EntityUtil.getEntity(EntityType.CLUSTER, entityCluster.getName());
-                    nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime);
-                    LOG.trace("nextInstanceTime:"+ nextInstanceTime + "entityName:"+entityName);
-                    Date endDate;
-                    if (entityType.equals(EntityType.FEED.toString())){
-                        endDate =  FeedHelper.getClusterValidity((Feed) entity, currentCluster.getName()).getEnd();
-                    }else {
-                        endDate =  ProcessHelper.getClusterValidity((Process) entity,
-                                currentCluster.getName()).getEnd();
-                    }
-                    while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) {
-                        LOG.trace("Adding pending instance={} for <entity,cluster>={}; entityType={}",
-                                nextInstanceTime, key, entityType);
-                        instances.add(nextInstanceTime);
-                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
-                        nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime);
-                    }
-
-                    for(Date date:instances){
-                        MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), entityCluster.getName(), date,
-                                entityType);
-                    }
+                    List<Date> instances = EntityUtil.getEntityInstanceTimesInBetween(entity, entityCluster.getName(),
+                            lastMonitoredInstanceTime, newCheckPointTime);
+                    addPendingInstances(entityType, entity, entityCluster.getName(), instances);
+                    // update last monitored time with the new checkpoint time
+                    MONITORING_JDBC_STATE_STORE.updateLastMonitoredTime(entityName, entityType,
+                            new Date(newCheckPointTime.getTime() + MINUTE_DELAY));
                 }
             }
         }
@@ -421,17 +414,17 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
      */
     private void checkPendingInstanceAvailability(String entityType) throws FalconException {
         if (MONITORING_JDBC_STATE_STORE.getAllPendingInstances() == null){
-            LOG.info("Returning as size of pending instance is zero");
+            LOG.info("No pending instances to be checked");
             return;
         }
         for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){
-            for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
+            for (Date instanceTime : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
                     pendingInstanceBean.getClusterName(), entityType)) {
                 boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
-                        pendingInstanceBean.getClusterName(), date, entityType);
+                        pendingInstanceBean.getClusterName(), instanceTime, entityType);
                 if (status) {
                     MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(),
-                            pendingInstanceBean.getClusterName(), date, EntityType.FEED.toString());
+                            pendingInstanceBean.getClusterName(), instanceTime, EntityType.FEED.toString());
                 }
             }
         }
@@ -441,7 +434,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
     private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime,
                                                     String entityType) throws FalconException {
         Entity entity = EntityUtil.getEntity(entityType, entityName);
-        authenticateUser(entity);
+        authenticateUser();
         try {
             if (entityType.equals(EntityType.PROCESS.toString())){
                 LOG.trace("Checking instance availability status for entity:{}, cluster:{}, "
@@ -479,8 +472,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
 
 
     /**
-     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} and {@link org.apache.falcon.entity.v0.process.Process}
-     * instances between given time range which have missed slaLow or slaHigh.
+     * Returns all the instances between given time range which have missed slaLow or slaHigh for given entity.
      *
      * Only entities which have defined sla in their definition are considered.
      * Only the entity instances between the given time range are considered.
@@ -534,13 +526,13 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
     }
 
     /**
-     * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range
+     * Returns all the instances of a given entity between the given time range
      * which missed sla.Only those instances are included which have missed either slaLow or slaHigh.
      * @param entityName name of the feed
      * @param clusterName cluster name
      * @param start start time, inclusive
      * @param end end time, inclusive
-     * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA.
+     * @return Pending instances of the given entity which belong to the given time range and have missed SLA.
      * @throws FalconException
      */
     public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName,
@@ -634,7 +626,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
             }
             Date startTime = FeedHelper.getFeedValidityStart((Feed) entity, clusterName);
             Frequency slaLow = sla.getSlaLow();
-            Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+            Date slaTime = new Date(now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
             return startTime.before(slaTime) ? startTime : slaTime;
         } else{
             org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process)entity);
@@ -644,7 +636,7 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
             }
             Date startTime = ProcessHelper.getProcessValidityStart((Process) entity, clusterName);
             Frequency slaLow = sla.getShouldEndIn();
-            Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+            Date slaTime = new Date(now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
             return startTime.before(slaTime) ? startTime : slaTime;
         }
     }
@@ -666,10 +658,9 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList
         }
     }
 
-    private void authenticateUser(Entity entity){
-        if (StringUtils.isNotBlank(entity.getACL().getOwner())) {
-            CurrentUser.authenticate(entity.getACL().getOwner());
-        } else {
+    // Authenticate user only if not already authenticated.
+    private void authenticateUser(){
+        if (!CurrentUser.isAuthenticated()) {
             CurrentUser.authenticate(System.getProperty("user.name"));
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index 018c562..a64b654 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -39,6 +39,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.apache.falcon.util.DateUtil.now;
+
 /**
 *Unit test for MonitoringJdbcStateStore.
  * */
@@ -86,11 +88,11 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
 
     @Test
     public void testInsertRetrieveAndUpdate() throws Exception {
-        monitoringJdbcStateStore.putMonitoredEntity("test_feed1", EntityType.FEED.toString());
-        monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString());
+        monitoringJdbcStateStore.putMonitoredEntity("test_feed1", EntityType.FEED.toString(), now());
+        monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString(), now());
         Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredEntity("test_feed1",
-                EntityType.FEED.toString()).getFeedName());
-        Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredEntity().size(), 2);
+                EntityType.FEED.toString()).getEntityName());
+        Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredEntities().size(), 2);
 
         monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", EntityType.FEED.toString());
         monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", EntityType.FEED.toString());
@@ -109,10 +111,27 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
     }
 
     @Test
+    public void testUpdateAndGetLastMonitoredTime() throws Exception {
+        Date expectedLastMonitoredTime = now();
+        monitoringJdbcStateStore.putMonitoredEntity("test-process", EntityType.PROCESS.toString(),
+                expectedLastMonitoredTime);
+        Date actualLastMonitoredTime = monitoringJdbcStateStore.getMonitoredEntity("test-process",
+                EntityType.PROCESS.toString()).getLastMonitoredTime();
+        Assert.assertEquals(actualLastMonitoredTime, expectedLastMonitoredTime);
+
+        Date updatedLastMonitoredTime = new Date(now().getTime() + 600000L);
+        monitoringJdbcStateStore.updateLastMonitoredTime("test-process", EntityType.PROCESS.toString(),
+                updatedLastMonitoredTime);
+        actualLastMonitoredTime = monitoringJdbcStateStore.getMonitoredEntity("test-process",
+                EntityType.PROCESS.toString()).getLastMonitoredTime();
+        Assert.assertEquals(actualLastMonitoredTime, updatedLastMonitoredTime);
+    }
+
+    @Test
     public void testEmptyLatestInstance() throws Exception {
         MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
-        store.putMonitoredEntity("test-feed1", EntityType.FEED.toString());
-        store.putMonitoredEntity("test-feed2", EntityType.FEED.toString());
+        store.putMonitoredEntity("test-feed1", EntityType.FEED.toString(), now());
+        store.putMonitoredEntity("test-feed2", EntityType.FEED.toString(), now());
         Assert.assertNull(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString()));
 
         Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
index 8b51354..347e39a 100644
--- a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
@@ -45,7 +45,7 @@ import java.io.File;
 import java.util.Date;
 
 /**
- * Test for EntitySLAMonitoringService.
+ * Test for SLA Alerts.
  */
 public class EntitySLAAlertServiceTest extends AbstractTestBase {
     private static final String DB_BASE_DIR = "target/test-data/persistancedb";

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa522a54/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 6d82516..8eb58b9 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -107,6 +107,10 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
                         org.apache.falcon.entity.store.FeedLocationStore,\
                         org.apache.falcon.service.EntitySLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
+
+## if you wish to use BacklogEmitterService please add  BackLogEmitter service as a configstore listners.##
+#                       org.apache.falcon.service.BacklogMetricEmitterService
+
 ## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
 #                       org.apache.falcon.state.store.jdbc.JDBCStateStore
 
@@ -124,6 +128,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 ##### Workflow Job Execution Completion listeners #####
 *.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler
 #org.apache.falcon.service.LogMoverService
+#org.apache.falcon.service.BacklogMetricEmitterService
 
 ######### Implementation classes #########
 
@@ -281,6 +286,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Authorization Enabled flag: false (default)|true
 *.falcon.security.authorization.enabled=false
 
+# The name of the group of super-users
+*.falcon.security.authorization.superusergroup=falcon
+
 # CSRF filter enabled flag: false (default) | true
 *.falcon.security.csrf.enabled=false
 
@@ -290,9 +298,6 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Browser user agents to be filtered
 *.falcon.security.csrf.browser=^Mozilla.*,^Opera.*
 
-# The name of the group of super-users
-*.falcon.security.authorization.superusergroup=falcon
-
 # Admin Users, comma separated users
 *.falcon.security.authorization.admin.users=falcon,ambari-qa
 
@@ -368,5 +373,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 *.falcon.postprocessing.enable=true
 
 ### LogMoveService Properties
-*.falcon.logMoveService.threadCount=200
+*.falcon.logMoveService.max.threadCount=200
 *.falcon.logMoveService.blockingQueue.length=50
+##Note min threadCount should always be smaller than max threadCount.
+*.falcon.logMoveService.min.threadCount=20


Mime
View raw message