falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [1/3] falcon git commit: FALCON-1678 SLA Monitoring does not honour entity end date. Contributed by Ajay Yadava.
Date Mon, 11 Jan 2016 09:54:24 GMT
Repository: falcon
Updated Branches:
  refs/heads/0.9 f451d548a -> eb598ee99


FALCON-1678 SLA Monitoring does not honour entity end date. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3451f2c8
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3451f2c8
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3451f2c8

Branch: refs/heads/0.9
Commit: 3451f2c8eebcbeebc874430250f8c3a5e820e2f4
Parents: f451d54
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Mon Jan 11 14:42:45 2016 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Mon Jan 11 15:23:29 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/FeedHelper.java    |  9 +++
 .../falcon/util/DeploymentProperties.java       |  2 +-
 .../apache/falcon/entity/FeedHelperTest.java    | 16 ++++
 .../service/FeedSLAMonitoringService.java       |  7 +-
 .../falcon/service/FeedSLAMonitoringTest.java   | 85 ++++++++++++++++++--
 6 files changed, 110 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 437dc45..94010c3 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1678 SLA Monitoring does not honour entity end date(Ajay Yadava)
+
     FALCON-1708  params API does not take start as a mandatory option(Praveen Adlakha via
Ajay Yadava)
 
     FALCON-1725 Falcon API shows results in ascending order in native scheduler (Pallavi
Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 150e0bd..575ceb3 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -42,6 +42,7 @@ import org.apache.falcon.entity.v0.feed.MergeType;
 import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
@@ -1128,6 +1129,14 @@ public final class FeedHelper {
         return argsMap;
     }
 
+    public static Validity getClusterValidity(Feed feed, String clusterName) throws FalconException
{
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster == null) {
+            throw new FalconException("Invalid cluster: " + clusterName + " for feed: " +
feed.getName());
+        }
+        return cluster.getValidity();
+    }
+
     public static Frequency getOldRetentionFrequency(Feed feed) {
         Frequency feedFrequency = feed.getFrequency();
         Frequency defaultFrequency = new Frequency("hours(24)");

http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
index 715b7ba..5879f30 100644
--- a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
@@ -31,7 +31,7 @@ public final class DeploymentProperties extends ApplicationProperties {
     private static final String PROPERTY_FILE = "deploy.properties";
 
     private static final AtomicReference<DeploymentProperties> INSTANCE =
-            new AtomicReference<DeploymentProperties>();
+            new AtomicReference<>();
 
     private DeploymentProperties() throws FalconException {
         super();

http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 9841083..d565f94 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -929,6 +929,22 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(startInstResult, feed.getClusters().getClusters().get(0).getValidity().getStart());
     }
 
+    @Test
+    public void testGetFeedClusterValidity() throws  Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)",  "2012-02-07 00:00 UTC", "2020-02-25
00:00 UTC");
+        Validity validity = FeedHelper.getClusterValidity(feed, cluster.getName());
+        Assert.assertEquals(validity.getStart(), getDate("2012-02-07 00:00 UTC"));
+        Assert.assertEquals(validity.getEnd(), getDate("2020-02-25 00:00 UTC"));
+    }
+
+    @Test(expectedExceptions = FalconException.class)
+    public void testGetClusterValidityInvalidCluster() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)",  "2012-02-07 00:00 UTC", "2020-02-25
00:00 UTC");
+        FeedHelper.getClusterValidity(feed, "abracadabra");
+    }
+
     private Validity getFeedValidity(String start, String end) throws ParseException {
         Validity validity = new Validity();
         validity.setStart(getDate(start));

http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 1cd571e..8ffecd8 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -80,7 +80,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         return SERVICE;
     }
 
-    private int queueSize;
+    protected int queueSize;
 
     /**
      * Permissions for storePath.
@@ -90,7 +90,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     /**
      * Feeds to be monitored.
      */
-    private Set<String> monitoredFeeds;
+    protected Set<String> monitoredFeeds;
 
 
     /**
@@ -340,7 +340,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                     org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
                             EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
                     nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster,
nextInstanceTime);
-                    while (nextInstanceTime.before(to)) {
+                    Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd();
+                    while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate))
{
                         if (instances.size() >= queueSize) { // if no space, first make
some space
                             LOG.debug("Removing instance={} for <feed,cluster>={}",
instances.peek(), key);
                             exists.remove(instances.peek());

http://git-wip-us.apache.org/repos/asf/falcon/blob/3451f2c8/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index e3dd5cc..90eec4d 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -18,27 +18,42 @@
 
 package org.apache.falcon.service;
 
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
+import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 /**
  * Tests for FeedSLAMonitoring Service.
  */
-public class FeedSLAMonitoringTest {
+public class FeedSLAMonitoringTest extends AbstractTestBase {
+    private static final String CLUSTER_NAME = "testCluster";
+    private static final String FEED_NAME = "testFeed";
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
     @Test
     public void testSLAStatus() throws FalconException {
@@ -122,4 +137,60 @@ public class FeedSLAMonitoringTest {
 
         Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(),
1);
     }
+
+    @Test
+    public void testEndDateCheck() throws Exception {
+        Cluster cluster = publishCluster();
+        publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC");
+        Pair<String, String> feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME);
+
+        FeedSLAMonitoringService service = FeedSLAMonitoringService.get();
+        service.initializeService();
+        service.queueSize = 100;
+        service.monitoredFeeds.add(FEED_NAME);
+        Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
+        Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z");
+        service.addNewPendingFeedInstances(from, to);
+        // check that instances after feed's end date are not added.
+        Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5);
+    }
+
+    private Cluster publishCluster() throws FalconException {
+        Cluster cluster = new Cluster();
+        cluster.setName(CLUSTER_NAME);
+        cluster.setColo("default");
+        getStore().publish(EntityType.CLUSTER, cluster);
+        return cluster;
+
+    }
+
+    private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+        Feed feed = new Feed();
+        feed.setName(FEED_NAME);
+        Frequency f = new Frequency(frequency);
+        feed.setFrequency(f);
+        feed.setTimezone(UTC);
+        Clusters fClusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        fCluster.setType(ClusterType.SOURCE);
+        fCluster.setName(cluster.getName());
+        fCluster.setValidity(getFeedValidity(start, end));
+        fClusters.getClusters().add(fCluster);
+        feed.setClusters(fClusters);
+        getStore().publish(EntityType.FEED, feed);
+        return feed;
+    }
+
+    private Validity getFeedValidity(String start, String end) throws ParseException {
+        Validity validity = new Validity();
+        validity.setStart(getDate(start));
+        validity.setEnd(getDate(end));
+        return validity;
+    }
+
+    private Date getDate(String dateString) throws ParseException {
+        DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
+        return format.parse(dateString);
+    }
 }


Mime
View raw message