falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [2/2] falcon git commit: FALCON-965 Open up life cycle stage implementation within Falcon for extension. Contributed by Ajay Yadava.
Date Mon, 28 Sep 2015 19:26:45 GMT
FALCON-965 Open up life cycle stage implementation within Falcon for extension. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f7ad3f48
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f7ad3f48
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f7ad3f48

Branch: refs/heads/master
Commit: f7ad3f487966e2c1a13a8f61c203fc11cc7c73c8
Parents: c462f3e
Author: Ajay Yadava <ajaynsit@gmail.com>
Authored: Tue Sep 29 00:24:36 2015 +0530
Committer: Ajay Yadava <ajaynsit@gmail.com>
Committed: Tue Sep 29 00:24:36 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 client/src/main/resources/feed-0.1.xsd          |  35 ++
 .../org/apache/falcon/entity/FeedHelper.java    |  68 ++-
 .../falcon/entity/parser/FeedEntityParser.java  |  26 +-
 .../lifecycle/AbstractPolicyBuilderFactory.java |  30 +
 .../falcon/lifecycle/FeedLifecycleStage.java    |  37 ++
 .../falcon/lifecycle/LifecyclePolicy.java       |  63 +++
 .../apache/falcon/lifecycle/PolicyBuilder.java  |  37 ++
 .../lifecycle/retention/AgeBasedDelete.java     | 114 ++++
 .../lifecycle/retention/RetentionPolicy.java    |  54 ++
 .../falcon/service/LifecyclePolicyMap.java      |  81 +++
 .../falcon/workflow/WorkflowEngineFactory.java  |   7 +
 common/src/main/resources/startup.properties    |   5 +
 .../apache/falcon/entity/AbstractTestBase.java  |   2 +
 .../apache/falcon/entity/FeedHelperTest.java    | 143 +++++
 .../entity/parser/FeedEntityParserTest.java     |  24 +-
 .../src/test/resources/config/feed/feed-0.3.xml |  83 +++
 .../src/test/resources/config/feed/feed-0.4.xml |  74 +++
 docs/src/site/twiki/EntitySpecification.twiki   |  40 ++
 lifecycle/pom.xml                               | 208 +++++++
 .../engine/oozie/OoziePolicyBuilderFactory.java |  59 ++
 .../retention/AgeBasedCoordinatorBuilder.java   | 112 ++++
 .../oozie/retention/AgeBasedDeleteBuilder.java  |  56 ++
 .../retention/AgeBasedWorkflowBuilder.java      | 153 +++++
 .../engine/oozie/utils/OozieBuilderUtils.java   | 556 +++++++++++++++++++
 .../resources/action/feed/eviction-action.xml   |  59 ++
 .../src/main/resources/binding/jaxb-binding.xjb |  26 +
 .../lifecycle/retention/AgeBasedDeleteTest.java | 108 ++++
 oozie/pom.xml                                   |   7 +
 .../falcon/oozie/feed/FeedBundleBuilder.java    |  39 +-
 .../feed/OozieFeedWorkflowBuilderTest.java      |  34 ++
 .../test/resources/feed/fs-retention-feed.xml   |  50 ++
 .../feed/fs-retention-lifecycle-feed.xml        |  60 ++
 pom.xml                                         |   7 +
 src/conf/startup.properties                     |   7 +
 35 files changed, 2448 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a4edebc..d18d5aa 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Trunk (Unreleased)
     FALCON-1401 MetadataMappingService fails to add an edge for a process instance(Pallavi Rao) 
 
   NEW FEATURES
+    FALCON-965 Open up life cycle stage implementation within Falcon for extension(Ajay Yadava)
+
     FALCON-1437 Change DR recipes notification with Falcon notification(Peeyush Bishnoi via Sowmya Ramesh)
 
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 2af28d2..77b8f4b 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -125,6 +125,7 @@
             <xs:element type="ACL" name="ACL"/>
             <xs:element type="schema" name="schema"/>
             <xs:element type="properties" name="properties" minOccurs="0"/>
+            <xs:element type="lifecycle" name="lifecycle" minOccurs="0" />
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="xs:string" name="description"/>
@@ -160,6 +161,7 @@
                 <xs:element type="locations" name="locations" minOccurs="0"/>
                 <xs:element type="catalog-table" name="table"/>
             </xs:choice>
+            <xs:element type="lifecycle" name="lifecycle" minOccurs="0" />
         </xs:sequence>
         <xs:attribute type="IDENTIFIER" name="name" use="required"/>
         <xs:attribute type="cluster-type" name="type" use="optional"/>
@@ -357,6 +359,20 @@
             <xs:enumeration value="chmod"/>
         </xs:restriction>
     </xs:simpleType>
+    <xs:complexType name="lifecycle">
+        <xs:annotation>
+            <xs:documentation>
+                Lifecycle of the feed consists of various stages. For example typical stages of a feed are import,
+                replication, archival, retention and export. All these stages together are called lifecycle of a feed.
+            </xs:documentation>
+        </xs:annotation>
+
+        <xs:all>
+            <xs:element type="retention-stage" name="retention-stage" minOccurs="0"></xs:element>
+        </xs:all>
+
+    </xs:complexType>
+
     <xs:simpleType name="cluster-type">
         <xs:annotation>
             <xs:documentation>
