falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [2/8] git commit: FALCON-107 Adding extensions. Contributed by Shwetha GS
Date Mon, 23 Sep 2013 23:04:50 GMT
FALCON-107 Adding extensions. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/0dc6c6c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/0dc6c6c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/0dc6c6c2

Branch: refs/heads/FALCON-85
Commit: 0dc6c6c2851f1438067418deae41d47402baa08e
Parents: d422dba
Author: Shwetha GS <shwethags@gmail.com>
Authored: Tue Sep 17 15:32:07 2013 +0530
Committer: Shwetha GS <shwethags@gmail.com>
Committed: Tue Sep 17 15:32:07 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/EntityUtil.java    |  7 ++
 .../entity/parser/ClusterEntityParser.java      |  7 +-
 .../entity/parser/ProcessEntityParser.java      | 10 +--
 common/src/main/resources/startup.properties    |  3 +
 docs/src/site/twiki/InstallationSteps.twiki     | 27 ++++--
 .../falcon/converter/OozieFeedMapper.java       | 89 +++++++++++---------
 .../falcon/converter/OozieFeedMapperTest.java   | 62 ++++++++++++--
 feed/src/test/resources/feed.xml                |  4 +-
 .../falcon/hadoop/FileSystemExtension.java      | 28 ++++++
 hadoop-webapp/pom.xml                           |  6 ++
 hadoop-webapp/src/main/resources/core-site.xml  |  5 ++
 .../converter/AbstractOozieEntityMapper.java    | 53 ++++++++++--
 .../service/SharedLibraryHostingService.java    | 75 +++++++++++------
 pom.xml                                         |  2 +-
 .../falcon/converter/OozieProcessMapper.java    |  5 ++
 .../converter/OozieProcessMapperTest.java       | 31 +++++++
 rerun/pom.xml                                   | 35 ++++++++
 .../apache/falcon/retention/FeedEvictor.java    |  9 +-
 src/bin/service-start.sh                        |  6 +-
 webapp/pom.xml                                  |  4 +
 .../falcon/resource/EntityManagerJerseyIT.java  | 41 +++++++--
 .../org/apache/falcon/resource/TestContext.java |  4 +-
 webapp/src/test/resources/feed-template1.xml    |  2 +-
 24 files changed, 400 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fbf99d3..9970301 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-107 Adding extensions. (Shwetha GS)
