falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [11/13] git commit: FALCON-143 Enable Late data handling for hive tables. Contributed by Venkatesh Seetharam
Date Fri, 01 Nov 2013 23:42:12 GMT
FALCON-143 Enable Late data handling for hive tables. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/eb8ca3de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/eb8ca3de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/eb8ca3de

Branch: refs/heads/FALCON-85
Commit: eb8ca3de624fdaf6ea300fac4f1a9387afefe78b
Parents: 502990c
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Thu Oct 31 22:41:06 2013 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Fri Nov 1 13:40:40 2013 -0700

----------------------------------------------------------------------
 .../falcon/catalog/AbstractCatalogService.java  |   14 +
 .../falcon/catalog/HiveCatalogService.java      |   16 +-
 .../org/apache/falcon/entity/EntityUtil.java    |    4 -
 .../falcon/entity/parser/FeedEntityParser.java  |    8 -
 .../entity/parser/ProcessEntityParser.java      |    5 -
 .../falcon/entity/CatalogStorageTest.java       |   13 +
 .../falcon/converter/OozieFeedMapper.java       |   12 +-
 .../config/workflow/replication-workflow.xml    |   17 +-
 .../config/workflow/retention-workflow.xml      |    2 -
 .../falcon/converter/OozieFeedMapperTest.java   |   99 +-
 .../falcon/messaging/EntityInstanceMessage.java |    3 +-
 .../falcon/messaging/MessageProducer.java       |    3 -
 .../messaging/FalconTopicProducerTest.java      |    7 +-
 .../falcon/messaging/FeedProducerTest.java      |    7 +-
 .../falcon/messaging/ProcessProducerTest.java   |    4 +-
 .../falcon/workflow/FalconPostProcessing.java   |    7 +-
 .../workflow/FalconPostProcessingTest.java      |    4 +-
 .../falcon/service/FalconTopicSubscriber.java   |   10 +-
 .../falcon/converter/OozieProcessMapper.java    |   38 +-
 .../config/workflow/process-parent-workflow.xml |   17 +-
 .../converter/OozieProcessMapperTest.java       |   39 +-
 .../apache/falcon/latedata/LateDataHandler.java |  205 +-
 .../apache/falcon/rerun/event/LaterunEvent.java |   13 +-
 .../falcon/rerun/event/RerunEventFactory.java   |    4 +-
 .../rerun/handler/AbstractRerunHandler.java     |    7 +-
 .../falcon/rerun/handler/LateRerunConsumer.java |   38 +-
 .../falcon/rerun/handler/LateRerunHandler.java  |   17 +-
 .../falcon/rerun/handler/RetryHandler.java      |    5 +-
 .../apache/falcon/rerun/queue/ActiveMQTest.java |    3 +-
 .../falcon/catalog/HiveCatalogServiceIT.java    |   53 +-
 .../apache/falcon/late/LateDataHandlerIT.java   |  214 ++
 .../TableStorageFeedReplicationIT.java          |    2 +-
 webapp/src/test/resources/apps/data/data.txt    | 2000 +++++++++---------
 .../table/customer-table-replicating-feed.xml   |    4 +-
 .../resources/table/hive-process-template.xml   |    3 +
 .../test/resources/table/pig-process-tables.xml |    3 +
 .../test/resources/table/table-feed-input.xml   |    3 +-
 .../test/resources/table/table-feed-output.xml  |    3 +-
 38 files changed, 1657 insertions(+), 1249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index a5d7d6b..691d805 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -90,4 +90,18 @@ public abstract class AbstractCatalogService {
      */
     public abstract boolean dropPartitions(String catalogUrl, String database, String tableName,
                                            Map<String, String> partitions) throws FalconException;
+
+    /**
+     * Gets the partition.
+     *
+     * @param catalogUrl url for the catalog service
+     * @param database database the table belongs to
+     * @param tableName tableName to check if it exists
+     * @param partitionSpec The partition specification, {[col_name,value],[col_name2,value2]}.
+     *                      All partition-key-values must be specified.
+     * @return An instance of CatalogPartition.
+     * @throws FalconException
+     */
+    public abstract CatalogPartition getPartition(String catalogUrl, String database, String tableName,
+                                                  Map<String, String> partitionSpec) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index e3ddc3b..51e4d6e 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -129,7 +129,7 @@ public class HiveCatalogService extends AbstractCatalogService {
 
     @Override
     public List<CatalogPartition> listPartitionsByFilter(String catalogUrl, String database,
-                                                      String tableName, String filter)
+                                                         String tableName, String filter)
         throws FalconException {
         LOG.info("List partitions for : " + tableName + ", partition filter: " + filter);
 
@@ -185,4 +185,18 @@ public class HiveCatalogService extends AbstractCatalogService {
 
         return true;
     }
+
+    @Override
+    public CatalogPartition getPartition(String catalogUrl, String database, String tableName,
+                                         Map<String, String> partitionSpec) throws FalconException {
+        LOG.info("List partitions for : " + tableName + ", partition spec: " + partitionSpec);
+
+        try {
+            HCatClient client = get(catalogUrl);
+            HCatPartition hCatPartition = client.getPartition(database, tableName, partitionSpec);
+            return createCatalogPartition(hCatPartition);
+        } catch (HCatException e) {
+            throw new FalconException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 658764a..ba80cac 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -559,10 +559,6 @@ public final class EntityUtil {
                 return null;
             }
 
-            if (FeedHelper.getStorageType((Feed) entity) == Storage.TYPE.TABLE) {
-                return null;
-            }
-
             LateProcess lateProcess = new LateProcess();
             lateProcess.setDelay(new Frequency(RuntimeProperties.get()
                     .getProperty("feed.late.frequency", "hours(3)")));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 58d04d2..0e687bd 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -291,7 +291,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
         final Storage.TYPE storageType = FeedHelper.getStorageType(feed);
         validateUniformStorageType(feed, storageType);
         validatePartitions(feed, storageType);
-        validateLateData(feed, storageType);
         validateStorageExists(feed);
     }
 
@@ -313,13 +312,6 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
     }
 
-    private void validateLateData(Feed feed, Storage.TYPE storageType) throws FalconException {
-        if (storageType == Storage.TYPE.TABLE && feed.getLateArrival() != null) {
-            throw new ValidationException("Late data handling is not supported for feeds with table storage! "
-                    + feed.getName());
-        }
-    }
-
     private void validateStorageExists(Feed feed) throws FalconException {
         StringBuilder buffer = new StringBuilder();
         for (Cluster cluster : feed.getClusters().getClusters()) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index e74745d..81bfe0f 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -193,11 +193,6 @@ public class ProcessEntityParser extends EntityParser<Process> {
 
             try {
                 Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
-                if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
-                    throw new ValidationException("Late data handling is not supported for feeds with table storage! "
-                            + feed.getName());
-                }
-
                 if (feed.getLateArrival() == null) {
                     throw new ValidationException(
                             "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
index ced3026..37f3f3e 100644
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -183,4 +183,17 @@ public class CatalogStorageTest {
         CatalogStorage table = new CatalogStorage(catalogUrl, tableUri);
         Assert.assertEquals(table.toPartitionAsPath(), partitionPath);
     }
+
+    @Test
+    public void testCreateFromURL() throws Exception {
+        String url = "thrift://localhost:29083/falcon_db/output_table/ds=2012-04-21-00";
+        CatalogStorage storage = new CatalogStorage(url);
+        Assert.assertEquals("thrift://localhost:29083", storage.getCatalogUrl());
+        Assert.assertEquals("falcon_db", storage.getDatabase());
+        Assert.assertEquals("output_table", storage.getTable());
+        Assert.assertEquals(Storage.TYPE.TABLE, storage.getType());
+        Assert.assertEquals(1, storage.getPartitions().size());
+        Assert.assertEquals("2012-04-21-00", storage.getPartitionValue("ds"));
+        Assert.assertTrue(storage.hasPartition("ds"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index db69a7c..a145cc5 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -405,6 +405,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                 props.put("srcClusterName", srcCluster.getName());
                 props.put("srcClusterColo", srcCluster.getColo());
 
+                // the storage type is uniform across source and target feeds for replication
                 props.put("falconFeedStorageType", sourceStorage.getType().name());
 
                 String instancePaths = null;
@@ -425,7 +426,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                     setupHiveConfiguration(trgCluster, sourceTableStorage, targetTableStorage, wfPath);
                 }
 
-                propagateLateDataProperties(feed, instancePaths, props);
+                propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
 
                 replicationWF.setConfiguration(getCoordConfig(props));
                 replicationAction.setWorkflow(replicationWF);
@@ -524,13 +525,18 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
             props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
         }
 
-        private void propagateLateDataProperties(Feed feed, String instancePaths, Map<String, String> props) {
+        private void propagateLateDataProperties(Feed feed, String instancePaths,
+                                                 String falconFeedStorageType, Map<String, String> props) {
             // todo these pairs are the same but used in different context
             // late data handler - should-record action
             props.put("falconInputFeeds", feed.getName());
             props.put("falconInPaths", instancePaths);
 
-            // late data consumer - falcon post processing
+            // storage type for each corresponding feed - in this case only one feed is involved
+            // needed to compute usage based on storage type in LateDataHandler
+            props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+            // falcon post processing
             props.put(ARG.feedNames.getPropName(), feed.getName());
             props.put(ARG.feedInstancePaths.getPropName(), instancePaths);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index bc4ba48..91d0285 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -38,6 +38,15 @@
                     <name>oozie.launcher.mapred.job.priority</name>
                     <value>${jobPriority}</value>
                 </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
             </configuration>
             <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
             <arg>-out</arg>
@@ -46,8 +55,8 @@
             <arg>${falconInPaths}</arg>
             <arg>-falconInputFeeds</arg>
             <arg>${falconInputFeeds}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
+            <arg>-falconInputFeedStorageTypes</arg>
+            <arg>${falconInputFeedStorageTypes}</arg>
             <capture-output/>
         </java>
         <ok to="replication-decision"/>
@@ -201,8 +210,6 @@
             <arg>${feedNames}</arg>
             <arg>-feedInstancePaths</arg>
             <arg>${feedInstancePaths}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
             <arg>-workflowEngineUrl</arg>
@@ -268,8 +275,6 @@
             <arg>${feedNames}</arg>
             <arg>-feedInstancePaths</arg>
             <arg>${feedInstancePaths}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}-${srcClusterName}.csv</arg>
             <arg>-workflowEngineUrl</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 8db5576..8b444f5 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -107,8 +107,6 @@
             <arg>${wf:conf("broker.ttlInMins")}</arg>
             <arg>-cluster</arg>
             <arg>${cluster}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
             <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
             <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
             <file>${wf:conf("falcon.libpath")}/jms.jar</file>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index efa923e..978c370 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -18,7 +18,9 @@
 package org.apache.falcon.converter;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
@@ -46,7 +48,9 @@ import javax.xml.bind.JAXBElement;
 import javax.xml.bind.Unmarshaller;
 import java.util.Calendar;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Tests for Oozie workflow definition for feed replication & retention.
@@ -161,12 +165,26 @@ public class OozieFeedMapperTest {
         String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
         Assert.assertEquals("${now(0,-40)}", outEventInstance);
 
+        HashMap<String, String> props = new HashMap<String, String>();
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            if (prop.getName().equals("mapred.job.priority")) {
-                Assert.assertEquals(prop.getValue(), "NORMAL");
-                break;
-            }
+            props.put(prop.getName(), prop.getValue());
         }
+
+        // verify the replication param that feed replicator depends on
+        Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}/");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}/");
+
         assertLibExtensions(coord, "replication");
     }
 
@@ -251,12 +269,54 @@ public class OozieFeedMapperTest {
         Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
         Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
 
+        HashMap<String, String> props = new HashMap<String, String>();
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
-            if (prop.getName().equals("shouldRecord")) {
-                Assert.assertEquals(prop.getValue(), "false");
-                break;
-            }
+            props.put(prop.getName(), prop.getValue());
         }
+
+        final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+        final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+
+        // verify the replication param that feed replicator depends on
+        Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
+
+        Assert.assertTrue(props.containsKey("distcpSourcePaths"));
+        Assert.assertEquals(props.get("distcpSourcePaths"),
+                FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertTrue(props.containsKey("distcpTargetPaths"));
+        Assert.assertEquals(props.get("distcpTargetPaths"),
+                FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
+                        + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+                        + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
+
+        // verify table props
+        assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
+        assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataIn('input')}");
+    }
+
+    private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+                                              Map<String, String> props, String prefix) {
+        Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
+        Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
+        Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
+
+        Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
+        Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
+        Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
     }
 
     @Test
@@ -274,18 +334,17 @@ public class OozieFeedMapperTest {
         Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
         Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
 
-        String feedDataPath = null;
-        String storageType = null;
-        org.apache.falcon.oozie.coordinator.CONFIGURATION configuration =
-                coord.getAction().getWorkflow().getConfiguration();
-        for (Property property : configuration.getProperty()) {
-            if ("feedDataPath".equals(property.getName())) {
-                feedDataPath = property.getValue();
-            } else if ("falconFeedStorageType".equals(property.getName())) {
-                storageType = property.getValue();
-            }
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
         }
 
+        String feedDataPath = props.get("feedDataPath");
+        String storageType = props.get("falconFeedStorageType");
+
+        // verify the param that feed evictor depends on
+        Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
+
         final Storage storage = FeedHelper.createStorage(cluster, feed);
         if (feedDataPath != null) {
             Assert.assertEquals(feedDataPath, storage.getUriTemplate()
@@ -295,5 +354,9 @@ public class OozieFeedMapperTest {
         if (storageType != null) {
             Assert.assertEquals(storageType, storage.getType().name());
         }
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), feed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
index f1783b5..eb49fd5 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/EntityInstanceMessage.java
@@ -70,8 +70,7 @@ public class EntityInstanceMessage {
         topicName("topicName"),
         status("status"),
         brokerTTL("broker.ttlInMins"),
-        cluster("cluster"),
-        falconFeedStorageType("falconFeedStorageType");
+        cluster("cluster");
 
         private String propName;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
index 3a0193a..b37931c 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/MessageProducer.java
@@ -124,8 +124,6 @@ public class MessageProducer extends Configured implements Tool {
                 "workflow id"));
         addOption(options, new Option(ARG.cluster.getArgName(), true,
                 "cluster name"));
-        addOption(options, new Option(ARG.falconFeedStorageType.getArgName(), true,
-                "feed storage type: filesystem or table"), false);
 
         return new GnuParser().parse(options, arguments);
     }
@@ -178,5 +176,4 @@ public class MessageProducer extends Configured implements Tool {
         }
         return 0;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index 44d4bd8..9912678 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -74,8 +74,7 @@ public class FalconTopicProducerTest {
                                      "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                                      "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                                      "-" + ARG.brokerTTL.getArgName(), "10",
-                                     "-" + ARG.cluster.getArgName(), "corp",
-                                     "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
+                                     "-" + ARG.cluster.getArgName(), "corp", };
         testProcessMessageCreator(args);
     }
 
@@ -97,8 +96,7 @@ public class FalconTopicProducerTest {
                                      "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                                      "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                                      "-" + ARG.brokerTTL.getArgName(), "10",
-                                     "-" + ARG.cluster.getArgName(), "corp",
-                                     "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
+                                     "-" + ARG.cluster.getArgName(), "corp", };
         testProcessMessageCreator(args);
     }
 
@@ -165,6 +163,5 @@ public class FalconTopicProducerTest {
         Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
                 "2012-01-01T01:00Z");
         Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
-        Assert.assertEquals(m.getString(ARG.falconFeedStorageType.getArgName()), "FILESYSTEM");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 3fc5a80..a1609af 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -50,8 +50,6 @@ public class FeedProducerTest {
 
     private String[] args;
     private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
-    // private static final String BROKER_URL =
-    // "tcp://localhost:61616?daemon=true";
     private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
     private static final String TOPIC_NAME = "Falcon.process1.click-logs";
     private BrokerService broker;
@@ -86,8 +84,7 @@ public class FeedProducerTest {
                             "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                             "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                             "-" + ARG.brokerTTL.getArgName(), "10",
-                            "-" + ARG.cluster.getArgName(), "corp",
-                            "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
+                            "-" + ARG.cluster.getArgName(), "corp", };
 
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
@@ -209,7 +206,5 @@ public class FeedProducerTest {
         Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
                 "2012-01-01T01:00Z");
         Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
-        Assert.assertEquals(m.getString(ARG.falconFeedStorageType.getArgName()), "FILESYSTEM");
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index a5fc60a..078b9c2 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -61,8 +61,7 @@ public class ProcessProducerTest {
                             "-" + ARG.topicName.getArgName(), (TOPIC_NAME),
                             "-" + ARG.status.getArgName(), ("SUCCEEDED"),
                             "-" + ARG.brokerTTL.getArgName(), "10",
-                            "-" + ARG.cluster.getArgName(), "corp",
-                            "-" + ARG.falconFeedStorageType.getArgName(), "FILESYSTEM", };
+                            "-" + ARG.cluster.getArgName(), "corp", };
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
         broker.setDataDirectory("target/activemq");
@@ -146,6 +145,5 @@ public class ProcessProducerTest {
         Assert.assertEquals(m.getString(ARG.timeStamp.getArgName()),
                 "2012-01-01T01:00Z");
         Assert.assertEquals(m.getString(ARG.status.getArgName()), "SUCCEEDED");
-        Assert.assertEquals(m.getString(ARG.falconFeedStorageType.getArgName()), "FILESYSTEM");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 0f98c3a..3f9256c 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -56,13 +56,12 @@ public class FalconPostProcessing extends Configured implements Tool {
         USER_BRKR_URL("userBrokerUrl", "user broker url"),
         BRKR_TTL("brokerTTL", "time to live for broker message in sec"),
         FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
-        FEED_INSTANCE_PATHS("feedInstancePaths", "comma seperated feed instance paths"),
+        FEED_INSTANCE_PATHS("feedInstancePaths", "comma separated feed instance paths"),
         LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
         WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
         USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
         USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
-        LOG_DIR("logDir", "log dir where job logs are copied"),
-        FEED_STORAGE_TYPE("falconFeedStorageType", "feed's storage type");
+        LOG_DIR("logDir", "log dir where job logs are copied");
 
         private String name;
         private String description;
@@ -156,7 +155,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         addArg(args, cmd, Arg.FEED_NAMES);
         addArg(args, cmd, Arg.FEED_INSTANCE_PATHS);
         addArg(args, cmd, Arg.LOG_FILE);
-        addArg(args, cmd, Arg.FEED_STORAGE_TYPE);
 
         MessageProducer.main(args.toArray(new String[0]));
     }
@@ -206,7 +204,6 @@ public class FalconPostProcessing extends Configured implements Tool {
         addOption(options, Arg.USER_SUBFLOW_ID);
         addOption(options, Arg.USER_WORKFLOW_ENGINE, false);
         addOption(options, Arg.LOG_DIR);
-        addOption(options, Arg.FEED_STORAGE_TYPE);
         return new GnuParser().parse(options, arguments);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index da18652..c6485cd 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -69,8 +69,7 @@ public class FalconPostProcessingTest {
                             "http://localhost:11000/oozie/",
                             "-" + Arg.LOG_DIR.getOptionName(), "target/log",
                             "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
-                            "-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie",
-                            "-" + Arg.FEED_STORAGE_TYPE.getOptionName(), "FILESYSTEM", };
+                            "-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie", };
         broker = new BrokerService();
         broker.addConnector(BROKER_URL);
         broker.setDataDirectory("target/activemq");
@@ -134,7 +133,6 @@ public class FalconPostProcessingTest {
             Assert.assertEquals(
                     m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
                     "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20");
-            Assert.assertEquals(m.getString(Arg.FEED_STORAGE_TYPE.getOptionName()), "FILESYSTEM");
         } else {
             Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
                     "click-logs");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index b29b9e1..6ac926d 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -91,7 +91,6 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
             String nominalTime = mapMessage.getString(ARG.nominalTime.getArgName());
             String status = mapMessage.getString(ARG.status.getArgName());
             String operation = mapMessage.getString(ARG.operation.getArgName());
-            String feedStorageType = mapMessage.getString(ARG.falconFeedStorageType.getArgName());
 
             AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
             InstancesResult result = wfEngine.getJobDetails(cluster, workflowId);
@@ -101,22 +100,25 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
             if (status.equalsIgnoreCase("FAILED")) {
                 retryHandler.handleRerun(cluster, entityType, entityName,
                         nominalTime, runId, workflowId,
-                        System.currentTimeMillis(), feedStorageType);
+                        System.currentTimeMillis());
+
                 GenericAlert.instrumentFailedInstance(cluster, entityType,
                         entityName, nominalTime, workflowId, runId, operation,
                         SchemaHelper.formatDateUTC(startTime),
                         "", "", duration);
+
             } else if (status.equalsIgnoreCase("SUCCEEDED")) {
                 latedataHandler.handleRerun(cluster, entityType, entityName,
                         nominalTime, runId, workflowId,
-                        System.currentTimeMillis(), feedStorageType);
+                        System.currentTimeMillis());
+
                 GenericAlert.instrumentSucceededInstance(cluster, entityType,
                         entityName, nominalTime, workflowId, runId, operation,
                         SchemaHelper.formatDateUTC(startTime),
                         duration);
+
                 notifySLAService(cluster, entityName, entityType, nominalTime, duration);
             }
-
         } catch (JMSException e) {
             LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
         } catch (FalconException e) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index 3460d95..eed7fa3 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -27,7 +27,6 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
@@ -37,7 +36,6 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.LateInput;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Property;
@@ -126,7 +124,6 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
         initializeInputPaths(cluster, process, coord, props); // inputs
         initializeOutputPaths(cluster, process, coord, props);  // outputs
-        propagateStorageType(process, props);  // falconFeedStorageType
 
         Workflow processWorkflow = process.getWorkflow();
         props.put("userWorkflowEngine", processWorkflow.getEngine().value());
@@ -191,6 +188,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
         List<String> inputFeeds = new ArrayList<String>();
         List<String> inputPaths = new ArrayList<String>();
+        List<String> inputFeedStorageTypes = new ArrayList<String>();
         for (Input input : process.getInputs().getInputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
@@ -221,11 +219,21 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
 
             inputFeeds.add(input.getName());
             inputPaths.add(inputExpr);
+            inputFeedStorageTypes.add(storage.getType().name());
         }
 
+        propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
+    }
+
+    private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+                                             List<String> inputFeedStorageTypes, Map<String, String> props) {
         // populate late data handler - should-record action
         props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
         props.put("falconInPaths", join(inputPaths.iterator(), '#'));
+
+        // storage type for each corresponding feed sent as a param to LateDataHandler
+        // needed to compute usage based on storage type in LateDataHandler
+        props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
     }
 
     private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
