falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject falcon git commit: FALCON-1644 Retention : Some feed instances are never deleted by retention jobs. Contributed by Balu Vellanki.
Date Tue, 15 Dec 2015 17:54:52 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 4591ffb61 -> 4b523130f


FALCON-1644 Retention : Some feed instances are never deleted by retention jobs. Contributed
by Balu Vellanki.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4b523130
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4b523130
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4b523130

Branch: refs/heads/master
Commit: 4b523130fbbcaff9e2ff6e577ceb52e3572aaa3c
Parents: 4591ffb
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Tue Dec 15 21:54:56 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Tue Dec 15 21:54:56 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../org/apache/falcon/entity/FeedHelper.java    | 18 +++++++++++
 common/src/main/resources/runtime.properties    |  4 +++
 docs/src/site/twiki/FalconDocumentation.twiki   |  6 +++-
 .../retention/AgeBasedCoordinatorBuilder.java   | 10 +++++-
 .../feed/FeedRetentionCoordinatorBuilder.java   |  8 +++++
 .../feed/OozieFeedWorkflowBuilderTest.java      | 33 +++++++++++++++-----
 src/conf/runtime.properties                     |  4 +++
 webapp/src/test/resources/runtime.properties    |  4 +++
 9 files changed, 80 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce346a8..37c5d67 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -74,6 +74,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1644 Retention : Some feed instances are never deleted by retention jobs(Balu
Vellanki via Ajay Yadava)
+
     FALCON-1641 Triage on an invalid feed instance throws IndexOutOfBoundException(Karishma
Gulati via Ajay Yadava)
 
     FALCON-1572 Only one instance is running in a process when run using Native Scheduler(Pallavi
Rao via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 138a43f..18d5152 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -38,6 +38,7 @@ import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.Locations;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.MergeType;
+import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Input;
@@ -1030,4 +1031,21 @@ public final class FeedHelper {
         }
         return retentionFrequency;
     }
+
+    public static int getRetentionLimitInSeconds(Feed feed, String clusterName) throws FalconException
{
+        Frequency retentionLimit = new Frequency("minutes(0)");
+        RetentionStage retentionStage = getRetentionStage(feed, clusterName);
+        if (retentionStage != null) {
+            for (Property property : retentionStage.getProperties().getProperties()) {
+                if (property.getName().equalsIgnoreCase("retention.policy.agebaseddelete.limit"))
{
+                    retentionLimit = new Frequency(property.getValue());
+                    break;
+                }
+            }
+        } else {
+            retentionLimit = getCluster(feed, clusterName).getRetention().getLimit();
+        }
+        Long freqInMillis = DateUtil.getFrequencyInMillis(retentionLimit);
+        return (int) (freqInMillis/1000);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index f499dd9..643559e 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -25,6 +25,10 @@
 *.falcon.replication.workflow.mapbandwidth=100
 *.webservices.default.results.per.page=10
 
+# If true, do not run retention past feedCluster validity end time.
+# This will retain recent instances beyond feedCluster validity end time.
+*.falcon.retention.keep.instances.beyond.validity=true
+
 # Default configs to handle replication for late arriving feeds.
 *.feed.late.allowed=true
 *.feed.late.frequency=hours(3)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/docs/src/site/twiki/FalconDocumentation.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconDocumentation.twiki b/docs/src/site/twiki/FalconDocumentation.twiki
index 95f388a..21b22ff 100644
--- a/docs/src/site/twiki/FalconDocumentation.twiki
+++ b/docs/src/site/twiki/FalconDocumentation.twiki
@@ -257,7 +257,11 @@ feed/data xml in the following manner for each cluster the feed can belong
to :
 
 The 'limit' attribute can be specified in units of minutes/hours/days/months, and a corresponding
numeric value can
 be attached to it. It essentially instructs the system to retain data till the time specified
-in the attribute spanning backwards in time, from now. Any data older than that is erased
from the system.
+in the attribute spanning backwards in time, from now. Any data older than that is erased
from the system. By default,
+Falcon runs retention jobs up to the cluster validity end time. This causes the instances
created within the endTime
+and "endTime - retentionLimit" to be retained forever. If the users do not want to retain
any instances of the
+feed past the cluster validity end time, user should set property "falcon.retention.keep.instances.beyond.validity"
+to false in runtime.properties.
 
 With the integration of Hive, Falcon also provides retention for tables in Hive catalog.
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
index 47ab2fc..7a25d86 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.lifecycle.engine.oozie.retention;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.entity.EntityUtil;
@@ -33,6 +34,7 @@ import org.apache.falcon.oozie.coordinator.ACTION;
 import org.apache.falcon.oozie.coordinator.CONTROLS;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -73,7 +75,13 @@ public final class AgeBasedCoordinatorBuilder {
         COORDINATORAPP coord = new COORDINATORAPP();
         String coordName = EntityUtil.getWorkflowName(LifeCycle.EVICTION.getTag(), feed).toString();
         coord.setName(coordName);
-        coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+        Date endDate = feedCluster.getValidity().getEnd();
+        if (RuntimeProperties.get().getProperty(
+                "falcon.retention.keep.instances.beyond.validity", "true").equalsIgnoreCase("false"))
{
+            int retentionLimitinSecs = FeedHelper.getRetentionLimitInSeconds(feed, cluster.getName());
+            endDate = DateUtils.addSeconds(endDate, retentionLimitinSecs);
+        }
+        coord.setEnd(SchemaHelper.formatDateUTC(endDate));
         coord.setStart(SchemaHelper.formatDateUTC(new Date()));
         coord.setTimezone(feed.getTimezone().getID());
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 69ca2c3..9f05a09 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -35,6 +35,7 @@ import org.apache.falcon.oozie.coordinator.ACTION;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.WORKFLOW;
 import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.util.RuntimeProperties;
 import org.apache.hadoop.fs.Path;
 
 import java.util.Arrays;
@@ -59,8 +60,15 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
         COORDINATORAPP coord = new COORDINATORAPP();
         String coordName = getEntityName();
         coord.setName(coordName);
+
         Date endDate = feedCluster.getValidity().getEnd();
+        if (RuntimeProperties.get().getProperty(
+                "falcon.retention.keep.instances.beyond.validity", "true").equalsIgnoreCase("false"))
{
+            int retentionLimitinSecs = FeedHelper.getRetentionLimitInSeconds(entity, cluster.getName());
+            endDate = DateUtils.addSeconds(endDate, retentionLimitinSecs);
+        }
         coord.setEnd(SchemaHelper.formatDateUTC(endDate));
+
         if (feedCluster.getValidity().getEnd().before(new Date())) {
             Date startDate = DateUtils.addMinutes(endDate, -1);
             coord.setStart(SchemaHelper.formatDateUTC(startDate));

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index d034b1a..9388c68 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.oozie.feed;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
@@ -51,6 +52,7 @@ import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.service.LifecyclePolicyMap;
+import org.apache.falcon.util.DateUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
@@ -161,21 +163,30 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         trgMiniDFS.shutdown();
     }
 
-    @Test
-    public void testRetentionWithLifecycle() throws Exception {
+    @DataProvider(name = "keepInstancesPostValidity")
+    private Object[][] keepInstancesPostValidity() {
+        return new Object[][] {
+            {"false", "2099-01-01T02:00Z"},
+            {"true", "2099-01-01T00:00Z"},
+        };
+    }
+
+    @Test(dataProvider = "keepInstancesPostValidity")
+    public void testRetentionWithLifecycle(String keepInstancesPostValidity, String endTime)
throws Exception {
+        RuntimeProperties.get().setProperty("falcon.retention.keep.instances.beyond.validity",
+                keepInstancesPostValidity);
         OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed);
         Path bundlePath = new Path("/projects/falcon/");
         builder.build(trgCluster, bundlePath);
 
         BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
         List<COORDINATOR> coords = bundle.getCoordinator();
-
         COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
         assertLibExtensions(coord, "retention");
         HashMap<String, String> props = getCoordProperties(coord);
         Assert.assertEquals(props.get("ENTITY_PATH"), bundlePath.toString() + "/RETENTION");
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(17)}");
-        Assert.assertEquals(coord.getEnd(), "2099-01-01T00:00Z");
+        Assert.assertEquals(coord.getEnd(), endTime);
         Assert.assertEquals(coord.getTimezone(), "UTC");
 
         HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(),
coord);
@@ -187,6 +198,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(wfProps.get("jobPriority"), "LOW");
     }
 
