falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pall...@apache.org
Subject falcon git commit: FALCON-1852 Make optional input to a process truly optional
Date Tue, 22 Mar 2016 07:15:55 GMT
Repository: falcon
Updated Branches:
  refs/heads/master ade3079bf -> c2f7a2a5e


FALCON-1852 Make optional input to a process truly optional

The following changes have been made:
1. Creation of "empty dir" under the staging path of a cluster (during cluster creation).
2. Modify OozieELExtensions to look for availability flag and use only those instances that
have the entire dataset. Use "empty dir" when no instances resolve.
3. Updated UT and IT for additional validation.

Tested manually with and without availabilityFlag supplied.

Author: Pallavi Rao <pallavi.rao@inmobi.com>

Reviewers: @sandeepSamudrala , @sriksun, @pavankumar526, @ajayyadava

Closes #73 from pallavi-rao/1852 and squashes the following commits:

7e087ed [Pallavi Rao] FALCON-1852 Documentation update for optional feeds
fc9c490 [Pallavi Rao] FALCON-1852 Added mockito to the pom
92ad20c [Pallavi Rao] FALCON-1852 Added UTs
533e221 [Pallavi Rao] FALCON-1852 Make optional input to a process truly optional


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c2f7a2a5
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c2f7a2a5
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c2f7a2a5

