falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [08/13] git commit: FALCON-95 Enable embedding hive scripts directly in a process. Contribtued by Venkatesh Seetharam
Date Fri, 01 Nov 2013 23:42:09 GMT
FALCON-95 Enable embedding hive scripts directly in a process. Contribtued by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/502990c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/502990c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/502990c9

Branch: refs/heads/FALCON-85
Commit: 502990c929d3ab9f6def03c4c40a621ad9815f9d
Parents: 8df7a12
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Thu Oct 31 22:26:29 2013 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Fri Nov 1 13:40:36 2013 -0700

----------------------------------------------------------------------
 client/src/main/resources/process-0.1.xsd       |    1 +
 .../falcon/cleanup/FeedCleanupHandler.java      |    2 +-
 .../org/apache/falcon/entity/FeedHelper.java    |   13 +-
 .../apache/falcon/entity/FileSystemStorage.java |    5 -
 .../falcon/entity/StorageFactoryTest.java       |  306 ++++++
 .../src/test/resources/config/feed/feed-0.2.xml |   10 +
 .../config/feed/hive-table-feed-out.xml         |    2 +
 docs/src/site/twiki/EntitySpecification.twiki   |   20 +
 .../falcon/converter/OozieFeedMapper.java       |    6 +-
 .../config/workflow/replication-workflow.xml    |    6 +-
 oozie/pom.xml                                   |   17 +
 .../converter/AbstractOozieEntityMapper.java    |    3 +
 oozie/src/main/resources/hive-action-0.2.xsd    |   68 ++
 .../falcon/converter/OozieProcessMapper.java    |  222 +++-
 .../config/workflow/process-parent-workflow.xml |   25 +-
 .../converter/OozieProcessMapperTest.java       |  141 ++-
 .../resources/config/process/hive-process.xml   |   46 +
 .../config/process/pig-process-table.xml        |   46 +
 .../resources/config/process/process-table.xml  |   46 -
 .../apache/falcon/latedata/LateDataHandler.java |    1 +
 webapp/pom.xml                                  |    9 -
 .../catalog/TableStorageFeedEvictorIT.java      |  325 ------
 .../catalog/TableStorageFeedReplicationIT.java  |  174 ---
 .../lifecycle/FileSystemFeedReplicationIT.java  |  205 ++++
 .../lifecycle/TableStorageFeedEvictorIT.java    |  326 ++++++
 .../TableStorageFeedReplicationIT.java          |  180 ++++
 .../org/apache/falcon/process/PigProcessIT.java |  126 +++
 .../falcon/process/TableStorageProcessIT.java   |  211 ++++
 .../resource/ClusterEntityValidationIT.java     |  104 --
 .../falcon/resource/FeedEntityValidationIT.java |  180 ----
 .../resource/PigProcessForTableStorageIT.java   |  156 ---
 .../apache/falcon/resource/PigProcessIT.java    |  119 ---
 .../org/apache/falcon/resource/TestContext.java |   80 +-
 .../java/org/apache/falcon/util/FSUtils.java    |  101 ++
 .../org/apache/falcon/util/HiveTestUtils.java   |   30 +-
 .../validation/ClusterEntityValidationIT.java   |  104 ++
 .../validation/FeedEntityValidationIT.java      |  145 +++
 webapp/src/test/resources/apps/data/data.txt    | 1000 ++++++++++++++++++
 webapp/src/test/resources/apps/hive/script.hql  |   19 +
 webapp/src/test/resources/apps/pig/data.txt     |  585 ----------
 .../table/customer-fs-replicating-feed.xml      |   56 +
 .../table/customer-table-replicating-feed.xml   |    8 +-
 .../resources/table/hive-process-template.xml   |   48 +
 .../table/multiple-targets-replicating-feed.xml |   74 ++
 .../resources/table/target-cluster-alpha.xml    |   52 +
 .../resources/table/target-cluster-beta.xml     |   52 +
 .../resources/table/target-cluster-gamma.xml    |   52 +
 47 files changed, 3627 insertions(+), 1880 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index c96700c..ed01b0e 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -292,6 +292,7 @@
         <xs:restriction base="xs:string">
             <xs:enumeration value="oozie"/>
             <xs:enumeration value="pig"/>
+            <xs:enumeration value="hive"/>
         </xs:restriction>
     </xs:simpleType>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 7e35c88..7dbac58 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -81,7 +81,7 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
 
         final CatalogStorage tableStorage = (CatalogStorage) storage;
         String stagingDir = FeedHelper.getStagingDir(cluster, feed, tableStorage, Tag.REPLICATION);
