falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shweth...@apache.org
Subject [1/2] incubator-falcon git commit: FALCON-943 process update copying user lib is very slow. Contributed by Shwetha GS
Date Tue, 23 Dec 2014 07:01:51 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master c1ac6e6af -> 0bc6aef5a


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 3477258..ef21f4d 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -235,7 +235,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         ACTION pigActionNode = getAction(parentWorkflow, "user-action");
 
         final PIG pigAction = pigActionNode.getPig();
-        Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
+        Assert.assertEquals(pigAction.getScript(), "${nameNode}/apps/pig/id.pig");
         Assert.assertNotNull(pigAction.getPrepare());
         Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
         Assert.assertFalse(pigAction.getParam().isEmpty());
@@ -306,8 +306,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
-        Assert.assertEquals(hiveAction.getScript(),
-                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
         Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
         Assert.assertNull(hiveAction.getPrepare());
         Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
@@ -361,8 +360,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
-        Assert.assertEquals(hiveAction.getScript(),
-                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
         Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
         Assert.assertNull(hiveAction.getPrepare());
         Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
@@ -416,8 +414,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
-        Assert.assertEquals(hiveAction.getScript(),
-                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
         Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
         Assert.assertNotNull(hiveAction.getPrepare());
         Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
@@ -467,8 +464,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
                 hiveNode);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
 
-        Assert.assertEquals(hiveAction.getScript(),
-                "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
         Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
         Assert.assertNull(hiveAction.getPrepare());
         Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
@@ -635,8 +631,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase
{
         List<CONFIGURATION.Property> props = bundle.getCoordinator().get(0).getConfiguration().getProperty();
         for (CONFIGURATION.Property prop : props) {
             if (prop.getName().equals("oozie.libpath")) {
-                Assert.assertEquals(prop.getValue().replace("${nameNode}", ""), new Path(bundlePath,
-                    "userlib").toString());
+                Assert.assertEquals(prop.getValue().replace("${nameNode}", ""), process.getWorkflow().getLib());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index b6e1cec..8ec9e2d 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -248,8 +248,7 @@ public abstract class AbstractEntityManager {
 
     // Parallel update can get very clumsy if two feeds are updated which
     // are referred by a single process. Sequencing them.
-    public synchronized APIResult update(HttpServletRequest request, String type, String
entityName,
-                                         String colo, String effectiveTimeStr) {
+    public synchronized APIResult update(HttpServletRequest request, String type, String
entityName, String colo) {
         checkColo(colo);
         try {
             EntityType entityType = EntityType.valueOf(type.toUpperCase());
@@ -262,8 +261,6 @@ public abstract class AbstractEntityManager {
             validateUpdate(oldEntity, newEntity);
             configStore.initiateUpdate(newEntity);
 
-            Date effectiveTime =
-                StringUtils.isEmpty(effectiveTimeStr) ? null : EntityUtil.parseDateUTC(effectiveTimeStr);
             StringBuilder result = new StringBuilder("Updated successfully");
             //Update in workflow engine
             if (!DeploymentUtil.isPrism()) {
@@ -273,8 +270,7 @@ public abstract class AbstractEntityManager {
                 oldClusters.removeAll(newClusters); //deleted clusters
 
                 for (String cluster : newClusters) {
-                    Date myEffectiveTime = validateEffectiveTime(newEntity, cluster, effectiveTime);
-                    result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster,
myEffectiveTime));
+                    result.append(getWorkflowEngine().update(oldEntity, newEntity, cluster));
                 }
                 for (String cluster : oldClusters) {
                     getWorkflowEngine().delete(oldEntity, cluster);
@@ -292,15 +288,6 @@ public abstract class AbstractEntityManager {
         }
     }
 
-    private Date validateEffectiveTime(Entity entity, String cluster, Date effectiveTime)
{
-        Date start = EntityUtil.getStartTime(entity, cluster);
-        Date end = EntityUtil.getEndTime(entity, cluster);
-        if (effectiveTime == null || effectiveTime.before(start) || effectiveTime.after(end))
{
-            return null;
-        }
-        return effectiveTime;
-    }
-
     private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException
{
         if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity))
{
             throw new FalconException(

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index fb9d9f3..075cb64 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -197,8 +197,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     public APIResult update(
             @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type")
final String type,
             @Dimension("entityName") @PathParam("entity") final String entityName,
-            @Dimension("colo") @QueryParam("colo") String ignore,
-            @Dimension("effective") @DefaultValue("") @QueryParam("effective") final String
effectiveTime) {
+            @Dimension("colo") @QueryParam("colo") String ignore) {
 
         final HttpServletRequest bufferedRequest = new BufferedRequest(request);
         final Set<String> oldColos = getApplicableColos(type, entityName);
@@ -233,8 +232,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
 
                 @Override
                 protected APIResult doExecute(String colo) throws FalconException {
-                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type,
entityName, colo,
-                            effectiveTime);
+                    return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type,
entityName, colo);
                 }
             }.execute());
         }
@@ -254,7 +252,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
         }
 
         if (!embeddedMode) {
-            results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo,
effectiveTime));
+            results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo));
         }
 
         return consolidateResult(results, APIResult.class);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
deleted file mode 100644
index d35abfa..0000000
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.latedata;
-
-import org.apache.commons.cli.*;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogPartition;
-import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * A tool for late data handling.
- */
-public class LateDataHandler extends Configured implements Tool {
-
-    private static final Logger LOG = LoggerFactory.getLogger(LateDataHandler.class);
-
-    public static void main(String[] args) throws Exception {
-        Configuration conf = OozieActionConfigurationHelper.createActionConf();
-        ToolRunner.run(conf, new LateDataHandler(), args);
-    }
-
-    private static CommandLine getCommand(String[] args) throws ParseException {
-        Options options = new Options();
-
-        Option opt = new Option("out", true, "Out file name");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option("paths", true,
-                "Comma separated path list, further separated by #");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option(WorkflowExecutionArgs.INPUT_NAMES.getName(), true,
-                "Input feed names, further separated by #");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        opt = new Option(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), true,
-                "Feed storage types corresponding to Input feed names, separated by #");
-        opt.setRequired(true);
-        options.addOption(opt);
-
-        return new GnuParser().parse(options, args);
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        CommandLine command = getCommand(args);
-
-        String pathStr = getOptionValue(command, "paths");
-        if (pathStr == null) {
-            return 0;
-        }
-
-        String[] inputFeeds = getOptionValue(command, WorkflowExecutionArgs.INPUT_NAMES.getName()).split("#");
-        String[] pathGroups = pathStr.split("#");
-        String[] inputFeedStorageTypes =
-            getOptionValue(command, WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()).split("#");
-
-        Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
-
-        Path file = new Path(command.getOptionValue("out"));
-        LOG.info("Persisting late data metrics: {} to file: {}", metrics, file);
-        persistMetrics(metrics, file);
-
-        return 0;
-    }
-
-    private String getOptionValue(CommandLine command, String option) {
-        String value = command.getOptionValue(option);
-        if (value.equals("null")) {
-            return null;
-        }
-        return value;
-    }
-
-    private Map<String, Long> computeMetrics(String[] inputFeeds, String[] pathGroups,
-                                             String[] inputFeedStorageTypes)
-        throws IOException, FalconException, URISyntaxException {
-
-        Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
-        for (int index = 0; index < pathGroups.length; index++) {
-            long storageMetric = computeStorageMetric(pathGroups[index], inputFeedStorageTypes[index],
getConf());
-            computedMetrics.put(inputFeeds[index], storageMetric);
-        }
-
-        return computedMetrics;
-    }
-
-    private void persistMetrics(Map<String, Long> metrics,
-                                Path file) throws IOException, FalconException {
-        OutputStream out = null;
-        try {  // created in a map job
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
-            out = fs.create(file);
-
-            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
-                out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-            }
-        } finally {
-            if (out != null) {
-                try {
-                    out.close();
-                } catch (IOException ignore) {
-                    // ignore
-                }
-            }
-        }
-    }
-
-    /**
-     * This method computes the storage metrics for a given feed's instance or partition.
-     * It uses size on disk as the metric for File System Storage.
-     * It uses create time as the metric for Catalog Table Storage.
-     *
-     * The assumption is that if a partition has changed or reinstated, the underlying
-     * metric would change, either size or create time.
-     *
-     * @param feedUri URI for the feed storage, filesystem path or table uri
-     * @param feedStorageType feed storage type
-     * @param conf configuration
-     * @return computed metric
-     * @throws IOException
-     * @throws FalconException
-     * @throws URISyntaxException
-     */
-    public long computeStorageMetric(String feedUri, String feedStorageType, Configuration
conf)
-        throws IOException, FalconException, URISyntaxException {
-
-        Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
-
-        if (storageType == Storage.TYPE.FILESYSTEM) {
-            // usage on file system is the metric
-            return getFileSystemUsageMetric(feedUri, conf);
-        } else if (storageType == Storage.TYPE.TABLE) {
-            // todo: this should have been done in oozie mapper but el ${coord:dataIn('input')}
returns hcat scheme
-            feedUri = feedUri.replace("hcat", "thrift");
-            // creation time of the given partition is the metric
-            return getTablePartitionCreateTimeMetric(feedUri);
-        }
-
-        throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
-    }
-
-    /**
-     * The storage metric for File System Storage is the size of content
-     * this feed's instance represented by the path uses on the file system.
-     *
-     * If this instance was reinstated, the assumption is that the size of
-     * this instance on disk would change.
-     *
-     * @param pathGroup path on file system
-     * @param conf configuration
-     * @return metric as the size of data on file system
-     * @throws IOException
-     */
-    private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
-        throws IOException, FalconException {
-        long usage = 0;
-        for (String pathElement : pathGroup.split(",")) {
-            Path inPath = new Path(pathElement);
-            usage += usage(inPath, conf);
-        }
-
-        return usage;
-    }
-
-    private long usage(Path inPath, Configuration conf) throws IOException, FalconException
{
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(inPath.toUri(),
conf);
-        FileStatus[] fileStatuses = fs.globStatus(inPath);
-        if (fileStatuses == null || fileStatuses.length == 0) {
-            return 0;
-        }
-        long totalSize = 0;
-        for (FileStatus fileStatus : fileStatuses) {
-            totalSize += fs.getContentSummary(fileStatus.getPath()).getLength();
-        }
-        return totalSize;
-    }
-
-    /**
-     * The storage metric for Table Storage is the create time of the given partition
-     * since there is API in Hive nor HCatalog to find if a partition has changed.
-     *
-     * If this partition was reinstated, the assumption is that the create time of
-     * this partition would change.
-     *
-     * @param feedUri catalog table uri
-     * @return metric as creation time of the given partition
-     * @throws IOException
-     * @throws URISyntaxException
-     * @throws FalconException
-     */
-    private long getTablePartitionCreateTimeMetric(String feedUri)
-        throws IOException, URISyntaxException, FalconException {
-
-        CatalogStorage storage = (CatalogStorage)
-                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUri, getConf());
-        CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
-                getConf(), storage.getCatalogUrl(), storage.getDatabase(),
-                storage.getTable(), new ArrayList(storage.getPartitions().values()));
-        return partition == null ? 0 : partition.getCreateTime();
-    }
-
-    /**
-     * This method compares the recorded metrics persisted in file against
-     * the recently computed metrics and returns the list of feeds that has changed.
-     *
-     * @param file persisted metrics from the first run
-     * @param metrics newly computed metrics
-     * @param conf configuration
-     * @return list if feed names which has changed, empty string is none has changed
-     * @throws Exception
-     */
-    public String detectChanges(Path file, Map<String, Long> metrics, Configuration
conf)
-        throws Exception {
-
-        StringBuilder buffer = new StringBuilder();
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), conf);
-        BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(file)));
-        String line;
-        try {
-            Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();
-            while ((line = in.readLine()) != null) {
-                if (line.isEmpty()) {
-                    continue;
-                }
-                int index = line.indexOf('=');
-                String key = line.substring(0, index);
-                long size = Long.parseLong(line.substring(index + 1));
-                recordedMetrics.put(key, size);
-            }
-
-            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
-                if (recordedMetrics.get(entry.getKey()) == null) {
-                    LOG.info("No matching key {}", entry.getKey());
-                    continue;
-                }
-                if (!recordedMetrics.get(entry.getKey()).equals(entry.getValue())) {
-                    LOG.info("Recorded size: {} is different from new size {}",
-                            recordedMetrics.get(entry.getKey()), entry.getValue());
-                    buffer.append(entry.getKey()).append(',');
-                }
-            }
-            if (buffer.length() == 0) {
-                return "";
-            } else {
-                return buffer.substring(0, buffer.length() - 1);
-            }
-
-        } finally {
-            in.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 9ba632e..51ccc5f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -23,7 +23,7 @@ import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.latedata.LateDataHandler;
+import org.apache.falcon.workflow.LateDataHandler;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/src/main/assemblies/distributed-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml
index 7320312..1e48c94 100644
--- a/src/main/assemblies/distributed-package.xml
+++ b/src/main/assemblies/distributed-package.xml
@@ -121,6 +121,11 @@
             <source>oozie-el-extensions/target/falcon-oozie-el-extension-${project.version}.jar</source>
             <outputDirectory>oozie/libext</outputDirectory>
         </file>
+
+        <file>
+            <source>oozie-el-extensions/src/main/conf/oozie-site.xml</source>
+            <outputDirectory>oozie/conf</outputDirectory>
+        </file>
     </files>
 </assembly>
     

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml
index f9d6009..682d52f 100644
--- a/src/main/assemblies/standalone-package.xml
+++ b/src/main/assemblies/standalone-package.xml
@@ -110,6 +110,11 @@
         </file>
 
         <file>
+            <source>oozie-el-extensions/src/main/conf/oozie-site.xml</source>
+            <outputDirectory>oozie/conf</outputDirectory>
+        </file>
+
+        <file>
             <source>webapp/target/falcon-webapp-${project.version}.war</source>
             <outputDirectory>server/webapp</outputDirectory>
             <destName>falcon.war</destName>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/src/main/examples/data/generate.sh
----------------------------------------------------------------------
diff --git a/src/main/examples/data/generate.sh b/src/main/examples/data/generate.sh
index 74ec3ba..54db3d7 100755
--- a/src/main/examples/data/generate.sh
+++ b/src/main/examples/data/generate.sh
@@ -48,6 +48,6 @@ do
 done
 
 hadoop fs -rmr /data/in/2013/11/15/
-hadoop fs -mkdir /data/in/2013/11/15/
+hadoop fs -mkdir -p /data/in/2013/11/15/
 hadoop fs -put generated-data/00 /data/in/2013/11/15/ 
 rm -rf generated-data

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/webapp/src/main/java/org/apache/falcon/Debug.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/Debug.java b/webapp/src/main/java/org/apache/falcon/Debug.java
deleted file mode 100644
index c606074..0000000
--- a/webapp/src/main/java/org/apache/falcon/Debug.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon;
-
-import org.apache.falcon.client.FalconClient;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.resource.EntityList;
-import org.apache.falcon.resource.EntityList.EntityElement;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.service.Services;
-import org.apache.falcon.util.DeploymentProperties;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
-
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.ByteArrayInputStream;
-import java.util.Date;
-
-/**
- * A driver for debugging purposes.
- */
-public final class Debug {
-    // private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyy-MM-dd
HH:mm Z");
-
-    /**
-     * Prevent users from constructing this.
-     */
-    private Debug() {
-    }
-
-    public static void main(String[] args) throws Exception {
-        String falconUrl = args[0];
-        String type = args[1];
-        String entity;
-
-        Services.get().register(ConfigurationStore.get());
-        ConfigurationStore.get().init();
-        CurrentUser.authenticate("testuser");
-        FalconClient client = new FalconClient(falconUrl);
-        for (int index = 2; index < args.length; index++) {
-            entity = args[index];
-            EntityList deps = client.getDependency(type, entity);
-            for (EntityElement dep : deps.getElements()) {
-                EntityType eType = EntityType.valueOf(dep.type.toUpperCase());
-                if (ConfigurationStore.get().get(eType, dep.name) != null) {
-                    continue;
-                }
-                String xml =
-                    client.getDefinition(eType.name().toLowerCase(), dep.name)
-                        .toString();
-                System.out.println(xml);
-                store(eType, xml);
-            }
-            String xml =
-                client.getDefinition(type.toLowerCase(), entity).toString();
-            System.out.println(xml);
-            store(EntityType.valueOf(type.toUpperCase()), xml);
-        }
-
-        entity = args[2];
-        Entity obj = EntityUtil.getEntity(type, entity);
-        Process newEntity = (Process) obj.copy();
-        newEntity.setFrequency(Frequency.fromString("minutes(5)"));
-        System.out.println("##############OLD ENTITY " + EntityUtil.md5(obj));
-        System.out.println("##############NEW ENTITY " + EntityUtil.md5(newEntity));
-
-
-//        OozieWorkflowEngine engine = new OozieWorkflowEngine();
-//        Date start = FORMATTER.parse("2010-01-02 01:05 UTC");
-//        Date end = FORMATTER.parse("2010-01-02 01:21 UTC");
-//        InstancesResult status = engine.suspendInstances(obj, start, end, new Properties());
-//        System.out.println(Arrays.toString(status.getInstances()));
-//        AbstractInstanceManager manager = new InstanceManager();
-//        InstancesResult result = manager.suspendInstance(new NullServletRequest(), type,
entity,
-//                "2010-01-02T01:05Z", "2010-01-02T01:21Z", "*");
-
-        DeploymentProperties.get().setProperty("deploy.mode", "standalone");
-        StartupProperties.get().setProperty("current.colo", "ua1");
-        OozieWorkflowEngine engine = new OozieWorkflowEngine();
-        ConfigurationStore.get().initiateUpdate(newEntity);
-        engine.update(obj, newEntity, newEntity.getClusters().getClusters().get(0).getName(),
new Date());
-        engine.delete(newEntity);
-        System.exit(0);
-    }
-
-    private static void store(EntityType eType, String xml) throws JAXBException, FalconException
{
-        Unmarshaller unmarshaller = eType.getUnmarshaller();
-        Entity obj = (Entity) unmarshaller.unmarshal(new ByteArrayInputStream(xml.getBytes()));
-        ConfigurationStore.get().publish(eType, obj);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
index 98b8b3d..9722116 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
@@ -64,8 +64,7 @@ public class ConfigSyncService extends AbstractEntityManager {
     public APIResult update(@Context HttpServletRequest request,
                             @Dimension("entityType") @PathParam("type") String type,
                             @Dimension("entityName") @PathParam("entity") String entityName,
-                            @Dimension("colo") @QueryParam("colo") String colo,
-                            @Dimension("effective") @DefaultValue("") @QueryParam("effective")
String effectiveTime) {
-        return super.update(request, type, entityName, colo, effectiveTime);
+                            @Dimension("colo") @QueryParam("colo") String colo) {
+        return super.update(request, type, entityName, colo);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index a3acbdb..9c6ad80 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -118,7 +118,7 @@ public class FalconCLIIT {
         Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type process -file
" + filePath), 0);
 
         Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName")
-                + " -type process -file " + filePath + " -effective 2025-04-20T00:00Z"),
0);
+                + " -type process -file " + filePath), 0);
     }
 
     public void testValidateValidCommands() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
index 1885bb7..86da770 100644
--- a/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/late/LateDataHandlerIT.java
@@ -23,7 +23,7 @@ import org.apache.falcon.catalog.CatalogServiceFactory;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.latedata.LateDataHandler;
+import org.apache.falcon.workflow.LateDataHandler;
 import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.util.FSUtils;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/0bc6aef5/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index f7e6bdb..40f8e04 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -247,28 +247,6 @@ public class EntityManagerJerseyIT {
     }
 
     @Test
-    public void testUserWorkflowUpdate() throws Exception {
-        //schedule a process
-        TestContext context = newContext();
-        context.scheduleProcess();
-        OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
-        List<BundleJob> bundles = OozieTestUtils.getBundles(context);
-        Assert.assertEquals(bundles.size(), 1);
-
-        //create new file in user workflow
-        FileSystem fs = context.cluster.getFileSystem();
-        fs.create(new Path("/falcon/test/workflow", "newfile")).close();
-
-        //update process should create new bundle
-        Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
-        updateEndtime(process);
-        ClientResponse response = update(context, process, null);
-        context.assertSuccessful(response);
-        bundles = OozieTestUtils.getBundles(context);
-        Assert.assertEquals(bundles.size(), 2);
-    }
-
-    @Test
     public void testUpdateSuspendedEntity() throws Exception {
         TestContext context = newContext();
         context.scheduleProcess();


Mime
View raw message