falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject falcon git commit: FALCON-1836 Import from database to HCatalog
Date Mon, 14 Mar 2016 21:34:50 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 9ec4c23a1 -> aec6084e7


FALCON-1836 Import from database to HCatalog

Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>

Reviewers: "Balu Vellanki <bvellanki@hortonworks.com>, Ajay Yadava <ajayyadava@apache.org>, Peeyush Bishnoi <bpeeyush@yahoo.co.in>"

Closes #61 from vramachan/master


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aec6084e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aec6084e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aec6084e

Branch: refs/heads/master
Commit: aec6084e7df82574c0b41063ab0f4a4115cbf25d
Parents: 9ec4c23
Author: Venkatesan Ramachandran <vramachandran@hortonworks.com>
Authored: Mon Mar 14 14:34:28 2016 -0700
Committer: bvellanki <bvellanki@hortonworks.com>
Committed: Mon Mar 14 14:34:28 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/falcon/entity/HiveUtil.java |   4 +-
 .../org/apache/falcon/entity/HiveUtilTest.java  |   4 +-
 .../engine/oozie/utils/OozieBuilderUtils.java   |   4 +-
 oozie/pom.xml                                   |  19 +++
 .../oozie/DatabaseExportWorkflowBuilder.java    | 116 ++++++++++++++-----
 .../oozie/DatabaseImportWorkflowBuilder.java    | 115 ++++++++++++++----
 .../falcon/oozie/ExportWorkflowBuilder.java     |   5 +-
 .../oozie/FeedExportCoordinatorBuilder.java     |  14 +--
 .../apache/falcon/oozie/ImportExportCommon.java |  46 +++++++-
 .../falcon/oozie/ImportWorkflowBuilder.java     |   5 +-
 .../OozieOrchestrationWorkflowBuilder.java      |   6 +-
 .../java/org/apache/falcon/util/OozieUtils.java |  28 +++++
 .../apache/falcon/lifecycle/FeedExportIT.java   | 115 ++++++++++++++++++
 .../apache/falcon/lifecycle/FeedImportIT.java   |  56 ++++++++-
 .../falcon/resource/AbstractTestContext.java    |   3 +-
 .../test/resources/feed-export-template6.xml    |  56 +++++++++
 webapp/src/test/resources/feed-template5.xml    |  55 +++++++++
 17 files changed, 575 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