@@ -272,30 +280,6 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
     }
 
-    private void propagateStorageType(Process process, Map<String, String> props) throws FalconException {
-        Storage.TYPE feedStorageType = Storage.TYPE.FILESYSTEM; // defaults to FS.
-
-        if (process.getLateProcess() != null) {
-            Map<String, String> feeds = new HashMap<String, String>();
-            if (process.getInputs() != null) {
-                for (Input in : process.getInputs().getInputs()) {
-                    feeds.put(in.getName(), in.getFeed());
-                }
-            }
-
-            for (LateInput lp : process.getLateProcess().getLateInputs()) {
-                Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
-                if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
-                    feedStorageType = Storage.TYPE.TABLE;
-                    break;  // break if one of 'em is a table as late data wont apply
-                }
-            }
-        }
-
-        // this is currently only used for late data handling.
-        props.put("falconFeedStorageType", feedStorageType.name());
-    }
-
     private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
                                       String datasetName, LocationType locationType) throws FalconException {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index 4668ce3..494bf20 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -38,6 +38,15 @@
                     <name>oozie.launcher.mapred.job.priority</name>
                     <value>${jobPriority}</value>
                 </property>
+                <!-- HCatalog jars -->
+                <property>
+                    <name>oozie.use.system.libpath</name>
+                    <value>true</value>
+                </property>
+                <property>
+                    <name>oozie.action.sharelib.for.java</name>
+                    <value>hcatalog</value>
+                </property>
             </configuration>
             <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
             <arg>-out</arg>
@@ -46,8 +55,8 @@
             <arg>${falconInPaths}</arg>
             <arg>-falconInputFeeds</arg>
             <arg>${falconInputFeeds}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
+            <arg>-falconInputFeedStorageTypes</arg>
+            <arg>${falconInputFeedStorageTypes}</arg>
             <capture-output/>
         </java>
         <ok to="user-workflow"/>
@@ -170,8 +179,6 @@
             <arg>${feedNames}</arg>
             <arg>-feedInstancePaths</arg>
             <arg>${feedInstancePaths}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
             <arg>-workflowEngineUrl</arg>
@@ -239,8 +246,6 @@
             <arg>${feedNames}</arg>
             <arg>-feedInstancePaths</arg>
             <arg>${feedInstancePaths}</arg>
-            <arg>-falconFeedStorageType</arg>
-            <arg>${falconFeedStorageType}</arg>
             <arg>-logFile</arg>
             <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
             <arg>-workflowEngineUrl</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index aef0850..b6f03e5 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -39,7 +39,6 @@ import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.entity.v0.process.Validity;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION;
 import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
@@ -169,7 +168,6 @@ public class OozieProcessMapperTest extends AbstractTestBase {
             props.put(prop.getName(), prop.getValue());
         }
         assertEquals(props.get("mapred.job.priority"), "LOW");
-        Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
 
         assertLibExtensions(coord);
     }
