falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1886 Feed sla monitoring does not work across restarts.
Date Wed, 06 Apr 2016 10:26:06 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 08399d038 -> 14b1bb8f4


FALCON-1886 Feed sla monitoring does not work across restarts.

Removed redundant fields.
Added unit tests.

Author: Ajay Yadava <ajaynsit@gmail.com>

Reviewers: Praveen Adlakha

Closes #86 from ajayyadava/1886


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/14b1bb8f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/14b1bb8f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/14b1bb8f

Branch: refs/heads/master
Commit: 14b1bb8f4dc6cf5c85957ea0981cbaff0b490ff3
Parents: 08399d0
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Wed Apr 6 15:55:18 2016 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Wed Apr 6 15:55:18 2016 +0530

----------------------------------------------------------------------
 .../falcon/persistence/PendingInstanceBean.java |  11 +-
 .../persistence/PersistenceConstants.java       |   1 +
 .../falcon/jdbc/MonitoringJdbcStateStore.java   |  20 ++--
 .../service/FeedSLAMonitoringService.java       | 113 +++++++------------
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  55 +++++++--
 .../falcon/service/FeedSLAMonitoringTest.java   |   6 +-
 6 files changed, 113 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 038244a..108001d 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -35,11 +35,12 @@ import java.util.Date;
 * */
 @Entity
 @NamedQueries({
-        @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a)
from PendingInstanceBean a where a.feedName = :feedName"),
-        @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query
= "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName
and a.nominalTime = :nominalTime"),
-        @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete
from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
-        @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query =
"select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName
= :clusterName"),
-        @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select
 OBJECT(a) from PendingInstanceBean a ")
+    @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime)
from PendingInstanceBean a where a.feedName = :feedName"),
+    @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a)
from PendingInstanceBean a where a.feedName = :feedName"),
+    @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete
from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and
a.nominalTime = :nominalTime"),
+    @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete
from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+    @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select
a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName =
:clusterName"),
+    @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a)
from PendingInstanceBean a ")
 })
 @Table(name = "PENDING_INSTANCES")
 //RESUME CHECKSTYLE CHECK  LineLengthCheck

http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index e554581..44edc7c 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -53,4 +53,5 @@ public final class PersistenceConstants {
     public static final String GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER";
     public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE";
     public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
+    public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 39e2562..6345d44 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -94,6 +94,15 @@ public class MonitoringJdbcStateStore {
         return result;
     }
 
+    public Date getLastInstanceTime(String feedName) throws ResultNotFoundException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME,
Date.class);
+        q.setParameter("feedName", feedName);
+        Date result = (Date)q.getSingleResult();
+        entityManager.close();
+        return result;
+    }
+
     public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);
@@ -133,21 +142,16 @@ public class MonitoringJdbcStateStore {
         commitAndCloseTransaction(entityManager);
     }
 
-    public List<Date> getNominalInstances(String feedName, String clusterName) throws
ResultNotFoundException{
+    public List<Date> getNominalInstances(String feedName, String clusterName) {
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
         q.setParameter("feedName", feedName);
         q.setParameter("clusterName", clusterName);
         List result = q.getResultList();
-        try{
-            if (CollectionUtils.isEmpty(result)) {
-                throw new ResultNotFoundException(feedName + " with " + clusterName + "Not
Found");
-            }
-        } finally {
-            entityManager.close();
-        }
+        entityManager.close();
         return result;
     }
+
     public List<PendingInstanceBean> getAllInstances(){
         EntityManager entityManager = getEntityManager();
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);

http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index b5a2569..9de4463 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -17,6 +17,14 @@
  */
 package org.apache.falcon.service;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
@@ -35,24 +43,18 @@ import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
 import org.apache.falcon.persistence.MonitoredFeedsBean;
 import org.apache.falcon.persistence.PendingInstanceBean;
 import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.DateUtil;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.ArrayList;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Service to monitor Feed SLAs.
@@ -62,8 +64,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
 
     private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
 
-    private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000);
-
     private static final int ONE_MS = 1;
 
     private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService();
@@ -76,17 +76,11 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         return SERVICE;
     }
 
-    protected int queueSize;
-
     /**
      * Permissions for storePath.
      */
     private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE,
FsAction.NONE);
 