-        Path stagingPath = new Path(stagingDir + "/*/*");
+        Path stagingPath = new Path(stagingDir + "/*/*/*");  // stagingDir/dataOutPartitionValue/nominal-time/data
         FileSystem fs = getFileSystem(cluster);
         try {
             FileStatus[] paths = fs.globStatus(stagingPath);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index a1b9cb8..67257e3 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -99,7 +99,7 @@ public final class FeedHelper {
 
         final List<Location> locations = getLocations(cluster, feed);
         if (locations != null) {
-            return new FileSystemStorage(clusterEntity, feed);
+            return new FileSystemStorage(ClusterHelper.getStorageUrl(clusterEntity), locations);
         }
 
         try {
@@ -185,7 +185,14 @@ public final class FeedHelper {
         throw new FalconException("Both catalog and locations are not defined.");
     }
 
-    private static List<Location> getLocations(Cluster cluster, Feed feed) {
+    public static Storage.TYPE getStorageType(Feed feed,
+                                              org.apache.falcon.entity.v0.cluster.Cluster clusterEntity)
+        throws FalconException {
+        Cluster feedCluster = getCluster(feed, clusterEntity.getName());
+        return getStorageType(feed, feedCluster);
+    }
+
+    protected static List<Location> getLocations(Cluster cluster, Feed feed) {
         // check if locations are overridden in cluster
         final Locations clusterLocations = cluster.getLocations();
         if (clusterLocations != null
@@ -197,7 +204,7 @@ public final class FeedHelper {
         return feedLocations == null ? null : feedLocations.getLocations();
     }
 
-    private static CatalogTable getTable(Cluster cluster, Feed feed) {
+    protected static CatalogTable getTable(Cluster cluster, Feed feed) {
         // check if table is overridden in cluster
         if (cluster.getTable() != null) {
             return cluster.getTable();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
index 099ee71..72d9e07 100644
--- a/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
+++ b/common/src/main/java/org/apache/falcon/entity/FileSystemStorage.java
@@ -19,7 +19,6 @@
 package org.apache.falcon.entity;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.entity.v0.feed.LocationType;
@@ -47,10 +46,6 @@ public class FileSystemStorage implements Storage {
         this(FILE_SYSTEM_URL, feed.getLocations());
     }
 
-    protected FileSystemStorage(Cluster cluster, Feed feed) {
-        this(ClusterHelper.getStorageUrl(cluster), feed.getLocations());
-    }
-
     protected FileSystemStorage(String storageUrl, Locations locations) {
         this(storageUrl, locations.getLocations());
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java b/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java
new file mode 100644
index 0000000..eb0127d
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/StorageFactoryTest.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.FeedEntityParser;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Test for storage factory methods in feed helper.
+ */
+public class StorageFactoryTest {
+
+    private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
+
+    private static final String FS_FEED_UNIFORM = "/config/feed/feed-0.1.xml";
+    private static final String FS_FEED_OVERRIDE = "/config/feed/feed-0.2.xml";
+
+    private static final String TABLE_FEED_UNIFORM = "/config/feed/hive-table-feed.xml";
+    private static final String TABLE_FEED_OVERRIDE = "/config/feed/hive-table-feed-out.xml";
+
+    private static final String OVERRIDE_TBL_LOC = "/testCluster/clicks-summary/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+
+    private final ClusterEntityParser clusterParser =
+            (ClusterEntityParser) EntityParserFactory.getParser(EntityType.CLUSTER);
+    private final FeedEntityParser feedParser =
+            (FeedEntityParser) EntityParserFactory.getParser(EntityType.FEED);
+
+    private Cluster clusterEntity;
+    private Feed fsFeedWithUniformStorage;
+    private Feed fsFeedWithOverriddenStorage;
+    private Feed tableFeedWithUniformStorage;
+    private Feed tableFeedWithOverriddenStorage;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        InputStream stream = this.getClass().getResourceAsStream(CLUSTER_XML);
+        clusterEntity = clusterParser.parse(stream);
+        stream.close();
+        Interface registry = ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY);
+        registry.setEndpoint("thrift://localhost:9083");
+        ConfigurationStore.get().publish(EntityType.CLUSTER, clusterEntity);
+
+        stream = this.getClass().getResourceAsStream(FS_FEED_UNIFORM);
+        fsFeedWithUniformStorage = feedParser.parse(stream);
+        stream.close();
+
+        stream = this.getClass().getResourceAsStream(FS_FEED_OVERRIDE);
+        fsFeedWithOverriddenStorage = feedParser.parse(stream);
+        stream.close();
+
+        stream = this.getClass().getResourceAsStream(TABLE_FEED_UNIFORM);
+        tableFeedWithUniformStorage = feedParser.parse(stream);
+        stream.close();
+
+        stream = this.getClass().getResourceAsStream(TABLE_FEED_OVERRIDE);
+        tableFeedWithOverriddenStorage = feedParser.parse(stream);
+        stream.close();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        ConfigurationStore.get().remove(EntityType.CLUSTER, clusterEntity.getName());
+    }
+
+    @DataProvider (name = "locationsDataProvider")
+    private Object[][] createLocationsDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, "/projects/falcon/clicks"},
+            {fsFeedWithOverriddenStorage, "/testCluster/projects/falcon/clicks"},
+        };
+    }
+
+    @Test (dataProvider = "locationsDataProvider")
+    public void testGetLocations(Feed feed, String dataPath) {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
+        for (Location location : locations) {
+            if (location.getType() == LocationType.DATA) {
+                Assert.assertEquals(location.getPath(), dataPath);
+            }
+        }
+    }
+
+    @DataProvider (name = "tableDataProvider")
+    private Object[][] createTableDataProvider() {
+        return new Object[][] {
+            {tableFeedWithUniformStorage, "catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+            {tableFeedWithOverriddenStorage, "catalog:testCluster:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+        };
+    }
+
+    @Test (dataProvider = "tableDataProvider")
+    public void testGetTable(Feed feed, String dataPath) {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        CatalogTable table = FeedHelper.getTable(feedCluster, feed);
+        Assert.assertEquals(table.getUri(), dataPath);
+    }
+
+    private static final String UNIFORM_TABLE = "${hcatNode}/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+    private static final String OVERRIDETBL = "${hcatNode}/default/clicks-summary/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}";
+
+
+    @DataProvider (name = "uniformFeedStorageDataProvider")
+    private Object[][] createUniformFeedStorageDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM, "${nameNode}/projects/falcon/clicks"},
+            {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM, "${nameNode}/projects/falcon/clicks"},
+            {tableFeedWithUniformStorage, Storage.TYPE.TABLE, UNIFORM_TABLE},
+            {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE, OVERRIDETBL},
+        };
+    }
+
+    @Test (dataProvider = "uniformFeedStorageDataProvider")
+    public void testCreateStorageWithFeed(Feed feed, Storage.TYPE storageType,
+                                            String dataLocation) throws Exception {
+        Storage storage = FeedHelper.createStorage(feed);
+        Assert.assertEquals(storage.getType(), storageType);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+
+        if (storageType == Storage.TYPE.TABLE) {
+            Assert.assertEquals(((CatalogStorage) storage).getDatabase(), "default");
+        }
+    }
+
+    @DataProvider (name = "overriddenFeedStorageDataProvider")
+    private Object[][] createFeedStorageDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM, "/projects/falcon/clicks"},
+            {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM, "/testCluster/projects/falcon/clicks"},
+            {tableFeedWithUniformStorage, Storage.TYPE.TABLE, "/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+            {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE, OVERRIDE_TBL_LOC},
+        };
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithFeedAndClusterEntity(Feed feed, Storage.TYPE storageType,
+                                                          String dataLocation) throws Exception {
+        Storage storage = FeedHelper.createStorage(clusterEntity, feed);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithFeedAndClusterName(Feed feed, Storage.TYPE storageType,
+                                                        String dataLocation) throws Exception {
+        Storage storage = FeedHelper.createStorage(clusterEntity.getName(), feed);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithFeedAndFeedCluster(Feed feed, Storage.TYPE storageType,
+                                                        String dataLocation) throws Exception {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        Storage storage = FeedHelper.createStorage(feedCluster, feed);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateStorageWithAll(Feed feed, Storage.TYPE storageType,
+                                         String dataLocation) throws Exception {
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        Storage storage = FeedHelper.createStorage(feedCluster, feed, clusterEntity);
+        Assert.assertEquals(storage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @Test (dataProvider = "overriddenFeedStorageDataProvider")
+    public void testCreateReadOnlyStorage(Feed feed, Storage.TYPE storageType,
+                                          String dataLocation) throws Exception {
+        Storage readOnlyStorage = FeedHelper.createReadOnlyStorage(clusterEntity, feed);
+        Assert.assertEquals(readOnlyStorage.getType(), storageType);
+
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            dataLocation = ClusterHelper.getReadOnlyStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            dataLocation =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+        }
+
+        Assert.assertEquals(readOnlyStorage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @DataProvider (name = "uriTemplateDataProvider")
+    private Object[][] createUriTemplateDataProvider() {
+        return new Object[][] {
+            {Storage.TYPE.FILESYSTEM, "/projects/falcon/clicks"},
+            {Storage.TYPE.FILESYSTEM, "/testCluster/projects/falcon/clicks"},
+            {Storage.TYPE.TABLE, "/default/clicks/ds=${YEAR}-${MONTH}-${DAY}-${HOUR}"},
+            {Storage.TYPE.TABLE, OVERRIDE_TBL_LOC},
+        };
+    }
+
+    @Test (dataProvider = "uriTemplateDataProvider")
+    public void testCreateStorageWithUriTemplate(Storage.TYPE storageType,
+                                                 String dataLocation) throws Exception {
+        String uriTemplate = null;
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            uriTemplate = "DATA=" + ClusterHelper.getStorageUrl(clusterEntity) + dataLocation + "#";
+            dataLocation = ClusterHelper.getStorageUrl(clusterEntity) + dataLocation;
+        } else if (storageType == Storage.TYPE.TABLE) {
+            uriTemplate =
+                    ClusterHelper.getInterface(clusterEntity, Interfacetype.REGISTRY).getEndpoint() + dataLocation;
+            dataLocation = uriTemplate;
+        }
+
+        Storage storage = FeedHelper.createStorage(storageType.name(), uriTemplate);
+        Assert.assertEquals(storage.getType(), storageType);
+        Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), dataLocation);
+    }
+
+    @DataProvider (name = "storageTypeDataProvider")
+    private Object[][] createStorageTypeDataProvider() {
+        return new Object[][] {
+            {fsFeedWithUniformStorage, Storage.TYPE.FILESYSTEM},
+            {fsFeedWithOverriddenStorage, Storage.TYPE.FILESYSTEM},
+            {tableFeedWithUniformStorage, Storage.TYPE.TABLE},
+            {tableFeedWithOverriddenStorage, Storage.TYPE.TABLE},
+        };
+    }
+
+    @Test (dataProvider = "storageTypeDataProvider")
+    public void testGetStorageTypeWithFeed(Feed feed, Storage.TYPE expectedStorageType) throws Exception {
+        Storage.TYPE actualStorageType = FeedHelper.getStorageType(feed);
+        Assert.assertEquals(actualStorageType, expectedStorageType);
+
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                FeedHelper.getCluster(feed, clusterEntity.getName());
+        actualStorageType = FeedHelper.getStorageType(feed, feedCluster);
+        Assert.assertEquals(actualStorageType, expectedStorageType);
+
+        actualStorageType = FeedHelper.getStorageType(feed, clusterEntity);
+        Assert.assertEquals(actualStorageType, expectedStorageType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/common/src/test/resources/config/feed/feed-0.2.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/feed-0.2.xml b/common/src/test/resources/config/feed/feed-0.2.xml
index a3f4f4d..8bf7d20 100644
--- a/common/src/test/resources/config/feed/feed-0.2.xml
+++ b/common/src/test/resources/config/feed/feed-0.2.xml
@@ -34,11 +34,21 @@
             <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
             <retention limit="hours(48)" action="delete"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <locations>
+                <location type="data" path="/testCluster/projects/falcon/clicks"/>
+                <location type="stats" path="/testCluster/projects/falcon/clicksStats"/>
+                <location type="meta" path="/testCluster/projects/falcon/clicksMetaData"/>
+            </locations>
         </cluster>
         <cluster name="backupCluster" type="target">
             <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
             <retention limit="hours(6)" action="archive"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <locations>
+                <location type="data" path="/backupCluster/projects/falcon/clicks"/>
+                <location type="stats" path="/backupCluster/projects/falcon/clicksStats"/>
+                <location type="meta" path="/backupCluster/projects/falcon/clicksMetaData"/>
+            </locations>
         </cluster>
     </clusters>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/common/src/test/resources/config/feed/hive-table-feed-out.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/feed/hive-table-feed-out.xml b/common/src/test/resources/config/feed/hive-table-feed-out.xml
index 1760388..17eb954 100644
--- a/common/src/test/resources/config/feed/hive-table-feed-out.xml
+++ b/common/src/test/resources/config/feed/hive-table-feed-out.xml
@@ -28,11 +28,13 @@
             <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
             <retention limit="hours(48)" action="delete"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <table uri="catalog:testCluster:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
         </cluster>
         <cluster name="backupCluster" type="target">
             <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
             <retention limit="hours(6)" action="archive"/>
             <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <table uri="catalog:backupCluster:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
         </cluster>
     </clusters>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki
index 7abc70f..f2cac4a 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -637,6 +637,26 @@ This defines the workflow engine to be pig and the pig script is defined at
 Feeds with Hive table storage will send one more parameter apart from the general ones:
 <verbatim>$input_filter</verbatim>
 
+---+++++ Hive
+
+Falcon also adds the Hive engine as part of Hive Integration which enables users to embed a Hive script as a process.
+This would enable users to create materialized queries in a declarative way.
+
+Example:
+<verbatim>
+<process name="sample-process">
+...
+    <workflow engine="hive" path="/projects/bootcamp/hive-script.hql"/>
+...
+</process>
+</verbatim>
+
+This defines the workflow engine to be hive and the hive script is defined at
+/projects/bootcamp/hive-script.hql.
+
+Feeds with Hive table storage will send one more parameter apart from the general ones:
+<verbatim>$input_filter</verbatim>
+
 ---++++ Retry
 Retry policy defines how the workflow failures should be handled. Two retry policies are defined: backoff and exp-backoff(exponential backoff). Depending on the delay and number of attempts, the workflow is re-tried after specific intervals.
 Syntax:

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
index e32f5d9..db69a7c 100644
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
@@ -511,8 +511,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                     FeedHelper.getStagingDir(srcCluster, getEntity(), sourceStorage, Tag.REPLICATION)
                     + "/" + sourceDatedPartitionKey
                     + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
-            props.put("falconSourceStagingDir", sourceStagingDir);
-            props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL);
+            props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
 
             // create staging dirs for import at target & set it as distcpTargetPaths
             String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
@@ -520,8 +519,7 @@ public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
                     FeedHelper.getStagingDir(trgCluster, getEntity(), targetStorage, Tag.REPLICATION)
                     + "/" + targetDatedPartitionKey
                     + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
-            props.put("falconTargetStagingDir", targetStagingDir);
-            props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL);
+            props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
 
             props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 22ee6a7..bc4ba48 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -67,7 +67,7 @@
             <job-tracker>${falconSourceJobTracker}</job-tracker>
             <name-node>${falconSourceNameNode}</name-node>
             <prepare>
-                <delete path="${falconSourceStagingDir}/${nominalTime}"/>
+                <delete path="${distcpSourcePaths}"/>
             </prepare>
             <job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
             <configuration>
@@ -84,7 +84,7 @@
             <param>falconSourceDatabase=${falconSourceDatabase}</param>
             <param>falconSourceTable=${falconSourceTable}</param>
             <param>falconSourcePartition=${falconSourcePartition}</param>
-            <param>falconSourceStagingDir=${falconSourceStagingDir}/${nominalTime}</param>
+            <param>falconSourceStagingDir=${distcpSourcePaths}</param>
         </hive>
         <ok to="replication"/>
         <error to="fail"/>
@@ -149,7 +149,7 @@
             <param>falconTargetDatabase=${falconTargetDatabase}</param>
             <param>falconTargetTable=${falconTargetTable}</param>
             <param>falconTargetPartition=${falconTargetPartition}</param>
-            <param>falconTargetStagingDir=${falconTargetStagingDir}/${nominalTime}</param>
+            <param>falconTargetStagingDir=${distcpTargetPaths}</param>
         </hive>
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index d3c9dff..41f74b5 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -147,6 +147,23 @@
                         </goals>
                         <configuration>
                             <!-- <generateDirectory>src/main/java</generateDirectory> -->
+                            <generatePackage>org.apache.falcon.oozie.hive</generatePackage>
+                            <includeSchemas>
+                                <includeSchema>hive-action-0.2.xsd</includeSchema>
+                            </includeSchemas>
+                            <excludeBindings>
+                                <excludeBinding>jaxb-binding.xjb</excludeBinding>
+                            </excludeBindings>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>4</id>
+                        <!-- <phase>generate-sources</phase> -->
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <!-- <generateDirectory>src/main/java</generateDirectory> -->
                             <generatePackage>org.apache.falcon.oozie.bundle</generatePackage>
                             <includeSchemas>
                                 <includeSchema>oozie-bundle-0.1.xsd</includeSchema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index 5438b6f..ad095fd 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -74,6 +74,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
     protected static final JAXBContext WORKFLOW_JAXB_CONTEXT;
     protected static final JAXBContext COORD_JAXB_CONTEXT;
     protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
+    protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
 
     protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
         @Override
@@ -96,6 +97,8 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
             WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
             COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
             BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
+            HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
+                    org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
         } catch (JAXBException e) {
             throw new RuntimeException("Unable to create JAXB context", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/oozie/src/main/resources/hive-action-0.2.xsd
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/hive-action-0.2.xsd b/oozie/src/main/resources/hive-action-0.2.xsd
new file mode 100644
index 0000000..884bd5f
--- /dev/null
+++ b/oozie/src/main/resources/hive-action-0.2.xsd
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+           xmlns:hive="uri:oozie:hive-action:0.2" elementFormDefault="qualified"
+           targetNamespace="uri:oozie:hive-action:0.2">
+
+    <xs:element name="hive" type="hive:ACTION"/>
+
+    <xs:complexType name="ACTION">
+        <xs:sequence>
+            <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="prepare" type="hive:PREPARE" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="configuration" type="hive:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="CONFIGURATION">
+        <xs:sequence>
+            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+                <xs:complexType>
+                    <xs:sequence>
+                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+                    </xs:sequence>
+                </xs:complexType>
+            </xs:element>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="PREPARE">
+        <xs:sequence>
+            <xs:element name="delete" type="hive:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="mkdir" type="hive:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="DELETE">
+        <xs:attribute name="path" type="xs:string" use="required"/>
+    </xs:complexType>
+
+    <xs:complexType name="MKDIR">
+        <xs:attribute name="path" type="xs:string" use="required"/>
+    </xs:complexType>
+
+</xs:schema>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index ee7167d..3460d95 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -32,6 +32,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.EngineType;
@@ -61,7 +62,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-
+import org.apache.xerces.dom.ElementNSImpl;
+import org.w3c.dom.Document;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.dom.DOMResult;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -447,42 +455,74 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
                 action.getSubWorkflow().setAppPath(storagePath);
             } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
                 decoratePIGAction(cluster, process, processWorkflow, storagePath, action.getPig(), wfPath);
+            } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
+                decorateHiveAction(cluster, process, processWorkflow, storagePath, action, wfPath);
             }
         }
 
         marshal(cluster, wfApp, wfPath);
     }
 
-    private void decoratePIGAction(Cluster cluster, Process process, Workflow processWorkflow,
-                                   String storagePath, PIG pigAction, Path wfPath) throws FalconException {
+    private void decoratePIGAction(Cluster cluster, Process process,
+                                   Workflow processWorkflow, String storagePath,
+                                   PIG pigAction, Path wfPath) throws FalconException {
         pigAction.setScript(storagePath);
 
-        addPrepareDeleteOutputPath(cluster, process, pigAction);
+        addPrepareDeleteOutputPath(process, pigAction);
 
-        addInputFeedsAsParams(pigAction, process, cluster);
-        addOutputFeedsAsParams(pigAction, process, cluster);
+        final List<String> paramList = pigAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
 
         propagateProcessProperties(pigAction, process);
 
-        setupHiveConfiguration(cluster, process, wfPath);
+        Storage.TYPE storageType = getStorageType(cluster, process);
+        if (Storage.TYPE.TABLE == storageType) {
+            // adds hive-site.xml in pig classpath
+            setupHiveConfiguration(cluster, wfPath, ""); // DO NOT ADD PREFIX!!!
+            pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+        }
+
+        addArchiveForCustomJars(cluster, processWorkflow, pigAction.getArchive());
+    }
+
+    private void decorateHiveAction(Cluster cluster, Process process,
+                                    Workflow processWorkflow, String storagePath,
+                                    ACTION wfAction, Path wfPath) throws FalconException {
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        hiveAction.setScript(storagePath);
+
+        addPrepareDeleteOutputPath(process, hiveAction);
+
+        final List<String> paramList = hiveAction.getParam();
+        addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
+        addOutputFeedsAsParams(paramList, process, cluster);
+
+        propagateProcessProperties(hiveAction, process);
 
-        addArchiveForCustomJars(cluster, processWorkflow, pigAction);
+        setupHiveConfiguration(cluster, wfPath, "falcon-");
+
+        addArchiveForCustomJars(cluster, processWorkflow, hiveAction.getArchive());
+
+        marshalHiveAction(wfAction, actionJaxbElement);
     }
 
-    private void addPrepareDeleteOutputPath(Cluster cluster, Process process,
+    private void addPrepareDeleteOutputPath(Process process,
                                             PIG pigAction) throws FalconException {
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
         final PREPARE prepare = new PREPARE();
         final List<DELETE> deleteList = prepare.getDelete();
-        for (Output output : process.getOutputs().getOutputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-            Storage storage = FeedHelper.createStorage(cluster, feed);
-
-            if (storage.getType() == Storage.TYPE.TABLE) {
-                continue; // prepare delete only applies to FileSystem storage
-            }
 
+        for (String deletePath : deleteOutputPathList) {
             final DELETE delete = new DELETE();
-            delete.setPath("${wf:conf('" + output.getName() + "')}");
+            delete.setPath(deletePath);
             deleteList.add(delete);
         }
 
@@ -491,41 +531,79 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         }
     }
 
-    private void addInputFeedsAsParams(PIG pigAction, Process process,
-                                       Cluster cluster) throws FalconException {
-        final List<String> paramList = pigAction.getParam();
+    private void addPrepareDeleteOutputPath(Process process,
+                                            org.apache.falcon.oozie.hive.ACTION hiveAction)
+        throws FalconException {
+
+        List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+        if (deleteOutputPathList.isEmpty()) {
+            return;
+        }
+
+        org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+        List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+        for (String deletePath : deleteOutputPathList) {
+            org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+            delete.setPath(deletePath);
+            deleteList.add(delete);
+        }
+
+        if (!deleteList.isEmpty()) {
+            hiveAction.setPrepare(prepare);
+        }
+    }
+
+    private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
+        final List<String> deleteList = new ArrayList<String>();
+        for (Output output : process.getOutputs().getOutputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+            if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+                continue; // prepare delete only applies to FileSystem storage
+            }
+
+            deleteList.add("${wf:conf('" + output.getName() + "')}");
+        }
+
+        return deleteList;
+    }
+
+    private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
+                                       String engineType) throws FalconException {
         for (Input input : process.getInputs().getInputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
 
-            final String inputName = "falcon_" + input.getName();
+            final String inputName = input.getName();
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                paramList.add(inputName + "=${" + inputName + "}");
+                paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
             } else if (storage.getType() == Storage.TYPE.TABLE) {
+                final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
                 Map<String, String> props = new HashMap<String, String>();
-                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, inputName);
+                propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
                 for (String key : props.keySet()) {
                     paramList.add(key + "=${wf:conf('" + key + "')}");
                 }
-                // finally add the pig filter for this input feed
-                paramList.add(inputName + "_filter=${wf:conf('" + inputName + "_partition_filter_pig')}");
+
+                paramList.add(paramName + "_filter=${wf:conf('"
+                        + paramName + "_partition_filter_" + engineType + "')}");
             }
         }
     }
 