index f4029e4..f8eaebb 100644
--- a/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/HiveUtil.java
@@ -29,7 +29,7 @@ import java.util.Properties;
  */
 public final class HiveUtil {
     public static final String METASTOREURIS = "hive.metastore.uris";
-    public static final String METASTROE_URI = "hcat.metastore.uri";
+    public static final String METASTORE_URI = "hcat.metastore.uri";
     public static final String NODE = "hcatNode";
     public static final String METASTORE_UGI = "hive.metastore.execute.setugi";
 
@@ -48,7 +48,7 @@ public final class HiveUtil {
         hiveCredentials.put(METASTOREURIS, metaStoreUrl);
         hiveCredentials.put(METASTORE_UGI, "true");
         hiveCredentials.put(NODE, metaStoreUrl.replace("thrift", "hcat"));
-        hiveCredentials.put(METASTROE_URI, metaStoreUrl);
+        hiveCredentials.put(METASTORE_URI, metaStoreUrl);
 
         if (SecurityUtil.isSecurityEnabled()) {
             String principal = ClusterHelper

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
index c37cebd..7f890f3 100644
--- a/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/HiveUtilTest.java
@@ -55,7 +55,7 @@ public class HiveUtilTest {
         Properties expected = new Properties();
         expected.put(HiveUtil.METASTORE_UGI, "true");
         expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
-        expected.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+        expected.put(HiveUtil.METASTORE_URI, metaStoreUrl);
         expected.put(HiveUtil.METASTOREURIS, metaStoreUrl);
 
         Properties actual = HiveUtil.getHiveCredentials(cluster);
@@ -91,7 +91,7 @@ public class HiveUtilTest {
         expected.put(SecurityUtil.METASTORE_PRINCIPAL, principal);
         expected.put(HiveUtil.METASTORE_UGI, "true");
         expected.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
-        expected.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+        expected.put(HiveUtil.METASTORE_URI, metaStoreUrl);
         expected.put(HiveUtil.METASTOREURIS, metaStoreUrl);
 
         Properties actual = HiveUtil.getHiveCredentials(cluster);

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
index 732a9e7..8f1b53b 100644
--- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
+++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/utils/OozieBuilderUtils.java
@@ -417,7 +417,7 @@ public final class OozieBuilderUtils {
         credential.setName(credentialName);
         credential.setType("hcat");
 
-        credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl));
+        credential.getProperty().add(createProperty(HiveUtil.METASTORE_URI, metaStoreUrl));
         credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL,
                 ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)));
 
@@ -441,7 +441,7 @@ public final class OozieBuilderUtils {
         hiveCredentials.put(HiveUtil.METASTOREURIS, metaStoreUrl);
         hiveCredentials.put(HiveUtil.METASTORE_UGI, "true");
         hiveCredentials.put(HiveUtil.NODE, metaStoreUrl.replace("thrift", "hcat"));
-        hiveCredentials.put(HiveUtil.METASTROE_URI, metaStoreUrl);
+        hiveCredentials.put(HiveUtil.METASTORE_URI, metaStoreUrl);
 
         if (SecurityUtil.isSecurityEnabled()) {
             String principal = ClusterHelper

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 4623d8b..c53d33c 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -186,6 +186,25 @@
                         </configuration>
                     </execution>
                     <execution>
+                        <id>sqoop-gen</id>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <forceRegenerate>true</forceRegenerate>
+                            <generatePackage>org.apache.falcon.oozie.sqoop</generatePackage>
+                            <schemas>
+                                <schema>
+                                    <dependencyResource>
+                                        <groupId>org.apache.oozie</groupId>
+                                        <artifactId>oozie-client</artifactId>
+                                        <resource>sqoop-action-0.3.xsd</resource>
+                                    </dependencyResource>
+                                </schema>
+                            </schemas>
+                        </configuration>
+                    </execution>
+                    <execution>
                         <id>bundle-gen</id>
                         <goals>
                             <goal>generate</goal>

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
index d69611b..284c4a3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseExportWorkflowBuilder.java
@@ -18,18 +18,25 @@
 
 package org.apache.falcon.oozie;
 
+import com.google.common.base.Splitter;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.DatasourceHelper;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LoadMethod;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.fs.Path;
 
+import javax.xml.bind.JAXBElement;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
@@ -49,22 +56,27 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
     }
 
     @Override
-    protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException {
+    protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+        throws FalconException {
 
-        addLibExtensionsToWorkflow(cluster, workflow, Tag.EXPORT);
+        ACTION action = unmarshalAction(EXPORT_SQOOP_ACTION_TEMPLATE);
+        JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionJaxbElement = OozieUtils.unMarshalSqoopAction(action);
+        org.apache.falcon.oozie.sqoop.ACTION sqoopExport = actionJaxbElement.getValue();
+
+        Properties props = new Properties();
+        ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath);
+        sqoopExport.getJobXml().add("${wf:appPath()}/conf/hive-site.xml");
+        OozieUtils.marshalSqoopAction(action, actionJaxbElement);
 
-        ACTION sqoopExport = unmarshalAction(EXPORT_SQOOP_ACTION_TEMPLATE);
-        addTransition(sqoopExport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(sqoopExport);
+        addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(action);
 
         //Add post-processing actions
         ACTION success = getSuccessPostProcessAction();
-        // delete addHDFSServersConfig(success, src, target);
         addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(success);
 
         ACTION fail = getFailPostProcessAction();
-        // delete addHDFSServersConfig(fail, src, target);
         addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(fail);
 
@@ -74,7 +86,6 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
         // build the sqoop command and put it in the properties
         String sqoopCmd = buildSqoopCommand(cluster, entity);
         LOG.info("SQOOP EXPORT COMMAND : " + sqoopCmd);
-        Properties props = new Properties();
         props.put("sqoopCommand", sqoopCmd);
         return props;
     }
@@ -86,28 +97,21 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
 
         buildConnectArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
         buildTableArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
-        ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, cluster, entity)
+        Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getExportDatasourceName(
+                FeedHelper.getCluster(entity, cluster.getName())));
+        ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, datasource)
                 .append(ImportExportCommon.ARG_SEPARATOR);
         buildNumMappers(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
-        buildArguments(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
+        buildArguments(sqoopArgs, extraArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR);
         buildLoadType(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
-        buildExportDirArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
+        buildExportArg(sqoopArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR);
 
-        StringBuffer sqoopCmd = new StringBuffer();
+        StringBuilder sqoopCmd = new StringBuilder();
         return sqoopCmd.append("export").append(ImportExportCommon.ARG_SEPARATOR)
                 .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR)
                 .append(sqoopArgs).toString();
     }
 
-    private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
-        Datasource db = DatasourceHelper.getDatasource(FeedHelper.getExportDatasourceName(feedCluster));
-        if ((db.getDriver() != null) && (db.getDriver().getClazz() != null)) {
-            builder.append("--driver").append(ImportExportCommon.ARG_SEPARATOR).append(db.getDriver().getClazz());
-        }
-        return builder;
-    }
-
     private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         return builder.append("--connect").append(ImportExportCommon.ARG_SEPARATOR)
@@ -132,18 +136,32 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
         return builder.append(modeType);
     }
 
+    private StringBuilder buildExportArg(StringBuilder builder, Feed feed, Cluster cluster)
+        throws FalconException {
+        Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster);
+        if (feedStorageType == Storage.TYPE.TABLE) {
+            return buildExportTableArg(builder, feed.getTable());
+        } else {
+            return buildExportDirArg(builder, cluster);
+        }
+    }
+
     private StringBuilder buildExportDirArg(StringBuilder builder, Cluster cluster)
         throws FalconException {
         return builder.append("--export-dir").append(ImportExportCommon.ARG_SEPARATOR)
-                .append(String.format("${coord:dataIn('%s')}",
-                        FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME));
+            .append(String.format("${coord:dataIn('%s')}",
+                FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME));
     }
 