+
     @Test
     public void testRetentionFrequency() throws Exception {
         feed.setFrequency(new Frequency("minutes(36000)"));
@@ -675,9 +687,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed,
                 srcCluster.getName());
-        final Calendar instance = Calendar.getInstance();
-        instance.roll(Calendar.YEAR, 1);
-        cluster.getValidity().setEnd(instance.getTime());
+        Calendar startCal = Calendar.getInstance();
+        Calendar endCal = Calendar.getInstance();
+        endCal.roll(Calendar.DATE, 1);
+        cluster.getValidity().setEnd(endCal.getTime());
+        RuntimeProperties.get().setProperty("falcon.retention.keep.instances.beyond.validity",
"false");
 
         OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(feed, Tag.RETENTION);
         List<Properties> coords = builder.buildCoords(
@@ -689,6 +703,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
+        Assert.assertEquals(coord.getStart(), DateUtil.getDateFormatFromTime(startCal.getTimeInMillis()));
+        Date endDate = DateUtils.addSeconds(endCal.getTime(),
+                FeedHelper.getRetentionLimitInSeconds(feed, srcCluster.getName()));
+        Assert.assertEquals(coord.getEnd(), DateUtil.getDateFormatFromTime(endDate.getTime()));
+
         HashMap<String, String> props = getCoordProperties(coord);
 
         HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/src/conf/runtime.properties
----------------------------------------------------------------------
diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties
index 4bc1bc0..f535b0f 100644
--- a/src/conf/runtime.properties
+++ b/src/conf/runtime.properties
@@ -41,6 +41,10 @@ falcon.current.colo=local
 *.feed.late.frequency=hours(3)
 *.feed.late.policy=exp-backoff
 
+# If true, do not run retention past feedCluster validity end time.
+# This will retain recent instances beyond feedCluster validity end time.
+*.falcon.retention.keep.instances.beyond.validity=true
+
 # If true, Falcon skips oozie dryrun while scheduling entities.
 *.falcon.skip.dryrun=false
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4b523130/webapp/src/test/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/runtime.properties b/webapp/src/test/resources/runtime.properties
index fec9e44..7dec191 100644
--- a/webapp/src/test/resources/runtime.properties
+++ b/webapp/src/test/resources/runtime.properties
@@ -25,6 +25,10 @@
 *.falcon.replication.workflow.mapbandwidth=100
 *.webservices.default.results.per.page=10
 
+# If true, do not run retention past feedCluster validity end time.
+# This will retain recent instances beyond feedCluster validity end time.
+*.falcon.retention.keep.instances.beyond.validity=true
+
 # Default configs to handle replication for late arriving feeds.
 *.feed.late.allowed=true
 *.feed.late.frequency=hours(3)


Mime
View raw message