-    private void addOutputFeedsAsParams(PIG pigAction, Process process,
+    private void addOutputFeedsAsParams(List<String> paramList, Process process,
                                         Cluster cluster) throws FalconException {
-        final List<String> paramList = pigAction.getParam();
         for (Output output : process.getOutputs().getOutputs()) {
             Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
             Storage storage = FeedHelper.createStorage(cluster, feed);
 
             if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                final String outputName = "falcon_" + output.getName();
+                final String outputName = output.getName();  // no prefix for backwards compatibility
                 paramList.add(outputName + "=${" + outputName + "}");
             } else if (storage.getType() == Storage.TYPE.TABLE) {
                 Map<String, String> props = new HashMap<String, String>();
-                propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+                propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
                 for (String key : props.keySet()) {
                     paramList.add(key + "=${wf:conf('" + key + "')}");
                 }
@@ -542,43 +620,79 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         // Propagate user defined properties to job configuration
         final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
                 pigAction.getConfiguration().getProperty();
+
+        // Propagate user defined properties to pig script as macros
+        // passed as parameters -p name=value that can be accessed as $name
+        final List<String> paramList = pigAction.getParam();
+
         for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
             org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
                     new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
             configProperty.setName(property.getName());
             configProperty.setValue(property.getValue());
             configuration.add(configProperty);
+
+            paramList.add(property.getName() + "=" + property.getValue());
         }
+    }
+
+    private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
+        org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+        if (processProperties == null) {
+            return;
+        }
+
+        // Propagate user defined properties to job configuration
+        final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
+                hiveAction.getConfiguration().getProperty();
 
         // Propagate user defined properties to pig script as macros
         // passed as parameters -p name=value that can be accessed as $name
-        final List<String> paramList = pigAction.getParam();
+        final List<String> paramList = hiveAction.getParam();
+
         for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+            org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
+                    new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+            configProperty.setName(property.getName());
+            configProperty.setValue(property.getValue());
+            configuration.add(configProperty);
+
             paramList.add(property.getName() + "=" + property.getValue());
         }
     }
 