-    private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs)
-        throws FalconException {
+    private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs, Feed feed,
+        Cluster cluster) throws FalconException {
+        Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster);
         for(Map.Entry<String, String> e : extraArgs.entrySet()) {
+            if ((feedStorageType == Storage.TYPE.TABLE) && (e.getKey().equals("--update-key"))) {
+                continue;
+            }
             builder.append(e.getKey()).append(ImportExportCommon.ARG_SEPARATOR).append(e.getValue())
-                    .append(ImportExportCommon.ARG_SEPARATOR);
+                .append(ImportExportCommon.ARG_SEPARATOR);
         }
         return builder;
     }
@@ -169,4 +187,50 @@ public class DatabaseExportWorkflowBuilder extends ExportWorkflowBuilder {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         return FeedHelper.getExportArguments(feedCluster);
     }
+
+    private StringBuilder buildExportTableArg(StringBuilder builder, CatalogTable catalog) throws FalconException {
+
+        LOG.info("Catalog URI {}", catalog.getUri());
+        builder.append("--skip-dist-cache").append(ImportExportCommon.ARG_SEPARATOR);
+        Iterator<String> itr = Splitter.on("#").split(catalog.getUri()).iterator();
+        String dbTable = itr.next();
+        String partitions = itr.next();
+        Iterator<String> itrDbTable = Splitter.on(":").split(dbTable).iterator();
+        itrDbTable.next();
+        String db = itrDbTable.next();
+        String table = itrDbTable.next();
+        LOG.debug("Target database {}, table {}", db, table);
+        builder.append("--hcatalog-database").append(ImportExportCommon.ARG_SEPARATOR)
+                .append(String.format("${coord:databaseIn('%s')}", FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME))
+                .append(ImportExportCommon.ARG_SEPARATOR);
+
+        builder.append("--hcatalog-table").append(ImportExportCommon.ARG_SEPARATOR)
+                .append(String.format("${coord:tableIn('%s')}", FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME))
+                .append(ImportExportCommon.ARG_SEPARATOR);
+
+        Map<String, String> partitionsMap = ImportExportCommon.getPartitionKeyValues(partitions);
+        if (partitionsMap.size() > 0) {
+            StringBuilder partitionKeys = new StringBuilder();
+            StringBuilder partitionValues = new StringBuilder();
+            for (Map.Entry<String, String> e : partitionsMap.entrySet()) {
+                partitionKeys.append(e.getKey());
+                partitionKeys.append(',');
+                partitionValues.append(String.format("${coord:dataInPartitionMin('%s','%s')}",
+                        FeedExportCoordinatorBuilder.EXPORT_DATAIN_NAME,
+                        e.getKey()));
+                partitionValues.append(',');
+            }
+            if (partitionsMap.size() > 0) {
+                partitionKeys.setLength(partitionKeys.length()-1);
+                partitionValues.setLength(partitionValues.length()-1);
+            }
+            LOG.debug("partitionKeys {} and partitionValue {}", partitionKeys.toString(), partitionValues.toString());
+            builder.append("--hcatalog-partition-keys").append(ImportExportCommon.ARG_SEPARATOR)
+                    .append(partitionKeys.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+            builder.append("--hcatalog-partition-values").append(ImportExportCommon.ARG_SEPARATOR)
+                    .append(partitionValues.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+        }
+        return builder;
+    }
 }
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
index 66bfa9b..3e24428 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/DatabaseImportWorkflowBuilder.java
@@ -18,25 +18,35 @@
 
 package org.apache.falcon.oozie;
 
+import com.google.common.base.Splitter;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
 import org.apache.falcon.entity.DatasourceHelper;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.datasource.Datasource;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.workflow.ACTION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBElement;
+
 /**
  * Builds Datasource import workflow for Oozie.
  */
 
 public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
+
     protected static final String IMPORT_SQOOP_ACTION_TEMPLATE = "/action/feed/import-sqoop-database-action.xml";
     protected static final String IMPORT_ACTION_NAME="db-import-sqoop";
 
@@ -48,23 +58,27 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
     }
 
     @Override
-    protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException {
+    protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+        throws FalconException {
 
-        addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);
+        ACTION action = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE);
+        JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionJaxbElement = OozieUtils.unMarshalSqoopAction(action);
+        org.apache.falcon.oozie.sqoop.ACTION sqoopImport = actionJaxbElement.getValue();
+
+        Properties props = new Properties();
+        ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath);
+        sqoopImport.getJobXml().add("${wf:appPath()}/conf/hive-site.xml");
+        OozieUtils.marshalSqoopAction(action, actionJaxbElement);
 