+
     FALCON-62 Falcon compilation with hadoop 2.0 libs. (Shwetha GS
     via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 4897985..fc4a467 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -597,4 +597,11 @@ public final class EntityUtil {
         return uriFormat.format(utcDate);
     }
 
+    public static boolean responsibleFor(String colo) {
+        if (DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
+                && colo.equals(DeploymentUtil.getCurrentColo()))) {
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index b4e4a95..b1cf8f3 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -24,11 +24,11 @@ import javax.jms.ConnectionFactory;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.StoreAccessException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -57,10 +57,7 @@ public class ClusterEntityParser extends EntityParser<Cluster> {
         validateScheme(cluster, Interfacetype.WORKFLOW);
         validateScheme(cluster, Interfacetype.MESSAGING);
 
-        // No interface validations in prism or other falcon servers.
-        // Only the falcon server for which the cluster belongs to should validate interfaces
-        if (DeploymentUtil.isPrism() || !cluster.getColo().equals(DeploymentUtil.getCurrentColo())) {
-            LOG.info("No interface validations in prism or falcon servers not applicable.");
+        if (!EntityUtil.responsibleFor(cluster.getColo())) {
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 75d7374..e4a9cf0 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -28,6 +28,7 @@ import java.util.TimeZone;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -38,7 +39,6 @@ import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.util.DeploymentUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -97,13 +97,13 @@ public class ProcessEntityParser extends EntityParser<Process> {
     }
 
     private void validateHDFSpaths(Process process, String clusterName) throws FalconException {
-        //No filesystem checks in prism
-        if (DeploymentUtil.isPrism()) {
+        org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
+                clusterName);
+
+        if (!EntityUtil.responsibleFor(cluster.getColo())) {
             return;
         }
 
-        org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
-                clusterName);
         String workflowPath = process.getWorkflow().getPath();
         String libPath = process.getWorkflow().getLib();
         String nameNode = getNameNode(cluster, clusterName);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 1b4e470..fea2a31 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -49,6 +49,9 @@ debug.config.oozie.conf.uri=${user.dir}/target/oozie
 debug.system.lib.location=${system.lib.location}
 debug.broker.url=vm://localhost
 debug.retry.recorder.path=${user.dir}/target/retry
+debug.libext.feed.retention.paths=${falcon.libext}
+debug.libext.feed.replication.paths=${falcon.libext}
+debug.libext.process.paths=${falcon.libext}
 
 *.falcon.cleanup.service.frequency=days(1)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index cf55697..c459fb0 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -90,12 +90,23 @@ bin/falcon-start [-port <port>]
 </verbatim>
 
 By default, 
-- falcon server starts at port 15000. To change the port, use -port option
-- falcon server starts embedded active mq. To control this behaviour, set the following system properties using -D option in environment variable FALCON_OPTS:
-  - falcon.embeddedmq=<true/false> - Should server start embedded active mq, default true
-  - falcon.emeddedmq.port=<port> - Port for embedded active mq, default 61616
-  - falcon.embeddedmq.data=<path> - Data path for embedded active mq, default {package dir}/logs/data
-- falcon server starts with conf from {package dir}/conf. To override this (to use the same conf with multiple falcon upgrades), set environment variable FALCON_CONF to the path of conf dir
+* falcon server starts at port 15000. To change the port, use -port option
+* falcon server starts embedded active mq. To control this behaviour, set the following system properties using -D option in environment variable FALCON_OPTS:
+   * falcon.embeddedmq=<true/false> - Should server start embedded active mq, default true
+   * falcon.emeddedmq.port=<port> - Port for embedded active mq, default 61616
+   * falcon.embeddedmq.data=<path> - Data path for embedded active mq, default {package dir}/logs/data
+* falcon server starts with conf from {package dir}/conf. To override this (to use the same conf with multiple falcon upgrades), set environment variable FALCON_CONF to the path of conf dir
+
+__Adding Extension Libraries__
+Library extensions allows users to add custom libraries to entity lifecycles such as feed retention, feed replication and process execution. This is useful for usecases such as adding filesystem extensions. To enable this, add the following configs to startup.properties:
+*.libext.paths=<paths to be added to all entity lifecycles>
+*.libext.feed.paths=<paths to be added to all feed lifecycles>
+*.libext.feed.retentions.paths=<paths to be added to feed retention workflow>
+*.libext.feed.replication.paths=<paths to be added to feed replication workflow>
+*.libext.process.paths=<paths to be added to process workflow>
+
+The configured jars are added to falcon classpath and the corresponding workflows
+
 
 *Starting Prism*
 <verbatim>
@@ -103,8 +114,8 @@ bin/prism-start [-port <port>]
 </verbatim>
 
 By default, 
-- falcon server starts at port 16000. To change the port, use -port option
-- prism starts with conf from {package dir}/conf. To override this (to use the same conf with multiple prism upgrades), set environment variable FALCON_CONF to the path of conf dir
+* falcon server starts at port 16000. To change the port, use -port option
+* prism starts with conf from {package dir}/conf. To override this (to use the same conf with multiple prism upgrades), set environment variable FALCON_CONF to the path of conf dir
 
 *Using Falcon*
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index adef3ec..886e0e0 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -114,46 +114,39 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
         Feed feed = getEntity();
         ACTION retentionAction = new ACTION();
         WORKFLOW retentionWorkflow = new WORKFLOW();
-        try {
-            //
-            WORKFLOWAPP retWfApp = createRetentionWorkflow(cluster);
-            retWfApp.setName(wfName);
-            marshal(cluster, retWfApp, wfPath);
-            retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
+        createRetentionWorkflow(cluster, wfPath, wfName);
+        retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
 
-            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+        Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
 
-            org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-            String feedPathMask = getLocationURI(cluster, feed, LocationType.DATA);
-            String metaPathMask = getLocationURI(cluster, feed, LocationType.META);
-            String statsPathMask = getLocationURI(cluster, feed, LocationType.STATS);
-            String tmpPathMask = getLocationURI(cluster, feed, LocationType.TMP);
-
-            StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
-            if (metaPathMask != null) {
-                feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
-            }
-            if (statsPathMask != null) {
-                feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
-            }
-            if (tmpPathMask != null) {
-                feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
-            }
-
-            props.put("feedDataPath", feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{"));
-            props.put("timeZone", feed.getTimezone().getID());
-            props.put("frequency", feed.getFrequency().getTimeUnit().name());
-            props.put("limit", feedCluster.getRetention().getLimit().toString());
-            props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
-            props.put(ARG.feedNames.getPropName(), feed.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-
-            retentionWorkflow.setConfiguration(getCoordConfig(props));
-            retentionAction.setWorkflow(retentionWorkflow);
-            return retentionAction;
-        } catch (IOException e) {
-            throw new FalconException("Unable to create parent/retention workflow", e);
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+        String feedPathMask = getLocationURI(cluster, feed, LocationType.DATA);
+        String metaPathMask = getLocationURI(cluster, feed, LocationType.META);
+        String statsPathMask = getLocationURI(cluster, feed, LocationType.STATS);
+        String tmpPathMask = getLocationURI(cluster, feed, LocationType.TMP);
+
+        StringBuilder feedBasePaths = new StringBuilder(feedPathMask);
+        if (metaPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP).append(metaPathMask);
         }
+        if (statsPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP).append(statsPathMask);
+        }
+        if (tmpPathMask != null) {
+            feedBasePaths.append(FEED_PATH_SEP).append(tmpPathMask);
+        }
+
+        props.put("feedDataPath", feedBasePaths.toString().replaceAll("\\$\\{", "\\?\\{"));
+        props.put("timeZone", feed.getTimezone().getID());
+        props.put("frequency", feed.getFrequency().getTimeUnit().name());
+        props.put("limit", feedCluster.getRetention().getLimit().toString());
+        props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+        props.put(ARG.feedNames.getPropName(), feed.getName());
+        props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+
+        retentionWorkflow.setConfiguration(getCoordConfig(props));
+        retentionAction.setWorkflow(retentionWorkflow);
+        return retentionAction;
     }
 
     private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
@@ -333,13 +326,25 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
     }
 
     private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
-        WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
-        repWFapp.setName(wfName);
-        marshal(cluster, repWFapp, wfPath);
+        try {
+            WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
+            repWFapp.setName(wfName);
+            addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
+            marshal(cluster, repWFapp, wfPath);
+        } catch(IOException e) {
+            throw new FalconException("Unable to create replication workflow", e);
+        }
     }
 
-    private WORKFLOWAPP createRetentionWorkflow(Cluster cluster) throws IOException, FalconException {
-        return getWorkflowTemplate(RETENTION_WF_TEMPLATE);
+    private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
+        try {
+            WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
+            retWfApp.setName(wfName);
+            addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
+            marshal(cluster, retWfApp, wfPath);
+        } catch(IOException e) {
+            throw new FalconException("Unable to create retention workflow", e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 7fbe179..51d3fd9 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -19,11 +19,17 @@ package org.apache.falcon.converter;
 
 import static org.testng.Assert.assertEquals;
 
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.Collection;
 import java.util.List;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.ClusterHelper;
@@ -36,6 +42,10 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -67,21 +77,27 @@ public class OozieFeedMapperTest {
 
         cleanupStore();
 
-        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH);
-        ClusterHelper.getInterface(srcCluster, Interfacetype.WRITE).setEndpoint(srcHdfsUrl);
+        srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
 
-        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH);
-        ClusterHelper.getInterface(trgCluster, Interfacetype.WRITE).setEndpoint(trgHdfsUrl);
+        trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
 
-        feed = (Feed) storeEntity(EntityType.FEED, FEED);
+        feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
 
     }
 
-    protected Entity storeEntity(EntityType type, String path) throws Exception {
+    protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
         Unmarshaller unmarshaller = type.getUnmarshaller();
         Entity entity = (Entity) unmarshaller
-                .unmarshal(OozieFeedMapperTest.class.getResource(path));
+                .unmarshal(OozieFeedMapperTest.class.getResource(template));
         store.publish(type, entity);
+
+        if (type == EntityType.CLUSTER) {
+            Cluster cluster = (Cluster) entity;
+            ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
+            FileSystem fs = new Path(writeEndpoint).getFileSystem(new Configuration());
+            fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
+            fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
+        }
         return entity;
     }
 
@@ -101,12 +117,16 @@ public class OozieFeedMapperTest {
     }
 
     @Test
-    public void testReplicationCoords() throws FalconException {
+    public void testFeedCoords() throws Exception {
         OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
         List<COORDINATORAPP> coords = feedMapper.getCoordinators(trgCluster,
                 new Path("/projects/falcon/"));
+        //Assert retention coord
         COORDINATORAPP coord = coords.get(0);
+        assertLibExtensions(coord, "retention");
 
+        //Assert replication coord
+        coord = coords.get(1);
         Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
         Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
                 .getAction().getWorkflow().getAppPath());
@@ -148,6 +168,32 @@ public class OozieFeedMapperTest {
                 break;
             }
         }
+        assertLibExtensions(coord, "replication");
+    }
 
+    private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) WORKFLOW_JAXB_CONTEXT.createUnmarshaller().unmarshal(
+                trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
+                        + lifecycle + "/ext.jar"));
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/feed/src/test/resources/feed.xml
----------------------------------------------------------------------
diff --git a/feed/src/test/resources/feed.xml b/feed/src/test/resources/feed.xml
index b003016..4da222e 100644
--- a/feed/src/test/resources/feed.xml
+++ b/feed/src/test/resources/feed.xml
@@ -26,12 +26,12 @@
     <late-arrival cut-off="minutes(3)"/>
     <clusters>
         <cluster name="corp1" type="source" delay="minutes(40)">
-            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <validity start="2010-01-01T00:00Z" end="2020-01-01T02:00Z"/>
             <retention limit="minutes(5)" action="delete"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
         </cluster>
         <cluster name="corp2" type="target">
-            <validity start="2010-01-01T00:00Z" end="2010-01-01T02:00Z"/>
+            <validity start="2010-01-01T00:00Z" end="2020-01-01T02:00Z"/>
             <retention limit="minutes(7)" action="delete"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
         </cluster>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/FileSystemExtension.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/FileSystemExtension.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/FileSystemExtension.java
new file mode 100644
index 0000000..beb79f5
--- /dev/null
+++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/FileSystemExtension.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hadoop;
+
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+
+/**
+ * For testing library extensions.
+ */
+public class FileSystemExtension extends DistributedFileSystem {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index 312a71b..00a8504 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -88,6 +88,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-hadoop-dependencies</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-core</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/hadoop-webapp/src/main/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/resources/core-site.xml b/hadoop-webapp/src/main/resources/core-site.xml
index 02c8d42..484e904 100644
--- a/hadoop-webapp/src/main/resources/core-site.xml
+++ b/hadoop-webapp/src/main/resources/core-site.xml
@@ -20,6 +20,11 @@
 
 <configuration>
     <property>
+        <name>fs.fsext.impl</name>
+        <value>org.apache.falcon.hadoop.FileSystemExtension</value>
+    </property>
+
+    <property>
         <name>fs.default.name</name>
         <value>hdfs://localhost:41020</value>
     </property>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index 359365f..4a55bb6 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ExternalId;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
@@ -34,10 +35,12 @@ import org.apache.falcon.oozie.bundle.BUNDLEAPP;
 import org.apache.falcon.oozie.bundle.COORDINATOR;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.ObjectFactory;
+import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.service.FalconPathFilter;
 import org.apache.falcon.service.SharedLibraryHostingService;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -142,15 +145,53 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
         return true;
     }
 
+    private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+        FileStatus[] libs = fs.listStatus(path);
+        if (libs == null) {
+            return;
+        }
+
+        for(FileStatus lib : libs) {
+            if (lib.isDir()) {
+                continue;
+            }
+
+            for(Object obj: wf.getDecisionOrForkOrJoin()) {
+                if (!(obj instanceof ACTION)) {
+                    continue;
+                }
+                ACTION action = (ACTION) obj;
+                List<String> files = null;
+                if (action.getJava() != null) {
+                    files = action.getJava().getFile();
+                } else if (action.getPig() != null) {
+                    files = action.getPig().getFile();
+                } else if (action.getMapReduce() != null) {
+                    files = action.getMapReduce().getFile();
+                }
+                if (files != null) {
+                    files.add(lib.getPath().toString());
+                }
+            }
+        }
+    }
+
+    protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
+        throws IOException {
+        String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+        FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+        addExtensionJars(fs, new Path(libext), wf);
+        addExtensionJars(fs, new Path(libext, type.name()), wf);
+        if (StringUtils.isNotEmpty(lifecycle)) {
+            addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
+        }
+    }
+
     private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
         try {
             Path libPath = new Path(coordPath, "lib");
-            FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
-            if (!fs.exists(libPath)) {
-                fs.mkdirs(libPath);
-            }
-
-            SharedLibraryHostingService.pushLibsToHDFS(libPath.toString(), cluster, FALCON_JAR_FILTER);
+            SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
+                    libPath, cluster, FALCON_JAR_FILTER);
         } catch (IOException e) {
             LOG.error("Failed to copy shared libs on cluster " + cluster.getName(), e);
             throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index b6c4f25..30856f6 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -18,13 +18,14 @@
 
 package org.apache.falcon.service;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -34,6 +35,7 @@ import org.apache.log4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Properties;
 
 /**
  * Host shared libraries in oozie shared lib dir upon creation or modification of cluster.
@@ -66,22 +68,31 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
     };
 
     private void addLibsTo(Cluster cluster) throws FalconException {
-        String libLocation = ClusterHelper.getLocation(cluster, "working") + "/lib";
+        Path lib = new Path(ClusterHelper.getLocation(cluster, "working"), "lib");
+        Path libext = new Path(ClusterHelper.getLocation(cluster, "working"), "libext");
         try {
-            pushLibsToHDFS(libLocation, cluster, NON_FALCON_JAR_FILTER);
+            Properties properties = StartupProperties.get();
+            pushLibsToHDFS(properties.getProperty("system.lib.location"), lib, cluster, NON_FALCON_JAR_FILTER);
+            pushLibsToHDFS(properties.getProperty("libext.paths"), libext, cluster, null);
+            pushLibsToHDFS(properties.getProperty("libext.feed.paths"),
+                    new Path(libext, EntityType.FEED.name()) , cluster, null);
+            pushLibsToHDFS(properties.getProperty("libext.feed.replication.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/replication"), cluster, null);
+            pushLibsToHDFS(properties.getProperty("libext.feed.retention.paths"),
+                    new Path(libext, EntityType.FEED.name() + "/retention"), cluster, null);
+            pushLibsToHDFS(properties.getProperty("libext.process.paths"),
+                    new Path(libext, EntityType.PROCESS.name()) , cluster, null);
         } catch (IOException e) {
             LOG.error("Failed to copy shared libs to cluster " + cluster.getName(), e);
         }
     }
 
-    public static void pushLibsToHDFS(String path, Cluster cluster, FalconPathFilter pathFilter)
+    public static void pushLibsToHDFS(String src, Path target, Cluster cluster, FalconPathFilter pathFilter)
         throws IOException, FalconException {
+        LOG.debug("Copying libs from " + src);
 
-        String localPaths = StartupProperties.get().getProperty("system.lib.location");
-        assert localPaths != null && !localPaths.isEmpty() : "Invalid value for system.lib.location";
-        if (!new File(localPaths).isDirectory()) {
-            throw new FalconException(
-                    localPaths + " configured for system.lib.location doesn't contain any valid libs");
+        if (StringUtils.isEmpty(src)) {
+            return;
         }
 
         Configuration conf = ClusterHelper.getConfiguration(cluster);
@@ -93,26 +104,37 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
             throw new FalconException("Unable to connect to HDFS: "
                     + ClusterHelper.getStorageUrl(cluster));
         }
-        Path clusterPath = new Path(path);
-        if (!fs.exists(clusterPath)) {
-            fs.mkdirs(clusterPath);
+        if (!fs.exists(target)) {
+            fs.mkdirs(target);
         }
 
-        for (File localFile : new File(localPaths).listFiles()) {
-            Path localPath = new Path(localFile.getAbsolutePath());
-            if (!pathFilter.accept(localPath)) {
-                continue;
+        for(String srcPaths : src.split(",")) {
+            File srcFile = new File(srcPaths);
+            File[] srcFiles = new File[] { srcFile };
+            if (srcFile.isDirectory()) {
+                srcFiles = srcFile.listFiles();
             }
 
-            Path clusterFile = new Path(path, pathFilter.getJarName(localPath) + ".jar");
-            if (fs.exists(clusterFile)) {
-                FileStatus fstat = fs.getFileStatus(clusterFile);
-                if (fstat.getLen() == localFile.length()) {
-                    continue;
+            for (File file : srcFiles) {
+                Path path = new Path(file.getAbsolutePath());
+                String jarName = StringUtils.removeEnd(path.getName(), ".jar");
+                if (pathFilter != null) {
+                    if (!pathFilter.accept(path)) {
+                        continue;
+                    }
+                    jarName = pathFilter.getJarName(path);
+                }
+
+                Path targetFile = new Path(target, jarName + ".jar");
+                if (fs.exists(targetFile)) {
+                    FileStatus fstat = fs.getFileStatus(targetFile);
+                    if (fstat.getLen() == file.length()) {
+                        continue;
+                    }
                 }
+                fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), targetFile);
+                LOG.info("Copied " + file.getAbsolutePath() + " to " + targetFile.toString() + " in " + fs.getUri());
             }
-            fs.copyFromLocalFile(false, true, new Path(localFile.getAbsolutePath()), clusterFile);
-            LOG.info("Copied " + localFile.getAbsolutePath() + " to " + path + " in " + fs.getUri());
         }
     }
 
@@ -123,10 +145,11 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
         }
 
         Cluster cluster = (Cluster) entity;
-        String currentColo = DeploymentUtil.getCurrentColo();
-        if (DeploymentUtil.isEmbeddedMode() || currentColo.equals(cluster.getColo())) {
-            addLibsTo(cluster);
+        if (!EntityUtil.responsibleFor(cluster.getColo())) {
+            return;
         }
+
+        addLibsTo(cluster);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d211e21..e5017ef 100644
--- a/pom.xml
+++ b/pom.xml
@@ -910,7 +910,7 @@
                 <configuration>
                     <redirectTestOutputToFile>true</redirectTestOutputToFile>
                     <forkMode>always</forkMode>
-                    <argLine>-Djava.security.krb5.realm= -Djava.security.krb5.kdc=</argLine>
+                    <argLine>-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc=</argLine>
                 </configuration>
             </plugin>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 8f75736..e4441cc 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -335,6 +335,11 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
                                 String wfName, Path wfPath) throws FalconException {
         WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
         wfApp.setName(wfName);
+        try {
+            addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
+        } catch (IOException e) {
+            throw new FalconException("Failed to add library extensions for the workflow", e);
+        }
 
         EngineType engineType = processWorkflow.getEngine();
         for (Object object : wfApp.getDecisionOrForkOrJoin()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 7be41da..6ed0d9c 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -73,6 +73,7 @@ import org.testng.annotations.Test;
 public class OozieProcessMapperTest extends AbstractTestBase {
 
     private String hdfsUrl;
+    private FileSystem fs;
 
     @BeforeClass
     public void setUpDFS() throws Exception {
@@ -88,6 +89,8 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         ConfigurationStore store = ConfigurationStore.get();
         Cluster cluster = store.get(EntityType.CLUSTER, "corp");
         ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+        fs = new Path(hdfsUrl).getFileSystem(new Configuration());
+        fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
 
         Process process = store.get(EntityType.PROCESS, "clicksummary");
         Path wfpath = new Path(process.getWorkflow().getPath());
@@ -155,6 +158,8 @@ public class OozieProcessMapperTest extends AbstractTestBase {
                 break;
             }
         }
+
+        assertLibExtensions(coord);
     }
 
     @Test
@@ -199,6 +204,32 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
     }
 
+    private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        JAXBContext WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+        WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) WORKFLOW_JAXB_CONTEXT.createUnmarshaller().unmarshal(
+                fs.open(new Path(wfPath, "workflow.xml")))).getValue();
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            List<String> files = null;
+            if (action.getJava() != null) {
+                files = action.getJava().getFile();
+            } else if (action.getPig() != null) {
+                files = action.getPig().getFile();
+            } else if (action.getMapReduce() != null) {
+                files = action.getMapReduce().getFile();
+            }
+            if (files != null) {
+                Assert.assertTrue(files.get(files.size() - 1)
+                        .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
+            }
+        }
+    }
+
     private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
         throws Exception {
         Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/rerun/pom.xml
----------------------------------------------------------------------
diff --git a/rerun/pom.xml b/rerun/pom.xml
index e054d22..a5d1d8c 100644
--- a/rerun/pom.xml
+++ b/rerun/pom.xml
@@ -32,6 +32,41 @@
     <name>Apache Falcon Rerun</name>
     <packaging>jar</packaging>
 
+    <profiles>
+        <profile>
+            <id>hadoop-1</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>hadoop.profile</name>
+                    <value>1</value>
+                </property>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-core</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <property>
+                    <name>hadoop.profile</name>
+                    <value>2</value>
+                </property>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.falcon</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index c8995b7..a0c7bb7 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -148,11 +148,12 @@ public class FeedEvictor extends Configured implements Tool {
 
     private void logInstancePaths(Path path) throws IOException {
         LOG.info("Writing deleted instances to path " + path);
-        OutputStream out = fs.create(path);
+        FileSystem logfs = path.getFileSystem(getConf());
+        OutputStream out = logfs.create(path);
         out.write(instancePaths.toString().getBytes());
         out.close();
         if (LOG.isDebugEnabled()) {
-            debug(path);
+            debug(logfs, path);
         }
     }
 
@@ -275,9 +276,9 @@ public class FeedEvictor extends Configured implements Tool {
         return fs.delete(path, true);
     }
 
-    private void debug(Path outPath) throws IOException {
+    private void debug(FileSystem myfs, Path outPath) throws IOException {
         ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(outPath);
+        InputStream instance = myfs.open(outPath);
         IOUtils.copyBytes(instance, writer, 4096, true);
         LOG.debug("Instance Paths copied to " + outPath);
         LOG.debug("Written " + writer);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/src/bin/service-start.sh
----------------------------------------------------------------------
diff --git a/src/bin/service-start.sh b/src/bin/service-start.sh
index 5ebcbdf..f72e723 100755
--- a/src/bin/service-start.sh
+++ b/src/bin/service-start.sh
@@ -13,8 +13,6 @@
 #  limitations under the License. See accompanying LICENSE file.
 #
 
-set -e
-
 # resolve links - $0 may be a softlink
 PRG="${0}"
 
@@ -56,6 +54,10 @@ for i in "${BASEDIR}/server/webapp/$APP_TYPE/WEB-INF/lib/"*.jar; do
   FALCONCPPATH="${FALCONCPPATH}:$i"
 done
 
+for i in "${BASEDIR}/libext/"*.jar; do
+  FALCONCPPATH="${FALCONCPPATH}:$i"
+done
+
 HADOOPDIR=`which hadoop`
 if [ "$HADOOPDIR" != "" ]; then
   echo "Hadoop is installed, adding hadoop classpath to falcon classpath"

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index ffeeb46..a3e12c9 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -357,6 +357,10 @@
                             <name>system.lib.location</name>
                             <value>${project.build.directory}/dependency</value>
                         </systemProperty>
+                        <systemProperty>
+                            <name>falcon.libext</name>
+                            <value>${project.build.directory}/../../hadoop-dependencies/target/falcon-hadoop-dependencies-${project.version}.jar</value>
+                        </systemProperty>
                     </systemProperties>
                     <stopKey>falcon-stop</stopKey>
                     <stopPort>41001</stopPort>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index 803e7f6..3cebb60 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -17,9 +17,7 @@
  */
 package org.apache.falcon.resource;
 
-import java.io.File;
-import java.io.InputStream;
-import java.io.StringReader;
+import java.io.*;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -35,13 +33,14 @@ import javax.ws.rs.core.Response;
 import javax.xml.bind.JAXBException;
 
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.*;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.util.BuildProperties;
 import org.apache.falcon.util.DeploymentProperties;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
@@ -68,6 +67,38 @@ public class EntityManagerJerseyIT {
         TestContext.prepare();
     }
 
+    private void assertLibs(FileSystem fs, Path path) throws IOException {
+        FileStatus[] libs = fs.listStatus(path);
+        Assert.assertNotNull(libs);
+        Assert.assertEquals(libs.length, 1);
+        Assert.assertTrue(libs[0].getPath().getName().startsWith("falcon-hadoop-dependencies"));
+    }
+
+    @Test
+    public void testLibExtensions() throws Exception {
+        TestContext context = newContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        ClientResponse response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+        FileSystem fs = context.getCluster().getFileSystem();
+        assertLibs(fs, new Path("/project/falcon/working/libext/FEED/retention"));
+        assertLibs(fs, new Path("/project/falcon/working/libext/PROCESS"));
+
+        String tmpFileName = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(new File(tmpFileName));
+        Location location = new Location();
+        location.setPath("fsext://localhost:41020/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}");
+        location.setType(LocationType.DATA);
+        Cluster cluster = feed.getClusters().getClusters().get(0);
+        cluster.setLocations(new Locations());
+        feed.getClusters().getClusters().get(0).getLocations().getLocations().add(location);
+
+        File tmpFile = context.getTempFile();
+        EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
+        response = context.submitAndSchedule(tmpFileName, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+    }
+
     @Test
     public void testUpdateCheckUser() throws Exception {
         TestContext context = newContext();
@@ -98,7 +129,7 @@ public class EntityManagerJerseyIT {
         EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
         response = context.service.path("api/entities/update/feed/"
                 + context.outputFeedName).header("Remote-User",
-                "testuser").accept(MediaType.TEXT_XML)
+                TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
                 .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
         context.assertSuccessful(response);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index f762c4b..af6f22c 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -273,7 +273,7 @@ public class TestContext {
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
 
         return this.service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase())
-                .header("Remote-User", "testuser").accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+                .header("Remote-User", TestContext.REMOTE_USER).accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
                 .post(ClientResponse.class, rawlogStream);
     }
 
@@ -296,7 +296,7 @@ public class TestContext {
         ServletInputStream rawlogStream = getServletInputStream(tmpFile);
 
         return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User",
-                "testuser")
+                TestContext.REMOTE_USER)
                 .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0dc6c6c2/webapp/src/test/resources/feed-template1.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/feed-template1.xml b/webapp/src/test/resources/feed-template1.xml
index 8d0c5e1..912e443 100644
--- a/webapp/src/test/resources/feed-template1.xml
+++ b/webapp/src/test/resources/feed-template1.xml
@@ -29,7 +29,7 @@
 
     <clusters>
         <cluster name="##cluster##" type="source">
-            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
+            <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/>
             <retention limit="hours(24)" action="delete"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
         </cluster>


Mime
View raw message