-    // adds hive-site.xml in pig classpath
-    private void setupHiveConfiguration(Cluster cluster, Process process,
-                                        Path wfPath) throws FalconException {
-        Input input = process.getInputs().getInputs().get(0); // at least one input should exist
-        Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-        Storage storage = FeedHelper.createStorage(cluster, feed);
-        if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-            return;
+    private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
+        if (process.getInputs() == null) {
+            return storageType;
+        }
+
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            storageType = FeedHelper.getStorageType(feed, cluster);
+            if (Storage.TYPE.TABLE == storageType) {
+                break;
+            }
         }
 
+        return storageType;
+    }
+
+    // creates hive-site.xml configuration in conf dir.
+    private void setupHiveConfiguration(Cluster cluster, Path wfPath,
+                                        String prefix) throws FalconException {
+        String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
         try {
             FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
             Path confPath = new Path(wfPath, "conf");
-            createHiveConf(fs, confPath, ((CatalogStorage) storage).getCatalogUrl(), ""); // DO NOT ADD PREFIX!!!
+            createHiveConf(fs, confPath, catalogUrl, prefix);
         } catch (IOException e) {
             throw new FalconException(e);
         }
     }
 
     private void addArchiveForCustomJars(Cluster cluster, Workflow processWorkflow,
-                                         PIG pigAction) throws FalconException {
+                                         List<String> archiveList) throws FalconException {
         String processWorkflowLib = processWorkflow.getLib();
         if (processWorkflowLib == null) {
             return;
@@ -588,7 +702,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
         try {
             final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
             if (fs.isFile(libPath)) {  // File, not a Dir
-                pigAction.getArchive().add(processWorkflowLib);
+                archiveList.add(processWorkflowLib);
                 return;
             }
 
@@ -605,10 +719,34 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
             });
 
             for (FileStatus fileStatus : fileStatuses) {
-                pigAction.getArchive().add(fileStatus.getPath().toString());
+                archiveList.add(fileStatus.getPath().toString());
             }
         } catch (IOException e) {
             throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
         }
     }
