falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srik...@apache.org
Subject [6/8] git commit: FALCON-129 Disable Late data handling for hive tables. Contributed by Venkatesh Seetharam
Date Thu, 17 Oct 2013 15:45:31 GMT
FALCON-129 Disable Late data handling for hive tables. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7dbf4e8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7dbf4e8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7dbf4e8d

Branch: refs/heads/FALCON-85
Commit: 7dbf4e8d580f39cbffe7ad46f9c0c0ea3dc5ccd4
Parents: 52633a1
Author: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Authored: Thu Oct 17 20:12:30 2013 +0530
Committer: srikanth.sundarrajan <srikanth.sundarrajan@inmobi.com>
Committed: Thu Oct 17 20:12:30 2013 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/client/FalconClient.java  |  4 +-
 .../apache/falcon/catalog/CatalogPartition.java | 12 +++---
 .../falcon/cleanup/AbstractCleanupHandler.java  |  5 ++-
 .../org/apache/falcon/entity/EntityUtil.java    | 12 +++---
 .../org/apache/falcon/entity/FeedHelper.java    | 30 ++++++++++++++
 .../apache/falcon/entity/FileSystemStorage.java |  2 +-
 .../falcon/entity/parser/FeedEntityParser.java  | 33 +++++++++++++++
 .../entity/parser/ProcessEntityParser.java      | 34 ++++++++++------
 .../workflow/engine/AbstractWorkflowEngine.java |  2 +-
 .../resources/config/feed/hive-table-feed.xml   |  6 ---
 .../falcon/converter/OozieFeedMapper.java       |  1 -
 .../config/workflow/replication-workflow.xml    |  4 ++
 .../config/workflow/retention-workflow.xml      |  2 +
 .../falcon/messaging/EntityInstanceMessage.java | 24 +++++++----
 .../falcon/messaging/MessageProducer.java       |  8 +++-
 .../messaging/FalconTopicProducerTest.java      |  7 +++-
 .../falcon/messaging/FeedProducerTest.java      |  4 +-
 .../falcon/messaging/ProcessProducerTest.java   |  4 +-
 .../falcon/workflow/FalconPostProcessing.java   |  5 ++-
 .../workflow/engine/OozieWorkflowEngine.java    |  6 +--
 .../workflow/FalconPostProcessingTest.java      |  4 +-
 .../falcon/service/FalconTopicSubscriber.java   |  5 ++-
 .../falcon/converter/OozieProcessMapper.java    | 37 +++++++++++++----
 .../config/workflow/process-parent-workflow.xml |  6 +++
 .../converter/OozieProcessMapperTest.java       | 10 ++---
 .../resources/config/process/process-0.1.xml    |  2 +-
 .../apache/falcon/latedata/LateDataHandler.java | 15 ++++++-
 .../apache/falcon/rerun/event/LaterunEvent.java | 15 +++++--
 .../apache/falcon/rerun/event/RerunEvent.java   |  4 +-
 .../falcon/rerun/event/RerunEventFactory.java   |  2 +-
 .../apache/falcon/rerun/event/RetryEvent.java   |  4 +-
 .../rerun/handler/AbstractRerunHandler.java     |  4 +-
 .../falcon/rerun/handler/LateRerunConsumer.java | 43 +++++++++++---------
 .../falcon/rerun/handler/LateRerunHandler.java  | 33 ++++++++-------
 .../falcon/rerun/handler/RetryHandler.java      |  5 ++-
 .../apache/falcon/rerun/queue/ActiveMQTest.java |  3 +-
 .../falcon/resource/FeedEntityValidationIT.java | 39 ++++++++++++++++++
 webapp/src/test/resources/hive-table-feed.xml   |  2 +-
 38 files changed, 318 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 24a5bfe..1ad9831 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -312,7 +312,7 @@ public class FalconClient {
                 getServletInputStream(clusters, sourceClusters, null), null, colo);
     }
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public String rerunInstances(String type, String entity, String start,
                                  String end, String filePath, String colo,
                                  String clusters, String sourceClusters)