@@ -435,4 +451,23 @@
             <xs:minLength value="1"/>
         </xs:restriction>
     </xs:simpleType>
+
+    <xs:complexType name="retention-stage">
+        <xs:annotation>
+            <xs:documentation>
+                Retention stage is the new way to define retention for a feed using feed lifecycle feature. Retention
+                has a configurable policy which does the validation and the real execution through workflow engine.
+                This method of specifying retention gives you more control like using different queue name, priority
+                and execution-order for retention than other lifecycle stages of feed like replication.
+            </xs:documentation>
+        </xs:annotation>
+        <xs:all>
+            <xs:element type="non-empty-string" name="policy" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="frequency-type" name="frequency" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="xs:string" name="queue" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="xs:string" name="priority" minOccurs="0" maxOccurs="1"></xs:element>
+            <xs:element type="properties" name="properties" minOccurs="0" maxOccurs="1"></xs:element>
+        </xs:all>
+    </xs:complexType>
+
 </xs:schema>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 572923b..79f1959 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.entity;
 
 import org.apache.commons.lang3.StringUtils;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
@@ -31,14 +30,17 @@ import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.lifecycle.FeedLifecycleStage;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.FeedInstanceResult;
@@ -372,6 +374,49 @@ public final class FeedHelper {
         return feedProperties;
     }
 