+
+    @SuppressWarnings("unchecked")
+    protected JAXBElement<org.apache.falcon.oozie.hive.ACTION> unMarshalHiveAction(ACTION wfAction) {
+        try {
+            Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
+            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+            return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
+                    unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to unmarshall hive action.", e);
+        }
+    }
+
+    protected void marshalHiveAction(ACTION wfAction,
+                                     JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
+        try {
+            DOMResult hiveActionDOM = new DOMResult();
+            Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(actionjaxbElement, hiveActionDOM);
+            wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to marshall hive action.", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index 048191c..4668ce3 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -62,6 +62,9 @@
             <case to="user-pig-job">
                 ${userWorkflowEngine=="pig"}
             </case>
+            <case to="user-hive-job">
+                ${userWorkflowEngine=="hive"}
+            </case>
             <default to="user-oozie-workflow"/>
         </switch>
     </decision>
@@ -88,11 +91,30 @@
                 </property>
             </configuration>
             <script>#USER_WF_PATH#</script>
-            <file>${wf:appPath()}/conf/hive-site.xml</file>
         </pig>
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>
     </action>
+    <action name="user-hive-job">
+        <hive xmlns="uri:oozie:hive-action:0.2">
+            <job-tracker>${jobTracker}</job-tracker>
+            <name-node>${nameNode}</name-node>
+            <job-xml>${wf:appPath()}/conf/falcon-hive-site.xml</job-xml>
+            <configuration>
+                <property>
+                    <name>mapred.job.queue.name</name>
+                    <value>${queueName}</value>
+                </property>
+                <property>
+                    <name>oozie.launcher.mapred.job.priority</name>
+                    <value>${jobPriority}</value>
+                </property>
+            </configuration>
+            <script>#USER_WF_PATH#</script>
+        </hive>
+        <ok to="succeeded-post-processing"/>
+        <error to="failed-post-processing"/>
+    </action>
     <action name='user-oozie-workflow'>
         <sub-workflow>
             <app-path>#USER_WF_PATH#</app-path>
@@ -101,7 +123,6 @@
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>
     </action>
-
     <action name='succeeded-post-processing'>
         <java>
             <job-tracker>${jobTracker}</job-tracker>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 5330b9f..aef0850 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -18,26 +18,6 @@
 
 package org.apache.falcon.converter;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
@@ -65,17 +45,37 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.PIG;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 /**
  * Test for the Falcon entities mapping into Oozie artifacts.
  */
@@ -206,17 +206,82 @@ public class OozieProcessMapperTest extends AbstractTestBase {
 
         List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
 
-        ACTION pigAction = (ACTION) decisionOrForkOrJoin.get(3);
-        Assert.assertEquals("user-pig-job", pigAction.getName());
-        Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getPig().getScript());
-        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getPig().getArchive());
+        ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
+        Assert.assertEquals("user-pig-job", pigActionNode.getName());
 
-        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(4);
+        final PIG pigAction = pigActionNode.getPig();
+        Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getScript());
+        Assert.assertNotNull(pigAction.getPrepare());
+        Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
+        Assert.assertFalse(pigAction.getParam().isEmpty());
+        Assert.assertEquals(5, pigAction.getParam().size());
+        Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
+        Assert.assertTrue(pigAction.getFile().size() > 0);
+
+        ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
         Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
         Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
     }
 
     @Test