@@ -336,7 +336,7 @@ public class FalconClient {
         return sendInstanceRequest(Instances.RERUN, type, entity, start, end,
                 getServletInputStream(clusters, sourceClusters, temp), null, colo);
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public String rerunInstances(String type, String entity, String start,
                                  String end, String colo, String clusters, String sourceClusters)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
index ec656a3..ab312e9 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
@@ -28,8 +28,8 @@ public class CatalogPartition {
     private String databaseName;
     private String tableName;
     private List<String> values;
-    private int createTime;
-    private int lastAccessTime;
+    private long createTime;
+    private long lastAccessTime;
     private List<String> tableColumns;
     private String inputFormat;
     private String outputFormat;
@@ -51,11 +51,11 @@ public class CatalogPartition {
         this.values = values;
     }
 
-    protected void setCreateTime(int createTime) {
+    protected void setCreateTime(long createTime) {
         this.createTime = createTime;
     }
 
-    protected void setLastAccessTime(int lastAccessTime) {
+    protected void setLastAccessTime(long lastAccessTime) {
         this.lastAccessTime = lastAccessTime;
     }
 
@@ -147,7 +147,7 @@ public class CatalogPartition {
      *
      * @return the last access time
      */
-    public int getLastAccessTime() {
+    public long getLastAccessTime() {
         return this.lastAccessTime;
     }
 
@@ -156,7 +156,7 @@ public class CatalogPartition {
      *
      * @return the creates the time
      */
-    public int getCreateTime() {
+    public long getCreateTime() {
         return this.createTime;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index e2943e1..644afd2 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -106,6 +106,10 @@ public abstract class AbstractCleanupHandler {
 
     protected void delete(Cluster cluster, Entity entity, long retention, FileStatus[] logs)
         throws FalconException {
+        if (logs == null || logs.length == 0) {
+            LOG.info("Nothing to delete for cluster: " + cluster.getName() + ", entity: " + entity.getName());
+            return;
+        }
 
         long now = System.currentTimeMillis();
 
@@ -141,7 +145,6 @@ public abstract class AbstractCleanupHandler {
             fs.delete(parent, true);
             deleteParentIfEmpty(fs, parent.getParent());
         }
-
     }
 
     public abstract void cleanup() throws FalconException;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 7ceaf97..658764a 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -558,6 +558,11 @@ public final class EntityUtil {
                     .equalsIgnoreCase("true")) {
                 return null;
             }
+
+            if (FeedHelper.getStorageType((Feed) entity) == Storage.TYPE.TABLE) {
+                return null;
+            }
+
             LateProcess lateProcess = new LateProcess();
             lateProcess.setDelay(new Frequency(RuntimeProperties.get()
                     .getProperty("feed.late.frequency", "hours(3)")));
@@ -598,10 +603,7 @@ public final class EntityUtil {
     }
 
     public static boolean responsibleFor(String colo) {
-        if (DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
-                && colo.equals(DeploymentUtil.getCurrentColo()))) {
-            return true;
-        }
-        return false;
+        return DeploymentUtil.isEmbeddedMode() || (!DeploymentUtil.isPrism()
+                && colo.equals(DeploymentUtil.getCurrentColo()));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index e76ed2f..a1b9cb8 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -155,6 +155,36 @@ public final class FeedHelper {
         throw new IllegalArgumentException("Bad type: " + type);
     }
 
+    public static Storage.TYPE getStorageType(Feed feed) throws FalconException {
+        final Locations feedLocations = feed.getLocations();
+        if (feedLocations != null
+                && feedLocations.getLocations().size() != 0) {
+            return Storage.TYPE.FILESYSTEM;
+        }
+
+        final CatalogTable table = feed.getTable();
+        if (table != null) {
+            return Storage.TYPE.TABLE;
+        }
+
+        throw new FalconException("Both catalog and locations are not defined.");
+    }
+
+    public static Storage.TYPE getStorageType(Feed feed,
+                                              Cluster cluster) throws FalconException {
+        final List<Location> locations = getLocations(cluster, feed);
+        if (locations != null) {
+            return Storage.TYPE.FILESYSTEM;
+        }
+
+        final CatalogTable table = getTable(cluster, feed);
+        if (table != null) {
+            return Storage.TYPE.TABLE;
+        }
+
+        throw new FalconException("Both catalog and locations are not defined.");
+    }
+
     private static List<Location> getLocations(Cluster cluster, Feed feed) {
         // check if locations are overridden in cluster
         final Locations clusterLocations = cluster.getLocations();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 4e2dae9..099ee71 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -72,7 +72,7 @@ public class FileSystemStorage implements Storage {
      * Create an instance from the URI Template that was generated using
      * the getUriTemplate() method.
      *
-     * @param uriTemplate the uri template from org.apache.falcon.entity.CatalogStorage#getUriTemplate
+     * @param uriTemplate the uri template from org.apache.falcon.entity.FileSystemStorage#getUriTemplate
      * @throws URISyntaxException
      */
     protected FileSystemStorage(String uriTemplate) throws URISyntaxException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index b72efc6..58d04d2 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -288,6 +288,39 @@ public class FeedEntityParser extends EntityParser<Feed> {
      * Does not matter for FileSystem storage.
      */
     private void validateFeedStorage(Feed feed) throws FalconException {
+        final Storage.TYPE storageType = FeedHelper.getStorageType(feed);
+        validateUniformStorageType(feed, storageType);
+        validatePartitions(feed, storageType);
+        validateLateData(feed, storageType);
+        validateStorageExists(feed);
+    }
+
+    private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws FalconException {
+        for (Cluster cluster : feed.getClusters().getClusters()) {
+            Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
+
+            if (feedStorageType != feedClusterStorageType) {
+                throw new ValidationException("The storage type is not uniform for cluster: " + cluster.getName());
+            }
+        }
+    }
+
+    private void validatePartitions(Feed feed, Storage.TYPE storageType) throws  FalconException {
+        if (storageType == Storage.TYPE.TABLE && feed.getPartitions() != null) {
+            throw new ValidationException("Partitions are not supported for feeds with table storage. "
+                    + "It should be defined as part of the table URI. "
+                    + feed.getName());
+        }
+    }
+
+    private void validateLateData(Feed feed, Storage.TYPE storageType) throws FalconException {
+        if (storageType == Storage.TYPE.TABLE && feed.getLateArrival() != null) {
+            throw new ValidationException("Late data handling is not supported for feeds with table storage! "
+                    + feed.getName());
+        }
+    }
+
+    private void validateStorageExists(Feed feed) throws FalconException {
         StringBuilder buffer = new StringBuilder();
         for (Cluster cluster : feed.getClusters().getClusters()) {
             final Storage storage = FeedHelper.createStorage(cluster, feed);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 8fa4364..e74745d 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -175,6 +175,10 @@ public class ProcessEntityParser extends EntityParser<Process> {
     }
 
     private void validateLateInputs(Process process) throws ValidationException {
+        if (process.getLateProcess() == null) {
+            return;
+        }
+
         Map<String, String> feeds = new HashMap<String, String>();
         if (process.getInputs() != null) {
             for (Input in : process.getInputs().getInputs()) {
@@ -182,26 +186,30 @@ public class ProcessEntityParser extends EntityParser<Process> {
             }
         }
 
-        if (process.getLateProcess() != null) {
-            for (LateInput lp : process.getLateProcess().getLateInputs()) {
-                if (!feeds.keySet().contains(lp.getInput())) {
-                    throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
+        for (LateInput lp : process.getLateProcess().getLateInputs()) {
+            if (!feeds.keySet().contains(lp.getInput())) {
+                throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
+            }
+
+            try {
+                Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
+                if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+                    throw new ValidationException("Late data handling is not supported for feeds with table storage! "
+                            + feed.getName());
                 }
-                try {
-                    Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
-                    if (feed.getLateArrival() == null) {
-                        throw new ValidationException(
-                                "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
-                    }
-                } catch (FalconException e) {
-                    throw new ValidationException(e);
+
+                if (feed.getLateArrival() == null) {
+                    throw new ValidationException(
+                            "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
                 }
+            } catch (FalconException e) {
+                throw new ValidationException(e);
             }
         }
     }
 
     private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException {
-        if (input.isOptional() && FeedHelper.createStorage(feed).getType() == Storage.TYPE.TABLE) {
+        if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
             throw new ValidationException("Optional Input is not supported for feeds with table storage! "
                     + input.getName());
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 17695d2..b86a715 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -81,7 +81,7 @@ public abstract class AbstractWorkflowEngine {
 
     public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
 
-    public abstract String getWorkflowProperty(String cluster, String jobId, String property) throws FalconException;
+    public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException;
 
     public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/common/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed.xml b/common/src/test/resources/config/feed/hive-table-feed.xml
index dfc2c2b..ef936d2 100644
--- a/common/src/test/resources/config/feed/hive-table-feed.xml
+++ b/common/src/test/resources/config/feed/hive-table-feed.xml
@@ -17,16 +17,10 @@
   limitations under the License.
   -->
 <feed description="clicks log" name="clicks" xmlns="uri:falcon:feed:0.1">
-    <partitions>
-        <partition name="fraud"/>
-        <partition name="good"/>
-    </partitions>
-
     <groups>online,bi</groups>
 
     <frequency>hours(1)</frequency>
     <timezone>UTC</timezone>
-    <late-arrival cut-off="hours(6)"/>
 
     <clusters>
         <cluster name="testCluster" type="source" partition="*/${cluster.colo}">

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index 4e11a98..e32f5d9 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -416,7 +416,6 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                 } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
                     instancePaths = "${coord:dataIn('input')}";
 
-                    props.put("shouldRecord", "false"); // todo - override until late data is handled
                     final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
                     propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
                     final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 306a249..ad9edb8 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -46,6 +46,8 @@
             <arg>${falconInPaths}</arg>
             <arg>-falconInputFeeds</arg>
             <arg>${falconInputFeeds}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <capture-output/>
         </java>
         <ok to="replication-decision"/>
@@ -264,6 +266,8 @@
             <arg>${feedNames}</arg>
             <arg>-feedInstancePaths</arg>
             <arg>${feedInstancePaths}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
             <arg>-workflowEngineUrl</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 8b444f5..8db5576 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -107,6 +107,8 @@
             <arg>${wf:conf("broker.ttlInMins")}</arg>
             <arg>-cluster</arg>
             <arg>${cluster}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
             <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
             <file>${wf:conf("falcon.libpath")}/jms.jar</file>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index ddd6781..f1783b5 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -55,12 +55,23 @@ public class EntityInstanceMessage {
      * properties available in feed entity operation workflow.
      */
     public enum ARG {
-        entityName("entityName"), feedNames("feedNames"), feedInstancePaths(
-                "feedInstancePaths"), workflowId("workflowId"), runId("runId"), nominalTime(
-                "nominalTime"), timeStamp("timeStamp"), brokerUrl("broker.url"), brokerImplClass(
-                "broker.impl.class"), entityType("entityType"), operation(
-                "operation"), logFile("logFile"), topicName("topicName"), status(
-                "status"), brokerTTL("broker.ttlInMins"), cluster("cluster");
+        entityName("entityName"),
+        feedNames("feedNames"),
+        feedInstancePaths("feedInstancePaths"),
+        workflowId("workflowId"),
+        runId("runId"),
+        nominalTime("nominalTime"),
+        timeStamp("timeStamp"),
+        brokerUrl("broker.url"),
+        brokerImplClass("broker.impl.class"),
+        entityType("entityType"),
+        operation("operation"),
+        logFile("logFile"),
+        topicName("topicName"),
+        status("status"),
+        brokerTTL("broker.ttlInMins"),
+        cluster("cluster"),
+        falconFeedStorageType("falconFeedStorageType");
 
         private String propName;
 
@@ -219,6 +230,5 @@ public class EntityInstanceMessage {
         DateFormat falconFormat = new SimpleDateFormat(
                 "yyyy'-'MM'-'dd'T'HH':'mm'Z'");
         return falconFormat.format(nominalDate);
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
index cf5c2d7..3a0193a 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
@@ -124,12 +124,18 @@ public class MessageProducer extends Configured implements Tool {
                 "workflow id"));
         addOption(options, new Option(ARG.cluster.getArgName(), true,
                 "cluster name"));
+        addOption(options, new Option(ARG.falconFeedStorageType.getArgName(), true,
+                "feed storage type: filesystem or table"), false);
 
         return new GnuParser().parse(options, arguments);
     }
 
     private static void addOption(Options options, Option opt) {
-        opt.setRequired(true);
+        addOption(options, opt, true);
+    }
+
+    private static void addOption(Options options, Option opt, boolean isRequired) {
+        opt.setRequired(isRequired);
         options.addOption(opt);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index 9912678..44d4bd8 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -74,7 +74,8 @@ public class FalconTopicProducerTest {
                                      "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                                      "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                                      "-" + ARG.brokerTTL.getArgName(), "10",
-                                     "-" + ARG.cluster.getArgName(), "corp", };
+                                     "-" + ARG.cluster.getArgName(), "corp",
+                                     "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
         testProcessMessageCreator(args);
     }
 
@@ -96,7 +97,8 @@ public class FalconTopicProducerTest {
                                      "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                                      "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                                      "-" + ARG.brokerTTL.getArgName(), "10",
-                                     "-" + ARG.cluster.getArgName(), "corp", };
+                                     "-" + ARG.cluster.getArgName(), "corp",
+                                     "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
         testProcessMessageCreator(args);
     }
 
@@ -163,5 +165,6 @@ public class FalconTopicProducerTest {
         Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
                 "2012-01-01T01:00Z");
         Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+        Assert.assertEquals(m.getString(ARG.falconFeedStorageType.getArgName()), "FILESYSTEM");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 6a6dc35..3fc5a80 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -86,7 +86,8 @@ public class FeedProducerTest {
                             "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                             "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                             "-" + ARG.brokerTTL.getArgName(), "10",
-                            "-" + ARG.cluster.getArgName(), "corp", };
+                            "-" + ARG.cluster.getArgName(), "corp",
+                            "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
 
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
@@ -208,6 +209,7 @@ public class FeedProducerTest {
         Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
                 "2012-01-01T01:00Z");
         Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+        Assert.assertEquals(m.getString(ARG.falconFeedStorageType.getArgName()), "FILESYSTEM");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 078b9c2..a5fc60a 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -61,7 +61,8 @@ public class ProcessProducerTest {
                             "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                             "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                             "-" + ARG.brokerTTL.getArgName(), "10",
-                            "-" + ARG.cluster.getArgName(), "corp", };
+                            "-" + ARG.cluster.getArgName(), "corp",
+                            "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
         broker.setDataDirectory("target/activemq");
@@ -145,5 +146,6 @@ public class ProcessProducerTest {
         Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
                 "2012-01-01T01:00Z");
         Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
+        Assert.assertEquals(m.getString(ARG.falconFeedStorageType.getArgName()), "FILESYSTEM");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 21a2f8e..0f98c3a 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -61,7 +61,8 @@ public class FalconPostProcessing extends Configured implements Tool {
         WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
         USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
         USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
-        LOG_DIR("logDir", "log dir where job logs are copied");
+        LOG_DIR("logDir", "log dir where job logs are copied"),
+        FEED_STORAGE_TYPE("falconFeedStorageType", "feed's storage type");
 
         private String name;
         private String description;
@@ -155,6 +156,7 @@ public class FalconPostProcessing extends Configured implements Tool {
         addArg(args, cmd, Arg.FEED_NAMES);
         addArg(args, cmd, Arg.FEED_INSTANCE_PATHS);
         addArg(args, cmd, Arg.LOG_FILE);
+        addArg(args, cmd, Arg.FEED_STORAGE_TYPE);
 
         MessageProducer.main(args.toArray(new String[0]));
     }
@@ -204,6 +206,7 @@ public class FalconPostProcessing extends Configured implements Tool {
         addOption(options, Arg.USER_SUBFLOW_ID);
         addOption(options, Arg.USER_WORKFLOW_ENGINE, false);
         addOption(options, Arg.LOG_DIR);
+        addOption(options, Arg.FEED_STORAGE_TYPE);
         return new GnuParser().parse(options, arguments);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index c086430..a6ecf3e 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -1267,14 +1267,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public String getWorkflowProperty(String cluster, String jobId,
-                                      String property) throws FalconException {
+    public Properties getWorkflowProperties(String cluster, String jobId) throws FalconException {
         OozieClient client = OozieClientFactory.get(cluster);
         try {
             WorkflowJob jobInfo = client.getJobInfo(jobId);
             String conf = jobInfo.getConf();
-            Properties props = OozieUtils.toProperties(conf);
-            return props.getProperty(property);
+            return OozieUtils.toProperties(conf);
         } catch (Exception e) {
             throw new FalconException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index c6485cd..da18652 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -69,7 +69,8 @@ public class FalconPostProcessingTest {
                             "http://localhost:11000/oozie/",
                             "-" + Arg.LOG_DIR.getOptionName(), "target/log",
                             "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
-                            "-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie", };
+                            "-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie",
+                            "-" + Arg.FEED_STORAGE_TYPE.getOptionName(), "FILESYSTEM", };
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
         broker.setDataDirectory("target/activemq");
@@ -133,6 +134,7 @@ public class FalconPostProcessingTest {
             Assert.assertEquals(
                     m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
                     "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20");
+            Assert.assertEquals(m.getString(Arg.FEED_STORAGE_TYPE.getOptionName()), "FILESYSTEM");
         } else {
             Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
                     "click-logs");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index f11998a..b29b9e1 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -91,6 +91,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
             String nominalTime = mapMessage.getString(ARG.nominalTime.getArgName());
             String status = mapMessage.getString(ARG.status.getArgName());
             String operation = mapMessage.getString(ARG.operation.getArgName());
+            String feedStorageType = mapMessage.getString(ARG.falconFeedStorageType.getArgName());
 
             AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
             InstancesResult result = wfEngine.getJobDetails(cluster, workflowId);
@@ -100,7 +101,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
             if (status.equalsIgnoreCase("FAILED")) {
                 retryHandler.handleRerun(cluster, entityType, entityName,
                         nominalTime, runId, workflowId,
-                        System.currentTimeMillis());
+                        System.currentTimeMillis(), feedStorageType);
                 GenericAlert.instrumentFailedInstance(cluster, entityType,
                         entityName, nominalTime, workflowId, runId, operation,
                         SchemaHelper.formatDateUTC(startTime),
@@ -108,7 +109,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
             } else if (status.equalsIgnoreCase("SUCCEEDED")) {
                 latedataHandler.handleRerun(cluster, entityType, entityName,
                         nominalTime, runId, workflowId,
-                        System.currentTimeMillis());
+                        System.currentTimeMillis(), feedStorageType);
                 GenericAlert.instrumentSucceededInstance(cluster, entityType,
                         entityName, nominalTime, workflowId, runId, operation,
                         SchemaHelper.formatDateUTC(startTime),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 1db4dbc..ee7167d 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -27,6 +27,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -35,6 +36,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
@@ -116,6 +118,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
         initializeInputPaths(cluster, process, coord, props); // inputs
         initializeOutputPaths(cluster, process, coord, props);  // outputs
+        propagateStorageType(process, props);  // falconFeedStorageType
 
         Workflow processWorkflow = process.getWorkflow();
         props.put("userWorkflowEngine", processWorkflow.getEngine().value());
@@ -231,15 +234,11 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             coord.setOutputEvents(new OUTPUTEVENTS());
         }
 
-        String feedStorageType = null;
         List<String> outputFeeds = new ArrayList<String>();
         List<String> outputPaths = new ArrayList<String>();
         for (Output output : process.getOutputs().getOutputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
-            if (feedStorageType == null) {
-                feedStorageType = storage.getType().name();
-            }
 
             SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
             coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
@@ -263,8 +262,30 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         // Output feed name and path for parent workflow
         props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
         props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+    }
+
+    private void propagateStorageType(Process process, Map<String, String> props) throws FalconException {
+        Storage.TYPE feedStorageType = Storage.TYPE.FILESYSTEM; // defaults to FS.
+
+        if (process.getLateProcess() != null) {
+            Map<String, String> feeds = new HashMap<String, String>();
+            if (process.getInputs() != null) {
+                for (Input in : process.getInputs().getInputs()) {
+                    feeds.put(in.getName(), in.getFeed());
+                }
+            }
 
-        props.put("falconFeedStorageType", feedStorageType);
+            for (LateInput lp : process.getLateProcess().getLateInputs()) {
+                Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
+                if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+                    feedStorageType = Storage.TYPE.TABLE;
+                    break;  // break if one of 'em is a table as late data wont apply
+                }
+            }
+        }
+
+        // this is currently only used for late data handling.
+        props.put("falconFeedStorageType", feedStorageType.name());
     }
 
     private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
@@ -320,7 +341,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
     }
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
                                    COORDINATORAPP coord, Map<String, String> props, Storage storage)
         throws FalconException {
@@ -346,7 +367,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         String outputExpr = "${coord:dataOut('" + name + type + "')}";
         props.put(name + "." + type, outputExpr);
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
                                                        Map<String, String> props, String prefix) {
@@ -365,8 +386,6 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
         props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
         props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
-
-        props.put("shouldRecord", "false"); // todo - override until late data is handled
     }
 
     private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index 565c4ac..048191c 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -46,6 +46,8 @@
             <arg>${falconInPaths}</arg>
             <arg>-falconInputFeeds</arg>
             <arg>${falconInputFeeds}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <capture-output/>
         </java>
         <ok to="user-workflow"/>
@@ -147,6 +149,8 @@
             <arg>${feedNames}</arg>
             <arg>-feedInstancePaths</arg>
             <arg>${feedInstancePaths}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
             <arg>-workflowEngineUrl</arg>
@@ -214,6 +218,8 @@
             <arg>${feedNames}</arg>
             <arg>-feedInstancePaths</arg>
             <arg>${feedInstancePaths}</arg>
+            <arg>-falconFeedStorageType</arg>
+            <arg>${falconFeedStorageType}</arg>
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
             <arg>-workflowEngineUrl</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 3313b2c..5330b9f 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -45,6 +45,7 @@ import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -163,12 +164,12 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         assertEquals(ds.getUriTemplate(),
                 FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
 
+        HashMap<String, String> props = new HashMap<String, String>();
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            if (prop.getName().equals("mapred.job.priority")) {
-                assertEquals(prop.getValue(), "LOW");
-                break;
-            }
+            props.put(prop.getName(), prop.getValue());
         }
+        assertEquals(props.get("mapred.job.priority"), "LOW");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
 
         assertLibExtensions(coord);
     }
@@ -285,7 +286,6 @@ public class OozieProcessMapperTest extends AbstractTestBase {
             props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
         } else if (prefix.equals("output")) {
             props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
-            props.put("shouldRecord", "false"); // todo - override until late data is handled
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/process/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-0.1.xml b/process/src/test/resources/config/process/process-0.1.xml
index 91d5e0f..975d1a4 100644
--- a/process/src/test/resources/config/process/process-0.1.xml
+++ b/process/src/test/resources/config/process/process-0.1.xml
@@ -40,6 +40,6 @@
 
     <late-process policy="exp-backoff" delay="hours(1)">
         <late-input input="impression" workflow-path="hdfs://impression/late/workflow"/>
-        <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+        <late-input input="click" workflow-path="hdfs://clicks/late/workflow"/>
     </late-process>
 </process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 5b758b8..0c7b7b7 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.latedata;
 
 import org.apache.commons.cli.*;
+import org.apache.falcon.entity.Storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -56,15 +57,20 @@ public class LateDataHandler extends Configured implements Tool {
         Option opt = new Option("out", true, "Out file name");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("paths", true,
                 "Comma separated path list, further separated by #");
         opt.setRequired(true);
         options.addOption(opt);
+
         opt = new Option("falconInputFeeds", true,
                 "Input feed names, further separated by #");
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new Option("falconFeedStorageType", true, "Feed storage type: FileSystem or Table");
+        opt.setRequired(true);
+
         return new GnuParser().parse(options, args);
     }
 
@@ -73,6 +79,12 @@ public class LateDataHandler extends Configured implements Tool {
 
         CommandLine command = getCommand(args);
 
+        final String falconFeedStorageType = command.getOptionValue("falconFeedStorageType");
+        if (Storage.TYPE.valueOf(falconFeedStorageType) == Storage.TYPE.TABLE) {
+            LOG.info("Late data not supported for table storage.");
+            return 0; // Late Data is not handled for table storage
+        }
+
         Path file = new Path(command.getOptionValue("out"));
         Map<String, Long> map = new LinkedHashMap<String, Long>();
         String pathStr = getOptionValue(command, "paths");
@@ -81,8 +93,7 @@ public class LateDataHandler extends Configured implements Tool {
         }
 
         String[] pathGroups = pathStr.split("#");
-        String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split(
-                "#");
+        String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split("#");
         for (int index = 0; index < pathGroups.length; index++) {
             long usage = 0;
             for (String pathElement : pathGroups[index].split(",")) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index dcde876..6523acc 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -22,14 +22,21 @@ package org.apache.falcon.rerun.event;
  */
 public class LaterunEvent extends RerunEvent {
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+    private String feedStorageType;
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
                         long delay, String entityType, String entityName, String instance,
-                        int runId) {
+                        int runId, String feedStorageType) {
         super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
                 instance, runId);
+        this.feedStorageType = feedStorageType;
+    }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+
+    public String getFeedStorageType() {
+        return feedStorageType;
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
 
     @Override
     public String toString() {
@@ -37,6 +44,6 @@ public class LaterunEvent extends RerunEvent {
                 + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
                 + delayInMilliSec + SEP + "entityType=" + entityType + SEP
                 + "entityName=" + entityName + SEP + "instance=" + instance
-                + SEP + "runId=" + runId;
+                + SEP + "runId=" + runId + ", falconFeedStorageType=" + feedStorageType;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 0dcc93d..baf4601 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -45,7 +45,7 @@ public class RerunEvent implements Delayed {
     protected String instance;
     protected int runId;
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
                       String entityType, String entityName, String instance, int runId) {
         this.clusterName = clusterName;
@@ -57,7 +57,7 @@ public class RerunEvent implements Delayed {
         this.runId = runId;
         this.entityType = entityType;
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public String getClusterName() {
         return clusterName;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index 54bbecf..a949c19 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -45,7 +45,7 @@ public class RerunEventFactory<T extends RerunEvent> {
         return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
                 Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), map.get("instance"),
-                Integer.parseInt(map.get("runId")));
+                Integer.parseInt(map.get("runId")), map.get("falconFeedStorageType"));
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
index 44bf96e..1396f19 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RetryEvent.java
@@ -25,7 +25,7 @@ public class RetryEvent extends RerunEvent {
     private int attempts;
     private int failRetryCount;
 
-    //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public RetryEvent(String clusterName, String wfId, long msgInsertTime,
                       long delay, String entityType, String entityName, String instance,
                       int runId, int attempts, int failRetryCount) {
@@ -34,7 +34,7 @@ public class RetryEvent extends RerunEvent {
         this.attempts = attempts;
         this.failRetryCount = failRetryCount;
     }
-    //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public int getAttempts() {
         return attempts;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index 8a41bf8..d53d05f 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -46,9 +46,11 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
         this.delayQueue.init();
     }
 
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public abstract void handleRerun(String cluster, String entityType,
                                      String entityName, String nominalTime, String runId, String wfId,
-                                     long msgReceivedTime);
+                                     long msgReceivedTime, String feedStorageType);
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     public AbstractWorkflowEngine getWfEngine() {
         return wfEngine;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 4088a59..0c66f00 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -19,12 +19,14 @@ package org.apache.falcon.rerun.handler;
 
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.latedata.LateDataHandler;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +59,12 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
                 return;
             }
 
+            final String falconFeedStorageType = message.getFeedStorageType();
+            if (Storage.TYPE.valueOf(falconFeedStorageType) == Storage.TYPE.TABLE) {
+                LOG.info("Late rerun not supported for table storage in entity: " + message.getEntityName());
+                return;
+            }
+
             String detectLate = detectLate(message);
 
             if (detectLate.equals("")) {
@@ -67,15 +75,13 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
                 handler.handleRerun(cluster, message.getEntityType(),
                         message.getEntityName(), message.getInstance(),
                         Integer.toString(message.getRunId()),
-                        message.getWfId(), System.currentTimeMillis());
+                        message.getWfId(), System.currentTimeMillis(), message.getFeedStorageType());
                 return;
             }
 
-            LOG.info("Late changes detected in the following feeds: "
-                    + detectLate);
+            LOG.info("Late changes detected in the following feeds: " + detectLate);
 
-            handler.getWfEngine().reRun(message.getClusterName(),
-                    message.getWfId(), null);
+            handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null);
             LOG.info("Scheduled late rerun for wf-id: " + message.getWfId()
                     + " on cluster: " + message.getClusterName());
         } catch (Exception e) {
@@ -95,26 +101,23 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
 
     public String detectLate(LaterunEvent message) throws Exception {
         LateDataHandler late = new LateDataHandler();
-        String falconInputFeeds = handler.getWfEngine().getWorkflowProperty(
-                message.getClusterName(), message.getWfId(), "falconInputFeeds");
-        String logDir = handler.getWfEngine().getWorkflowProperty(
-                message.getClusterName(), message.getWfId(), "logDir");
-        String falconInPaths = handler.getWfEngine().getWorkflowProperty(
-                message.getClusterName(), message.getWfId(), "falconInPaths");
-        String nominalTime = handler.getWfEngine().getWorkflowProperty(
-                message.getClusterName(), message.getWfId(), "nominalTime");
-        String srcClusterName = handler.getWfEngine().getWorkflowProperty(
-                message.getClusterName(), message.getWfId(), "srcClusterName");
-
-        Configuration conf = handler.getConfiguration(message.getClusterName(),
-                message.getWfId());
-        Path lateLogPath = handler.getLateLogPath(logDir, nominalTime,
-                srcClusterName);
+        Properties properties = handler.getWfEngine().getWorkflowProperties(
+                message.getClusterName(), message.getWfId());
+        String falconInputFeeds = properties.getProperty("falconInputFeeds");
+        String logDir = properties.getProperty("logDir");
+        String falconInPaths = properties.getProperty("falconInPaths");
+        String nominalTime = properties.getProperty("nominalTime");
+        String srcClusterName = properties.getProperty("srcClusterName");
+        Path lateLogPath = handler.getLateLogPath(logDir, nominalTime, srcClusterName);
+
+        final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
+        Configuration conf = LateRerunHandler.getConfiguration(storageEndpoint);
         FileSystem fs = FileSystem.get(conf);
         if (!fs.exists(lateLogPath)) {
             LOG.warn("Late log file:" + lateLogPath + " not found:");
             return "";
         }
+
         Map<String, Long> feedSizes = new LinkedHashMap<String, Long>();
         String[] pathGroups = falconInPaths.split("#");
         String[] inputFeeds = falconInputFeeds.split("#");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 1e4bd25..b37a2b6 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -20,6 +20,7 @@ package org.apache.falcon.rerun.handler;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -38,7 +39,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import java.util.Date;
+import java.util.*;
 
 /**
  * An implementation of handler for late reruns.
@@ -49,9 +50,13 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
         AbstractRerunHandler<LaterunEvent, M> {
 
     @Override
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public void handleRerun(String cluster, String entityType,
                             String entityName, String nominalTime, String runId, String wfId,
-                            long msgReceivedTime) {
+                            long msgReceivedTime, String feedStorageType) {
+        if (Storage.TYPE.TABLE.name().equals(feedStorageType)) {
+            return;
+        }
 
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
@@ -72,15 +77,17 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             Long wait = getEventDelay(entity, nominalTime);
             if (wait == -1) {
                 LOG.info("Late rerun expired for entity: " + entityType + "(" + entityName + ")");
-                String logDir = this.getWfEngine().getWorkflowProperty(cluster,
-                        wfId, "logDir");
-                String srcClusterName = this.getWfEngine().getWorkflowProperty(
-                        cluster, wfId, "srcClusterName");
+
+                java.util.Properties properties =
+                        this.getWfEngine().getWorkflowProperties(cluster, wfId);
+                String logDir = properties.getProperty("logDir");
+                String srcClusterName = properties.getProperty("srcClusterName");
                 Path lateLogPath = this.getLateLogPath(logDir,
                         EntityUtil.fromUTCtoURIDate(nominalTime), srcClusterName);
+
                 LOG.info("Going to delete path:" + lateLogPath);
-                FileSystem fs = FileSystem.get(getConfiguration(cluster,
-                        wfId));
+                final String storageEndpoint = properties.getProperty(AbstractWorkflowEngine.NAME_NODE);
+                FileSystem fs = FileSystem.get(getConfiguration(storageEndpoint));
                 if (fs.exists(lateLogPath)) {
                     boolean deleted = fs.delete(lateLogPath, true);
                     if (deleted) {
@@ -95,7 +102,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                     + " And WorkflowId: " + wfId);
             LaterunEvent event = new LaterunEvent(cluster, wfId,
                     msgInsertTime.getTime(), wait, entityType, entityName,
-                    nominalTime, intRunId);
+                    nominalTime, intRunId, feedStorageType);
             offerToQueue(event);
         } catch (Exception e) {
             LOG.error("Unable to schedule late rerun for entity instance : "
@@ -105,6 +112,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                     nominalTime, wfId, runId, e.getMessage());
         }
     }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private long getEventDelay(Entity entity, String nominalTime) throws FalconException {
 
@@ -216,12 +224,9 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
 
     }
 
-    public Configuration getConfiguration(String cluster, String wfId) throws FalconException {
+    public static Configuration getConfiguration(String storageEndpoint) throws FalconException {
         Configuration conf = new Configuration();
-        conf.set(
-                CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
-                this.getWfEngine().getWorkflowProperty(cluster, wfId,
-                        AbstractWorkflowEngine.NAME_NODE));
+        conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, storageEndpoint);
         return conf;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 2b41a7c..28e40d2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -38,8 +38,10 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
         AbstractRerunHandler<RetryEvent, M> {
 
     @Override
+    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public void handleRerun(String cluster, String entityType, String entityName,
-                            String nominalTime, String runId, String wfId, long msgReceivedTime) {
+                            String nominalTime, String runId, String wfId,
+                            long msgReceivedTime, String feedStorageType) {
         try {
             Entity entity = getEntity(entityType, entityName);
             Retry retry = getRetry(entity);
@@ -78,6 +80,7 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
             GenericAlert.alertRetryFailed(entityType, entityName, nominalTime, wfId, runId, e.getMessage());
         }
     }
+    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     @Override
     public void init(M aDelayQueue) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index 01d0415..66a83f2 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.rerun.queue;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.testng.Assert;
@@ -51,7 +52,7 @@ public class ActiveMQTest {
 
         RerunEvent event = new LaterunEvent("clusterName", "wfId",
                 System.currentTimeMillis(), 60 * 1000, "entityType",
-                "entityName", "instance", 0);
+                "entityName", "instance", 0, Storage.TYPE.FILESYSTEM.name());
 
         try {
             activeMQueue.offer(event);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
index 93de109..540691f 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/FeedEntityValidationIT.java
@@ -19,17 +19,28 @@
 package org.apache.falcon.resource;
 
 import com.sun.jersey.api.client.ClientResponse;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.catalog.HiveCatalogService;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.FeedEntityParser;
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LateArrival;
 import org.apache.hcatalog.api.HCatClient;
 import org.apache.hcatalog.api.HCatCreateDBDesc;
 import org.apache.hcatalog.api.HCatCreateTableDesc;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import javax.xml.bind.Marshaller;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Map;
 
@@ -110,6 +121,34 @@ public class FeedEntityValidationIT {
         context.assertSuccessful(response);
     }
 
+    /**
+     * Late data handling test.
+     *
+     * @throws Exception
+     */
+    @Test (expectedExceptions = FalconException.class)
+    public void testFeedEntityWithValidTableAndLateArrival() throws Exception {
+        Map<String, String> overlay = context.getUniqueOverlay();
+        overlay.put("colo", "default"); // validations will be ignored if not default & tests fail
+        overlay.put("tableUri", TABLE_URI);
+
+        String filePath = context.overlayParametersOverTemplate("/hive-table-feed.xml", overlay);
+        InputStream stream = new FileInputStream(filePath);
+        FeedEntityParser parser = (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+        Feed feed = parser.parse(stream);
+        Assert.assertNotNull(feed);
+
+        final LateArrival lateArrival = new LateArrival();
+        lateArrival.setCutOff(new Frequency("4", Frequency.TimeUnit.hours));
+        feed.setLateArrival(lateArrival);
+
+        StringWriter stringWriter = new StringWriter();
+        Marshaller marshaller = EntityType.FEED.getMarshaller();
+        marshaller.marshal(feed, stringWriter);
+        System.out.println(stringWriter.toString());
+        parser.parseAndValidate(stringWriter.toString());
+    }
+
     @DataProvider(name = "invalidTableUris")
     public Object[][] createInvalidTableUriData() {
         return new Object[][] {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7dbf4e8d/webapp/src/test/resources/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/hive-table-feed.xml b/webapp/src/test/resources/hive-table-feed.xml
index 8f9d80a..4434e64 100644
--- a/webapp/src/test/resources/hive-table-feed.xml
+++ b/webapp/src/test/resources/hive-table-feed.xml
@@ -21,7 +21,7 @@
 
     <frequency>hours(1)</frequency>
     <timezone>UTC</timezone>
-    <late-arrival cut-off="hours(6)"/>
+    <!--<late-arrival cut-off="hours(6)"/>-->
 
     <clusters>
         <cluster name="##cluster##" type="source">


Mime
View raw message