falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [2/2] incubator-falcon git commit: FALCON-943 process update copying user lib is very slow. Contributed by Shwetha GS
Date Tue, 23 Dec 2014 07:01:52 GMT
FALCON-943 process update copying user lib is very slow. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/0bc6aef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/0bc6aef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/0bc6aef5

Branch: refs/heads/master
Commit: 0bc6aef5afc9a6f4e79c964937625d57b6adfede
Parents: c1ac6e6
Author: shwethags <shwetha.gs@inmobi.com>
Authored: Tue Dec 23 12:31:14 2014 +0530
Committer: shwethags <shwetha.gs@inmobi.com>
Committed: Tue Dec 23 12:31:32 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/falcon/cli/FalconCLI.java   |   7 +-
 .../org/apache/falcon/client/FalconClient.java  |  11 +-
 .../org/apache/falcon/entity/EntityUtil.java    |   3 -
 .../apache/falcon/entity/FileSystemStorage.java |  10 -
 .../org/apache/falcon/entity/ProcessHelper.java |  42 ---
 .../org/apache/falcon/update/UpdateHelper.java  | 111 -------
 .../workflow/engine/AbstractWorkflowEngine.java |   3 +-
 .../apache/falcon/update/UpdateHelperTest.java  |  68 -----
 docs/src/site/twiki/FalconCLI.twiki             |   3 +-
 docs/src/site/twiki/FalconDocumentation.twiki   |  12 +-
 docs/src/site/twiki/restapi/EntityUpdate.twiki  |   5 +-
 .../src/main/conf/oozie-site.xml                | 209 +++++++++++++
 .../src/test/resources/oozie-site.xml           |  21 +-
 .../apache/falcon/oozie/OozieBundleBuilder.java |  17 +-
 .../falcon/oozie/OozieCoordinatorBuilder.java   |   5 -
 .../apache/falcon/oozie/OozieEntityBuilder.java |  20 +-
 .../OozieOrchestrationWorkflowBuilder.java      |   5 -
 .../falcon/oozie/feed/FeedBundleBuilder.java    |   7 +-
 .../process/HiveProcessWorkflowBuilder.java     |   5 +-
 .../process/OozieProcessWorkflowBuilder.java    |   4 +-
 .../process/PigProcessWorkflowBuilder.java      |   5 +-
 .../oozie/process/ProcessBundleBuilder.java     |  47 +--
 .../ProcessExecutionWorkflowBuilder.java        |   8 +-
 .../apache/falcon/workflow/LateDataHandler.java | 292 ++++++++++++++++++
 .../workflow/engine/OozieWorkflowEngine.java    |  42 ++-
 .../src/main/resources/action/post-process.xml  |  10 +-
 oozie/src/main/resources/action/pre-process.xml |   2 +-
 .../OozieProcessWorkflowBuilderTest.java        |  17 +-
 .../falcon/resource/AbstractEntityManager.java  |  17 +-
 .../proxy/SchedulableEntityManagerProxy.java    |   8 +-
 .../apache/falcon/latedata/LateDataHandler.java | 293 -------------------
 .../falcon/rerun/handler/LateRerunConsumer.java |   2 +-
 src/main/assemblies/distributed-package.xml     |   5 +
 src/main/assemblies/standalone-package.xml      |   5 +
 src/main/examples/data/generate.sh              |   2 +-
 .../src/main/java/org/apache/falcon/Debug.java  | 113 -------
 .../falcon/resource/ConfigSyncService.java      |   5 +-
 .../java/org/apache/falcon/cli/FalconCLIIT.java |   2 +-
 .../apache/falcon/late/LateDataHandlerIT.java   |   2 +-
 .../falcon/resource/EntityManagerJerseyIT.java  |  22 --
 41 files changed, 613 insertions(+), 856 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cb2edf5..5d529b9 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@ Trunk (Unreleased)
    Seetharam)
 
   OPTIMIZATIONS