+    public void testHiveProcessMapper() throws Exception {
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+        resource = this.getClass().getResource("/config/process/hive-process.xml");
+        Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+        ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+        OozieProcessMapper mapper = new OozieProcessMapper(process);
+        Path bundlePath = new Path("/tmp/seetharam", EntityUtil.getStagingPath(process));
+        mapper.map(cluster, bundlePath);
+        assertTrue(fs.exists(bundlePath));
+
+        BUNDLEAPP bundle = getBundle(bundlePath);
+        assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+        assertEquals(1, bundle.getCoordinator().size());
+        assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+                bundle.getCoordinator().get(0).getName());
+        String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+        COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+        CONFIGURATION conf = coord.getAction().getWorkflow().getConfiguration();
+        List<Property> properties = conf.getProperty();
+
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+
+        for (Property property : properties) {
+            if (expected.containsKey(property.getName())) {
+                Assert.assertEquals(property.getValue(), expected.get(property.getName()));
+            }
+        }
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+        testParentWorkflow(process, parentWorkflow);
+
+        List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+        ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+        Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+        JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = mapper.unMarshalHiveAction(hiveNode);
+        org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+        Assert.assertEquals("${nameNode}/apps/hive/script.hql", hiveAction.getScript());
+        Assert.assertNull(hiveAction.getPrepare());
+        Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+        Assert.assertFalse(hiveAction.getParam().isEmpty());
+        Assert.assertEquals(10, hiveAction.getParam().size());
+    }
+
+    @Test
     public void testProcessMapperForTableStorage() throws Exception {
         URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
         Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
@@ -226,7 +291,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.FEED, outFeed);
 
