falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajayyad...@apache.org
Subject [1/2] falcon git commit: FALCON-965 Open up life cycle stage implementation within Falcon for extension. Contributed by Ajay Yadava.
Date Mon, 28 Sep 2015 19:26:44 GMT
Repository: falcon
Updated Branches:
  refs/heads/master c462f3e05 -> f7ad3f487


http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
new file mode 100644
index 0000000..0a87213
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.HiveUtil;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
+import org.apache.falcon.lifecycle.engine.oozie.utils.OozieBuilderUtils;
+import org.apache.falcon.lifecycle.retention.AgeBasedDelete;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Workflow Builder for AgeBasedDelete.
+ */
+public final class AgeBasedWorkflowBuilder {
+    private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml";
+    private static final String EVICTION_ACTION_NAME = "eviction";
+
+    private AgeBasedWorkflowBuilder(){
+
+    }
+
+    public static Properties build(Cluster cluster, Path basePath, Feed feed) throws FalconException {
+        Path buildPath = OozieBuilderUtils.getBuildPath(basePath, LifeCycle.EVICTION.getTag());
+        WORKFLOWAPP workflow = new WORKFLOWAPP();
+        String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
+
+        //Add eviction action
+        ACTION eviction = OozieBuilderUtils.unmarshalAction(EVICTION_ACTION_TEMPLATE);
+        OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
+                OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(eviction);
+
+        //Add post-processing actions
+        ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
+        OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(success);
+
+        ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
+        OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(fail);
+
+        OozieBuilderUtils.decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
+        OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION, EntityType.FEED);
+
+        // Prepare and marshal properties to config-default.xml
+        Properties props = OozieBuilderUtils.getProperties(buildPath, wfName);
+        props.putAll(getWorkflowProperties(feed, cluster));
+        props.putAll(OozieBuilderUtils.createDefaultConfiguration(cluster, feed,
+                WorkflowExecutionContext.EntityOperations.DELETE));
+        props.putAll(FeedHelper.getUserWorkflowProperties(LifeCycle.EVICTION));
+        // override the queueName and priority
+        RetentionStage retentionStage = FeedHelper.getRetentionStage(feed, cluster.getName());
+        props.put(OozieBuilderUtils.MR_QUEUE_NAME, retentionStage.getQueue());
+        props.put(OozieBuilderUtils.MR_JOB_PRIORITY, retentionStage.getPriority());
+
+        if (EntityUtil.isTableStorageType(cluster, feed)) {
+            setupHiveCredentials(cluster, buildPath, workflow);
+            // copy paste todo kludge send source hcat creds for coord dependency check to pass
+            props.putAll(HiveUtil.getHiveCredentials(cluster));
+        }
+
+        // Write out the config to config-default.xml
+        OozieBuilderUtils.marshalDefaultConfig(cluster, workflow, props, buildPath);
+
+        // write out the workflow.xml
+        OozieBuilderUtils.marshalWokflow(cluster, workflow, buildPath);
+        return props;
+    }
+
+    private static Properties getWorkflowProperties(Feed feed, Cluster cluster) throws FalconException {
+        final Storage storage = FeedHelper.createStorage(cluster, feed);
+        Properties props = new Properties();
+        props.setProperty("srcClusterName", "NA");
+        props.setProperty("availabilityFlag", "NA");
+        props.put("timeZone", feed.getTimezone().getID());
+        props.put("frequency", feed.getFrequency().getTimeUnit().name());
+        props.put("falconFeedStorageType", storage.getType().name());
+        props.put("limit", new AgeBasedDelete().getRetentionLimit(feed, cluster.getName()).toString());
+        props.put("falconInputFeeds", feed.getName());
+        props.put("falconInPaths", OozieBuilderUtils.IGNORE);
+
+        String feedDataPath = storage.getUriTemplate();
+        props.put("feedDataPath",
+                feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), feed.getName());
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OozieBuilderUtils.IGNORE);
+
+        return props;
+    }
+
+    private static void setupHiveCredentials(Cluster cluster, Path wfPath, WORKFLOWAPP workflowApp)
+        throws FalconException {
+        if (SecurityUtil.isSecurityEnabled()) {
+            // add hcatalog credentials for secure mode and add a reference to each action
+            OozieBuilderUtils.addHCatalogCredentials(workflowApp, cluster, OozieBuilderUtils.HIVE_CREDENTIAL_NAME);
+        }
+
+        // create hive-site.xml file so actions can use it in the classpath
+        OozieBuilderUtils.createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance
+
+        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                continue;
+            }
+
+            org.apache.falcon.oozie.workflow.ACTION action =
+                    (org.apache.falcon.oozie.workflow.ACTION) object;
+            String actionName = action.getName();
+            if (EVICTION_ACTION_NAME.equals(actionName)) {
+                // add reference to hive-site conf to each action
+                action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+
+                if (SecurityUtil.isSecurityEnabled()) {
+                    // add a reference to credential in the action
+                    action.setCred(OozieBuilderUtils.HIVE_CREDENTIAL_NAME);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
new file mode 100644
index 0000000..be9175e
--- /dev/null
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.engine.oozie.utils;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ExternalId;
+import org.apache.falcon.entity.HiveUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CREDENTIAL;
+import org.apache.falcon.oozie.workflow.CREDENTIALS;
+import org.apache.falcon.oozie.workflow.END;
+import org.apache.falcon.oozie.workflow.KILL;
+import org.apache.falcon.oozie.workflow.START;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.falcon.workflow.util.OozieConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.namespace.QName;
+import javax.xml.transform.stream.StreamSource;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class to build oozie artificats.
+ */
+public final class OozieBuilderUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(OozieBuilderUtils.class);
+
+    private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml";
+
+    public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+    private static final String USER_JMS_NOTIFICATION_ENABLED = "userJMSNotificationEnabled";
+    public static final String MR_QUEUE_NAME = "queueName";
+    public static final String MR_JOB_PRIORITY = "jobPriority";
+    private static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
+    private static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
+    private static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+
+    private static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+    private static final JAXBContext ACTION_JAXB_CONTEXT;
+    private static final JAXBContext COORD_JAXB_CONTEXT;
+    private static final JAXBContext CONFIG_JAXB_CONTEXT;
+
+
+    public static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing";
+    public static final String FAIL_POSTPROCESS_ACTION_NAME = "failed-post-processing";
+    public static final String OK_ACTION_NAME = "end";
+    public static final String FAIL_ACTION_NAME = "fail";
+
+
+    public static final String ENTITY_PATH = "ENTITY_PATH";
+    public static final String ENTITY_NAME = "ENTITY_NAME";
+    public static final String IGNORE = "IGNORE";
+
+
+    static {
+        try {
+            WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+            ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class);
+            COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
+            CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.CONFIGURATION.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXB context", e);
+        }
+    }
+
+    private OozieBuilderUtils() {
+
+    }
+
+    public static ACTION addTransition(ACTION action, String ok, String fail) {
+        // XTODOS : why return when it is changing the same object?
+        action.getOk().setTo(ok);
+        action.getError().setTo(fail);
+        return action;
+    }
+
+
+    public static void decorateWorkflow(WORKFLOWAPP wf, String name, String startAction) {
+        wf.setName(name);
+        wf.setStart(new START());
+        wf.getStart().setTo(startAction);
+
+        wf.setEnd(new END());
+        wf.getEnd().setName(OK_ACTION_NAME);
+
+        KILL kill = new KILL();
+        kill.setName(FAIL_ACTION_NAME);
+        kill.setMessage("Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
+        wf.getDecisionOrForkOrJoin().add(kill);
+    }
+
+    public static ACTION getSuccessPostProcessAction() throws FalconException {
+        ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+        decorateWithOozieRetries(action);
+        return action;
+    }
+
+    public static ACTION getFailPostProcessAction() throws FalconException {
+        ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+        decorateWithOozieRetries(action);
+        action.setName(FAIL_POSTPROCESS_ACTION_NAME);
+        return action;
+    }
+
+    private static Path marshal(Cluster cluster, JAXBElement<?> jaxbElement,
+                           JAXBContext jaxbContext, Path outPath) throws FalconException {
+        try {
+            Marshaller marshaller = jaxbContext.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+
+            if (LOG.isDebugEnabled()) {
+                StringWriter writer = new StringWriter();
+                marshaller.marshal(jaxbElement, writer);
+                LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
+                LOG.debug(writer.getBuffer().toString());
+            }
+
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                    outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+            OutputStream out = fs.create(outPath);
+            try {
+                marshaller.marshal(jaxbElement, out);
+            } finally {
+                out.close();
+            }
+
+            LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
+            return outPath;
+        } catch (Exception e) {
+            throw new FalconException("Unable to marshall app object", e);
+        }
+    }
+
+    public static Path marshalCoordinator(Cluster cluster, COORDINATORAPP coord, Path outPath) throws FalconException {
+        return marshal(cluster, new org.apache.falcon.oozie.coordinator.ObjectFactory().createCoordinatorApp(coord),
+                COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
+    }
+
+
+    public static Path marshalDefaultConfig(Cluster cluster, WORKFLOWAPP workflowapp,
+               Properties properties, Path outPath) throws FalconException {
+        QName workflowQName = new org.apache.falcon.oozie.workflow.ObjectFactory()
+                .createWorkflowApp(workflowapp).getName();
+        org.apache.falcon.oozie.workflow.CONFIGURATION config = getWorkflowConfig(properties);
+        JAXBElement<org.apache.falcon.oozie.workflow.CONFIGURATION> configJaxbElement =
+                new JAXBElement(new QName(workflowQName.getNamespaceURI(), "configuration", workflowQName.getPrefix()),
+                        org.apache.falcon.oozie.workflow.CONFIGURATION.class, config);
+
+        Path defaultConfigPath = new Path(outPath, "config-default.xml");
+        return marshal(cluster, configJaxbElement, CONFIG_JAXB_CONTEXT, defaultConfigPath);
+    }
+
+
+    public static Path marshalWokflow(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+        return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+                WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+    }
+
+    public static <T> T unmarshal(String template, JAXBContext context, Class<T> cls) throws FalconException {
+        InputStream resourceAsStream = null;
+        try {
+            resourceAsStream = OozieBuilderUtils.class.getResourceAsStream(template);
+            Unmarshaller unmarshaller = context.createUnmarshaller();
+            JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new StreamSource(resourceAsStream), cls);
+            return jaxbElement.getValue();
+        } catch (JAXBException e) {
+            throw new FalconException("Failed to unmarshal " + template, e);
+        } finally {
+            IOUtils.closeQuietly(resourceAsStream);
+        }
+    }
+
+    public static ACTION unmarshalAction(String template) throws FalconException {
+        return unmarshal(template, ACTION_JAXB_CONTEXT, ACTION.class);
+    }
+
+    // XTODOS Should we make them more specific to feeds??
+    public static void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag, EntityType type)
+        throws FalconException {
+        String libext = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/libext";
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
+                ClusterHelper.getConfiguration(cluster));
+        try {
+            addExtensionJars(fs, new Path(libext), wf);
+            addExtensionJars(fs, new Path(libext, type.name()), wf);
+            if (tag != null) {
+                addExtensionJars(fs, new Path(libext, type.name() + "/" + tag.name().toLowerCase()),
+                        wf);
+            }
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+    }
+
+    /**
+     *
+     * @param path
+     * @param name
+     * @return
+     */
+    public static Properties getProperties(Path path, String name) {
+        if (path == null) {
+            return null;
+        }
+        Properties prop = new Properties();
+        prop.setProperty(ENTITY_PATH, path.toString());
+        prop.setProperty(ENTITY_NAME, name);
+        return prop;
+    }
+
+
+    /**
+     * Adds path(will be the list of directories containing jars to be added as external jars to workflow e.g.
+     * for feeds libext, libext/FEED/, libext/FEED/RETENTION, libext/FEED/REPLICATION as an extension jar to the
+     * workflow. e.g.
+     *
+     * @param fs
+     * @param path
+     * @param wf
+     * @throws IOException
+     */
+    public static void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+        FileStatus[] libs;
+        try {
+            libs = fs.listStatus(path);
+        } catch (FileNotFoundException ignore) {
+            //Ok if the libext is not configured
+            return;
+        }
+
+        for (FileStatus lib : libs) {
+            if (lib.isDirectory()) {
+                continue;
+            }
+
+            for (Object obj : wf.getDecisionOrForkOrJoin()) {
+                if (!(obj instanceof ACTION)) {
+                    continue;
+                }
+                ACTION action = (ACTION) obj;
+                List<String> files = null;
+                if (action.getJava() != null) {
+                    files = action.getJava().getFile();
+                } else if (action.getPig() != null) {
+                    files = action.getPig().getFile();
+                } else if (action.getMapReduce() != null) {
+                    files = action.getMapReduce().getFile();
+                }
+                if (files != null) {
+                    files.add(lib.getPath().toString());
+                }
+            }
+        }
+    }
+
+
+    public static void decorateWithOozieRetries(ACTION action) {
+        Properties props = RuntimeProperties.get();
+        action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+        action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+    }
+
+    // creates the default configuration which is written in config-default.xml
+    public static Properties createDefaultConfiguration(Cluster cluster, Entity entity,
+                WorkflowExecutionContext.EntityOperations operation)  throws FalconException {
+        Properties props = new Properties();
+        props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+        props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+        props.put("falconDataOperation", operation.name());
+
+        props.put(WorkflowExecutionArgs.LOG_DIR.getName(),
+                getStoragePath(EntityUtil.getLogPath(cluster, entity)));
+        props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
+
+        addLateDataProperties(props, entity);
+        addBrokerProperties(cluster, props);
+
+        props.put(MR_QUEUE_NAME, "default");
+        props.put(MR_JOB_PRIORITY, "NORMAL");
+
+        //properties provided in entity override the default generated properties
+        props.putAll(EntityUtil.getEntityProperties(entity));
+        props.putAll(createAppProperties(cluster));
+        return props;
+    }
+
+
+    // gets the cluster specific properties to be populated in config-default.xml
+    private static Properties createAppProperties(Cluster cluster) throws FalconException {
+        Properties properties = EntityUtil.getEntityProperties(cluster);
+        properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
+        properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
+        properties.setProperty("colo.name", cluster.getColo());
+        final String endpoint = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint();
+        if (!OozieConstants.LOCAL_OOZIE.equals(endpoint)) {
+            properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
+        }
+        properties.setProperty("falcon.libpath",
+                ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath()  + "/lib");
+
+        return properties;
+    }
+
+    // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
+    public static void createHiveConfiguration(Cluster cluster, Path workflowPath,
+                                           String prefix) throws FalconException {
+        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+
+        try {
+            Configuration conf = ClusterHelper.getConfiguration(cluster);
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+
+            // create hive conf to stagingDir
+            Path confPath = new Path(workflowPath + "/conf");
+
+            persistHiveConfiguration(fs, confPath, hiveConf, prefix);
+        } catch (IOException e) {
+            throw new FalconException("Unable to create create hive site", e);
+        }
+    }
+
+    private static void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
+                                          String prefix) throws IOException {
+        OutputStream out = null;
+        try {
+            out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+            hiveConf.writeXml(out);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param workflowApp workflow xml
+     * @param cluster     cluster entity
+     */
+    public static void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName) {
+        CREDENTIALS credentials = workflowApp.getCredentials();
+        if (credentials == null) {
+            credentials = new CREDENTIALS();
+        }
+
+        credentials.getCredential().add(createHCatalogCredential(cluster, credentialName));
+
+        // add credential for workflow
+        workflowApp.setCredentials(credentials);
+    }
+
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param cluster        cluster entity
+     * @param credentialName credential name
+     * @return CREDENTIALS object
+     */
+    public static CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) {
+        final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+
+        CREDENTIAL credential = new CREDENTIAL();
+        credential.setName(credentialName);
+        credential.setType("hcat");
+
+        credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl));
+        credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL,
+                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)));
+
+        return credential;
+    }
+
+    public static CREDENTIAL.Property createProperty(String name, String value) {
+        CREDENTIAL.Property property = new CREDENTIAL.Property();
+        property.setName(name);
+        property.setValue(value);
+        return property;
+    }
+
+    private static Properties getHiveCredentials(Cluster cluster) {
+        String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+        if (metaStoreUrl == null) {
+            throw new IllegalStateException("Registry interface is not defined in cluster: " + cluster.getName());
+        }
+
+        Properties hiveCredentials = new Properties();
+        hiveCredentials.put(HiveUtil.METASTOREURIS, metaStoreUrl);
+        hiveCredentials.put(HiveUtil.METASTORE_UGI, "true");
+        hiveCredentials.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
+        hiveCredentials.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            String principal = ClusterHelper
+                    .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
+            hiveCredentials.put(SecurityUtil.METASTORE_PRINCIPAL, principal);
+            hiveCredentials.put(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL, principal);
+            hiveCredentials.put(SecurityUtil.METASTORE_USE_THRIFT_SASL, "true");
+        }
+
+        return hiveCredentials;
+    }
+
+    private static Configuration getHiveCredentialsAsConf(Cluster cluster) {
+        Properties hiveCredentials = getHiveCredentials(cluster);
+
+        Configuration hiveConf = new Configuration(false);
+        for (Map.Entry<Object, Object> entry : hiveCredentials.entrySet()) {
+            hiveConf.set((String)entry.getKey(), (String)entry.getValue());
+        }
+
+        return hiveConf;
+    }
+
+    public static Path getBuildPath(Path buildPath, Tag tag) {
+        return new Path(buildPath, tag.name());
+    }
+
+    protected static String getStoragePath(Path path) {
+        if (path != null) {
+            return getStoragePath(path.toString());
+        }
+        return null;
+    }
+
+    public static String getStoragePath(String path) {
+        if (StringUtils.isNotEmpty(path)) {
+            if (new Path(path).toUri().getScheme() == null && !path.startsWith("${nameNode}")) {
+                path = "${nameNode}" + path;
+            }
+        }
+        return path;
+    }
+
+    // default configuration for coordinator
+    public static Properties createCoordDefaultConfiguration(String coordName, Entity entity)
+        throws FalconException {
+
+        Properties props = new Properties();
+        props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
+        props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
+        props.put(OozieClient.EXTERNAL_ID,
+                new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+                        "${coord:nominalTime()}").getId());
+        props.put(USER_JMS_NOTIFICATION_ENABLED, "true");
+        //props in entity override the set props.
+        props.putAll(EntityUtil.getEntityProperties(entity));
+        return props;
+    }
+
+    private static void addLateDataProperties(Properties props, Entity entity) throws FalconException {
+        if (EntityUtil.getLateProcess(entity) == null
+                || EntityUtil.getLateProcess(entity).getLateInputs() == null
+                || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+            props.put("shouldRecord", "false");
+        } else {
+            props.put("shouldRecord", "true");
+        }
+    }
+
+    private static void addBrokerProperties(Cluster cluster, Properties props) {
+        props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
+                ClusterHelper.getMessageBrokerUrl(cluster));
+        props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
+                ClusterHelper.getMessageBrokerImplClass(cluster));
+
+        String falconBrokerUrl = StartupProperties.get().getProperty(
+                "broker.url", "tcp://localhost:61616?daemon=true");
+        props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+
+        String falconBrokerImplClass = StartupProperties.get().getProperty(
+                "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+        props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+
+        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+                DEFAULT_BROKER_MSG_TTL.toString());
+        props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
+    }
+
+
+    private static org.apache.falcon.oozie.workflow.CONFIGURATION getWorkflowConfig(Properties props) {
+        org.apache.falcon.oozie.workflow.CONFIGURATION conf = new org.apache.falcon.oozie.workflow.CONFIGURATION();
+        for (Map.Entry<Object, Object> prop : props.entrySet()) {
+            org.apache.falcon.oozie.workflow.CONFIGURATION.Property confProp =
+                    new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+            confProp.setName((String) prop.getKey());
+            confProp.setValue((String) prop.getValue());
+            conf.getProperty().add(confProp);
+        }
+        return conf;
+    }
+
+    public static CONFIGURATION getCoordinatorConfig(Properties props) {
+        CONFIGURATION conf = new CONFIGURATION();
+        for (Map.Entry<Object, Object> prop : props.entrySet()) {
+            CONFIGURATION.Property confProp = new CONFIGURATION.Property();
+            confProp.setName((String) prop.getKey());
+            confProp.setValue((String) prop.getValue());
+            conf.getProperty().add(confProp);
+        }
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/resources/action/feed/eviction-action.xml
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml
new file mode 100644
index 0000000..4ab67d2
--- /dev/null
+++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml
@@ -0,0 +1,59 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<action name="eviction" xmlns="uri:oozie:workflow:0.3">
+    <java>
+        <job-tracker>${jobTracker}</job-tracker>
+        <name-node>${nameNode}</name-node>
+        <configuration>
+            <property>
+                <name>mapred.job.queue.name</name>
+                <value>${queueName}</value>
+            </property>
+            <property>
+                <name>oozie.launcher.mapred.job.priority</name>
+                <value>${jobPriority}</value>
+            </property>
+            <!-- HCatalog jars -->
+            <property>
+                <name>oozie.action.sharelib.for.java</name>
+                <value>hcatalog</value>
+            </property>
+            <property>
+                <name>oozie.launcher.oozie.libpath</name>
+                <value>${wf:conf("falcon.libpath")}</value>
+            </property>
+        </configuration>
+        <main-class>org.apache.falcon.retention.FeedEvictor</main-class>
+        <arg>-feedBasePath</arg>
+        <arg>${feedDataPath}</arg>
+        <arg>-falconFeedStorageType</arg>
+        <arg>${falconFeedStorageType}</arg>
+        <arg>-retentionType</arg>
+        <arg>instance</arg>
+        <arg>-retentionLimit</arg>
+        <arg>${limit}</arg>
+        <arg>-timeZone</arg>
+        <arg>${timeZone}</arg>
+        <arg>-frequency</arg>
+        <arg>${frequency}</arg>
+        <arg>-logFile</arg>
+        <arg>${logDir}/job-${nominalTime}/${wf:run()}/evicted-instancePaths.csv</arg>
+    </java>
+    <ok to="succeeded-post-processing"/>
+    <error to="failed-post-processing"/>
+</action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/main/resources/binding/jaxb-binding.xjb
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/resources/binding/jaxb-binding.xjb b/lifecycle/src/main/resources/binding/jaxb-binding.xjb
new file mode 100644
index 0000000..1a43660
--- /dev/null
+++ b/lifecycle/src/main/resources/binding/jaxb-binding.xjb
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<jaxb:bindings xmlns:xs="http://www.w3.org/2001/XMLSchema"
+               xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" version="2.1">
+
+    <jaxb:bindings schemaLocation="../../../../target/oozie-schemas/oozie-workflow-0.3.xsd"
+                   node="//xs:complexType[@name='ACTION']/xs:sequence/xs:any[@namespace='uri:oozie:sla:0.1']">
+        <jaxb:property name="anySLA"/>
+    </jaxb:bindings>
+</jaxb:bindings>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java b/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java
new file mode 100644
index 0000000..cf90f04
--- /dev/null
+++ b/lifecycle/src/test/java/org/apache/falcon/lifecycle/retention/AgeBasedDeleteTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle.retention;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LateArrival;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
+import org.apache.falcon.entity.v0.feed.Properties;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for AgeBasedDelete Policy validations.
+ */
+public class AgeBasedDeleteTest {
+    private static Feed feed;
+    private static String clusterName = "testCluster";
+
+    @BeforeMethod
+    private void init() {
+        feed = new Feed();
+        Cluster cluster = new Cluster();
+        cluster.setName(clusterName);
+
+        Property property = new Property();
+        property.setName(AgeBasedDelete.LIMIT_PROPERTY_NAME);
+        property.setValue("hours(3)");
+
+        Properties properties = new Properties();
+        properties.getProperties().add(property);
+
+        RetentionStage retentionStage = new RetentionStage();
+        retentionStage.setProperties(properties);
+
+        Lifecycle lifecycle = new Lifecycle();
+        lifecycle.setRetentionStage(retentionStage);
+
+        cluster.setLifecycle(lifecycle);
+
+        Clusters clusters = new Clusters();
+        clusters.getClusters().add(cluster);
+        feed.setClusters(clusters);
+
+        //set sla
+        Sla sla = new Sla();
+        sla.setSlaLow(new Frequency("hours(3)"));
+        sla.setSlaHigh(new Frequency("hours(3)"));
+        feed.setSla(sla);
+
+        // set late data arrival
+        LateArrival lateArrival = new LateArrival();
+        lateArrival.setCutOff(new Frequency("hours(3)"));
+        feed.setLateArrival(lateArrival);
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+        expectedExceptionsMessageRegExp = ".*slaHigh of Feed:.*")
+    public void testSlaValidation() throws FalconException {
+        feed.getSla().setSlaHigh(new Frequency("hours(4)"));
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+    expectedExceptionsMessageRegExp = ".*Feed's retention limit:.*")
+    public void testLateDataValidation() throws FalconException {
+        feed.getLateArrival().setCutOff(new Frequency("hours(4)"));
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+
+    @Test(expectedExceptions = FalconException.class,
+        expectedExceptionsMessageRegExp = ".*Invalid value for property.*")
+    public void testValidateLimit() throws FalconException {
+        feed.getClusters().getClusters().get(0).getLifecycle().getRetentionStage().getProperties().getProperties()
+                .get(0).setValue("invalid");
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+
+    @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = ".*limit is required.*")
+    public void testStageValidity() throws Exception {
+        feed.getClusters().getClusters().get(0).getLifecycle().getRetentionStage().getProperties().getProperties()
+                .clear();
+        new AgeBasedDelete().validate(feed, clusterName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 157edf9..40c0a3e 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -90,6 +90,13 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-feed-lifecycle</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index b819dee..9e55edf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -20,10 +20,14 @@ package org.apache.falcon.oozie.feed;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
+import org.apache.falcon.lifecycle.LifecyclePolicy;
 import org.apache.falcon.oozie.OozieBundleBuilder;
 import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
@@ -38,16 +42,32 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
         super(entity);
     }
 
-    @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
-        List<Properties> props = new ArrayList<Properties>();
-        List<Properties> evictionProps =
-            OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);
-        if (evictionProps != null) {
-            props.addAll(evictionProps);
+    @Override
+    protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+        // if feed has lifecycle defined - then use it to create coordinator and wf else fall back
+        List<Properties> props = new ArrayList<>();
+        Lifecycle lifecycle = this.entity.getLifecycle();
+        if (lifecycle != null) {
+            for (String name : FeedHelper.getPolicies(this.entity, cluster.getName())) {
+                LifecyclePolicy policy = LifecyclePolicyMap.get().get(name);
+                if (policy == null) {
+                    LOG.error("Couldn't find lifecycle policy for name:{}", name);
+                    throw new FalconException("Invalid policy name " + name);
+                }
+                Properties appProps = policy.build(cluster, buildPath, this.entity);
+                if (appProps != null) {
+                    props.add(appProps);
+                }
+            }
+        } else {
+            List<Properties> evictionProps =
+                    OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);
+            if (evictionProps != null) {
+                props.addAll(evictionProps);
+            }
         }
-
-        List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION).buildCoords(cluster,
-            buildPath);
+        List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION)
+                .buildCoords(cluster, buildPath);
         if (replicationProps != null) {
             props.addAll(replicationProps);
         }
@@ -55,7 +75,6 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
         if (!props.isEmpty()) {
             copySharedLibs(cluster, new Path(getLibPath(buildPath)));
         }
-
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 7d0174a..cfce1ae 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -49,6 +49,7 @@ import org.apache.falcon.oozie.workflow.JAVA;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.service.LifecyclePolicyMap;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
@@ -88,12 +89,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     private Feed feed;
     private Feed tableFeed;
     private Feed fsReplFeed;
+    private Feed lifecycleRetentionFeed;
+    private Feed retentionFeed;
 
     private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml";
     private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml";
     private static final String FEED = "/feed/feed.xml";
     private static final String TABLE_FEED = "/feed/table-replication-feed.xml";
     private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml";
+    private static final String FS_RETENTION_LIFECYCLE_FEED = "/feed/fs-retention-lifecycle-feed.xml";
+    private static final String FS_RETENTION_ORIG_FEED = "/feed/fs-retention-feed.xml";
 
     @BeforeClass
     public void setUpDFS() throws Exception {
@@ -105,6 +110,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
         String trgHdfsUrl = trgMiniDFS.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
 
+        LifecyclePolicyMap.get().init();
         cleanupStore();
 
         org.apache.falcon.entity.v0.cluster.Property property =
@@ -124,6 +130,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         feed = (Feed) storeEntity(EntityType.FEED, FEED);
         fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED);
         tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED);