+   FALCON-943 process update copying user lib is very slow. (Shwetha G S)
+
    FALCON-419 Update deprecated HCatalog API to use Hive Metastore API.
    (Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 93776d3..5797bbe 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -97,7 +97,6 @@ public class FalconCLI {
     public static final String INSTANCE_CMD = "instance";
     public static final String START_OPT = "start";
     public static final String END_OPT = "end";
-    public static final String EFFECTIVE_OPT = "effective";
     public static final String RUNNING_OPT = "running";
     public static final String KILL_OPT = "kill";
     public static final String RERUN_OPT = "rerun";
@@ -371,7 +370,6 @@ public class FalconCLI {
         String cluster = commandLine.getOptionValue(CLUSTER_OPT);
         String start = commandLine.getOptionValue(START_OPT);
         String end = commandLine.getOptionValue(END_OPT);
-        String time = commandLine.getOptionValue(EFFECTIVE_OPT);
         String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
         String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
         String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
@@ -394,8 +392,7 @@ public class FalconCLI {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
             validateNotEmpty(entityName, ENTITY_NAME_OPT);
-            Date effectiveTime = parseDateString(time);
-            result = client.update(entityType, entityName, filePath, effectiveTime).getMessage();
+            result = client.update(entityType, entityName, filePath).getMessage();
         } else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
             validateNotEmpty(filePath, "file");
             validateColo(optionsList);
@@ -646,7 +643,6 @@ public class FalconCLI {
         Option colo = new Option(COLO_OPT, true, "Colo name");
         Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
         colo.setRequired(false);
-        Option effective = new Option(EFFECTIVE_OPT, true, "Effective time for update");
         Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
         Option filterBy = new Option(FILTER_BY_OPT, true,
                 "Filter returned entities by the specified status");
@@ -670,7 +666,6 @@ public class FalconCLI {
         entityOptions.addOption(cluster);
         entityOptions.addOption(start);
         entityOptions.addOption(end);
-        entityOptions.addOption(effective);
         entityOptions.addOption(fields);
         entityOptions.addOption(filterBy);
         entityOptions.addOption(filterTags);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 23c8943..5c476ae 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.falcon.client;
 
-import org.apache.falcon.entity.v0.Entity;
-
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
@@ -30,8 +28,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.net.util.TrustManagerUtils;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.cli.FalconMetadataCLI;
+import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.recipe.RecipeTool;
 import org.apache.falcon.recipe.RecipeToolArgs;
 import org.apache.falcon.resource.APIResult;
@@ -43,6 +41,7 @@ import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
+
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLContext;
@@ -64,7 +63,6 @@ import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.security.SecureRandom;
-import java.util.Date;
 import java.util.List;
 import java.util.Properties;
 
@@ -324,14 +322,11 @@ public class FalconClient {
                 entityStream, null);
     }
 
-    public APIResult update(String entityType, String entityName, String filePath, Date effectiveTime)
+    public APIResult update(String entityType, String entityName, String filePath)
         throws FalconCLIException {
         InputStream entityStream = getServletInputStream(filePath);
         Entities operation = Entities.UPDATE;
         WebResource resource = service.path(operation.path).path(entityType).path(entityName);
-        if (effectiveTime != null) {
-            resource = resource.queryParam("effective", SchemaHelper.formatDateUTC(effectiveTime));
-        }
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
                 .accept(operation.mimeType).type(MediaType.TEXT_XML)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 59e43fb..1a5d30c 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -65,9 +65,6 @@ public final class EntityUtil {
     private static final long DAY_IN_MS = 86400000L;
     private static final long MONTH_IN_MS = 2592000000L;
 
-    public static final String PROCESS_CHECKSUM_FILE = "checksums";
-    public static final String PROCESS_USER_DIR = "user";
-    public static final String PROCESS_USERLIB_DIR = "userlib";
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
     private EntityUtil() {}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index bbe274b..fe93048 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -260,16 +260,6 @@ public class FileSystemStorage extends Configured implements Storage {
         return null;
     }
 
-    public static Properties getFeedProperties(Feed feed) {
-        Properties feedProperties = new Properties();
-        if (feed.getProperties() != null) {
-            for (org.apache.falcon.entity.v0.feed.Property property : feed.getProperties().getProperties()) {
-                feedProperties.put(property.getName(), property.getValue());
-            }
-        }
-        return feedProperties;
-    }
-
     @Override
     public void validateACL(AccessControlList acl) throws FalconException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 174f8f6..504509d 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -26,11 +26,6 @@ import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
 
 /**
  * Helper methods for accessing process members.
@@ -82,41 +77,4 @@ public final class ProcessHelper {
 
         return storageType;
     }
-
-    public static Path getUserWorkflowPath(Process process, org.apache.falcon.entity.v0.cluster.Cluster cluster,
-        Path buildPath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-            Path wfPath = new Path(process.getWorkflow().getPath());
-            if (fs.isFile(wfPath)) {
-                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
-            } else {
-                return new Path(buildPath.getParent(), EntityUtil.PROCESS_USER_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get workflow path", e);
-        }
-    }
-
-    public static Path getUserLibPath(Process process, org.apache.falcon.entity.v0.cluster.Cluster cluster,
-        Path buildPath) throws FalconException {
-        try {
-            String userLibPath = process.getWorkflow().getLib();
-            if (StringUtils.isEmpty(userLibPath)) {
-                return null;
-            }
-            Path libPath = new Path(userLibPath);
-
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-            if (fs.isFile(libPath)) {
-                return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
-            } else {
-                return new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR);
-            }
-        } catch(IOException e) {
-            throw new FalconException("Failed to get user lib path", e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 5a86ae3..d0af011 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -20,32 +20,20 @@ package org.apache.falcon.update;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Helper methods to facilitate entity updates.
@@ -87,105 +75,6 @@ public final class UpdateHelper {
         throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType());
     }
 
-    //Read checksum file
-    private static Map<String, String> readChecksums(FileSystem fs, Path oldStagingPath) throws FalconException {
-        try {
-            Map<String, String> checksums = new HashMap<String, String>();
-            BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(oldStagingPath)));
-            try {
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    String[] parts = line.split("=");
-                    checksums.put(parts[0], parts[1]);
-                }
-            } finally {
-                reader.close();
-            }
-            return checksums;
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
-    //Checks if the user workflow or lib is updated
-    public static boolean isWorkflowUpdated(String cluster, Entity entity,
-                                            Path bundleAppPath) throws FalconException {
-        if (entity.getEntityType() != EntityType.PROCESS) {
-            return false;
-        }
-
-        if (bundleAppPath == null) {
-            return true;
-        }
-
-        try {
-            Process process = (Process) entity;
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-            Path checksum = new Path(bundleAppPath, EntityUtil.PROCESS_CHECKSUM_FILE);
-            Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
-            if (!fs.exists(checksum)) {
-                //Update if there is no checksum file(for backward compatibility)
-                return true;
-            }
-            Map<String, String> checksums = readChecksums(fs, checksum);
-
-            //Get checksum from user wf/lib
-            Map<String, String> wfPaths = checksumAndCopy(fs, new Path(process.getWorkflow().getPath()), null);
-            if (StringUtils.isNotEmpty(process.getWorkflow().getLib())) {
-                wfPaths.putAll(checksumAndCopy(fs, new Path(process.getWorkflow().getLib()), null));
-            }
-
-            //Update if the user wf/lib is updated i.e., if checksums are different
-            return !wfPaths.equals(checksums);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
-    /**
-     * Recursively traverses each file and tracks checksum. If dest != null, each traversed file is copied to dest
-     * @param fs FileSystem
-     * @param src file/directory
-     * @param dest directory always
-     * @return checksums
-     * @throws FalconException
-     */
-    public static Map<String, String> checksumAndCopy(FileSystem fs, Path src, Path dest) throws FalconException {
-        try {
-            Configuration conf = new Configuration();
-            Map<String, String> paths = new HashMap<String, String>();
-            if (dest != null && !fs.exists(dest) && !fs.mkdirs(dest)) {
-                throw new FalconException("mkdir failed on " + dest);
-            }
-
-            if (fs.isFile(src)) {
-                paths.put(src.toString(), fs.getFileChecksum(src).toString());
-                if (dest != null) {
-                    Path target = new Path(dest, src.getName());
-                    FileUtil.copy(fs, src, fs, target, false, conf);
-                    LOG.debug("Copied {} to {}", src, target);
-                }
-            } else {
-                FileStatus[] files = fs.listStatus(src);
-                if (files != null) {
-                    for (FileStatus file : files) {
-                        if (fs.isFile(file.getPath())) {
-                            paths.putAll(checksumAndCopy(fs, file.getPath(), dest));
-                        } else {
-                            paths.putAll(checksumAndCopy(fs, file.getPath(),
-                                    ((dest == null) ? null : new Path(dest, file.getPath().getName()))));
-                        }
-                    }
-                }
-            }
-            return paths;
-        } catch(IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
     public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity, String cluster)
         throws FalconException {
         if (oldEntity.getEntityType() == EntityType.FEED && affectedEntity.getEntityType() == EntityType.PROCESS) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index ae158f7..bbf5a30 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -88,8 +88,7 @@ public abstract class AbstractWorkflowEngine {
     public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
                                                       List<LifeCycle> lifeCycles) throws FalconException;
 
-    public abstract String update(Entity oldEntity, Entity newEntity, String cluster, Date effectiveTime)
-        throws FalconException;
+    public abstract String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException;
 
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index ef0d769..73791d3 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -41,8 +41,6 @@ import org.apache.falcon.entity.v0.feed.Properties;
 import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.process.PolicyType;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -94,72 +92,6 @@ public class UpdateHelperTest extends AbstractTestBase {
     }
 
     @Test
-    public void testCopyUserWorkflow() throws Exception {
-        FileSystem fs = dfsCluster.getFileSystem();
-        Path basePath = new Path("/tmp/basepath");
-        fs.mkdirs(basePath);
-        Path wfdir = new Path(basePath, "workflow");
-        fs.mkdirs(wfdir);
-        Path wf = new Path(wfdir, "workflow.xml");
-        Path lib = new Path(wfdir, "common.jar");
-        fs.create(wf).close();
-        fs.create(lib).close();
-        Path dest = new Path("/tmp/dest");
-        UpdateHelper.checksumAndCopy(fs, wfdir, dest);
-        Assert.assertTrue(fs.exists(new Path(dest, "workflow.xml")) && fs.isFile(new Path(dest, "workflow.xml")));
-        Assert.assertTrue(fs.exists(new Path(dest, "common.jar")) && fs.isFile(new Path(dest, "common.jar")));
-
-        fs.delete(dest, true);
-        UpdateHelper.checksumAndCopy(fs, wf, dest);
-        Assert.assertTrue(fs.exists(new Path(dest, "workflow.xml")) && fs.isFile(new Path(dest, "workflow.xml")));
-    }
-
-    @Test
-    public void testIsWorkflowUpdated() throws IOException, FalconException {
-        FileSystem fs = dfsCluster.getFileSystem();
-        Process process = processParser.parseAndValidate(this.getClass().getResourceAsStream(PROCESS_XML));
-        String cluster = "testCluster";
-        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-        Path staging = EntityUtil.getNewStagingPath(clusterEntity, process);
-        fs.mkdirs(staging);
-        fs.create(new Path(staging, "workflow.xml")).close();
-
-        //Update if there is no checksum file
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
-
-        //No update if there is no new file
-        fs.create(new Path(staging, "checksums")).close();
-        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
-
-        //Update if there is new lib
-        Path libpath = new Path("/falcon/test/lib");
-        process.getWorkflow().setLib(libpath.toString());
-        fs.mkdirs(libpath);
-        Path lib = new Path(libpath, "new.jar");
-        fs.create(lib).close();
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
-
-        //Don't Update if the lib is not updated
-        fs.delete(new Path(staging, "checksums"), true);
-        FSDataOutputStream stream = fs.create(new Path(staging, "checksums"));
-        stream.write((dfsCluster.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY)
-                + lib.toString() + "=" + fs.getFileChecksum(lib).toString() + "\n").getBytes());
-        stream.close();
-        Assert.assertFalse(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
-
-        //Update if the lib is updated
-        fs.delete(lib, true);
-        stream = fs.create(lib);
-        stream.writeChars("some new jar");
-        stream.close();
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
-
-        //Update if the lib is deleted
-        fs.delete(lib, true);
-        Assert.assertTrue(UpdateHelper.isWorkflowUpdated(cluster, process, staging));
-    }
-
-    @Test
     public void testIsEntityUpdated() throws Exception {
         Feed oldFeed = parser.parseAndValidate(this.getClass().getResourceAsStream(FEED_XML));
         String cluster = "testCluster";

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index 202af63..d8199dd 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -75,8 +75,7 @@ Update operation allows an already submitted/scheduled entity to be updated. Clu
 not allowed.
 
 Usage:
-$FALCON_HOME/bin/falcon entity  -type [feed|process] -name <<name>> -update [-effective <<effective time>>] -file
-<<path_to_file>>
+$FALCON_HOME/bin/falcon entity  -type [feed|process] -name <<name>> -update -file <<path_to_file>>
 
 Example:
 $FALCON_HOME/bin/falcon entity -type process -name HourlyReportsGenerator -update -file /process/definition.xml

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/docs/src/site/twiki/FalconDocumentation.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconDocumentation.twiki b/docs/src/site/twiki/FalconDocumentation.twiki
index 0d2a692..c374966 100644
--- a/docs/src/site/twiki/FalconDocumentation.twiki
+++ b/docs/src/site/twiki/FalconDocumentation.twiki
@@ -202,20 +202,12 @@ no dependent entities on the deleted entity.
 ---+++ Update
 Update operation allows an already submitted/scheduled entity to be updated. Cluster update is currently
 not allowed. Feed update can cause cascading update to all the processes already scheduled. Process update triggers
-update in falcon if entity is updated/the user specified workflow/lib is updated. The following set of actions are
-performed in Oozie to realize an update:
-   * Suspend the previously scheduled Oozie coordinator. This is to prevent any new action from being triggered.
-   * Update the coordinator to set the end time to "now"
-   * Resume the suspended coordinators
+update in falcon if entity is updated. The following set of actions are performed in scheduler to realize an update:
+   * Update the old scheduled entity to set the end time to "now"
    * Schedule as per the new process/feed definition with the start time as "now"
 
-Update optionally takes effective time as a parameter which is used as the end time of previously scheduled coordinator.
-So, the updated configuration will be effective since the given timestamp.
-
-
 ---++ Instance Management actions
 
-
 Instance Manager gives user the option to control individual instances of the process based on their instance start time (start time of that instance). Start time needs to be given in standard TZ format. Example: 01 Jan 2012 01:00 => 2012-01-01T01:00Z
 
 All the instance management operations (except running) allow single instance or list of instance within a Date range to be acted on. Make sure the dates are valid. i.e. are within the start and end time of process itself. 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/docs/src/site/twiki/restapi/EntityUpdate.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityUpdate.twiki b/docs/src/site/twiki/restapi/EntityUpdate.twiki
index f2c2e7e..0e5f56f 100644
--- a/docs/src/site/twiki/restapi/EntityUpdate.twiki
+++ b/docs/src/site/twiki/restapi/EntityUpdate.twiki
@@ -10,7 +10,6 @@ Updates the submitted entity.
 ---++ Parameters
    * :entity-type can be feed or process.
    * :entity-name is name of the feed or process.
-   * :effective is optional effective time
 
 ---++ Results
 Result of the validation.
@@ -18,7 +17,7 @@ Result of the validation.
 ---++ Examples
 ---+++ Rest Call
 <verbatim>
-POST http://localhost:15000/api/entities/update/process/SampleProcess?effective=2014-01-01T00:00Z
+POST http://localhost:15000/api/entities/update/process/SampleProcess
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday -->
 <process xmlns="uri:falcon:process:0.1" name="SampleProcess" >
@@ -58,7 +57,7 @@ POST http://localhost:15000/api/entities/update/process/SampleProcess?effective=
 <verbatim>
 {
     "requestId": "update\/default\/d6aaa328-6836-4818-a212-515bb43d8b86\n\n",
-    "message": "update\/default\/SampleProcess updated successfully with effective time [(local/2014-01-01T00:00Z)]\n\n",
+    "message": "update\/default\/SampleProcess updated successfully\n\n",
     "status": "SUCCEEDED"
 }
 </verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie-el-extensions/src/main/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/main/conf/oozie-site.xml b/oozie-el-extensions/src/main/conf/oozie-site.xml
new file mode 100644
index 0000000..0925b41
--- /dev/null
+++ b/oozie-el-extensions/src/main/conf/oozie-site.xml
@@ -0,0 +1,209 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+<configuration>
+
+    <!--
+    Refer to the oozie-default.xml file for the complete list of
+    Oozie configuration properties and their default values.
+    -->
+
+    <!-- Proxyuser Configuration -->
+
+    <!--
+
+    <property>
+        <name>oozie.service.ProxyUserService.proxyuser.#USER#.hosts</name>
+        <value>*</value>
+        <description>
+            List of hosts the '#USER#' user is allowed to perform 'doAs'
+            operations.
+
+            The '#USER#' must be replaced with the username o the user who is
+            allowed to perform 'doAs' operations.
+
+            The value can be the '*' wildcard or a list of hostnames.
+
+            For multiple users copy this property and replace the user name
+            in the property name.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.service.ProxyUserService.proxyuser.#USER#.groups</name>
+        <value>*</value>
+        <description>
+            List of groups the '#USER#' user is allowed to impersonate users
+            from to perform 'doAs' operations.
+
+            The '#USER#' must be replaced with the username o the user who is
+            allowed to perform 'doAs' operations.
+
+            The value can be the '*' wildcard or a list of groups.
+
+            For multiple users copy this property and replace the user name
+            in the property name.
+        </description>
+    </property>
+    -->
+
+    <!-- Oozie EL Extension configurations for falcon -->
+    <property>
+        <name>oozie.service.ELService.ext.functions.coord-job-submit-instances</name>
+        <value>
+            now=org.apache.oozie.extensions.OozieELExtensions#ph1_now_echo,
+            today=org.apache.oozie.extensions.OozieELExtensions#ph1_today_echo,
+            yesterday=org.apache.oozie.extensions.OozieELExtensions#ph1_yesterday_echo,
+            currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_currentWeek_echo,
+            lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_lastWeek_echo,
+            currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_currentMonth_echo,
+            lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_lastMonth_echo,
+            currentYear=org.apache.oozie.extensions.OozieELExtensions#ph1_currentYear_echo,
+            lastYear=org.apache.oozie.extensions.OozieELExtensions#ph1_lastYear_echo,
+            formatTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_formatTime_echo,
+            latest=org.apache.oozie.coord.CoordELFunctions#ph2_coord_latest_echo,
+            future=org.apache.oozie.coord.CoordELFunctions#ph2_coord_future_echo
+        </value>
+        <description>
+            EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
+            This property is a convenience property to add extensions to the built in
+            executors without having to
+            include all the built in ones.
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.ELService.ext.functions.coord-action-create-inst</name>
+        <value>
+            now=org.apache.oozie.extensions.OozieELExtensions#ph2_now_inst,
+            today=org.apache.oozie.extensions.OozieELExtensions#ph2_today_inst,
+            yesterday=org.apache.oozie.extensions.OozieELExtensions#ph2_yesterday_inst,
+            currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_currentWeek_inst,
+            lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_lastWeek_inst,
+            currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_currentMonth_inst,
+            lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_lastMonth_inst,
+            currentYear=org.apache.oozie.extensions.OozieELExtensions#ph2_currentYear_inst,
+            lastYear=org.apache.oozie.extensions.OozieELExtensions#ph2_lastYear_inst,
+            latest=org.apache.oozie.coord.CoordELFunctions#ph2_coord_latest_echo,
+            future=org.apache.oozie.coord.CoordELFunctions#ph2_coord_future_echo,
+            formatTime=org.apache.oozie.coord.CoordELFunctions#ph2_coord_formatTime,
+            user=org.apache.oozie.coord.CoordELFunctions#coord_user
+        </value>
+        <description>
+            EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
+            This property is a convenience property to add extensions to the built in
+            executors without having to
+            include all the built in ones.
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.ELService.ext.functions.coord-action-create</name>
+        <value>
+            now=org.apache.oozie.extensions.OozieELExtensions#ph2_now,
+            today=org.apache.oozie.extensions.OozieELExtensions#ph2_today,
+            yesterday=org.apache.oozie.extensions.OozieELExtensions#ph2_yesterday,
+            currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_currentWeek,
+            lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_lastWeek,
+            currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_currentMonth,
+            lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_lastMonth,
+            currentYear=org.apache.oozie.extensions.OozieELExtensions#ph2_currentYear,
+            lastYear=org.apache.oozie.extensions.OozieELExtensions#ph2_lastYear,
+            latest=org.apache.oozie.coord.CoordELFunctions#ph2_coord_latest_echo,
+            future=org.apache.oozie.coord.CoordELFunctions#ph2_coord_future_echo,
+            formatTime=org.apache.oozie.coord.CoordELFunctions#ph2_coord_formatTime,
+            user=org.apache.oozie.coord.CoordELFunctions#coord_user
+        </value>
+        <description>
+            EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
+            This property is a convenience property to add extensions to the built in
+            executors without having to
+            include all the built in ones.
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.ELService.ext.functions.coord-job-submit-data</name>
+        <value>
+            now=org.apache.oozie.extensions.OozieELExtensions#ph1_now_echo,
+            today=org.apache.oozie.extensions.OozieELExtensions#ph1_today_echo,
+            yesterday=org.apache.oozie.extensions.OozieELExtensions#ph1_yesterday_echo,
+            currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_currentWeek_echo,
+            lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph1_lastWeek_echo,
+            currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_currentMonth_echo,
+            lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph1_lastMonth_echo,
+            currentYear=org.apache.oozie.extensions.OozieELExtensions#ph1_currentYear_echo,
+            lastYear=org.apache.oozie.extensions.OozieELExtensions#ph1_lastYear_echo,
+            dataIn=org.apache.oozie.extensions.OozieELExtensions#ph1_dataIn_echo,
+            instanceTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_nominalTime_echo_wrap,
+            formatTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_formatTime_echo,
+            dateOffset=org.apache.oozie.coord.CoordELFunctions#ph1_coord_dateOffset_echo,
+            user=org.apache.oozie.coord.CoordELFunctions#coord_user
+        </value>
+        <description>
+            EL constant declarations, separated by commas, format is [PREFIX:]NAME=CLASS#CONSTANT.
+            This property is a convenience property to add extensions to the built in
+            executors without having to
+            include all the built in ones.
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.ELService.ext.functions.coord-action-start</name>
+        <value>
+            now=org.apache.oozie.extensions.OozieELExtensions#ph2_now,
+            today=org.apache.oozie.extensions.OozieELExtensions#ph2_today,
+            yesterday=org.apache.oozie.extensions.OozieELExtensions#ph2_yesterday,
+            currentWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_currentWeek,
+            lastWeek=org.apache.oozie.extensions.OozieELExtensions#ph2_lastWeek,
+            currentMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_currentMonth,
+            lastMonth=org.apache.oozie.extensions.OozieELExtensions#ph2_lastMonth,
+            currentYear=org.apache.oozie.extensions.OozieELExtensions#ph2_currentYear,
+            lastYear=org.apache.oozie.extensions.OozieELExtensions#ph2_lastYear,
+            latest=org.apache.oozie.coord.CoordELFunctions#ph3_coord_latest,
+            future=org.apache.oozie.coord.CoordELFunctions#ph3_coord_future,
+            dataIn=org.apache.oozie.extensions.OozieELExtensions#ph3_dataIn,
+            instanceTime=org.apache.oozie.coord.CoordELFunctions#ph3_coord_nominalTime,
+            dateOffset=org.apache.oozie.coord.CoordELFunctions#ph3_coord_dateOffset,
+            formatTime=org.apache.oozie.coord.CoordELFunctions#ph3_coord_formatTime,
+            user=org.apache.oozie.coord.CoordELFunctions#coord_user
+        </value>
+        <description>
+            EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
+            This property is a convenience property to add extensions to the built in
+            executors without having to
+            include all the built in ones.
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.ELService.ext.functions.coord-sla-submit</name>
+        <value>
+            instanceTime=org.apache.oozie.coord.CoordELFunctions#ph1_coord_nominalTime_echo_fixed,
+            user=org.apache.oozie.coord.CoordELFunctions#coord_user
+        </value>
+        <description>
+            EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
+        </description>
+    </property>
+    <property>
+        <name>oozie.service.ELService.ext.functions.coord-sla-create</name>
+        <value>
+            instanceTime=org.apache.oozie.coord.CoordELFunctions#ph2_coord_nominalTime,
+            user=org.apache.oozie.coord.CoordELFunctions#coord_user
+        </value>
+        <description>
+            EL functions declarations, separated by commas, format is [PREFIX:]NAME=CLASS#METHOD.
+        </description>
+    </property>
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie-el-extensions/src/test/resources/oozie-site.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/test/resources/oozie-site.xml b/oozie-el-extensions/src/test/resources/oozie-site.xml
index a106aa6..ba0333a 100644
--- a/oozie-el-extensions/src/test/resources/oozie-site.xml
+++ b/oozie-el-extensions/src/test/resources/oozie-site.xml
@@ -1,9 +1,20 @@
 <?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding 
-    copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may 
-    obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed 
-    on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the 
-    License. -->
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
 <configuration>
 
     <!-- Refer to the oozie-default.xml file for the complete list of Oozie configuration properties and their default values. -->

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index c73401a..de11df6 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -58,7 +58,8 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         super(entity);
     }
 
-    @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+    @Override
+    public Properties build(Cluster cluster, Path buildPath) throws FalconException {
         String clusterName = cluster.getName();
         if (EntityUtil.getStartTime(entity, clusterName).compareTo(EntityUtil.getEndTime(entity, clusterName)) >= 0) {
             LOG.info("process validity start <= end for cluster {}. Skipping schedule", clusterName);
@@ -92,6 +93,12 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
         properties.setProperty(OozieClient.BUNDLE_APP_PATH, getStoragePath(buildPath));
         properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
 
+        //Add libpath
+        String libPath = getLibPath(buildPath);
+        if (libPath != null) {
+            properties.put(OozieClient.LIBPATH, getStoragePath(libPath));
+        }
+
         return properties;
     }
 
@@ -128,12 +135,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
             }
         }
 
-        //Add libpath
-        Path libPath = getLibPath(cluster, buildPath);
-        if (libPath != null) {
-            properties.put(OozieClient.LIBPATH, getStoragePath(libPath));
-        }
-
         return properties;
     }
 
@@ -160,4 +161,6 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
             throw new FalconException(e);
         }
     }
+
+    public abstract String getLibPath(Path buildPath);
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 2ceb91e..e5d75fb 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -189,9 +189,4 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
     protected COORDINATORAPP unmarshal(String template) throws FalconException {
         return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
     }
-
-    @Override
-    protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
-        return super.getLibPath(cluster, buildPath.getParent());
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index e341fb8..a9d2f57 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
@@ -75,7 +74,11 @@ public abstract class OozieEntityBuilder<T extends Entity> {
     private static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
         @Override
         public boolean accept(Path path) {
-            return path.getName().startsWith("falcon");
+            String fileName = path.getName();
+            if (fileName.startsWith("falcon")) {
+                return true;
+            }
+            return false;
         }
 
         @Override
@@ -294,17 +297,4 @@ public abstract class OozieEntityBuilder<T extends Entity> {
             IOUtils.closeQuietly(resourceAsStream);
         }
     }
-
-    protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
-        switch (entity.getEntityType()) {
-        case PROCESS:
-            return ProcessHelper.getUserLibPath((Process) entity, cluster, buildPath);
-
-        case FEED:
-            return new Path(buildPath, "lib");
-
-        default:
-        }
-        throw new IllegalArgumentException("Unhandled type " + entity.getEntityType());
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 771295f..3186c4a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -365,9 +365,4 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
         action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
     }
-
-    @Override
-    protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
-        return super.getLibPath(cluster, buildPath.getParent());
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
index 3347fbf..b819dee 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -53,9 +53,14 @@ public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
         }
 
         if (!props.isEmpty()) {
-            copySharedLibs(cluster, getLibPath(cluster, buildPath));
+            copySharedLibs(cluster, new Path(getLibPath(buildPath)));
         }
 
         return props;
     }
+
+    @Override
+    public String getLibPath(Path buildPath) {
+        return new Path(buildPath, "lib").toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
index 1db4ca4..9f9579c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.oozie.process;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.oozie.hive.CONFIGURATION.Property;
@@ -47,7 +46,7 @@ public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(action);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
-        Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
+        Path userWfPath = new Path(entity.getWorkflow().getPath());
         hiveAction.setScript(getStoragePath(userWfPath));
 
         addPrepareDeleteOutputPath(hiveAction);
@@ -61,7 +60,7 @@ public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
         // adds hive-site.xml in hive classpath
         hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml");
 
-        addArchiveForCustomJars(cluster, hiveAction.getArchive(), getLibPath(cluster, buildPath));
+        addArchiveForCustomJars(cluster, hiveAction.getArchive(), entity.getWorkflow().getLib());
 
         OozieUtils.marshalHiveAction(action, actionJaxbElement);
         return action;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
index 14668f0..f93a599 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.oozie.process;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.oozie.workflow.ACTION;
@@ -37,8 +36,7 @@ public class OozieProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
 
     @Override protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
         ACTION action = unmarshalAction(ACTION_TEMPLATE);
-        action.getSubWorkflow().setAppPath(getStoragePath(ProcessHelper.getUserWorkflowPath(entity, cluster,
-            buildPath)));
+        action.getSubWorkflow().setAppPath(getStoragePath(entity.getWorkflow().getPath()));
         return action;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
index 6bd5dd8..a1a7c12 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
@@ -20,7 +20,6 @@ package org.apache.falcon.oozie.process;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.oozie.workflow.ACTION;
@@ -45,7 +44,7 @@ public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
         ACTION action = unmarshalAction(ACTION_TEMPLATE);
 
         PIG pigAction = action.getPig();
-        Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
+        Path userWfPath = new Path(entity.getWorkflow().getPath());
         pigAction.setScript(getStoragePath(userWfPath));
 
         addPrepareDeleteOutputPath(pigAction);
@@ -60,7 +59,7 @@ public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
             pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
         }
 
-        addArchiveForCustomJars(cluster, pigAction.getArchive(), getLibPath(cluster, buildPath));
+        addArchiveForCustomJars(cluster, pigAction.getArchive(), entity.getWorkflow().getLib());
 
         return action;
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 8691ee5..806810e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -18,10 +18,8 @@
 
 package org.apache.falcon.oozie.process;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.v0.EntityType;
@@ -32,18 +30,12 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.oozie.OozieBundleBuilder;
 import org.apache.falcon.oozie.OozieCoordinatorBuilder;
-import org.apache.falcon.update.UpdateHelper;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -84,6 +76,7 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
                 }
             }
         }