-        ACTION sqoopImport = unmarshalAction(IMPORT_SQOOP_ACTION_TEMPLATE);
-        // delete addHDFSServersConfig(sqoopImport, src, target);
-        addTransition(sqoopImport, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
-        workflow.getDecisionOrForkOrJoin().add(sqoopImport);
+        addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+        workflow.getDecisionOrForkOrJoin().add(action);
 
         //Add post-processing actions
         ACTION success = getSuccessPostProcessAction();
-        // delete addHDFSServersConfig(success, src, target);
         addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(success);
 
         ACTION fail = getFailPostProcessAction();
-        // delete addHDFSServersConfig(fail, src, target);
         addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(fail);
 
@@ -73,8 +87,7 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
 
         // build the sqoop command and put it in the properties
         String sqoopCmd = buildSqoopCommand(cluster, entity);
-        LOG.info("SQOOP COMMAND : " + sqoopCmd);
-        Properties props = new Properties();
+        LOG.info("SQOOP IMPORT COMMAND : " + sqoopCmd);
         props.put("sqoopCommand", sqoopCmd);
         return props;
     }
@@ -86,16 +99,18 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
         buildDriverArgs(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
         buildConnectArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
         buildTableArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
-        ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, cluster, entity)
-                .append(ImportExportCommon.ARG_SEPARATOR);
+        Datasource datasource = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(
+            FeedHelper.getCluster(entity, cluster.getName())));
+        ImportExportCommon.buildUserPasswordArg(sqoopArgs, sqoopOptions, datasource)
+            .append(ImportExportCommon.ARG_SEPARATOR);
         buildNumMappers(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
         buildArguments(sqoopArgs, extraArgs).append(ImportExportCommon.ARG_SEPARATOR);
-        buildTargetDirArg(sqoopArgs, cluster).append(ImportExportCommon.ARG_SEPARATOR);
+        buildTargetArg(sqoopArgs, feed, cluster).append(ImportExportCommon.ARG_SEPARATOR);
 
-        StringBuffer sqoopCmd = new StringBuffer();
+        StringBuilder sqoopCmd = new StringBuilder();
         return sqoopCmd.append("import").append(ImportExportCommon.ARG_SEPARATOR)
-                .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR)
-                .append(sqoopArgs).toString();
+            .append(sqoopOptions).append(ImportExportCommon.ARG_SEPARATOR)
+            .append(sqoopArgs).toString();
     }
 
     private StringBuilder buildDriverArgs(StringBuilder builder, Cluster cluster) throws FalconException {
@@ -110,29 +125,40 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
     private StringBuilder buildConnectArg(StringBuilder builder, Cluster cluster) throws FalconException {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         return builder.append("--connect").append(ImportExportCommon.ARG_SEPARATOR)
-                .append(DatasourceHelper.getReadOnlyEndpoint(
-                        DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster))));
+            .append(DatasourceHelper.getReadOnlyEndpoint(
+                DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster))));
     }
 
     private StringBuilder buildTableArg(StringBuilder builder, Cluster cluster) throws FalconException {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         return builder.append("--table").append(ImportExportCommon.ARG_SEPARATOR)
-                                    .append(FeedHelper.getImportDataSourceTableName(feedCluster));
+            .append(FeedHelper.getImportDataSourceTableName(feedCluster));
     }
 
-    private StringBuilder buildTargetDirArg(StringBuilder builder, Cluster cluster)
+    private StringBuilder buildTargetArg(StringBuilder builder, Feed feed, Cluster cluster)
+        throws FalconException {
+        Storage.TYPE feedStorageType = FeedHelper.getStorageType(feed, cluster);
+        if (feedStorageType == Storage.TYPE.TABLE) {
+            return buildTargetTableArg(builder, feed.getTable());
+
+        } else {
+            return buildTargetDirArg(builder);
+        }
+    }
+
+    private StringBuilder buildTargetDirArg(StringBuilder builder)
         throws FalconException {
         return builder.append("--delete-target-dir").append(ImportExportCommon.ARG_SEPARATOR)
                 .append("--target-dir").append(ImportExportCommon.ARG_SEPARATOR)
                 .append(String.format("${coord:dataOut('%s')}",
-                        FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
+                    FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME));
     }
 
     private StringBuilder buildArguments(StringBuilder builder, Map<String, String> extraArgs)
         throws FalconException {
         for(Map.Entry<String, String> e : extraArgs.entrySet()) {
             builder.append(e.getKey()).append(ImportExportCommon.ARG_SEPARATOR).append(e.getValue())
-                    .append(ImportExportCommon.ARG_SEPARATOR);
+                .append(ImportExportCommon.ARG_SEPARATOR);
         }
         return builder;
     }
@@ -158,4 +184,49 @@ public class DatabaseImportWorkflowBuilder extends ImportWorkflowBuilder {
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         return FeedHelper.getImportArguments(feedCluster);
     }