Branch: refs/heads/master
Commit: c2f7a2a5e925ce790391a9d718decd5c942a59b3
Parents: ade3079
Author: Pallavi Rao <pallavi.rao@inmobi.com>
Authored: Tue Mar 22 12:45:40 2016 +0530
Committer: Pallavi Rao <pallavi.rao@inmobi.com>
Committed: Tue Mar 22 12:45:40 2016 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/entity/ClusterHelper.java |   7 +-
 .../entity/parser/ClusterEntityParser.java      |   4 +
 .../falcon/hadoop/HadoopClientFactory.java      |   2 +
 .../entity/parser/ClusterEntityParserTest.java  |   9 +-
 docs/src/site/twiki/EntitySpecification.twiki   |   3 +-
 oozie-el-extensions/pom.xml                     |   5 +
 .../oozie/extensions/OozieELExtensions.java     |  47 +++++++--
 .../oozie/extensions/TestOozieELExtensions.java | 101 +++++++++++++++++--
 .../oozie/process/ProcessBundleBuilder.java     |   6 +-
 .../ProcessExecutionCoordinatorBuilder.java     |  20 ++--
 .../falcon/resource/EntityManagerJerseyIT.java  |   2 +
 11 files changed, 177 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 41c9369..24ba7d7 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -42,7 +42,7 @@ public final class ClusterHelper {
     public static final String DEFAULT_BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     public static final String WORKINGDIR = "working";
     public static final String NO_USER_BROKER_URL = "NA";
-
+    public static final String EMPTY_DIR_NAME = "EMPTY_DIR_DONT_DELETE";
 
 
     private ClusterHelper() {
@@ -192,4 +192,9 @@ public final class ClusterHelper {
         }
         return null;
     }
+
+    public static String getEmptyDir(Cluster cluster) {
+        return getStorageUrl(cluster) + getLocation(cluster, ClusterLocationType.STAGING).getPath()
+                + "/" + EMPTY_DIR_NAME;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
index bef4b39..87db536 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
@@ -301,6 +301,10 @@ public class ClusterEntityParser extends EntityParser<Cluster>
{
                     "falcon/workflows/feed", HadoopClientFactory.ALL_PERMISSION);
             createStagingSubdirs(fs, cluster, stagingLocation,
                     "falcon/workflows/process", HadoopClientFactory.ALL_PERMISSION);
+
+            // Create empty dirs for optional input
+            createStagingSubdirs(fs, cluster, stagingLocation,
+                    ClusterHelper.EMPTY_DIR_NAME, HadoopClientFactory.READ_ONLY_PERMISSION);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
index e33d353..e970439 100644
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
@@ -57,6 +57,8 @@ public final class HadoopClientFactory {
             new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
 
     private static final HadoopClientFactory INSTANCE = new HadoopClientFactory();
+    public static final FsPermission READ_ONLY_PERMISSION =
+            new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ);
 
     private HadoopClientFactory() {
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index f98b6e4..c45909f 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -347,8 +347,8 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         Mockito.doNothing().when(clusterEntityParser).validateWorkflowInterface(cluster);
         Mockito.doNothing().when(clusterEntityParser).validateMessagingInterface(cluster);
         Mockito.doNothing().when(clusterEntityParser).validateRegistryInterface(cluster);
-        this.dfsCluster.getFileSystem().mkdirs(new Path(ClusterHelper.getLocation(cluster,
-                ClusterLocationType.STAGING).getPath()), HadoopClientFactory.ALL_PERMISSION);
+        String stagingPath = ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING).getPath();
+        this.dfsCluster.getFileSystem().mkdirs(new Path(stagingPath), HadoopClientFactory.ALL_PERMISSION);
         clusterEntityParser.validate(cluster);
         String workingDirPath = cluster.getLocations().getLocations().get(0).getPath() +
"/working";
         Assert.assertEquals(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(),
workingDirPath);
@@ -357,6 +357,11 @@ public class ClusterEntityParserTest extends AbstractTestBase {
         Assert.assertEquals(workingDirStatus.getPermission(), HadoopClientFactory.READ_EXECUTE_PERMISSION);
         Assert.assertEquals(workingDirStatus.getOwner(), UserGroupInformation.getLoginUser().getShortUserName());
 
+        FileStatus emptyDirStatus = this.dfsCluster.getFileSystem().getFileStatus(new Path(stagingPath
+                + "/" + ClusterHelper.EMPTY_DIR_NAME));
+        Assert.assertEquals(emptyDirStatus.getPermission(), HadoopClientFactory.READ_ONLY_PERMISSION);
+        Assert.assertEquals(emptyDirStatus.getOwner(), UserGroupInformation.getLoginUser().getShortUserName());
+
         String stagingSubdirFeed = cluster.getLocations().getLocations().get(0).getPath()
+ "/falcon/workflows/feed";
         String stagingSubdirProcess =
                 cluster.getLocations().getLocations().get(0).getPath() + "/falcon/workflows/process";

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index d08c3a3..7eedf87 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -682,8 +682,7 @@ Example workflow configuration:
 
 
 ---+++ Optional Inputs
-User can mention one or more inputs as optional inputs. In such cases the job does not wait
on those inputs which are
-mentioned as optional. If they are present it considers them otherwise continue with the
compulsory ones.
+User can mention one or more inputs as optional inputs. In such cases the job does not wait
on those inputs which are mentioned as optional. If they are present it considers them otherwise
continues with the mandatory ones. If some instances of the optional feed are present for
the given data window, those are considered and passed on to the process. While checking for
presence of an feed instance, Falcon looks for __availabilityFlag__ in the directory, if specified
in the feed definition. If no __availabilityFlag__ is specified, presence of the instance
directory is treated as indication of availability of data.
 Example:
 <verbatim>
 <feed name="feed1">

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie-el-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/pom.xml b/oozie-el-extensions/pom.xml
index e8c1830..d0f753d 100644
--- a/oozie-el-extensions/pom.xml
+++ b/oozie-el-extensions/pom.xml
@@ -82,6 +82,11 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
index 167afde..a6ff487 100644
--- a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
+++ b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
@@ -20,11 +20,14 @@ package org.apache.oozie.extensions;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.coord.CoordCommandUtils;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.SyncCoordDataset;
+import org.apache.oozie.dependency.ActionDependency;
+import org.apache.oozie.dependency.DependencyChecker;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
@@ -34,8 +37,10 @@ import org.jdom.Text;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.List;
 import java.util.TimeZone;
 
 /**
@@ -72,15 +77,31 @@ public final class OozieELExtensions {
         //optional input
         if (uristr == null) {
             Element dsEle = getDSElement(eval, dataInName);
-            Configuration conf = new Configuration();
             SyncCoordAction appInst = (SyncCoordAction) eval.getVariable(CoordELFunctions.COORD_ACTION);
+
+            Configuration conf = new Configuration();
+            conf.set(OozieClient.USER_NAME, (String)eval.getVariable(OozieClient.USER_NAME));
             try {
                 ELEvaluator instEval = CoordELEvaluator.createInstancesELEvaluator(dsEle,
appInst, conf);
                 StringBuilder instances = new StringBuilder();
-                CoordCommandUtils.resolveInstanceRange(dsEle, instances , appInst, conf,
instEval);
-                uristr = CoordCommandUtils.createEarlyURIs(dsEle, instances.toString(),
-                        new StringBuilder(), new StringBuilder());
-                uristr = uristr.replace(CoordELFunctions.INSTANCE_SEPARATOR, ",");
+                StringBuilder urisWithDoneFlag = new StringBuilder();
+                CoordCommandUtils.resolveInstanceRange(dsEle, instances, appInst, conf, instEval);
+                CoordCommandUtils.createEarlyURIs(dsEle, instances.toString(),
+                        new StringBuilder(), urisWithDoneFlag);
+                XLog.getLog(OozieELExtensions.class).debug("Resolved instances for " + dataInName
+ " : "
+                        + urisWithDoneFlag.toString());
+                // Check if availability flags are present for each instance.
+                ActionDependency actionDep = DependencyChecker.checkForAvailability(urisWithDoneFlag.toString(),
+                        conf, false);
+                String doneFlag = (String) eval.getVariable(dataInName + ".done-flag");
+                uristr = StringUtils.join(stripDoneFlag(actionDep.getAvailableDependencies(),
doneFlag), ",");
+                // If no instances are present, point the optional input to empty dir.
+                if (StringUtils.isEmpty(uristr)) {
+                    String emptyDir = (String) eval.getVariable(dataInName + ".empty-dir");
+                    XLog.getLog(OozieELExtensions.class).debug("No instances could be resolved.
Passing empty dir : "
+                            + emptyDir);
+                    uristr = emptyDir;
+                }
             } catch (Exception e) {
                 throw new RuntimeException("Failed to resolve instance range for " + dataInName,
e);
             }
@@ -108,6 +129,17 @@ public final class OozieELExtensions {
         return uristr;
     }
 
+    private static List<String> stripDoneFlag(List<String> availableDependencies,
String doneFlag) {
+        if (StringUtils.isEmpty(doneFlag)) {
+            return availableDependencies;
+        }
+        List<String> strippedAvailableDeps = new ArrayList<>();
+        for (String availableDep : availableDependencies) {
+            strippedAvailableDeps.add(StringUtils.stripEnd(availableDep, "/" + doneFlag));
+        }
+        return strippedAvailableDeps;
+    }
+
     private static Element getDSElement(ELEvaluator eval, String dataInName) {
         Element ele = new Element("datain");
         Element dsEle = new Element("dataset");
@@ -121,7 +153,10 @@ public final class OozieELExtensions {
         String[] children = {"done-flag", "uri-template"};
         for (String child : children) {
             Element childEle = new Element(child);
-            childEle.setContent(new Text(((String) eval.getVariable(dataInName + "." + child)).replace('%',
'$')));
+            String text = (String) eval.getVariable(dataInName + "." + child);
+            if (text != null) {
+                childEle.setContent(new Text(text.replace('%', '$')));
+            }
             dsEle.getChildren().add(childEle);
         }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
index c99a2b3..b9bf594 100644
--- a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
+++ b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
@@ -19,6 +19,10 @@
 package org.apache.oozie.extensions;
 
 import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
@@ -27,13 +31,23 @@ import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.SyncCoordDataset;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.URIHandler;
+import org.apache.oozie.dependency.URIHandlerException;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
+
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -122,23 +136,47 @@ public class TestOozieELExtensions {
         String expuris =
                 "hdfs://localhost:8020/clicks/2009/09/02/10/*/US,hdfs://localhost:8020/clicks/2009/09/02/09/*/US";
         Assert.assertEquals(expuris, CoordELFunctions.evalAndWrap(eval, "${dataIn('clicks',
'*/US')}"));
+    }
 
-        //test optional input
+    @Test(dataProvider = "optionalDatasets")
+    public void testDataInOptional(String expuris, String partition, String doneFlag) throws
Exception {
+        ELEvaluator eval = createActionStartEvaluator();
         String inName = "clicks";
+        eval.setVariable(".datain.clicks", null);
+        Services.get().setService(DummyURIHandlerService.class);
+
         SyncCoordDataset ds = createDataSet("2007-09-30T010:00Z");
         eval.setVariable(inName + ".frequency", String.valueOf(ds.getFrequency()));
         eval.setVariable(inName + ".freq_timeunit", ds.getTimeUnit().name());
         eval.setVariable(inName + ".timezone", ds.getTimeZone().getID());
         eval.setVariable(inName + ".end_of_duration", Timeunit.NONE.name());
         eval.setVariable(inName + ".initial-instance", OozieELExtensions.formatDateUTC(ds.getInitInstance()));
-        eval.setVariable(inName + ".done-flag", "notused");
+        eval.setVariable(inName + ".done-flag", doneFlag);
         eval.setVariable(inName + ".uri-template", ds.getUriTemplate());
         eval.setVariable(inName + ".start-instance", "now(-1,0)");
         eval.setVariable(inName + ".end-instance", "now(0,0)");
-        // TODO Had to comment this out for this test to PASS else NPE in
-        // TODO org.apache.oozie.command.coord.CoordCommandUtils.createEarlyURIs(CoordCommandUtils.java:359)
-        // eval.setVariable(".datain.clicks", null);
-        Assert.assertEquals(expuris, CoordELFunctions.evalAndWrap(eval, "${dataIn('clicks',
'*/US')}"));
+        eval.setVariable(OozieClient.USER_NAME, "test");
+        eval.setVariable(inName + ".empty-dir", "hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE");
+        Assert.assertEquals(CoordELFunctions.evalAndWrap(eval, "${dataIn('clicks', '" + partition
+ "')}"), expuris);
+    }
+
+    @DataProvider(name = "optionalDatasets")
+    public Object[][] getOptionalDatasets() {
+        return new Object[][] {
+            // With partitions and availability flag. All instances available.
+            {"hdfs://localhost:8020/clicks/2009/09/02/10/*/US,hdfs://localhost:8020/clicks/2009/09/02/09/*/US",
+                "*/US", "_DONE", },
+            // With availability flag. All instances missing
+            {"hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE", "null",
"_FINISH"},
+            // No availability flag. One instance missing
+            {"hdfs://localhost:8020/clicks/2009/09/02/09", "null", ""},
+            // With availability flag. One instance missing.
+            {"hdfs://localhost:8020/clicks/2009/09/02/10", "null", "_SUCCESS"},
+            // No availability flag and partition. One instance missing
+            {"hdfs://localhost:8020/clicks/2009/09/02/09/US", "US", ""},
+            // With availability flag and partition. One instance missing.
+            {"hdfs://localhost:8020/clicks/2009/09/02/10/US", "US", "_SUCCESS"},
+        };
     }
 
     @Test
@@ -309,4 +347,55 @@ public class TestOozieELExtensions {
         CoordELFunctions.configureEvaluator(eval, null, appInst);
         return eval;
     }
+
+    // A mock URIHandlerService that simulates availability of data as per testcase requirement
+    private static class DummyURIHandlerService extends URIHandlerService {
+        private URIHandler mockHandler = Mockito.mock(FSURIHandler.class);
+
+        @Override
+        public void init(Services services) throws ServiceException {
+            try {
+                Mockito.when(mockHandler.exists((URI)Mockito.argThat(new URIMatcher()),
+                        Mockito.any(Configuration.class), Mockito.matches("test"))).thenReturn(true);
+                Mockito.when(mockHandler.getURIWithDoneFlag(Mockito.anyString(),
+                        Mockito.anyString())).thenCallRealMethod();
+            } catch (URIHandlerException e) {
+                throw new ServiceException(e);
+            }
+        }
+
+        @Override
+        public void destroy() {
+
+        }
+
+        public URIHandler getURIHandler(URI uri) {
+            return mockHandler;
+        }
+
+        @Override
+        public Class<? extends Service> getInterface() {
+            return URIHandlerService.class;
+        }
+    }
+
+    private static class URIMatcher extends ArgumentMatcher {
+        private List<URI> availableURIs = new ArrayList<>();
+
+        public URIMatcher() {
+            try {
+                availableURIs.add(new URI("hdfs://localhost:8020/clicks/2009/09/02/10/_DONE"));
+                availableURIs.add(new URI("hdfs://localhost:8020/clicks/2009/09/02/09/_DONE"));
+                availableURIs.add(new URI("hdfs://localhost:8020/clicks/2009/09/02/10/_SUCCESS"));
+                availableURIs.add(new URI("hdfs://localhost:8020/clicks/2009/09/02/09"));
+            } catch (URISyntaxException e) {
+                //Shouldn't happen
+            }
+        }
+
+        @Override
+        public boolean matches(Object o) {
+            return availableURIs.contains(o);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
index 806810e..6661dd5 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.oozie.process;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.v0.EntityType;
@@ -65,12 +66,13 @@ public class ProcessBundleBuilder extends OozieBundleBuilder<Process>
{
                     properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
                     properties.put(inName + ".initial-instance",
                         SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
-                    properties.put(inName + ".done-flag", "notused");
+                    String doneFlag = feed.getAvailabilityFlag();
+                    properties.put(inName + ".done-flag", (doneFlag == null)? "" : doneFlag);
 
                     String locPath = FeedHelper.createStorage(cluster.getName(), feed)
                         .getUriTemplate(LocationType.DATA).replace('$', '%');
                     properties.put(inName + ".uri-template", locPath);
-
+                    properties.put(inName + ".empty-dir", ClusterHelper.getEmptyDir(cluster));
                     properties.put(inName + ".start-instance", in.getStart());
                     properties.put(inName + ".end-instance", in.getEnd());
                 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index f5c9948..91f4757 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -158,20 +158,20 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
             Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
 
+            if (coord.getDatasets() == null) {
+                coord.setDatasets(new DATASETS());
+            }
+
+            SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(),
LocationType.DATA);
+            if (syncdataset == null) {
+                return;
+            }
+            coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
             if (!input.isOptional()) {
-                if (coord.getDatasets() == null) {
-                    coord.setDatasets(new DATASETS());
-                }
                 if (coord.getInputEvents() == null) {
                     coord.setInputEvents(new INPUTEVENTS());
                 }
-
-                SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(),
LocationType.DATA);
-                if (syncdataset == null) {
-                    return;
-                }
-                coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
                 DATAIN datain = createDataIn(input);
                 coord.getInputEvents().getDataIn().add(datain);
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c2f7a2a5/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index f336422..b6553e9 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -198,6 +198,8 @@ public class EntityManagerJerseyIT extends AbstractSchedulerManagerJerseyIT
{
         File tmpFile = TestContext.getTempFile();
         EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
         schedule(context);
+        waitForStatus(EntityType.PROCESS.name(), context.getProcessName(), START_INSTANCE,
+                InstancesResult.WorkflowStatus.SUCCEEDED);
     }
 
     public void testDryRun() throws Exception {


Mime
View raw message