-        resource = this.getClass().getResource("/config/process/process-table.xml");
+        resource = this.getClass().getResource("/config/process/pig-process-table.xml");
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.PROCESS, process);
 
@@ -289,6 +354,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         }
     }
 
+    @SuppressWarnings("unchecked")
     private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
@@ -346,8 +412,11 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
         Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
         Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+        Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+        Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
     }
 
     private COORDINATORAPP getCoordinator(Path path) throws Exception {
@@ -362,16 +431,14 @@ public class OozieProcessMapperTest extends AbstractTestBase {
         return jaxbBundle.getValue();
     }
 
+    @SuppressWarnings("unchecked")
     private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
         String workflow = readFile(new Path(path, "workflow.xml"));
 
-        Unmarshaller unmarshaller = JAXBContext.newInstance(WORKFLOWAPP.class).createUnmarshaller();
-        SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
-        Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-workflow-0.3.xsd"));
-        unmarshaller.setSchema(schema);
-        JAXBElement<WORKFLOWAPP> jaxbWorkflow = unmarshaller.unmarshal(
-                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())), WORKFLOWAPP.class);
-        return jaxbWorkflow.getValue();
+        JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+        Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
+        return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+                new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
     }
 
     private BUNDLEAPP getBundle(Path path) throws Exception {
@@ -397,7 +464,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
     }
 
     @Override