+
+    private StringBuilder buildTargetTableArg(StringBuilder builder, CatalogTable catalog) throws FalconException {
+
+        LOG.info("Catalog URI {}", catalog.getUri());
+        builder.append("--skip-dist-cache").append(ImportExportCommon.ARG_SEPARATOR);
+        Iterator<String> itr = Splitter.on("#").split(catalog.getUri()).iterator();
+        String dbTable = itr.next();
+        String partitions = itr.next();
+        Iterator<String> itrDbTable = Splitter.on(":").split(dbTable).iterator();
+        itrDbTable.next();
+        String db = itrDbTable.next();
+        String table = itrDbTable.next();
+        LOG.debug("Target database {}, table {}", db, table);
+        builder.append("--hcatalog-database").append(ImportExportCommon.ARG_SEPARATOR)
+                .append(String.format("${coord:databaseOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME))
+                .append(ImportExportCommon.ARG_SEPARATOR);
+
+        builder.append("--hcatalog-table").append(ImportExportCommon.ARG_SEPARATOR)
+                .append(String.format("${coord:tableOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME))
+                .append(ImportExportCommon.ARG_SEPARATOR);
+
+        Map<String, String> partitionsMap = ImportExportCommon.getPartitionKeyValues(partitions);
+        if (partitionsMap.size() > 0) {
+            StringBuilder partitionKeys = new StringBuilder();
+            StringBuilder partitionValues = new StringBuilder();
+            for (Map.Entry<String, String> e : partitionsMap.entrySet()) {
+                partitionKeys.append(e.getKey());
+                partitionKeys.append(',');
+                partitionValues.append(String.format("${coord:dataOutPartitionValue('%s','%s')}",
+                        FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME,
+                        e.getKey()));
+                partitionValues.append(',');
+            }
+            if (partitionsMap.size() > 0) {
+                partitionKeys.setLength(partitionKeys.length()-1);
+                partitionValues.setLength(partitionValues.length()-1);
+            }
+            LOG.debug("partitionKeys {} and partitionValue {}", partitionKeys.toString(), partitionValues.toString());
+            builder.append("--hcatalog-partition-keys").append(ImportExportCommon.ARG_SEPARATOR)
+                    .append(partitionKeys.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+            builder.append("--hcatalog-partition-values").append(ImportExportCommon.ARG_SEPARATOR)
+                    .append(partitionValues.toString()).append(ImportExportCommon.ARG_SEPARATOR);
+        }
+        return builder;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
index a55656c..af7431a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java
@@ -47,7 +47,7 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu
         WORKFLOWAPP workflow = new WORKFLOWAPP();
         String wfName = EntityUtil.getWorkflowName(Tag.EXPORT, entity).toString();
         workflow.setName(wfName);
-        Properties p = getWorkflow(cluster, workflow);
+        Properties p = getWorkflow(cluster, workflow, buildPath);
         marshal(cluster, workflow, buildPath);
 
         Properties props = FeedHelper.getFeedProperties(entity);
@@ -81,5 +81,6 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu
         return props;
     }
 
-    protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException;
+    protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+        throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
index 1bfacc2..4437d8b 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/FeedExportCoordinatorBuilder.java
@@ -53,7 +53,6 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
     }
 
     public static final String EXPORT_DATASET_NAME = "export-dataset";
-
     public static final String EXPORT_DATAIN_NAME = "export-input";
 
     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(FeedExportCoordinatorBuilder.class);
@@ -63,19 +62,19 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
     public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
 
         LOG.info("Generating Feed EXPORT coordinator.");
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster((Feed) entity, cluster.getName());
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
         if (!FeedHelper.isExportEnabled(feedCluster)) {
             return null;
         }
 
-        if (feedCluster.getValidity().getEnd().before(new Date())) {
+        if ((feedCluster.getValidity() != null) && (feedCluster.getValidity().getEnd().before(new Date()))) {
             LOG.warn("Feed IMPORT is not applicable as Feed's end time for cluster {} is not in the future",
                     cluster.getName());
             return null;
         }
 
         COORDINATORAPP coord = new COORDINATORAPP();
-        initializeCoordAttributes(coord, (Feed) entity, cluster);
+        initializeCoordAttributes(coord, entity, cluster);
         Properties props = createCoordDefaultConfiguration(getEntityName());
         initializeInputPath(coord, cluster, props);
 
@@ -108,8 +107,8 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
             coord.setInputEvents(new INPUTEVENTS());
         }
 
-        Storage storage = FeedHelper.createStorage(cluster, (Feed) entity);
-        SYNCDATASET syncdataset = createDataSet((Feed) entity, cluster, storage,
+        Storage storage = FeedHelper.createStorage(cluster, entity);
+        SYNCDATASET syncdataset = createDataSet(entity, cluster, storage,
                 EXPORT_DATASET_NAME, LocationType.DATA);
 
         if (syncdataset == null) {
@@ -126,6 +125,7 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
         datain.setDataset(EXPORT_DATASET_NAME);
         org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
         datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+        datain.getInstance().add(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
         return datain;
     }
 
@@ -138,7 +138,7 @@ public class FeedExportCoordinatorBuilder extends OozieCoordinatorBuilder<Feed>
      * @param storage
      * @param datasetName
      * @param locationType
-     * @return
+     * @return Sync dataset
      * @throws FalconException
      */
     private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
index 19b567c..52c7820 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportExportCommon.java
@@ -18,15 +18,27 @@
 
 package org.apache.falcon.oozie;
 
+import com.google.common.base.Splitter;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.DatasourceHelper;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.datasource.Credential;
 import org.apache.falcon.entity.v0.datasource.Credentialtype;
 import org.apache.falcon.entity.v0.datasource.Datasource;
 import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import java.net.URI;
 import java.net.URISyntaxException;
 
@@ -38,13 +50,17 @@ public final class ImportExportCommon {
 
     static final String ARG_SEPARATOR = " ";
 
+    public static final Logger LOG = LoggerFactory.getLogger(ImportExportCommon.class);
+
+    private static final Set<String> FALCON_IMPORT_SQOOP_ACTIONS = new HashSet<>(
+            Arrays.asList(new String[]{ OozieOrchestrationWorkflowBuilder.PREPROCESS_ACTION_NAME,
+                                        OozieOrchestrationWorkflowBuilder.USER_ACTION_NAME, }));
+
     private ImportExportCommon() {
     }
 
-    static StringBuilder buildUserPasswordArg(StringBuilder builder, StringBuilder sqoopOpts,
-                                                 Cluster cluster, Feed entity) throws FalconException {
-        org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
-        Datasource db = DatasourceHelper.getDatasource(FeedHelper.getImportDatasourceName(feedCluster));
+    static StringBuilder buildUserPasswordArg(StringBuilder builder, StringBuilder sqoopOpts, Datasource db)
+        throws FalconException {
         Credential cred = DatasourceHelper.getReadPasswordInfo(db);
         builder.append("--username").append(ARG_SEPARATOR)
                 .append(cred.getUserName())
@@ -70,4 +86,26 @@ public final class ImportExportCommon {
         }
         return builder;
     }
+
+    public static void addHCatalogProperties(Properties props, Feed entity, Cluster cluster,
+        WORKFLOWAPP workflow, OozieOrchestrationWorkflowBuilder<Feed> wBuilder, Path buildPath)
+        throws FalconException {
+        if (FeedHelper.getStorageType(entity, cluster) == Storage.TYPE.TABLE) {
+            wBuilder.createHiveConfiguration(cluster, buildPath, "");
+            addHCatalogShareLibs(props);
+            if (SecurityUtil.isSecurityEnabled()) {
+                // add hcatalog credentials for secure mode and add a reference to each action
+                wBuilder.addHCatalogCredentials(workflow, cluster,
+                        OozieOrchestrationWorkflowBuilder.HIVE_CREDENTIAL_NAME, FALCON_IMPORT_SQOOP_ACTIONS);
+            }
+        }
+    }
+    private static void addHCatalogShareLibs(Properties props) throws FalconException {
+        props.put("oozie.action.sharelib.for.sqoop", "sqoop,hive,hcatalog");
+    }
+
+    public static Map<String, String> getPartitionKeyValues(String partitionStr) {
+        return Splitter.on(";").withKeyValueSeparator("=").split(partitionStr);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
index cae8497..2d93189 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java
@@ -48,7 +48,7 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu
         WORKFLOWAPP workflow = new WORKFLOWAPP();
         String wfName = EntityUtil.getWorkflowName(Tag.IMPORT, entity).toString();
         workflow.setName(wfName);
-        Properties p = getWorkflow(cluster, workflow);
+        Properties p = getWorkflow(cluster, workflow, buildPath);
         marshal(cluster, workflow, buildPath);
 
         Properties props = FeedHelper.getFeedProperties(entity);
@@ -81,5 +81,6 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu
         return props;
     }
 
-    protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow) throws FalconException;
+    protected abstract Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path buildPath)
+        throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index e137e11..181f2d2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -76,7 +76,7 @@ import java.util.Set;
  * @param <T>
  */
 public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
-    protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+    public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
 
     protected static final String USER_ACTION_NAME = "user-action";
     protected static final String PREPROCESS_ACTION_NAME = "pre-processing";
@@ -329,7 +329,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
     }
 
     // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
-    protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
+    public void createHiveConfiguration(Cluster cluster, Path workflowPath,
                                            String prefix) throws FalconException {
         Configuration hiveConf = getHiveCredentialsAsConf(cluster);
 
@@ -413,7 +413,7 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         credential.setName(credentialName);
         credential.setType("hcat");
 
-        credential.getProperty().add(createProperty(HiveUtil.METASTROE_URI, metaStoreUrl));
+        credential.getProperty().add(createProperty(HiveUtil.METASTORE_URI, metaStoreUrl));
         credential.getProperty().add(createProperty(SecurityUtil.METASTORE_PRINCIPAL,
                 ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)));
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 149a7e6..708788b 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -46,6 +46,7 @@ public final class OozieUtils {
     public static final JAXBContext BUNDLE_JAXB_CONTEXT;
     public static final JAXBContext CONFIG_JAXB_CONTEXT;
     protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
+    protected static final JAXBContext SQOOP_ACTION_JAXB_CONTEXT;
 
     static {
         try {
@@ -56,6 +57,8 @@ public final class OozieUtils {
             CONFIG_JAXB_CONTEXT = JAXBContext.newInstance(CONFIGURATION.class);
             HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
                 org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
+            SQOOP_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
+                    org.apache.falcon.oozie.sqoop.ACTION.class.getPackage().getName());
         } catch (JAXBException e) {
             throw new RuntimeException("Unable to create JAXB context", e);
         }
@@ -97,4 +100,29 @@ public final class OozieUtils {
             throw new RuntimeException("Unable to marshall hive action.", e);
         }
     }
+
+    @SuppressWarnings("unchecked")
+    public static JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> unMarshalSqoopAction(
+        org.apache.falcon.oozie.workflow.ACTION wfAction) {
+        try {
+            Unmarshaller unmarshaller = SQOOP_ACTION_JAXB_CONTEXT.createUnmarshaller();
+            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+            return (JAXBElement<org.apache.falcon.oozie.sqoop.ACTION>)
+                    unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to unmarshall sqoop action.", e);
+        }
+    }
+
+    public static  void marshalSqoopAction(org.apache.falcon.oozie.workflow.ACTION wfAction,
+                                          JAXBElement<org.apache.falcon.oozie.sqoop.ACTION> actionjaxbElement) {
+        try {
+            DOMResult hiveActionDOM = new DOMResult();
+            Marshaller marshaller = SQOOP_ACTION_JAXB_CONTEXT.createMarshaller();
+            marshaller.marshal(actionjaxbElement, hiveActionDOM);
+            wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to marshall sqoop action.", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java
new file mode 100644
index 0000000..194f4c7
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedExportIT.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.lifecycle;
+
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.HiveTestUtils;
+import org.apache.falcon.util.HsqldbTestUtils;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration test for Feed Export.
+ */
+
+@Test
+public class FeedExportIT {
+    public static final Logger LOG = LoggerFactory.getLogger(FeedExportIT.class);
+
+    private static final String DATASOURCE_NAME_KEY = "datasourcename";
+    private static final String METASTORE_URL = "thrift://localhost:49083";
+    private static final String DATABASE_NAME = "SqoopTestDB";
+    private static final String TABLE_NAME = "SqoopTestTable";
+
+    private HCatClient client;
+    private CatalogStorage storage;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        HsqldbTestUtils.start();
+        HsqldbTestUtils.createSqoopUser("sqoop_user", "sqoop");
+        HsqldbTestUtils.changeSAPassword("sqoop");
+        HsqldbTestUtils.createAndPopulateCustomerTable();
+
+        TestContext.cleanupStore();
+        TestContext.prepare();
+
+        // setup hcat
+        CurrentUser.authenticate(TestContext.REMOTE_USER);
+        client = TestContext.getHCatClient(METASTORE_URL);
+
+        HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+        List<String> partitionKeys = new ArrayList<>();
+        partitionKeys.add("year");
+        partitionKeys.add("month");
+        partitionKeys.add("day");
+        partitionKeys.add("hour");
+        HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        HsqldbTestUtils.tearDown();
+        FileUtils.deleteDirectory(new File("../localhost/"));
+        FileUtils.deleteDirectory(new File("localhost"));
+    }
+
+    @Test
+    public void testFeedExportHSql() throws Exception {
+        Assert.assertEquals(4, HsqldbTestUtils.getNumberOfRows());
+    }
+
+    @Test
+    public void testSqoopExport() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+        LOG.info("entity -submit -type cluster -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+        // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3
+        // are populated  with the same datasource name
+        String dsName = "datasource-test-1";
+        overlay.put(DATASOURCE_NAME_KEY, dsName);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay);
+        LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_EXPORT_TEMPLATE6, overlay);
+        LOG.info("Submit export feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName,
+            filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
index c34bcfc..2efe4bb 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/FeedImportIT.java
@@ -22,13 +22,17 @@ import junit.framework.Assert;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.HiveTestUtils;
 import org.apache.falcon.util.HsqldbTestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.HCatClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -37,6 +41,8 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -48,6 +54,12 @@ public class FeedImportIT {
     public static final Logger LOG =  LoggerFactory.getLogger(FeedImportIT.class);
 
     private static final String DATASOURCE_NAME_KEY = "datasourcename";
+    private static final String METASTORE_URL = "thrift://localhost:49083";
+    private static final String DATABASE_NAME = "SqoopTestDB";
+    private static final String TABLE_NAME = "SqoopTestTable";
+
+    private HCatClient client;
+    private CatalogStorage storage;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -58,6 +70,18 @@ public class FeedImportIT {
 
         TestContext.cleanupStore();
         TestContext.prepare();
+
+        // setup hcat
+        CurrentUser.authenticate(TestContext.REMOTE_USER);
+        client = TestContext.getHCatClient(METASTORE_URL);
+
+        HiveTestUtils.createDatabase(METASTORE_URL, DATABASE_NAME);
+        List<String> partitionKeys = new ArrayList<>();
+        partitionKeys.add("year");
+        partitionKeys.add("month");
+        partitionKeys.add("day");
+        partitionKeys.add("hour");
+        HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
     }
 
     @AfterClass
@@ -154,7 +178,8 @@ public class FeedImportIT {
         Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
 
         filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
-        LOG.info("Submit FEED entity with datasource {} via entity -submit -type feed -file {}", dsName, filePath);
+        LOG.info("Submit import FEED entity with datasource {} via entity -submit -type feed -file {}",
+            dsName, filePath);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
     }
 
@@ -200,7 +225,8 @@ public class FeedImportIT {
         Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
 
         filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
-        LOG.info("Submit FEED entity with datasource {} via entity -submit -type feed -file {}", dsName, filePath);
+        LOG.info("Submit import FEED entity with datasource {} via entity -submit -type feed -file {}",
+            dsName, filePath);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type feed -file " + filePath));
     }
 
@@ -222,7 +248,31 @@ public class FeedImportIT {
         Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
 
         filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE3, overlay);
-        LOG.info("Submit feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName, filePath);
+        LOG.info("Submit import feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName,
+            filePath);
+        Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+    }
+
+    @Test
+    public void testSqoopHCatImport() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+        context.setCluster(filePath);
+        LOG.info("entity -submit -type cluster -file " + filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+        // Make a new datasource name into the overlay so that DATASOURCE_TEMPLATE1 and FEED_TEMPLATE3
+        // are populated  with the same datasource name
+        String dsName = "datasource-test-1";
+        overlay.put(DATASOURCE_NAME_KEY, dsName);
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.DATASOURCE_TEMPLATE1, overlay);
+        LOG.info("Submit datatsource entity {} via entity -submit -type datasource -file {}", dsName, filePath);
+        Assert.assertEquals(TestContext.executeWithURL("entity -submit -type datasource -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE5, overlay);
+        LOG.info("Submit import feed with datasource {} via entity -submitAndSchedule -type feed -file {}", dsName,
+            filePath);
         Assert.assertEquals(0, TestContext.executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
index 413dfde..ed27306 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestContext.java
@@ -33,7 +33,8 @@ public abstract class AbstractTestContext {
     public static final String FEED_TEMPLATE1 = "/feed-template1.xml";
     public static final String FEED_TEMPLATE2 = "/feed-template2.xml";
     public static final String FEED_TEMPLATE3 = "/feed-template3.xml";
-
+    public static final String FEED_TEMPLATE5 = "/feed-template5.xml";
+    public static final String FEED_EXPORT_TEMPLATE6 = "/feed-export-template6.xml";
     public static final String PROCESS_TEMPLATE = "/process-template.xml";
 
     protected static void mkdir(FileSystem fileSystem, Path path) throws Exception {

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/resources/feed-export-template6.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/feed-export-template6.xml b/webapp/src/test/resources/feed-export-template6.xml
new file mode 100644
index 0000000..0eb748b
--- /dev/null
+++ b/webapp/src/test/resources/feed-export-template6.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+    <groups>input</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="##cluster##" type="source">
+            <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/>
+            <retention limit="hours(24)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <export>
+                <target name="##datasourcename##" tableName="simple_export">
+                <load type="allowinsert"/>
+                <fields>
+                    <includes>
+                        <field>id</field>
+                        <field>name</field>
+                    </includes>
+                </fields>
+            </target>
+            <arguments>
+                <argument name="--update-key" value="id"/>
+            </arguments>
+        </export>
+        </cluster>
+    </clusters>
+
+    <locations>
+        <location type="data" path="/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/"/>
+        <location type="stats" path="/projects/falcon/clicksStats"/>
+        <location type="meta" path="/projects/falcon/clicksMetaData"/>
+    </locations>
+
+    <ACL owner="##user##" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/aec6084e/webapp/src/test/resources/feed-template5.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/feed-template5.xml b/webapp/src/test/resources/feed-template5.xml
new file mode 100644
index 0000000..150ce87
--- /dev/null
+++ b/webapp/src/test/resources/feed-template5.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<feed description="Customer table from RDB" name="##inputFeedName##" xmlns="uri:falcon:feed:0.1">
+    <groups>input</groups>
+
+    <frequency>hours(1)</frequency>
+    <timezone>UTC</timezone>
+    <late-arrival cut-off="hours(6)"/>
+
+    <clusters>
+        <cluster name="##cluster##" type="source">
+            <validity start="2010-01-01T00:00Z" end="2020-04-21T00:00Z"/>
+            <retention limit="hours(24)" action="delete"/>
+            <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+            <import>
+                <source name="##datasourcename##" tableName="simple">
+                    <extract type="full">
+                        <mergepolicy>snapshot</mergepolicy>
+                    </extract>
+                    <fields>
+                        <includes>
+                            <field>id</field>
+                            <field>name</field>
+                        </includes>
+                    </fields>
+                </source>
+                <arguments>
+                    <argument name="--split-by" value="id"/>
+                    <argument name="--num-mappers" value="2"/>
+                </arguments>
+            </import>
+        </cluster>
+    </clusters>
+
+    <table uri="catalog:SqoopTestDB:SqoopTestTable#year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR}"/>
+
+    <ACL owner="##user##" group="group" permission="0x755"/>
+    <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
\ No newline at end of file


Mime
View raw message