@@ -251,14 +249,16 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        CONFIGURATION conf = coord.getAction().getWorkflow().getConfiguration();
-        List<Property> properties = conf.getProperty();
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
 
+        // verify table props
         Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
-
-        for (Property property : properties) {
-            if (expected.containsKey(property.getName())) {
-                Assert.assertEquals(property.getValue(), expected.get(property.getName()));
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
             }
         }
 
@@ -309,16 +309,27 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
 
         COORDINATORAPP coord = getCoordinator(new Path(coordPath));
-        CONFIGURATION conf = coord.getAction().getWorkflow().getConfiguration();
-        List<Property> properties = conf.getProperty();
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
 
+        // verify table props
         Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
-
-        for (Property property : properties) {
-            if (expected.containsKey(property.getName())) {
-                Assert.assertEquals(property.getValue(), expected.get(property.getName()));
+        for (Map.Entry<String, String> entry : props.entrySet()) {
+            if (expected.containsKey(entry.getKey())) {
+                Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
             }
         }
+
+        // verify the late data params
+        Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getName());
+        Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+        Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
     }
 
     private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index dab809b..4b35760 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -19,6 +19,11 @@
 package org.apache.falcon.latedata;
 
 import org.apache.commons.cli.*;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogPartition;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -30,6 +35,7 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
 import java.io.*;