-    /**
-     * Used to store the last time when pending instances were checked for SLA.
-     */
-    private Date lastCheckedAt;
 
     /**
      * Frequency in seconds of "status check" for pending feed instances.
@@ -103,11 +97,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
 
 
     /**
-     * Frequency in milliseconds of serializing(for backup) monitoring service's state.
-     */
-    private int serializationFrequencyMillis;
-
-    /**
      * Filesystem used for serializing and deserializing.
      */
     private FileSystem fileSystem;
@@ -212,21 +201,12 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         filePath = new Path(storePath, "feedSLAMonitoringService");
         fileSystem = initializeFileSystem();
 
-        String freq = StartupProperties.get().getProperty("feed.sla.serialization.frequency.millis",
ONE_HOUR);
-        serializationFrequencyMillis = Integer.parseInt(freq);
-
-        freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds",
"600");
+        String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds",
"600");
         statusCheckFrequencySeconds = Integer.parseInt(freq);
 
         freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
         lookAheadWindowMillis = Integer.parseInt(freq);
-
-        String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288");
-        queueSize = Integer.parseInt(size);
-
         LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
-        initializeService();
-
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
         executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
     }
@@ -272,8 +252,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                     // add Instances from last checked time to 10 minutes from now(some buffer
for status check)
                     Date now = new Date();
                     Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
-                    addNewPendingFeedInstances(lastCheckedAt, newCheckPoint);
-                    lastCheckedAt = newCheckPoint;
+                    addNewPendingFeedInstances(newCheckPoint);
                 }
             } catch (Throwable e) {
                 LOG.error("Feed SLA monitoring failed: ", e);
@@ -282,7 +261,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     }
 
 
-    void addNewPendingFeedInstances(Date from, Date to) throws FalconException {
+    void addNewPendingFeedInstances(Date to) throws FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
         List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
         for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) {
@@ -290,41 +269,27 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
             Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
             for (Cluster feedCluster : feed.getClusters().getClusters()) {
                 if (currentClusters.contains(feedCluster.getName())) {
-                    Date nextInstanceTime = from;
+                    // get start of instances from the database
+                    Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(feedName);
                     Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
-                    BlockingQueue<Date> instances = new LinkedBlockingQueue<>(
-                            MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, feedCluster.getName()));
-                    if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName,
-                            feedCluster.getName()))) {
-                        instances = new LinkedBlockingQueue<>(queueSize);
-                        Date feedStartTime = feedCluster.getValidity().getStart();
-                        Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed,
feedCluster);
-                        ExpressionHelper evaluator = ExpressionHelper.get();
-                        ExpressionHelper.setReferenceDate(new Date());
-                        Date retention = new Date(evaluator.evaluate(retentionFrequency.toString(),
Long.class));
-                        if (feedStartTime.before(retention)) {
-                            feedStartTime = retention;
-                        }
-                        nextInstanceTime = feedStartTime;
+                    if (nextInstanceTime == null) {
+                        nextInstanceTime = getInitialStartTime(feed, feedCluster.getName());
+                    } else {
+                        nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
                     }
-                    Set<Date> exists = new HashSet<>(instances);
+
+                    Set<Date> instances = new HashSet<>();
                     org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
                             EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
                     nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster,
nextInstanceTime);
                     Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd();
                     while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate))
{
-                        if (instances.size() >= queueSize) { // if no space, first make
some space
-                            LOG.debug("Removing instance={} for <feed,cluster>={}",
instances.peek(), key);
-                            exists.remove(instances.peek());
-                            instances.remove();
-                        }
                         LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime,
key);
-                        if (exists.add(nextInstanceTime)) {
-                            instances.add(nextInstanceTime);
-                        }
+                        instances.add(nextInstanceTime);
                         nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
                         nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster,
nextInstanceTime);
                     }
+
                     for(Date date:instances){
                         MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(),
date);
                     }