+
         return  properties;
     }
 
@@ -107,8 +100,6 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
     }
 
     @Override protected List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
-        copyUserWorkflow(cluster, buildPath);
-
         List<Properties> props = OozieCoordinatorBuilder.get(entity, Tag.DEFAULT).buildCoords(cluster, buildPath);
         if (props != null) {
             assert props.size() == 1 : "Process should have only 1 coord";
@@ -118,38 +109,8 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
         return props;
     }
 
-    private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {
-        try {
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
-                ClusterHelper.getConfiguration(cluster));
-
-            //Copy user workflow and lib to staging dir
-            Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),
-                new Path(buildPath, EntityUtil.PROCESS_USER_DIR));
-            if (StringUtils.isNotEmpty(entity.getWorkflow().getLib())
-                    && fs.exists(new Path(entity.getWorkflow().getLib()))) {
-                checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()),
-                    new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR)));
-            }
-
-            writeChecksums(fs, new Path(buildPath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
-    }
-
-    private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
-        try {
-            FSDataOutputStream stream = fs.create(path);
-            try {
-                for (Map.Entry<String, String> entry : checksums.entrySet()) {
-                    stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-                }
-            } finally {
-                stream.close();
-            }
-        } catch (IOException e) {
-            throw new FalconException("Failed to copy user workflow/lib", e);
-        }
+    @Override
+    public String getLibPath(Path buildPath) {
+        return entity.getWorkflow().getLib();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index d271695..75faceb 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.oozie.process;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
@@ -215,12 +216,13 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
         return deleteList;
     }
 
-    protected void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
-        Path libPath) throws FalconException {
-        if (libPath == null) {
+    protected void addArchiveForCustomJars(Cluster cluster, List<String> archiveList, String lib)
+        throws FalconException {
+        if (StringUtils.isEmpty(lib)) {
             return;
         }
 
+        Path libPath = new Path(lib);
         try {
             final FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
                 ClusterHelper.getConfiguration(cluster));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/workflow/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/LateDataHandler.java b/oozie/src/main/java/org/apache/falcon/workflow/LateDataHandler.java
new file mode 100644
index 0000000..2f5fb4c
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/workflow/LateDataHandler.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.workflow;
+
+import org.apache.commons.cli.*;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogPartition;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A tool for late data handling.
+ */
+public class LateDataHandler extends Configured implements Tool {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LateDataHandler.class);
+
+    public static void main(String[] args) throws Exception {
+        Configuration conf = OozieActionConfigurationHelper.createActionConf();
+        ToolRunner.run(conf, new LateDataHandler(), args);
+    }
+
+    private static CommandLine getCommand(String[] args) throws ParseException {
+        Options options = new Options();
+
+        Option opt = new Option("out", true, "Out file name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("paths", true,
+                "Comma separated path list, further separated by #");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option(WorkflowExecutionArgs.INPUT_NAMES.getName(), true,
+                "Input feed names, further separated by #");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), true,
+                "Feed storage types corresponding to Input feed names, separated by #");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        return new GnuParser().parse(options, args);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        CommandLine command = getCommand(args);
+
+        String pathStr = getOptionValue(command, "paths");
+        if (pathStr == null) {
+            return 0;
+        }
+
+        String[] inputFeeds = getOptionValue(command, WorkflowExecutionArgs.INPUT_NAMES.getName()).split("#");
+        String[] pathGroups = pathStr.split("#");
+        String[] inputFeedStorageTypes =
+            getOptionValue(command, WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()).split("#");
+
+        Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
+
+        Path file = new Path(command.getOptionValue("out"));
+        LOG.info("Persisting late data metrics: {} to file: {}", metrics, file);
+        persistMetrics(metrics, file);
+
+        return 0;
+    }
+
+    private String getOptionValue(CommandLine command, String option) {
+        String value = command.getOptionValue(option);
+        if (value.equals("null")) {
+            return null;
+        }
+        return value;
+    }
+
+    private Map<String, Long> computeMetrics(String[] inputFeeds, String[] pathGroups,
+                                             String[] inputFeedStorageTypes)
+        throws IOException, FalconException, URISyntaxException {
+
+        Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
+        for (int index = 0; index < pathGroups.length; index++) {
+            long storageMetric = computeStorageMetric(pathGroups[index], inputFeedStorageTypes[index], getConf());
+            computedMetrics.put(inputFeeds[index], storageMetric);
+        }
+
+        return computedMetrics;
+    }
+
+    private void persistMetrics(Map<String, Long> metrics,
+                                Path file) throws IOException, FalconException {
+        OutputStream out = null;
+        try {  // created in a map job
+            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
+            out = fs.create(file);
+
+            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+                out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+            }
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException ignore) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    /**
+     * This method computes the storage metrics for a given feed's instance or partition.
+     * It uses size on disk as the metric for File System Storage.
+     * It uses create time as the metric for Catalog Table Storage.
+     *
+     * The assumption is that if a partition has changed or reinstated, the underlying
+     * metric would change, either size or create time.
+     *
+     * @param feedUri URI for the feed storage, filesystem path or table uri
+     * @param feedStorageType feed storage type
+     * @param conf configuration
+     * @return computed metric
+     * @throws IOException
+     * @throws FalconException
+     * @throws URISyntaxException
+     */
+    public long computeStorageMetric(String feedUri, String feedStorageType, Configuration conf)
+        throws IOException, FalconException, URISyntaxException {
+
+        Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            // usage on file system is the metric
+            return getFileSystemUsageMetric(feedUri, conf);
+        } else if (storageType == Storage.TYPE.TABLE) {
+            // todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme
+            feedUri = feedUri.replace("hcat", "thrift");
+            // creation time of the given partition is the metric
+            return getTablePartitionCreateTimeMetric(feedUri);
+        }
+
+        throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
+    }
+
+    /**
+     * The storage metric for File System Storage is the size of content
+     * this feed's instance represented by the path uses on the file system.
+     *
+     * If this instance was reinstated, the assumption is that the size of
+     * this instance on disk would change.
+     *
+     * @param pathGroup path on file system
+     * @param conf configuration
+     * @return metric as the size of data on file system
+     * @throws IOException
+     */
+    private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
+        throws IOException, FalconException {
+        long usage = 0;
+        for (String pathElement : pathGroup.split(",")) {
+            Path inPath = new Path(pathElement);
+            usage += usage(inPath, conf);
+        }
+
+        return usage;
+    }
+
+    private long usage(Path inPath, Configuration conf) throws IOException, FalconException {
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(inPath.toUri(), conf);
+        FileStatus[] fileStatuses = fs.globStatus(inPath);
+        if (fileStatuses == null || fileStatuses.length == 0) {
+            return 0;
+        }
+        long totalSize = 0;
+        for (FileStatus fileStatus : fileStatuses) {
+            totalSize += fs.getContentSummary(fileStatus.getPath()).getLength();
+        }
+        return totalSize;
+    }
+
+    /**
+     * The storage metric for Table Storage is the create time of the given partition
+     * since there is API in Hive nor HCatalog to find if a partition has changed.
+     *
+     * If this partition was reinstated, the assumption is that the create time of
+     * this partition would change.
+     *
+     * @param feedUri catalog table uri
+     * @return metric as creation time of the given partition
+     * @throws IOException
+     * @throws URISyntaxException
+     * @throws FalconException
+     */
+    private long getTablePartitionCreateTimeMetric(String feedUri)
+        throws IOException, URISyntaxException, FalconException {
+
+        CatalogStorage storage = (CatalogStorage)
+                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUri, getConf());
+        CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+                getConf(), storage.getCatalogUrl(), storage.getDatabase(),
+                storage.getTable(), new ArrayList(storage.getPartitions().values()));
+        return partition == null ? 0 : partition.getCreateTime();
+    }
+
+    /**
+     * This method compares the recorded metrics persisted in file against
+     * the recently computed metrics and returns the list of feeds that has changed.
+     *
+     * @param file persisted metrics from the first run
+     * @param metrics newly computed metrics
+     * @param conf configuration
+     * @return list if feed names which has changed, empty string is none has changed
+     * @throws Exception
+     */
+    public String detectChanges(Path file, Map<String, Long> metrics, Configuration conf)
+        throws Exception {
+
+        StringBuilder buffer = new StringBuilder();
+        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), conf);
+        BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(file)));
+        String line;
+        try {
+            Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();
+            while ((line = in.readLine()) != null) {
+                if (line.isEmpty()) {
+                    continue;
+                }
+                int index = line.indexOf('=');
+                String key = line.substring(0, index);
+                long size = Long.parseLong(line.substring(index + 1));
+                recordedMetrics.put(key, size);
+            }
+
+            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+                if (recordedMetrics.get(entry.getKey()) == null) {
+                    LOG.info("No matching key {}", entry.getKey());
+                    continue;
+                }
+                if (!recordedMetrics.get(entry.getKey()).equals(entry.getValue())) {
+                    LOG.info("Recorded size: {} is different from new size {}",
+                            recordedMetrics.get(entry.getKey()), entry.getValue());
+                    buffer.append(entry.getKey()).append(',');
+                }
+            }
+            if (buffer.length() == 0) {
+                return "";
+            } else {
+                return buffer.substring(0, buffer.length() - 1);
+            }
+
+        } finally {
+            in.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 745aaac..62c1457 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1024,26 +1024,23 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
 
-    private boolean canUpdateBundle(Entity oldEntity, Entity newEntity, boolean wfUpdated) throws FalconException {
-        return !wfUpdated && EntityUtil.equals(oldEntity, newEntity, BUNDLE_UPDATEABLE_PROPS);
+    private boolean canUpdateBundle(Entity oldEntity, Entity newEntity) throws FalconException {
+        return EntityUtil.equals(oldEntity, newEntity, BUNDLE_UPDATEABLE_PROPS);
     }
 
     @Override
-    public String update(Entity oldEntity, Entity newEntity, String cluster, Date effectiveTime)
-        throws FalconException {
+    public String update(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
         BundleJob bundle = findLatestBundle(oldEntity, cluster);
 
         boolean entityUpdated = false;
-        boolean wfUpdated = false;
         if (bundle != MISSING) {
             entityUpdated = UpdateHelper.isEntityUpdated(oldEntity, newEntity, cluster, new Path(bundle.getAppPath()));
-            wfUpdated = UpdateHelper.isWorkflowUpdated(cluster, newEntity, new Path(bundle.getAppPath()));
         }
 
         Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
         StringBuilder result = new StringBuilder();
-        //entity is scheduled before and either entity or workflow is updated
-        if (bundle != MISSING && (entityUpdated || wfUpdated)) {
+        //entity is scheduled before and entity is updated
+        if (bundle != MISSING && entityUpdated) {
             LOG.info("Updating entity through Workflow Engine {}", newEntity.toShortString());
             Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
             if (newEndTime.before(now())) {
@@ -1053,7 +1050,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             LOG.debug("Updating for cluster: {}, bundle: {}", cluster, bundle.getId());
 
-            if (canUpdateBundle(oldEntity, newEntity, wfUpdated)) {
+            if (canUpdateBundle(oldEntity, newEntity)) {
                 // only concurrency and endtime are changed. So, change coords
                 LOG.info("Change operation is adequate! : {}, bundle: {}", cluster, bundle.getId());
                 updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
@@ -1063,20 +1060,20 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
 
             LOG.debug("Going to update! : {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster, bundle
                 .getId());
-            result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle, effectiveTime,
-                CurrentUser.getUser())).append("\n");
+            result.append(updateInternal(oldEntity, newEntity, clusterEntity, bundle,
+                    CurrentUser.getUser())).append("\n");
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}", newEntity.toShortString(), cluster,
                 bundle.getId());
         }
 
-        result.append(updateDependents(clusterEntity, oldEntity, newEntity, effectiveTime));
+        result.append(updateDependents(clusterEntity, oldEntity, newEntity));
         return result.toString();
     }
 
     private String getUpdateString(Entity entity, Date date, BundleJob oldBundle, BundleJob newBundle) {
         StringBuilder builder = new StringBuilder();
         builder.append(entity.toShortString()).append("/Effective Time: ").append(SchemaHelper.formatDateUTC(date));
-        builder.append(". Old workflow id: ");
+        builder.append(". Old bundle id: ");
         List<String> coords = new ArrayList<String>();
         for (CoordinatorJob coord : oldBundle.getCoordinators()) {
             coords.add(coord.getId());
@@ -1084,22 +1081,22 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         builder.append(StringUtils.join(coords, ','));
 
         if (newBundle != null) {
-            builder.append(". New workflow id: ");
             coords.clear();
             for (CoordinatorJob coord : newBundle.getCoordinators()) {
                 coords.add(coord.getId());
             }
             if (coords.isEmpty()) {
+                builder.append(". New bundle id: ");
                 builder.append(newBundle.getId());
             } else {
+                builder.append(". New coordinator id: ");
                 builder.append(StringUtils.join(coords, ','));
             }
         }
         return  builder.toString();
     }
 
-    private String updateDependents(Cluster cluster, Entity oldEntity, Entity newEntity,
-        Date effectiveTime) throws FalconException {
+    private String updateDependents(Cluster cluster, Entity oldEntity, Entity newEntity) throws FalconException {
         //Update affected entities
         Set<Entity> affectedEntities = EntityGraph.get().getDependents(oldEntity);
         StringBuilder result = new StringBuilder();
@@ -1122,7 +1119,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
             LOG.info("Triggering update for {}, {}", cluster, affectedProcBundle.getId());
 
             result.append(updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle,
-                effectiveTime, affectedProcBundle.getUser())).append("\n");
+                affectedProcBundle.getUser())).append("\n");
             LOG.info("Entity update complete: {} for cluster {}, bundle: {}",
                 affectedEntity.toShortString(), cluster, affectedProcBundle.getId());
         }
@@ -1192,10 +1189,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle,
-        Date inEffectiveTime, String user) throws FalconException {
+        String user) throws FalconException {
         String clusterName = cluster.getName();
 
-        Date effectiveTime = getEffectiveTime(cluster, newEntity, inEffectiveTime);
+        Date effectiveTime = getEffectiveTime(cluster, newEntity);
         LOG.info("Effective time " + effectiveTime);
 
         //Validate that new entity can be scheduled
@@ -1222,15 +1219,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
         return getUpdateString(newEntity, effectiveTime, oldBundle, newBundle);
     }
 
-    private Date getEffectiveTime(Cluster cluster, Entity newEntity, Date inEffectiveTime) {
+    private Date getEffectiveTime(Cluster cluster, Entity newEntity) {
         //pick effective time as now() + 3 min to handle any time diff between falcon and oozie
         //oozie rejects changes with endtime < now
         Date effectiveTime = offsetTime(now(), 3);
-        if (inEffectiveTime != null && inEffectiveTime.after(effectiveTime)) {
-            //If the user has specified effective time and is valid,
-            // pick user specified effective time
-            effectiveTime = inEffectiveTime;
-        }
 
         //pick start time for new bundle which is after effectiveTime
         return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index 440a131..f354351 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -28,6 +28,10 @@
                 <name>oozie.launcher.mapred.job.priority</name>
                 <value>${jobPriority}</value>
             </property>
+            <property>
+                <name>oozie.launcher.oozie.libpath</name>
+                <value>${wf:conf("falcon.libpath")}</value>
+            </property>
         </configuration>
         <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
         <arg>-cluster</arg>
@@ -82,12 +86,6 @@
         <arg>${falconInputFeeds}</arg>
         <arg>-falconInPaths</arg>
         <arg>${falconInPaths}</arg>
-        <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
-        <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
-        <file>${wf:conf("falcon.libpath")}/jms.jar</file>
-        <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
-        <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
-        <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
     </java>
     <ok to="end"/>
     <error to="fail"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/main/resources/action/pre-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/pre-process.xml b/oozie/src/main/resources/action/pre-process.xml
index 070c42b..50306e5 100644
--- a/oozie/src/main/resources/action/pre-process.xml
+++ b/oozie/src/main/resources/action/pre-process.xml
@@ -34,7 +34,7 @@
                 <value>hcatalog</value>
             </property>
         </configuration>
-        <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
+        <main-class>org.apache.falcon.workflow.LateDataHandler</main-class>
         <arg>-out</arg>
         <arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
         <arg>-paths</arg>


Mime
View raw message