+import java.net.URISyntaxException;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -68,7 +74,8 @@ public class LateDataHandler extends Configured implements Tool {
         opt.setRequired(true);
         options.addOption(opt);
 
-        opt = new Option("falconFeedStorageType", true, "Feed storage type: FileSystem or Table");
+        opt = new Option("falconInputFeedStorageTypes", true,
+                "Feed storage types corresponding to Input feed names, separated by #");
         opt.setRequired(true);
         options.addOption(opt);
 
@@ -77,39 +84,23 @@ public class LateDataHandler extends Configured implements Tool {
 
     @Override
     public int run(String[] args) throws Exception {
-
         CommandLine command = getCommand(args);
 
-        final String falconFeedStorageType = command.getOptionValue("falconFeedStorageType");
-        if (Storage.TYPE.valueOf(falconFeedStorageType) == Storage.TYPE.TABLE) {
-            LOG.info("Late data not supported for table storage.");
-            return 0; // Late Data is not handled for table storage
-        }
-
-        Path file = new Path(command.getOptionValue("out"));
-        Map<String, Long> map = new LinkedHashMap<String, Long>();
         String pathStr = getOptionValue(command, "paths");
         if (pathStr == null) {
             return 0;
         }
 
-        String[] pathGroups = pathStr.split("#");
         String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split("#");
-        for (int index = 0; index < pathGroups.length; index++) {
-            long usage = 0;
-            for (String pathElement : pathGroups[index].split(",")) {
-                Path inPath = new Path(pathElement);
-                usage += usage(inPath, getConf());
-            }
-            map.put(inputFeeds[index], usage);
-        }
-        LOG.info("MAP data: " + map);
+        String[] pathGroups = pathStr.split("#");
+        String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#");
+
+        Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
+        LOG.info("MAP data: " + metrics);
+
+        Path file = new Path(command.getOptionValue("out"));
+        persistMetrics(metrics, file);
 
-        OutputStream out = file.getFileSystem(getConf()).create(file);
-        for (Map.Entry<String, Long> entry : map.entrySet()) {
-            out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
-        }
-        out.close();
         return 0;
     }
 
@@ -121,7 +112,142 @@ public class LateDataHandler extends Configured implements Tool {
         return value;
     }
 
-    public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
+    private Map<String, Long> computeMetrics(String[] inputFeeds, String[] pathGroups,
+                                             String[] inputFeedStorageTypes)
+        throws IOException, FalconException, URISyntaxException {
+
+        Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
+        for (int index = 0; index < pathGroups.length; index++) {
+            long storageMetric = computeStorageMetric(pathGroups[index], inputFeedStorageTypes[index], getConf());
+            computedMetrics.put(inputFeeds[index], storageMetric);
+        }
+
+        return computedMetrics;
+    }
+
+    private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException {
+        OutputStream out = null;
+        try {
+            out = file.getFileSystem(getConf()).create(file);
+
+            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+                out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+            }
+        } finally {
+            if (out != null) {
+                try {
+                    out.close();
+                } catch (IOException ignore) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    /**
+     * This method computes the storage metrics for a given feed's instance or partition.
+     * It uses size on disk as the metric for File System Storage.
+     * It uses create time as the metric for Catalog Table Storage.
+     *
+     * The assumption is that if a partition has changed or reinstated, the underlying
+     * metric would change, either size or create time.
+     *
+     * @param feedUriTemplate URI for the feed storage, filesystem path or table uri
+     * @param feedStorageType feed storage type
+     * @param conf configuration
+     * @return computed metric
+     * @throws IOException
+     * @throws FalconException
+     * @throws URISyntaxException
+     */
+    public long computeStorageMetric(String feedUriTemplate, String feedStorageType, Configuration conf)
+        throws IOException, FalconException, URISyntaxException {
+
+        Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            // usage on file system is the metric
+            return getFileSystemUsageMetric(feedUriTemplate, conf);
+        } else if (storageType == Storage.TYPE.TABLE) {
+            // todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme
+            feedUriTemplate = feedUriTemplate.replace("hcat", "thrift");
+            // creation time of the given partition is the metric
+            return getTablePartitionCreateTimeMetric(feedUriTemplate);
+        }
+
+        throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
+    }
+
+    /**
+     * The storage metric for File System Storage is the size of content
+     * this feed's instance represented by the path uses on the file system.
+     *
+     * If this instance was reinstated, the assumption is that the size of
+     * this instance on disk would change.
+     *
+     * @param pathGroup path on file system
+     * @param conf configuration
+     * @return metric as the size of data on file system
+     * @throws IOException
+     */
+    private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
+        throws IOException {
+        long usage = 0;
+        for (String pathElement : pathGroup.split(",")) {
+            Path inPath = new Path(pathElement);
+            usage += usage(inPath, conf);
+        }
+
+        return usage;
+    }
+
+    private long usage(Path inPath, Configuration conf) throws IOException {
+        FileSystem fs = inPath.getFileSystem(conf);
+        FileStatus[] fileStatuses = fs.globStatus(inPath);
+        if (fileStatuses == null || fileStatuses.length == 0) {
+            return 0;
+        }
+        long totalSize = 0;
+        for (FileStatus fileStatus : fileStatuses) {
+            totalSize += fs.getContentSummary(fileStatus.getPath()).getLength();
+        }
+        return totalSize;
+    }
+
+    /**
+     * The storage metric for Table Storage is the create time of the given partition
+     * since there is API in Hive nor HCatalog to find if a partition has changed.
+     *
+     * If this partition was reinstated, the assumption is that the create time of
+     * this partition would change.
+     *
+     * @param feedUriTemplate catalog table uri
+     * @return metric as creation time of the given partition
+     * @throws IOException
+     * @throws URISyntaxException
+     * @throws FalconException
+     */
+    private long getTablePartitionCreateTimeMetric(String feedUriTemplate)
+        throws IOException, URISyntaxException, FalconException {
+
+        CatalogStorage storage = (CatalogStorage)
+                FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate);
+        CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+                storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), storage.getPartitions());
+        return partition == null ? 0 : partition.getCreateTime();
+    }
+
+    /**
+     * This method compares the recorded metrics persisted in file against
+     * the recently computed metrics and returns the list of feeds that has changed.
+     *
+     * @param file persisted metrics from the first run
+     * @param metrics newly computed metrics
+     * @param conf configuration
+     * @return list if feed names which has changed, empty string is none has changed
+     * @throws Exception
+     */
+    public String detectChanges(Path file, Map<String, Long> metrics, Configuration conf)
         throws Exception {
 
         StringBuilder buffer = new StringBuilder();
@@ -129,7 +255,7 @@ public class LateDataHandler extends Configured implements Tool {
                 file.getFileSystem(conf).open(file)));
         String line;
         try {
-            Map<String, Long> recorded = new LinkedHashMap<String, Long>();
+            Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();
             while ((line = in.readLine()) != null) {
                 if (line.isEmpty()) {
                     continue;
@@ -137,17 +263,17 @@ public class LateDataHandler extends Configured implements Tool {
                 int index = line.indexOf('=');
                 String key = line.substring(0, index);
                 long size = Long.parseLong(line.substring(index + 1));
-                recorded.put(key, size);
+                recordedMetrics.put(key, size);
             }
 
-            for (Map.Entry<String, Long> entry : map.entrySet()) {
-                if (recorded.get(entry.getKey()) == null) {
+            for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+                if (recordedMetrics.get(entry.getKey()) == null) {
                     LOG.info("No matching key " + entry.getKey());
                     continue;
                 }
-                if (!recorded.get(entry.getKey()).equals(entry.getValue())) {
-                    LOG.info("Recorded size:" + recorded.get(entry.getKey()) + "  is different from new size"
-                            + entry.getValue());
+                if (!recordedMetrics.get(entry.getKey()).equals(entry.getValue())) {
+                    LOG.info("Recorded size:" + recordedMetrics.get(entry.getKey())
+                            + " is different from new size" + entry.getValue());
                     buffer.append(entry.getKey()).append(',');
                 }
             }
@@ -161,17 +287,4 @@ public class LateDataHandler extends Configured implements Tool {
             in.close();
         }
     }
-
-    public long usage(Path inPath, Configuration conf) throws IOException {
-        FileSystem fs = inPath.getFileSystem(conf);
-        FileStatus[] status = fs.globStatus(inPath);
-        if (status == null || status.length == 0) {
-            return 0;
-        }
-        long totalSize = 0;
-        for (FileStatus statu : status) {
-            totalSize += fs.getContentSummary(statu.getPath()).getLength();
-        }
-        return totalSize;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index 6523acc..b5ac121 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -22,28 +22,21 @@ package org.apache.falcon.rerun.event;
  */
 public class LaterunEvent extends RerunEvent {
 
-    private String feedStorageType;
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
-                        long delay, String entityType, String entityName, String instance,
-                        int runId, String feedStorageType) {
+    public LaterunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
+                        String entityType, String entityName, String instance, int runId) {
         super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
                 instance, runId);
-        this.feedStorageType = feedStorageType;
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
 
-    public String getFeedStorageType() {
-        return feedStorageType;
-    }
-
     @Override
     public String toString() {
         return "clusterName=" + clusterName + SEP + "wfId=" + wfId + SEP
                 + "msgInsertTime=" + msgInsertTime + SEP + "delayInMilliSec="
                 + delayInMilliSec + SEP + "entityType=" + entityType + SEP
                 + "entityName=" + entityName + SEP + "instance=" + instance
-                + SEP + "runId=" + runId + ", falconFeedStorageType=" + feedStorageType;
+                + SEP + "runId=" + runId;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
index a949c19..03230f9 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEventFactory.java
@@ -45,7 +45,7 @@ public class RerunEventFactory<T extends RerunEvent> {
         return (T) new LaterunEvent(map.get("clusterName"), map.get("wfId"),
                 Long.parseLong(map.get("msgInsertTime")), Long.parseLong(map.get("delayInMilliSec")),
                 map.get("entityType"), map.get("entityName"), map.get("instance"),
-                Integer.parseInt(map.get("runId")), map.get("falconFeedStorageType"));
+                Integer.parseInt(map.get("runId")));
     }
 
     @SuppressWarnings("unchecked")
@@ -56,7 +56,6 @@ public class RerunEventFactory<T extends RerunEvent> {
                 map.get("entityType"), map.get("entityName"), map.get("instance"),
                 Integer.parseInt(map.get("runId")), Integer.parseInt(map.get("attempts")),
                 Integer.parseInt(map.get("failRetryCount")));
-
     }
 
     private Map<String, String> getMap(String message) {
@@ -68,5 +67,4 @@ public class RerunEventFactory<T extends RerunEvent> {
         }
         return map;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
index d53d05f..ab7f472 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java
@@ -46,11 +46,8 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay
         this.delayQueue.init();
     }
 
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public abstract void handleRerun(String cluster, String entityType,
-                                     String entityName, String nominalTime, String runId, String wfId,
-                                     long msgReceivedTime, String feedStorageType);
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+    public abstract void handleRerun(String cluster, String entityType, String entityName,
+                                     String nominalTime, String runId, String wfId, long msgReceivedTime);
 
     public AbstractWorkflowEngine getWfEngine() {
         return wfEngine;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
index 0c66f00..fffd5cd 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java
@@ -19,7 +19,6 @@ package org.apache.falcon.rerun.handler;
 
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.process.LateInput;
@@ -59,12 +58,6 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
                 return;
             }
 
-            final String falconFeedStorageType = message.getFeedStorageType();
-            if (Storage.TYPE.valueOf(falconFeedStorageType) == Storage.TYPE.TABLE) {
-                LOG.info("Late rerun not supported for table storage in entity: " + message.getEntityName());
-                return;
-            }
-
             String detectLate = detectLate(message);
 
             if (detectLate.equals("")) {
@@ -75,7 +68,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
                 handler.handleRerun(cluster, message.getEntityType(),
                         message.getEntityName(), message.getInstance(),
                         Integer.toString(message.getRunId()),
-                        message.getWfId(), System.currentTimeMillis(), message.getFeedStorageType());
+                        message.getWfId(), System.currentTimeMillis());
                 return;
             }
 
@@ -104,8 +97,9 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
         Properties properties = handler.getWfEngine().getWorkflowProperties(
                 message.getClusterName(), message.getWfId());
         String falconInputFeeds = properties.getProperty("falconInputFeeds");
-        String logDir = properties.getProperty("logDir");
         String falconInPaths = properties.getProperty("falconInPaths");
+        String falconInputFeedStorageTypes = properties.getProperty("falconInputFeedStorageTypes");
+        String logDir = properties.getProperty("logDir");
         String nominalTime = properties.getProperty("nominalTime");
         String srcClusterName = properties.getProperty("srcClusterName");
         Path lateLogPath = handler.getLateLogPath(logDir, nominalTime, srcClusterName);
@@ -118,34 +112,30 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv
             return "";
         }
 
-        Map<String, Long> feedSizes = new LinkedHashMap<String, Long>();
         String[] pathGroups = falconInPaths.split("#");
         String[] inputFeeds = falconInputFeeds.split("#");
-        Entity entity = EntityUtil.getEntity(message.getEntityType(),
-                message.getEntityName());
+        String[] inputFeedStorageTypes = falconInputFeedStorageTypes.split("#");
 
-        List<String> lateFeed = new ArrayList<String>();
+        Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
+        Entity entity = EntityUtil.getEntity(message.getEntityType(), message.getEntityName());
         if (EntityUtil.getLateProcess(entity) != null) {
-            for (LateInput li : EntityUtil.getLateProcess(entity)
-                    .getLateInputs()) {
+            List<String> lateFeed = new ArrayList<String>();
+            for (LateInput li : EntityUtil.getLateProcess(entity).getLateInputs()) {
                 lateFeed.add(li.getInput());
             }
+
             for (int index = 0; index < pathGroups.length; index++) {
                 if (lateFeed.contains(inputFeeds[index])) {
-                    long usage = 0;
-                    for (String pathElement : pathGroups[index].split(",")) {
-                        Path inPath = new Path(pathElement);
-                        usage += late.usage(inPath, conf);
-                    }
-                    feedSizes.put(inputFeeds[index], usage);
+                    long computedMetric = late.computeStorageMetric(
+                            pathGroups[index], inputFeedStorageTypes[index], conf);
+                    computedMetrics.put(inputFeeds[index], computedMetric);
                 }
             }
         } else {
             LOG.warn("Late process is not configured for entity: "
-                    + message.getEntityType() + "(" + message.getEntityName()
-                    + ")");
+                    + message.getEntityType() + "(" + message.getEntityName() + ")");
         }
 
-        return late.detectChanges(lateLogPath, feedSizes, conf);
+        return late.detectChanges(lateLogPath, computedMetrics, conf);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index b37a2b6..ee12332 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -20,7 +20,6 @@ package org.apache.falcon.rerun.handler;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.GenericAlert;
 import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -50,14 +49,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
         AbstractRerunHandler<LaterunEvent, M> {
 
     @Override
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public void handleRerun(String cluster, String entityType,
-                            String entityName, String nominalTime, String runId, String wfId,
-                            long msgReceivedTime, String feedStorageType) {
-        if (Storage.TYPE.TABLE.name().equals(feedStorageType)) {
-            return;
-        }
-
+    public void handleRerun(String cluster, String entityType, String entityName,
+                            String nominalTime, String runId, String wfId, long msgReceivedTime) {
         try {
             Entity entity = EntityUtil.getEntity(entityType, entityName);
             try {
@@ -100,9 +93,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
             LOG.debug("Scheduling the late rerun for entity instance : "
                     + entityType + "(" + entityName + ")" + ":" + nominalTime
                     + " And WorkflowId: " + wfId);
-            LaterunEvent event = new LaterunEvent(cluster, wfId,
-                    msgInsertTime.getTime(), wait, entityType, entityName,
-                    nominalTime, intRunId, feedStorageType);
+            LaterunEvent event = new LaterunEvent(cluster, wfId, msgInsertTime.getTime(),
+                    wait, entityType, entityName, nominalTime, intRunId);
             offerToQueue(event);
         } catch (Exception e) {
             LOG.error("Unable to schedule late rerun for entity instance : "
@@ -112,7 +104,6 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
                     nominalTime, wfId, runId, e.getMessage());
         }
     }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     private long getEventDelay(Entity entity, String nominalTime) throws FalconException {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
index 28e40d2..2b41a7c 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryHandler.java
@@ -38,10 +38,8 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
         AbstractRerunHandler<RetryEvent, M> {
 
     @Override
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     public void handleRerun(String cluster, String entityType, String entityName,
-                            String nominalTime, String runId, String wfId,
-                            long msgReceivedTime, String feedStorageType) {
+                            String nominalTime, String runId, String wfId, long msgReceivedTime) {
         try {
             Entity entity = getEntity(entityType, entityName);
             Retry retry = getRetry(entity);
@@ -80,7 +78,6 @@ public class RetryHandler<M extends DelayedQueue<RetryEvent>> extends
             GenericAlert.alertRetryFailed(entityType, entityName, nominalTime, wfId, runId, e.getMessage());
         }
     }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
     @Override
     public void init(M aDelayQueue) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
----------------------------------------------------------------------
diff --git a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
index 66a83f2..01d0415 100644
--- a/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
+++ b/rerun/src/test/java/org/apache/falcon/rerun/queue/ActiveMQTest.java
@@ -18,7 +18,6 @@
 package org.apache.falcon.rerun.queue;
 
 import org.apache.activemq.broker.BrokerService;
-import org.apache.falcon.entity.Storage;
 import org.apache.falcon.rerun.event.LaterunEvent;
 import org.apache.falcon.rerun.event.RerunEvent;
 import org.testng.Assert;
@@ -52,7 +51,7 @@ public class ActiveMQTest {
 
         RerunEvent event = new LaterunEvent("clusterName", "wfId",
                 System.currentTimeMillis(), 60 * 1000, "entityType",
-                "entityName", "instance", 0, Storage.TYPE.FILESYSTEM.name());
+                "entityName", "instance", 0);
 
         try {
             activeMQueue.offer(event);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/eb8ca3de/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index 9e42b94..c4d6671 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -283,20 +283,49 @@ public class HiveCatalogServiceIT {
         Assert.assertEquals(partitions.size(), 0, "Unexpected number of partitions");
     }
 
-    /*
-    // this is NOT possible to do in Hive
     @Test
-    public void testDropPartitionBulk() throws Exception {
-        Map<String, String> partialPartitionSpec = new HashMap<String, String>();
-        partialPartitionSpec.put("ds", "20130903");
-        partialPartitionSpec.put("ds", "20130902");
-        partialPartitionSpec.put("ds", "20130901");
+    public void testGetPartition() throws Exception {
+        Map<String, String> partitionSpec = new HashMap<String, String>();
+        partitionSpec.put("ds", "20130902");
+        partitionSpec.put("region", "in");
 
-        Assert.assertTrue(hiveCatalogService.dropPartitions(
-                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partialPartitionSpec));
+        CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+        Assert.assertNotNull(partition);
 
-        List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
-        Assert.assertEquals(0, partitions.size(), "Unexpected number of partitions");
+        long createTime = partition.getCreateTime();
+        Assert.assertTrue(createTime > 0);
+    }
+
+    @Test
+    public void testReInstatePartition() throws Exception {
+        Map<String, String> partitionSpec = new HashMap<String, String>();
+        partitionSpec.put("ds", "20130918");
+        partitionSpec.put("region", "blah");
+
+        HCatAddPartitionDesc first = HCatAddPartitionDesc.create(
+                DATABASE_NAME, TABLE_NAME, null, partitionSpec).build();
+        client.addPartition(first);
+
+        CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+        Assert.assertNotNull(partition);
+        final long originalCreateTime = partition.getCreateTime();
+
+        Thread.sleep(1000); // sleep before deletion
+        client.dropPartitions(DATABASE_NAME, TABLE_NAME, partitionSpec, true);
+
+        Thread.sleep(1000); // sleep so the next add is delayed a bit
+
+        HCatAddPartitionDesc second = HCatAddPartitionDesc.create(
+                DATABASE_NAME, TABLE_NAME, null, partitionSpec).build();
+        client.addPartition(second);
+
+        CatalogPartition reInstatedPartition = CatalogServiceFactory.getCatalogService().getPartition(
+                METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionSpec);
+        Assert.assertNotNull(reInstatedPartition);
+        final long reInstatedCreateTime = reInstatedPartition.getCreateTime();
+
+        Assert.assertTrue(reInstatedCreateTime > originalCreateTime);
     }
-    */
 }


Mime
View raw message