+    public static Lifecycle getLifecycle(Feed feed, String clusterName) throws FalconException {
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster !=null) {
+            return cluster.getLifecycle() != null ? cluster.getLifecycle() : feed.getLifecycle();
+        }
+        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
+    }
+
+    public static RetentionStage getRetentionStage(Feed feed, String clusterName) throws FalconException {
+        if (isLifecycleEnabled(feed, clusterName)) {
+            Lifecycle globalLifecycle = feed.getLifecycle();
+            Lifecycle clusterLifecycle = getCluster(feed, clusterName).getLifecycle();
+
+            if (clusterLifecycle != null && clusterLifecycle.getRetentionStage() != null) {
+                return clusterLifecycle.getRetentionStage();
+            } else if (globalLifecycle != null) {
+                return globalLifecycle.getRetentionStage();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Returns various policies applicable for a feed.
+     *
+     * @param feed
+     * @return list of names of lifecycle policies for the given feed, empty list if there are none.
+     */
+    public static List<String> getPolicies(Feed feed, String clusterName) throws FalconException {
+        List<String> result = new ArrayList<>();
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster != null) {
+            if (isLifecycleEnabled(feed, clusterName)) {
+                String policy = getRetentionStage(feed, clusterName).getPolicy();
+                policy = StringUtils.isBlank(policy)
+                        ? FeedLifecycleStage.RETENTION.getDefaultPolicyName() : policy;
+                result.add(policy);
+            }
+            return result;
+        }
+        throw new FalconException("Cluster: " + clusterName + " isn't valid for feed: " + feed.getName());
+    }
+
     /**
      *  Extracts date from the actual data path e.g., /path/2014/05/06 maps to 2014-05-06T00:00Z.
      * @param instancePath - actual data path
@@ -748,4 +793,25 @@ public final class FeedHelper {
         return storage.getInstanceAvailabilityStatus(feed, clusterName, LocationType.DATA, instanceTime);
     }
 
+    public static boolean isLifecycleEnabled(Feed feed, String clusterName) {
+        Cluster cluster = getCluster(feed, clusterName);
+        return cluster != null && (feed.getLifecycle() != null || cluster.getLifecycle() != null);
+    }
+
+    public static Frequency getRetentionFrequency(Feed feed, String clusterName) throws FalconException {
+        Frequency retentionFrequency;
+        RetentionStage retentionStage = getRetentionStage(feed, clusterName);
+        if (retentionStage != null && retentionStage.getFrequency() != null) {
+            retentionFrequency = retentionStage.getFrequency();
+        } else {
+            Frequency.TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
+            if (timeUnit == Frequency.TimeUnit.hours || timeUnit == Frequency.TimeUnit.minutes) {
+                retentionFrequency = new Frequency("hours(6)");
+            } else {
+                retentionFrequency = new Frequency("days(1)");
+            }
+        }
+        return  retentionFrequency;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 4f5599e..c73cc78 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -32,14 +32,14 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityGraph;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Properties;
-import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.ACL;
 import org.apache.falcon.entity.v0.feed.Cluster;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Properties;
+import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
@@ -48,6 +48,7 @@ import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
 import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
@@ -55,9 +56,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.TimeZone;
-import java.util.List;
 
 /**
  * Parser that parses feed entity definition.
@@ -80,6 +81,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
             throw new ValidationException("Feed should have at least one cluster");
         }
 
+        validateLifecycle(feed);
         validateACL(feed);
         for (Cluster cluster : feed.getClusters().getClusters()) {
             validateEntityExists(EntityType.CLUSTER, cluster.getName());
@@ -100,7 +102,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
         validateFeedPartitionExpression(feed);
         validateFeedGroups(feed);
         validateFeedSLA(feed);
-        validateACL(feed);
         validateProperties(feed);
 
         // Seems like a good enough entity object for a new one
@@ -124,6 +125,21 @@ public class FeedEntityParser extends EntityParser<Feed> {
         ensureValidityFor(feed, processes);
     }
 
+    private void validateLifecycle(Feed feed) throws FalconException {
+        LifecyclePolicyMap map = LifecyclePolicyMap.get();
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) {
+                if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) {
+                    throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
+                            + cluster.getName());
+                }
+                for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
+                    map.get(policyName).validate(feed, cluster.getName());
+                }
+            }
+        }
+    }
+
     private Set<Process> findProcesses(Set<Entity> referenced) {
         Set<Process> processes = new HashSet<Process>();
         for (Entity entity : referenced) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java b/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
new file mode 100644
index 0000000..5bcc2f8
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Abstract factory class for feed lifecycle policy builders.
+ */
+public abstract class AbstractPolicyBuilderFactory {
+
+    public abstract PolicyBuilder getPolicyBuilder(String policyName) throws FalconException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
new file mode 100644
index 0000000..833ad04
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.lifecycle;
+
+/**
+ * Enum for valid lifecycle stages for the feed.
+ */
+public enum FeedLifecycleStage {
+
+    RETENTION("AgeBasedDelete");
+
+    private String defaultPolicyName;
+
+    private FeedLifecycleStage(String defaultPolicyName) {
+        this.defaultPolicyName = defaultPolicyName;
+    }
+
+    public String getDefaultPolicyName() {
+        return defaultPolicyName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
new file mode 100644
index 0000000..be4e68c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Interface for all policies in feed lifecycle.
+ */
+public interface LifecyclePolicy {
+
+    /**
+     * Returns the name of the policy. Name of policy must be unique as it is used as an identifier.
+     * @return name of the policy
+     */
+    String getName();
+
+    /**
+     * Returns the stage to which the policy belongs.
+     * @return stage to which the policy belongs.
+     */
+    FeedLifecycleStage getStage();
+
+    /**
+     * Validates the configurations as per this policy.
+     * @param feed Parent feed for which the policy is configured.
+     * @param clusterName cluster to be used as context for validation.
+     * @throws FalconException
+     */
+    void validate(Feed feed, String clusterName) throws FalconException;
+
+    /**
+     * Builds workflow engine artifacts.
+     * @param cluster cluster to be used as context
+     * @param buildPath base path to be used for storing the artifacts.
+     * @param feed Parent feed.
+     * @return Properties to be passed to the caller e.g. bundle in case of oozie workflow engine.
+     * @throws FalconException
+     */
+    Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java b/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
new file mode 100644
index 0000000..5e5055b
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Interface to be implemented by all policy builders for a lifecycle policy.
+ * A Builder builds workflow engine specific artifacts for a policy.
+ */
+public interface PolicyBuilder {
+
+    Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
+
+    String getPolicyName();
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
new file mode 100644
index 0000000..0a1810e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.retention;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.expression.ExpressionHelper;
+
+import java.util.Date;
+
+/**
+ * Retention policy which deletes all instances of instance time older than a given time.
+ * It will create the workflow and coordinators for this policy.
+ */
+public class AgeBasedDelete extends RetentionPolicy {
+
+    public static final String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit";
+
+    @Override
+    public void validate(Feed feed, String clusterName) throws FalconException {
+        // validate that it is a valid cluster
+        Cluster cluster = FeedHelper.getCluster(feed, clusterName);
+        Frequency retentionLimit = getRetentionLimit(feed, clusterName);
+        if (cluster != null) {
+            validateLimitWithSla(feed, cluster, retentionLimit.toString());
+            validateLimitWithLateData(feed, cluster, retentionLimit.toString());
+        }
+    }
+
+    private void validateLimitWithLateData(Feed feed, Cluster cluster, String retention) throws FalconException {
+        ExpressionHelper evaluator = ExpressionHelper.get();
+        long retentionPeriod = evaluator.evaluate(retention, Long.class);
+
+        if (feed.getLateArrival() != null) {
+            String feedCutoff = feed.getLateArrival().getCutOff().toString();
+            long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class);
+            if (retentionPeriod < feedCutOffPeriod) {
+                throw new ValidationException(
+                        "Feed's retention limit: " + retention + " of referenced cluster " + cluster.getName()
+                                + " should be more than feed's late arrival cut-off period: " + feedCutoff
+                                + " for feed: " + feed.getName());
+            }
+        }
+    }
+
+    private void validateLimitWithSla(Feed feed, Cluster cluster, String retentionExpression) throws FalconException {
+        // test that slaHigh is less than retention
+        Sla clusterSla = FeedHelper.getSLAs(cluster, feed);
+        if (clusterSla != null) {
+            ExpressionHelper evaluator = ExpressionHelper.get();
+            ExpressionHelper.setReferenceDate(new Date());
+
+            Frequency slaHighExpression = clusterSla.getSlaHigh();
+            Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class));
+
+            Date retention = new Date(evaluator.evaluate(retentionExpression, Long.class));
+            if (slaHigh.after(retention)) {
+                throw new ValidationException("slaHigh of Feed: " + slaHighExpression
+                        + " is greater than retention of the feed: " + retentionExpression
+                        + " for cluster: " + cluster.getName()
+                );
+            }
+        }
+    }
+
+    public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconException {
+        RetentionStage retention = FeedHelper.getRetentionStage(feed, clusterName);
+        if (retention != null) {
+            String limit = null;
+            for (Property property : retention.getProperties().getProperties()) {
+                if (StringUtils.equals(property.getName(), LIMIT_PROPERTY_NAME)) {
+                    limit = property.getValue();
+                }
+            }
+            if (limit == null) {
+                throw new FalconException("Property: " + LIMIT_PROPERTY_NAME + " is required for "
+                        + getName() + " policy.");
+            }
+            try {
+                return new Frequency(limit);
+            } catch (IllegalArgumentException e) {
+                throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid "
+                        + "frequency e.g. hours(2)", e);
+            }
+        } else {
+            throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
new file mode 100644
index 0000000..7fd6175
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
+import org.apache.falcon.lifecycle.FeedLifecycleStage;
+import org.apache.falcon.lifecycle.LifecyclePolicy;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * All retention policies must implement this interface.
+ */
+public abstract class RetentionPolicy implements LifecyclePolicy {
+
+    @Override
+    public String getName() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public FeedLifecycleStage getStage() {
+        return FeedLifecycleStage.RETENTION;
+    }
+
+    @Override
+    public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+        AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine();
+        PolicyBuilder builder = factory.getPolicyBuilder(getName());
+        return builder.build(cluster, buildPath, feed);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java b/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
new file mode 100644
index 0000000..b8c979e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/LifecyclePolicyMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.lifecycle.FeedLifecycleStage;
+import org.apache.falcon.lifecycle.LifecyclePolicy;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Stores all internal and external feed lifecycle policies.
+ */
+public final class LifecyclePolicyMap implements FalconService {
+    private static final Logger LOG = LoggerFactory.getLogger(LifecyclePolicyMap.class);
+    private static final LifecyclePolicyMap STORE = new LifecyclePolicyMap();
+
+    private final Map<String, LifecyclePolicy> policyMap = new HashMap<>();
+
+    private LifecyclePolicyMap() {}
+
+    public static LifecyclePolicyMap get() {
+        return STORE;
+    }
+
+    public LifecyclePolicy get(String policyName) {
+        return policyMap.get(policyName);
+    }
+
+    @Override
+    public String getName() {
+        return getClass().getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        String[] policyNames = StartupProperties.get().getProperty("falcon.feed.lifecycle.policies").split(",");
+        for (String name : policyNames) {
+            LifecyclePolicy policy = ReflectionUtils.getInstanceByClassName(name);
+            LOG.debug("Loaded policy : {} for stage : {}", policy.getName(), policy.getStage());
+            policyMap.put(policy.getName(), policy);
+        }
+        validate();
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        policyMap.clear();
+    }
+
+    // validate that default policy for each stage is available
+    private void validate() throws FalconException {
+        for (FeedLifecycleStage stage : FeedLifecycleStage.values()) {
+            if (!policyMap.containsKey(stage.getDefaultPolicyName())) {
+                throw new FalconException("Default Policy: " + stage.getDefaultPolicyName()
+                        + " for stage: " + stage.name() + "was not found.");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
index 756c6b8..49592ac 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.workflow;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 
@@ -30,6 +31,8 @@ public final class WorkflowEngineFactory {
 
     private static final String WORKFLOW_ENGINE = "workflow.engine.impl";
 
+    private static final String LIFECYCLE_ENGINE = "lifecycle.engine.impl";
+
     private WorkflowEngineFactory() {
     }
 
@@ -37,4 +40,8 @@ public final class WorkflowEngineFactory {
         return ReflectionUtils.getInstance(WORKFLOW_ENGINE);
     }
 
+    public static AbstractPolicyBuilderFactory getLifecycleEngine() throws FalconException {
+        return ReflectionUtils.getInstance(LIFECYCLE_ENGINE);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 9db460c..357b90c 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -22,6 +22,7 @@
 ## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
 
 *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
 *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
 *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
 *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
@@ -41,6 +42,10 @@
                         org.apache.falcon.service.GroupsService,\
                         org.apache.falcon.service.ProxyUserService
 
+# List of Lifecycle policies configured.
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+# List of builders for the policies.
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
 ##### Falcon Configuration Store Change listeners #####
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
                         org.apache.falcon.entity.ColoClusterRelation,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index e9946c4..aab9cee 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -55,6 +55,8 @@ public class AbstractTestBase {
 
     protected static final String PROCESS_XML = "/config/process/process-0.1.xml";
     protected static final String FEED_XML = "/config/feed/feed-0.1.xml";
+    protected static final String FEED3_XML = "/config/feed/feed-0.3.xml";
+    protected static final String FEED4_XML = "/config/feed/feed-0.4.xml";
     protected static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
     protected EmbeddedCluster dfsCluster;
     protected Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index c70cfcc..4020d36 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -19,6 +19,8 @@
 package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.FeedEntityParser;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -28,9 +30,11 @@ import org.apache.falcon.entity.v0.cluster.Properties;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.entity.v0.feed.Clusters;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Inputs;
@@ -38,6 +42,7 @@ import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -50,6 +55,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.TimeZone;
 
@@ -63,6 +69,7 @@ public class FeedHelperTest extends AbstractTestBase {
     @BeforeClass
     public void init() throws Exception {
         initConfigStore();
+        LifecyclePolicyMap.get().init();
     }
 
     @BeforeMethod
@@ -232,6 +239,16 @@ public class FeedHelperTest extends AbstractTestBase {
     }
 
     @Test
+    public void testGetPolicies() throws Exception {
+        FeedEntityParser parser = (FeedEntityParser) EntityParserFactory
+                .getParser(EntityType.FEED);
+        Feed feed = parser.parse(this.getClass().getResourceAsStream(FEED3_XML));
+        List<String> policies = FeedHelper.getPolicies(feed, "testCluster");
+        Assert.assertEquals(policies.size(), 1);
+        Assert.assertEquals(policies.get(0), "AgeBasedDelete");
+    }
+
+    @Test
     public void testFeedWithNoDependencies() throws Exception {
         Cluster cluster = publishCluster();
         Feed feed = publishFeed(cluster, "minutes(5)", "2011-02-28 10:00 UTC", "2016-02-28 10:00 UTC");
@@ -706,6 +723,132 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(result, expected);
     }
 
+    @Test
+    public void testIsLifeCycleEnabled() throws Exception {
+        Feed feed = new Feed();
+
+        // lifecycle is not defined
+        Clusters clusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        cluster.setName("cluster1");
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+        Assert.assertFalse(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+
+        // lifecycle is defined at global level
+        Lifecycle globalLifecycle = new Lifecycle();
+        RetentionStage retentionStage = new RetentionStage();
+        retentionStage.setFrequency(new Frequency("hours(2)"));
+        globalLifecycle.setRetentionStage(retentionStage);
+        feed.setLifecycle(globalLifecycle);
+        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+
+        // lifecycle is defined at both global and cluster level
+        Lifecycle clusterLifecycle = new Lifecycle();
+        retentionStage = new RetentionStage();
+        retentionStage.setFrequency(new Frequency("hours(4)"));
+        clusterLifecycle.setRetentionStage(retentionStage);
+        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+
+        // lifecycle is defined only at cluster level
+        feed.setLifecycle(null);
+        Assert.assertTrue(FeedHelper.isLifecycleEnabled(feed, cluster.getName()));
+    }
+
+    @Test
+    public void testGetRetentionStage() throws Exception {
+        Feed feed = new Feed();
+        feed.setFrequency(new Frequency("days(1)"));
+
+        // lifecycle is not defined
+        Clusters clusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        cluster.setName("cluster1");
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+        Assert.assertNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("days(1)"));
+
+        // lifecycle is defined at global level
+        Lifecycle globalLifecycle = new Lifecycle();
+        RetentionStage globalRetentionStage = new RetentionStage();
+        globalRetentionStage.setFrequency(new Frequency("hours(2)"));
+        globalLifecycle.setRetentionStage(globalRetentionStage);
+        feed.setLifecycle(globalLifecycle);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                feed.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle is defined at both global and cluster level
+        Lifecycle clusterLifecycle = new Lifecycle();
+        RetentionStage clusterRetentionStage = new RetentionStage();
+        clusterRetentionStage.setFrequency(new Frequency("hours(4)"));
+        clusterLifecycle.setRetentionStage(clusterRetentionStage);
+        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                cluster.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle at both level - retention only at cluster level.
+        feed.getLifecycle().setRetentionStage(null);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                cluster.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle at both level - retention only at global level.
+        feed.getLifecycle().setRetentionStage(globalRetentionStage);
+        feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(null);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                feed.getLifecycle().getRetentionStage().getFrequency());
+
+        // lifecycle is defined only at cluster level
+        feed.setLifecycle(null);
+        feed.getClusters().getClusters().get(0).getLifecycle().setRetentionStage(clusterRetentionStage);
+        Assert.assertNotNull(FeedHelper.getRetentionStage(feed, cluster.getName()));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()),
+                cluster.getLifecycle().getRetentionStage().getFrequency());
+    }
+
+    @Test
+    public void testGetRetentionFrequency() throws Exception {
+        Feed feed = new Feed();
+        feed.setFrequency(new Frequency("days(10)"));
+
+        // no lifecycle defined - test both daily and monthly feeds
+        Clusters clusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        cluster.setName("cluster1");
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("days(1)"));
+
+        feed.setFrequency(new Frequency("hours(1)"));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)"));
+
+        feed.setFrequency(new Frequency("minutes(5)"));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)"));
+
+        // lifecycle at both level - retention only at global level.
+        Lifecycle globalLifecycle = new Lifecycle();
+        RetentionStage globalRetentionStage = new RetentionStage();
+        globalRetentionStage.setFrequency(new Frequency("hours(2)"));
+        globalLifecycle.setRetentionStage(globalRetentionStage);
+        feed.setLifecycle(globalLifecycle);
+
+        Lifecycle clusterLifecycle = new Lifecycle();
+        RetentionStage clusterRetentionStage = new RetentionStage();
+        clusterLifecycle.setRetentionStage(clusterRetentionStage);
+        feed.getClusters().getClusters().get(0).setLifecycle(clusterLifecycle);
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(6)"));
+
+        // lifecycle at both level - retention only at cluster level.
+        feed.getLifecycle().getRetentionStage().setFrequency(null);
+        clusterRetentionStage.setFrequency(new Frequency("hours(4)"));
+        Assert.assertEquals(FeedHelper.getRetentionFrequency(feed, cluster.getName()), new Frequency("hours(4)"));
+    }
+
     private Validity getFeedValidity(String start, String end) throws ParseException {
         Validity validity = new Validity();
         validity.setStart(getDate(start));

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index b6fdb13..1c43800 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -42,6 +42,7 @@ import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.group.FeedGroupMapTest;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.falcon.util.FalconTestUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.Path;
@@ -88,9 +89,9 @@ public class FeedEntityParserTest extends AbstractTestBase {
         cluster.setName("backupCluster");
         store.publish(EntityType.CLUSTER, cluster);
 
+        LifecyclePolicyMap.get().init();
         CurrentUser.authenticate(FalconTestUtil.TEST_USER_2);
-        modifiableFeed = parser.parseAndValidate(this.getClass()
-                .getResourceAsStream(FEED_XML));
+        modifiableFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
     }
 
     @Test(expectedExceptions = ValidationException.class)
@@ -163,6 +164,25 @@ public class FeedEntityParserTest extends AbstractTestBase {
         System.out.println(stringWriter.toString());
     }
 
+    @Test
+    public void testLifecycleParse() throws Exception {
+        Feed feed = parser.parseAndValidate(this.getClass()
+                .getResourceAsStream(FEED3_XML));
+        assertEquals("hours(17)", feed.getLifecycle().getRetentionStage().getFrequency().toString());
+        assertEquals("AgeBasedDelete", FeedHelper.getPolicies(feed, "testCluster").get(0));
+        assertEquals("reports", feed.getLifecycle().getRetentionStage().getQueue());
+        assertEquals("NORMAL", feed.getLifecycle().getRetentionStage().getPriority());
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+            expectedExceptionsMessageRegExp = ".*Retention is a mandatory stage.*")
+    public void testMandatoryRetention() throws Exception {
+        Feed feed = parser.parseAndValidate(this.getClass()
+                .getResourceAsStream(FEED3_XML));
+        feed.getLifecycle().setRetentionStage(null);
+        parser.validate(feed);
+    }
+
     @Test(expectedExceptions = ValidationException.class)
     public void applyValidationInvalidFeed() throws Exception {
         Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/resources/config/feed/feed-0.3.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-0.3.xml b/common/src/test/resources/config/feed/feed-0.3.xml
new file mode 100644
index 0000000..fc3ea06
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-0.3.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
+        >
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+            <lifecycle>
+                <retention-stage>
+                    <frequency>hours(10)</frequency>
+                    <queue>reports</queue>
+                    <priority>NORMAL</priority>
+                    <properties>
+                        <property name="retention.policy.agebaseddelete.limit" value="hours(9)"></property>
+                    </properties>
+                </retention-stage>
+            </lifecycle>
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser-ut-user" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+    <lifecycle>
+        <retention-stage>
+            <frequency>hours(17)</frequency>
+            <queue>reports</queue>
+            <priority>NORMAL</priority>
+            <properties>
+                <property name="retention.policy.agebaseddelete.limit" value="hours(7)"></property>
+            </properties>
+        </retention-stage>
+    </lifecycle>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/common/src/test/resources/config/feed/feed-0.4.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-0.4.xml b/common/src/test/resources/config/feed/feed-0.4.xml
new file mode 100644
index 0000000..3983c59
--- /dev/null
+++ b/common/src/test/resources/config/feed/feed-0.4.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1"
+        >
+    <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
+    <partitions>
+        <partition name="fraud"/>
+        <partition name="good"/>
+    </partitions>
+
+    <groups>online,bi</groups>
+    <availabilityFlag>_SUCCESS</availabilityFlag>
+
+    <frequency>hours(1)</frequency>
+    <sla slaLow="hours(2)" slaHigh="hours(3)"/>
+    <timezone>UTC</timezone>
+
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="testCluster" type="source">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(48)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <sla slaLow="hours(3)" slaHigh="hours(4)"/>
+            <locations>
+                <location type="data" path="/projects/falcon/clicks"/>
+                <location type="stats" path="/projects/falcon/clicksStats"/>
+                <location type="meta" path="/projects/falcon/clicksMetaData"/>
+            </locations>
+        </cluster>
+        <cluster name="backupCluster" type="target">
+            <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+            <retention limit="hours(6)" action="archive"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/projects/falcon/clicks"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="testuser-ut-user" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+    <lifecycle>
+        <retention-stage>
+            <frequency>hours(17)</frequency>
+            <queue>reports</queue>
+            <priority>NORMAL</priority>
+            <properties>
+                <property name="retention.policy.agebaseddelete.limit" value="hours(7)"></property>
+            </properties>
+        </retention-stage>
+    </lifecycle>
+
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index d4f4140..e07fe12 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -316,6 +316,46 @@ destination but not in source during replication. "preserveBlockSize" represents
 replication. "preserveReplicationNumber" represents preserving replication number during replication.
 "preservePermission" represents preserving permission during
 
+
+---+++ Lifecycle
+<verbatim>
+
+<lifecycle>
+    <retention-stage>
+        <frequency>hours(10)</frequency>
+        <queue>reports</queue>
+        <priority>NORMAL</priority>
+        <properties>
+            <property name="retention.policy.agebaseddelete.limit" value="hours(9)"></property>
+        </properties>
+    </retention-stage>
+</lifecycle>
+
+</verbatim>
+
+lifecycle tag is the new way to define various stages of a feed's lifecycle. In the example above we have defined a
+retention-stage using lifecycle tag. You may define lifecycle at global level or a cluster level or both. Cluster level
+configuration takes precedence and falcon falls back to global definition if cluster level specification is missing.
+
+
+----++++ Retention Stage
+As of now there are two ways to specify retention. One is through the <retention> tag in the cluster and another is the
+new way through <retention-stage> tag in <lifecycle> tag. If both are defined for a feed, then the lifecycle tag will be
+considered effective and falcon will ignore the <retention> tag in the cluster. If there is an invalid configuration of
+retention-stage in lifecycle tag, then falcon will *NOT* fall back to retention tag even if it is defined and will
+throw validation error.
+
+In this new method of defining retention you can specify the frequency at which the retention should occur, you can
+also define the queue and priority parameters for retention jobs. The default behavior of retention-stage is same as
+the existing one which is to delete all instances corresponding to instance-time earlier than the duration provided in
+"retention.policy.agebaseddelete.limit"
+
+Property "retention.policy.agebaseddelete.limit" is a mandatory property and must contain a valid duration e.g. "hours(1)"
+
+In future, we will allow more customisation like customising how to choose instances to be deleted through this method.
+
+
+
 ---++ Process Specification
 A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines  the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on.  
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/pom.xml
----------------------------------------------------------------------
diff --git a/lifecycle/pom.xml b/lifecycle/pom.xml
new file mode 100644
index 0000000..ddb9550
--- /dev/null
+++ b/lifecycle/pom.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.falcon</groupId>
+        <artifactId>falcon-main</artifactId>
+        <version>0.8-SNAPSHOT</version>
+    </parent>
+    <artifactId>falcon-feed-lifecycle</artifactId>
+    <description>Apache Falcon Lifecycle Module</description>
+    <name>Apache Falcon LIfecycle Module</name>
+    <packaging>jar</packaging>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                 <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-client</artifactId>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>unpack</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>unpack</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.oozie</groupId>
+                                    <artifactId>oozie-client</artifactId>
+                                    <outputDirectory>${project.build.directory}/oozie-schemas</outputDirectory>
+                                    <includes>**/*.xsd</includes>
+                                    <excludes>**/*.class</excludes>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jvnet.jaxb2.maven2</groupId>
+                <artifactId>maven-jaxb2-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>coord-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.coordinator</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>oozie-coordinator-0.3.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>wf-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <!-- Uses workflow xsd from unpacked oozie client jar to customize jaxb binding.
+                            jaxb binding is required to avoid 'Property "Any" is already defined' error-->
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.workflow</generatePackage>
+                            <bindingDirectory>src/main/resources/binding</bindingDirectory>
+                            <schemaDirectory>${project.build.directory}/oozie-schemas</schemaDirectory>
+                            <schemaIncludes>
+                                <include>oozie-workflow-0.3.xsd</include>
+                            </schemaIncludes>
+                            <debug>true</debug>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>hive-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.hive</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>hive-action-0.2.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>bundle-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.bundle</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>oozie-bundle-0.1.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/log4j.xml</exclude>
+                    </excludes>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java
new file mode 100644
index 0000000..7464b4c
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/OoziePolicyBuilderFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds feed lifecycle policies for Oozie workflow engine.
+ */
+public class OoziePolicyBuilderFactory extends AbstractPolicyBuilderFactory {
+
+    private static Map<String, PolicyBuilder> registry = new HashMap<>();
+
+    static {
+        String builders = StartupProperties.get().getProperty("falcon.feed.lifecycle.policy.builders", "");
+        if (StringUtils.isNotBlank(builders)) {
+            for (String builder : builders.split(",")) {
+                try {
+                    PolicyBuilder policyBuilder = ReflectionUtils.getInstanceByClassName(builder);
+                    registry.put(policyBuilder.getPolicyName(), policyBuilder);
+                } catch (FalconException e) {
+                    throw new RuntimeException("Couldn't load builder for " + builder.getClass().getSimpleName(), e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public PolicyBuilder getPolicyBuilder(String policyName) throws FalconException {
+        if (registry.containsKey(policyName)) {
+            return registry.get(policyName);
+        }
+        throw new FalconException("Couldn't find builder for policy " + policyName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
new file mode 100644
index 0000000..4601070
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedCoordinatorBuilder.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.ExecutionType;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Properties;
+
+/**
+ * Utility class to build coordinators for AgeBasedDelete Policy.
+ */
+public final class AgeBasedCoordinatorBuilder {
+
+    private AgeBasedCoordinatorBuilder() {
+
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AgeBasedCoordinatorBuilder.class);
+
+    /**
+     * Builds the coordinator app.
+     * @param cluster - cluster to schedule retention on.
+     * @param basePath - Base path to marshal coordinator app.
+     * @param feed - feed for which retention is to be scheduled.
+     * @param wfProp - properties passed from workflow to coordinator e.g. ENTITY_PATH
+     * @return - Properties from creating the coordinator application to be used by Bundle.
+     * @throws FalconException
+     */
+    public static Properties build(Cluster cluster, Path basePath, Feed feed, Properties wfProp)
+        throws FalconException {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        if (feedCluster.getValidity().getEnd().before(new Date())) {
+            LOG.warn("Feed Retention is not applicable as Feed's end time for cluster {} is not in the future",
+                    cluster.getName());
+            return null;
+        }
+
+        COORDINATORAPP coord = new COORDINATORAPP();
+        String coordName = EntityUtil.getWorkflowName(LifeCycle.EVICTION.getTag(), feed).toString();
+        coord.setName(coordName);
+        coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+        coord.setStart(SchemaHelper.formatDateUTC(new Date()));
+        coord.setTimezone(feed.getTimezone().getID());
+
+        Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, cluster.getName());
+        // set controls
+        long frequencyInMillis = ExpressionHelper.get().evaluate(retentionFrequency.toString(), Long.class);
+        CONTROLS controls = new CONTROLS();
+        controls.setExecution(ExecutionType.LAST_ONLY.value());
+        controls.setTimeout(String.valueOf(frequencyInMillis / (1000 * 60)));
+        coord.setControls(controls);
+
+        coord.setFrequency("${coord:" + retentionFrequency.toString() + "}");
+
+        Path buildPath = OozieBuilderUtils.getBuildPath(basePath, LifeCycle.EVICTION.getTag());
+        Properties props = OozieBuilderUtils.createCoordDefaultConfiguration(coordName, feed);
+        props.putAll(OozieBuilderUtils.getProperties(buildPath, coordName));
+
+        WORKFLOW workflow = new WORKFLOW();
+        String entityPath = wfProp.getProperty(OozieBuilderUtils.ENTITY_PATH);
+        String storagePath = OozieBuilderUtils.getStoragePath(entityPath);
+        workflow.setAppPath(storagePath);
+        workflow.setConfiguration(OozieBuilderUtils.getCoordinatorConfig(props));
+        ACTION action = new ACTION();
+        action.setWorkflow(workflow);
+
+        coord.setAction(action);
+
+        Path marshalPath = OozieBuilderUtils.marshalCoordinator(cluster, coord, buildPath);
+        return OozieBuilderUtils.getProperties(marshalPath, coordName);
+    }
+
+
+    protected static WorkflowExecutionContext.EntityOperations getOperation() {
+        return WorkflowExecutionContext.EntityOperations.DELETE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java
new file mode 100644
index 0000000..edb4f8e
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedDeleteBuilder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.lifecycle.PolicyBuilder;
+import org.apache.falcon.lifecycle.retention.AgeBasedDelete;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Oozie Builder for AgeBasedDelete policy.
+ */
+public class AgeBasedDeleteBuilder implements PolicyBuilder {
+
+    private static final String NAME = new AgeBasedDelete().getName();
+
+    @Override
+    public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+        Properties wfProps = buildWorkflow(cluster, buildPath, feed);
+        return buildCoordinator(cluster, buildPath, feed, wfProps);
+    }
+
+    @Override
+    public String getPolicyName() {
+        return NAME;
+    }
+
+    public Properties buildCoordinator(Cluster cluster, Path buildPath, Feed feed, Properties wfProps)
+        throws FalconException {
+        return AgeBasedCoordinatorBuilder.build(cluster, buildPath, feed, wfProps);
+    }
+
+    public Properties buildWorkflow(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
+        return AgeBasedWorkflowBuilder.build(cluster, buildPath, feed);
+    }
+}


Mime
View raw message