-    @AfterClass
+    @AfterMethod
     public void cleanup() throws Exception {
         super.cleanup();
         ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/process/src/test/resources/config/process/hive-process.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/hive-process.xml b/process/src/test/resources/config/process/hive-process.xml
new file mode 100644
index 0000000..4dac8e9
--- /dev/null
+++ b/process/src/test/resources/config/process/hive-process.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/process/src/test/resources/config/process/pig-process-table.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/pig-process-table.xml b/process/src/test/resources/config/process/pig-process-table.xml
new file mode 100644
index 0000000..37aca10
--- /dev/null
+++ b/process/src/test/resources/config/process/pig-process-table.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<process name="table-process" xmlns="uri:falcon:process:0.1">
+    <!-- where -->
+    <clusters>
+        <cluster name="corp">
+            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+        </cluster>
+    </clusters>
+
+    <!-- when -->
+    <parallel>1</parallel>
+    <order>LIFO</order>
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+
+    <!-- what -->
+    <inputs>
+        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+    </inputs>
+
+    <outputs>
+        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+    </outputs>
+
+    <!-- how -->
+    <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/process/src/test/resources/config/process/process-table.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-table.xml b/process/src/test/resources/config/process/process-table.xml
deleted file mode 100644
index 37aca10..0000000
--- a/process/src/test/resources/config/process/process-table.xml
+++ /dev/null
@@ -1,46 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-<process name="table-process" xmlns="uri:falcon:process:0.1">
-    <!-- where -->
-    <clusters>
-        <cluster name="corp">
-            <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
-        </cluster>
-    </clusters>
-
-    <!-- when -->
-    <parallel>1</parallel>
-    <order>LIFO</order>
-    <frequency>hours(1)</frequency>
-    <timezone>UTC</timezone>
-
-    <!-- what -->
-    <inputs>
-        <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
-    </inputs>
-
-    <outputs>
-        <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
-    </outputs>
-
-    <!-- how -->
-    <workflow engine="pig" path="/apps/pig/id.pig"/>
-
-    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
-</process>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 0c7b7b7..dab809b 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -70,6 +70,7 @@ public class LateDataHandler extends Configured implements Tool {
 
         opt = new Option("falconFeedStorageType", true, "Feed storage type: FileSystem or Table");
         opt.setRequired(true);
+        options.addOption(opt);
 
         return new GnuParser().parse(options, args);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/502990c9/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 492fc93..02c0458 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -249,15 +249,6 @@
                                     <outputDirectory>${project.build.directory}/webapps</outputDirectory>
                                     <destFileName>hadoop.war</destFileName>
                                 </artifactItem>
-                                <artifactItem>
-                                    <groupId>org.apache.pig</groupId>
-                                    <artifactId>pig</artifactId>
-                                    <version>0.11.1</version>
-                                    <type>jar</type>
-                                    <overWrite>false</overWrite>
-                                    <outputDirectory>${project.build.directory}/sharelib</outputDirectory>
-                                    <destFileName>pig.jar</destFileName>
-                                </artifactItem>
                                 <!-- this is only used in integration-tests against external clusters -->
                                 <artifactItem>
                                     <groupId>org.apache.activemq</groupId>


Mime
View raw message