@@ -376,10 +341,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     }
 
 
-    protected void initializeService() {
-        lastCheckedAt = new Date();
-    }
-
     /**
      * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given
time range which have missed
      * slaLow or slaHigh.
@@ -403,11 +364,11 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
             Sla sla = FeedHelper.getSLA(cluster, feed);
             if (sla != null) {
                 Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start,
end,
-                        new LinkedBlockingQueue<Date>(MONITORING_JDBC_STATE_STORE.getNominalInstances(
-                                pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName())));
-                for (Pair<Date, String> status : slaStatus){
+                    MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
+                        pendingInstanceBean.getClusterName()));
+                for (Pair<Date, String> status : slaStatus) {
                     SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
-                            feedClusterPair.second, status.first, EntityType.FEED);
+                        feedClusterPair.second, status.first, EntityType.FEED);
                     instance.setTags(status.second);
                     result.add(instance);
                 }
@@ -431,8 +392,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
 
         Set<SchedulableEntityInstance> result = new HashSet<>();
         Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
-        BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(MONITORING_JDBC_STATE_STORE.
-                getNominalInstances(feedName, clusterName));
+        List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName,
clusterName);
         Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
         Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
         Sla sla = FeedHelper.getSLA(cluster, feed);
@@ -448,7 +408,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         return result;
     }
 
-    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, BlockingQueue<Date>
missingInstances)
+    Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, List<Date>
missingInstances)
         throws FalconException {
         String tagCritical = "Missed SLA High";
         String tagWarn = "Missed SLA Low";
@@ -473,4 +433,17 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         }
         return result;
     }
+
+    @VisibleForTesting
+    Date getInitialStartTime(Feed feed, String clusterName) throws FalconException {
+        Sla sla = FeedHelper.getSLA(clusterName, feed);
+        if (sla == null) {
+            throw new IllegalStateException("InitialStartTime can not be determined as the
feed: "
+                + feed.getName() + " and cluster: " + clusterName + " does not have any sla");
+        }
+        Date startTime = FeedHelper.getFeedValidityStart(feed, clusterName);
+        Frequency slaLow = sla.getSlaLow();
+        Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+        return startTime.before(slaTime) ? startTime : slaTime;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index aa32167..b43025d 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -17,6 +17,12 @@
  */
 package org.apache.falcon.jdbc;
 
+import java.io.File;
+import java.util.Date;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -26,14 +32,12 @@ import org.apache.falcon.util.StateStoreProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.util.Date;
-import java.util.Random;
-
 /**
 *Unit test for MonitoringJdbcStateStore.
  * */
@@ -45,7 +49,7 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
     protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
     protected LocalFileSystem fs = new LocalFileSystem();
 
-    private static Random randomValGenerator = new Random();
+    private static MonitoringJdbcStateStore monitoringJdbcStateStore;
     private static FalconJPAService falconJPAService = FalconJPAService.get();
 
     protected int execDBCLICommands(String[] args) {
@@ -71,12 +75,16 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
         falconJPAService.init();
         this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
         this.conf = dfsCluster.getConf();
+        monitoringJdbcStateStore = new MonitoringJdbcStateStore();
+    }
+
+    @BeforeMethod
+    public void init() {
+        clear();
     }
 
     @Test
     public void testInsertRetrieveAndUpdate() throws Exception {
-
-        MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore();
         monitoringJdbcStateStore.putMonitoredFeed("test_feed1");
         monitoringJdbcStateStore.putMonitoredFeed("test_feed2");
         Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName());
@@ -94,4 +102,37 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
         Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(),
1);
         monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster");
     }
+
+    @Test
+    public void testEmptyLatestInstance() throws Exception {
+        MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
+        store.putMonitoredFeed("test-feed1");
+        store.putMonitoredFeed("test-feed2");
+        Assert.assertNull(store.getLastInstanceTime("test-feed1"));
+
+        Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
+        Date dateTwo =  SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
+
+        store.putPendingInstances("test-feed1", "test_cluster", dateTwo);
+        store.putPendingInstances("test-feed1", "test_cluster", dateOne);
+        store.putPendingInstances("test-feed2", "test_cluster", dateOne);
+
+        Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1")));
+        Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2")));
+
+    }
+
+    private void clear() {
+        EntityManager em = FalconJPAService.get().getEntityManager();
+        em.getTransaction().begin();
+        try {
+            Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
+            query.executeUpdate();
+            query = em.createNativeQuery("delete from PENDING_INSTANCES");
+            query.executeUpdate();
+        } finally {
+            em.getTransaction().commit();
+            em.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/14b1bb8f/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index b739037..dbe0cf4 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -21,12 +21,12 @@ package org.apache.falcon.service;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.TimeZone;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
@@ -65,7 +65,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
         Date start = SchemaHelper.parseDateUTC("2014-05-05T00:00Z");
         Date end = SchemaHelper.parseDateUTC("2015-05-05T00:00Z");
 
-        BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>();
+        List<Date> missingInstances = new ArrayList<>();
         missingInstances.add(SchemaHelper.parseDateUTC("2013-05-05T00:00Z")); // before start
time
         missingInstances.add(SchemaHelper.parseDateUTC("2014-05-05T00:00Z")); // equal to
start time
         missingInstances.add(SchemaHelper.parseDateUTC("2014-05-06T00:00Z")); // in between


Mime
View raw message