+        lifecycleRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_LIFECYCLE_FEED);
+        retentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_ORIG_FEED);
     }
 
     private Entity storeEntity(EntityType type, String resource) throws Exception {
@@ -150,6 +158,32 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
     }
 
     @Test
+    public void testRetentionWithLifecycle() throws Exception {
+        OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed);
+        Path bundlePath = new Path("/projects/falcon/");
+        builder.build(trgCluster, bundlePath);
+
+        BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
+        List<COORDINATOR> coords = bundle.getCoordinator();
+
+        COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
+        assertLibExtensions(coord, "retention");
+        HashMap<String, String> props = getCoordProperties(coord);
+        Assert.assertEquals(props.get("ENTITY_PATH"), bundlePath.toString() + "/RETENTION");
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(17)}");
+        Assert.assertEquals(coord.getEnd(), "2099-01-01T00:00Z");
+        Assert.assertEquals(coord.getTimezone(), "UTC");
+
+        HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
+        Assert.assertEquals(wfProps.get("feedNames"), lifecycleRetentionFeed.getName());
+        Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name()));
+        Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
+        Assert.assertEquals(wfProps.get("queueName"), "retention");
+        Assert.assertEquals(wfProps.get("limit"), "hours(2)");
+        Assert.assertEquals(wfProps.get("jobPriority"), "LOW");
+    }
+
+    @Test
     public void testReplicationCoordsForFSStorage() throws Exception {
         OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
         Path bundlePath = new Path("/projects/falcon/");

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/resources/feed/fs-retention-feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/fs-retention-feed.xml b/oozie/src/test/resources/feed/fs-retention-feed.xml
new file mode 100644
index 0000000..7eb85fa
--- /dev/null
+++ b/oozie/src/test/resources/feed/fs-retention-feed.xml
@@ -0,0 +1,50 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="lifecycle original retention feed" name="retention-test" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" type="source" name="corp1">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location path="/data/lifecycle/" type="data"/>
+        <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+        <location path="/data/regression/fetlrc/billing/metadata" type="meta"/>
+    </locations>
+
+    <ACL permission="0x755" group="group" owner="fetl"/>
+    <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+    <properties>
+        <property name="maxMaps" value="33" />
+        <property name="mapBandwidth" value="2" />
+    </properties>
+
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
new file mode 100644
index 0000000..2cadfe0
--- /dev/null
+++ b/oozie/src/test/resources/feed/fs-retention-lifecycle-feed.xml
@@ -0,0 +1,60 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="lifecycle retention feed" name="retention-lifecycle-test" xmlns="uri:falcon:feed:0.1">
+    <partitions>
+        <partition name="colo"/>
+        <partition name="eventTime"/>
+        <partition name="impressionHour"/>
+        <partition name="pricingModel"/>
+    </partitions>
+
+    <frequency>minutes(5)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="minutes(1)"/>
+
+    <clusters>
+        <cluster partition="${cluster.colo}" type="source" name="corp2">
+            <validity end="2099-01-01T00:00Z" start="2012-10-01T12:00Z"/>
+            <retention action="delete" limit="days(10000)"/>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location path="/data/lifecycle/" type="data"/>
+        <location path="/data/regression/fetlrc/billing/stats" type="stats"/>
+        <location path="/data/regression/fetlrc/billing/metadata" type="meta"/>
+    </locations>
+
+    <ACL permission="0x755" group="group" owner="fetl"/>
+    <schema provider="protobuf" location="/databus/streams_local/click_rr/schema/"/>
+    <properties>
+        <property name="maxMaps" value="33" />
+        <property name="mapBandwidth" value="2" />
+    </properties>
+
+    <lifecycle>
+        <retention-stage>
+            <frequency>hours(17)</frequency>
+            <queue>retention</queue>
+            <priority>LOW</priority>
+            <properties>
+                <property name="retention.policy.agebaseddelete.limit" value="hours(2)"></property>
+            </properties>
+        </retention-stage>
+    </lifecycle>
+</feed>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8127b46..9cdfc87 100644
--- a/pom.xml
+++ b/pom.xml
@@ -434,6 +434,7 @@
         <module>rerun</module>
         <module>prism</module>
         <module>unit</module>
+        <module>lifecycle</module>
         <module>webapp</module>
         <module>docs</module>
     </modules>
@@ -844,6 +845,12 @@
 
             <dependency>
                 <groupId>org.apache.falcon</groupId>
+                <artifactId>falcon-feed-lifecycle</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.falcon</groupId>
                 <artifactId>falcon-process</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f7ad3f48/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 8f3bc35..9c6aef7 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -26,6 +26,7 @@
 ## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
 
 *.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
 *.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
 *.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
 *.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
@@ -49,6 +50,12 @@
 ##### Prism Services #####
 prism.application.services=org.apache.falcon.entity.store.ConfigurationStore
 
+
+# List of Lifecycle policies configured.
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+# List of builders for the policies.
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
+
 ##### Falcon Configuration Store Change listeners #####
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
                         org.apache.falcon.entity.ColoClusterRelation,\


Mime
View raw message