falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [1/4] git commit: FALCON-390 falcon HCatProcess tests are failing in secure clusters. Contributed by Venkatesh Seetharam
Date Thu, 01 May 2014 18:21:00 GMT
Repository: incubator-falcon
Updated Branches:
  refs/heads/master 0728d91a5 -> c6bfca909


FALCON-390 falcon HCatProcess tests are failing in secure clusters. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/49593f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/49593f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/49593f19

Branch: refs/heads/master
Commit: 49593f198bbf7d5ce77774f4a5aa139423e9e54d
Parents: 0728d91
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Thu May 1 11:15:35 2014 -0700
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Thu May 1 11:15:35 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   5 +-
 .../org/apache/falcon/entity/ClusterHelper.java |  15 ++
 .../org/apache/falcon/entity/ProcessHelper.java |  22 +++
 .../workflow/OozieFeedWorkflowBuilder.java      | 159 ++++++++++++---
 .../config/workflow/replication-workflow.xml    |   4 -
 .../config/workflow/retention-workflow.xml      |   5 -
 .../converter/OozieFeedWorkflowBuilderTest.java | 185 +++++++++++++++--
 .../falcon/workflow/OozieWorkflowBuilder.java   | 197 +++++++++++++++++--
 .../workflow/OozieProcessWorkflowBuilder.java   |  96 +++++----
 .../config/workflow/process-parent-workflow.xml |  11 +-
 .../OozieProcessWorkflowBuilderTest.java        |  91 +++++++--
 11 files changed, 644 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3a53322..8b19e29 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -116,7 +116,10 @@ Trunk (Unreleased)
     FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
 
   BUG FIXES
-    FALCON-284 Hcatalog based feed retention doesn't work when partition filter spans across 
+    FALCON-390 falcon HCatProcess tests are failing in secure clusters
+    (Venkatesh Seetharam)
+
+    FALCON-284 Hcatalog based feed retention doesn't work when partition filter spans across
     multiple partition keys. (Satish Mittal via Shwetha GS)
 
     FALCON-409 Not able to create a package. (Raju Bairishetti via Shwetha GS)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index c0f3ee2..3bf9d95 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,9 +18,13 @@
 
 package org.apache.falcon.entity;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.*;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -32,6 +36,12 @@ public final class ClusterHelper {
     private ClusterHelper() {
     }
 
+    public static FileSystem getFileSystem(String cluster) throws FalconException {
+        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+        Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
+        return HadoopClientFactory.get().createProxiedFileSystem(conf);
+    }
+
     public static Configuration getConfiguration(Cluster cluster) {
         Configuration conf = new Configuration();
 
@@ -67,6 +77,11 @@ public final class ClusterHelper {
         return getInterface(cluster, Interfacetype.EXECUTE).getEndpoint();
     }
 
+    public static String getRegistryEndPoint(Cluster cluster) {
+        final Interface catalogInterface = getInterface(cluster, Interfacetype.REGISTRY);
+        return catalogInterface == null ? null : catalogInterface.getEndpoint();
+    }
+
     public static String getMessageBrokerUrl(Cluster cluster) {
         return getInterface(cluster, Interfacetype.MESSAGING).getEndpoint();
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index 46e7384..a0a74e4 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -19,7 +19,11 @@
 package org.apache.falcon.entity;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 
 /**
@@ -41,4 +45,22 @@ public final class ProcessHelper {
     public static String getProcessWorkflowName(String workflowName, String processName) {
         return StringUtils.isEmpty(workflowName) ? processName + "-workflow" : workflowName;
     }
+
+    public static Storage.TYPE getStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+                                              Process process) throws FalconException {
+        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
+        if (process.getInputs() == null) {
+            return storageType;
+        }
+
+        for (Input input : process.getInputs().getInputs()) {
+            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+            storageType = FeedHelper.getStorageType(feed, cluster);
+            if (Storage.TYPE.TABLE == storageType) {
+                break;
+            }
+        }
+
+        return storageType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 3c8d2f2..f83f90c 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -141,17 +141,22 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
     private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
         throws FalconException {
         List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
-
         if (FeedHelper.getCluster(entity, targetCluster.getName()).getType() == ClusterType.TARGET) {
-            String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
-            Path basePath = getCoordPath(bundlePath, coordName);
-            replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
-
             for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
                 if (feedCluster.getType() == ClusterType.SOURCE) {
-                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(entity,
-                        (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
-                        targetCluster, bundlePath);
+                    String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
+                    Path basePath = getCoordPath(bundlePath, coordName);
+                    Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName());
+
+                    // workflow is serialized to a specific dir
+                    Path sourceSpecificWfPath = new Path(basePath, srcCluster.getName());
+
+                    // Different workflow for each source since hive credentials vary for each cluster
+                    replicationMapper.createReplicationWorkflow(
+                            targetCluster, srcCluster, sourceSpecificWfPath, coordName);
+
+                    COORDINATORAPP coord = replicationMapper.createAndGetCoord(
+                            entity, srcCluster, targetCluster, sourceSpecificWfPath);
 
                     if (coord != null) {
                         replicationCoords.add(coord);
@@ -221,10 +226,10 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
 
             props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
             props.put(ARG.feedNames.getPropName(), entity.getName());
-            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+            props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
 
             props.put("falconInputFeeds", entity.getName());
-            props.put("falconInPaths", "IGNORE");
+            props.put("falconInPaths", IGNORE);
 
             propagateUserWorkflowProperties(props, "eviction");
 
@@ -239,11 +244,46 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
                 retWfApp.setName(wfName);
                 addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
                 addOozieRetries(retWfApp);
+
+                if (isTableStorageType(cluster, entity)) {
+                    setupHiveCredentials(cluster, wfPath, retWfApp);
+                }
+
                 marshal(cluster, retWfApp, wfPath);
             } catch(IOException e) {
                 throw new FalconException("Unable to create retention workflow", e);
             }
         }
+
+        private void setupHiveCredentials(Cluster cluster, Path wfPath,
+                                          WORKFLOWAPP workflowApp) throws FalconException {
+            if (isSecurityEnabled) {
+                // add hcatalog credentials for secure mode and add a reference to each action
+                addHCatalogCredentials(workflowApp, cluster, HIVE_CREDENTIAL_NAME);
+            }
+
+            // create hive-site.xml file so actions can use it in the classpath
+            createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance
+
+            for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+                if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                    continue;
+                }
+
+                org.apache.falcon.oozie.workflow.ACTION action =
+                        (org.apache.falcon.oozie.workflow.ACTION) object;
+                String actionName = action.getName();
+                if ("eviction".equals(actionName)) {
+                    // add reference to hive-site conf to each action
+                    action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+
+                    if (isSecurityEnabled) {
+                        // add a reference to credential in the action
+                        action.setCred(HIVE_CREDENTIAL_NAME);
+                    }
+                }
+            }
+        }
     }
 
     private class ReplicationOozieWorkflowMapper {
@@ -258,22 +298,79 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
         private static final String TIMEOUT = "timeout";
         private static final String PARALLEL = "parallel";
 
-        private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
-            throws FalconException {
+        private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
+        private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
+
+        /**
+         * This method is called for each source serializing a workflow for each source per
+         * target. Additionally, hive credentials are recorded in the workflow definition.
+         *
+         * @param targetCluster target cluster
+         * @param sourceCluster source cluster
+         * @param wfPath workflow path
+         * @param wfName workflow name
+         * @throws FalconException
+         */
+        private void createReplicationWorkflow(Cluster targetCluster, Cluster sourceCluster,
+                                               Path wfPath, String wfName) throws FalconException {
+            WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
+            repWFapp.setName(wfName);
+
             try {
-                WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
-                repWFapp.setName(wfName);
-                addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
-                addOozieRetries(repWFapp);
-                marshal(cluster, repWFapp, wfPath);
-            } catch(IOException e) {
-                throw new FalconException("Unable to create replication workflow", e);
+                addLibExtensionsToWorkflow(targetCluster, repWFapp, EntityType.FEED, "replication");
+            } catch (IOException e) {
+                throw new FalconException("Unable to add lib extensions to workflow", e);
+            }
+
+            addOozieRetries(repWFapp);
+
+            if (isTableStorageType(targetCluster, entity)) {
+                setupHiveCredentials(targetCluster, sourceCluster, repWFapp);
             }
 
+            marshal(targetCluster, repWFapp, wfPath);
+        }
+
+        private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster,
+                                          WORKFLOWAPP workflowApp) {
+            if (isSecurityEnabled) {
+                // add hcatalog credentials for secure mode and add a reference to each action
+                addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
+                addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
+            }
+
+            // hive-site.xml file is created later in coordinator initialization but
+            // actions are set to point to that here
+
+            for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+                if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+                    continue;
+                }
+
+                org.apache.falcon.oozie.workflow.ACTION action =
+                        (org.apache.falcon.oozie.workflow.ACTION) object;
+                String actionName = action.getName();
+                if ("recordsize".equals(actionName)) {
+                    // add reference to hive-site conf to each action
+                    action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
+
+                    if (isSecurityEnabled) { // add a reference to credential in the action
+                        action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+                    }
+                } else if ("table-export".equals(actionName)) {
+                    if (isSecurityEnabled) { // add a reference to credential in the action
+                        action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+                    }
+                } else if ("table-import".equals(actionName)) {
+                    if (isSecurityEnabled) { // add a reference to credential in the action
+                        action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
+                    }
+                }
+            }
         }
 
         private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
-            Path bundlePath) throws FalconException {
+                                                 Path wfPath) throws FalconException {
             long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
             Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
             Date sourceEndDate = getEndDate(feed, srcCluster);
@@ -311,7 +408,6 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
             final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
             initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
 
-            Path wfPath = getCoordPath(bundlePath, coordName);
             ACTION replicationWorkflowAction = getReplicationWorkflowAction(
                 srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
             replicationCoord.setAction(replicationWorkflowAction);
@@ -436,7 +532,8 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
         }
 
         private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
-            String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
+                                                    String wfName, Storage sourceStorage,
+                                                    Storage targetStorage) throws FalconException {
             ACTION replicationAction = new ACTION();
             WORKFLOW replicationWF = new WORKFLOW();
 
@@ -469,7 +566,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
                 propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
                 propagateTableCopyProperties(srcCluster, sourceTableStorage,
                     trgCluster, targetTableStorage, props);
-                setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
+                setupHiveConfiguration(srcCluster, trgCluster, wfPath);
             }
 
             propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
@@ -527,10 +624,10 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
             props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
         }
 
-        private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
-            Cluster trgCluster, CatalogStorage targetStorage, Path wfPath) throws FalconException {
+        private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
+                                            Path wfPath) throws FalconException {
             Configuration conf = ClusterHelper.getConfiguration(trgCluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
 
             try {
                 // copy import export scripts to stagingDir
@@ -540,10 +637,10 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
 
                 // create hive conf to stagingDir
                 Path confPath = new Path(wfPath + "/conf");
-                createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
-                createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
-            } catch(IOException e) {
-                throw new FalconException(e);
+                persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-");
+                persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-");
+            } catch (IOException e) {
+                throw new FalconException("Unable to create hive conf files", e);
             }
         }
 
@@ -580,7 +677,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
                     + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
             props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
 
-            props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
+            props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage.
         }
 
         private void propagateLateDataProperties(Feed feed, String instancePaths,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 205beb2..6f94dd7 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -40,10 +40,6 @@
                 </property>
                 <!-- HCatalog jars -->
                 <property>
-                    <name>oozie.use.system.libpath</name>
-                    <value>true</value>
-                </property>
-                <property>
                     <name>oozie.action.sharelib.for.java</name>
                     <value>hcatalog</value>
                 </property>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/feed/src/main/resources/config/workflow/retention-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/retention-workflow.xml b/feed/src/main/resources/config/workflow/retention-workflow.xml
index 1e10aae..5138865 100644
--- a/feed/src/main/resources/config/workflow/retention-workflow.xml
+++ b/feed/src/main/resources/config/workflow/retention-workflow.xml
@@ -32,10 +32,6 @@
                 </property>
                 <!-- HCatalog jars -->
                 <property>
-                    <name>oozie.use.system.libpath</name>
-                    <value>true</value>
-                </property>
-                <property>
                     <name>oozie.action.sharelib.for.java</name>
                     <value>hcatalog</value>
                 </property>
@@ -59,7 +55,6 @@
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>
     </action>
-
     <action name='succeeded-post-processing'>
         <java>
             <job-tracker>${jobTracker}</job-tracker>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
index b0bb83b..77a89c0 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -41,6 +41,8 @@ import org.apache.falcon.oozie.workflow.DECISION;
 import org.apache.falcon.oozie.workflow.JAVA;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.OozieFeedWorkflowBuilder;
 import org.apache.falcon.workflow.OozieWorkflowBuilder;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import javax.xml.bind.JAXBContext;
@@ -95,8 +98,17 @@ public class OozieFeedWorkflowBuilderTest {
 
         cleanupStore();
 
+        org.apache.falcon.entity.v0.cluster.Property property =
+                new org.apache.falcon.entity.v0.cluster.Property();
+        property.setName(OozieWorkflowBuilder.METASTORE_KERBEROS_PRINCIPAL);
+        property.setValue("hive/_HOST");
+
         srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
+        srcCluster.getProperties().getProperties().add(property);
+
         trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
+        trgCluster.getProperties().getProperties().add(property);
+
         alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
         betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
 
@@ -139,7 +151,7 @@ public class OozieFeedWorkflowBuilderTest {
 
     @Test
     public void testReplicationCoordsForFSStorage() throws Exception {
-        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+        OozieFeedWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
         List<COORDINATORAPP> coords = builder.getCoordinators(trgCluster, new Path("/projects/falcon/"));
         //Assert retention coord
         COORDINATORAPP coord = coords.get(0);
@@ -148,8 +160,7 @@ public class OozieFeedWorkflowBuilderTest {
         //Assert replication coord
         coord = coords.get(1);
         Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
-        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
-                .getAction().getWorkflow().getAppPath());
+        Assert.assertEquals(getWorkflowAppPath(), coord.getAction().getWorkflow().getAppPath());
         Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
                 + srcCluster.getName(), coord.getName());
         Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
@@ -217,11 +228,21 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals(props.get("mapBandwidthKB"), "102400");
 
         assertLibExtensions(coord, "replication");
-        assertWorkflowRetries(coord);
+        WORKFLOWAPP wf = getWorkflowapp(coord);
+        assertWorkflowRetries(wf);
+
+        Assert.assertFalse(Storage.TYPE.TABLE == FeedHelper.getStorageType(feed, trgCluster));
+    }
+
+    private String getWorkflowAppPath() {
+        return "${nameNode}/projects/falcon/REPLICATION/" + srcCluster.getName();
     }
 
     private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
-        WORKFLOWAPP wf = getWorkflowapp(coord);
+        assertWorkflowRetries(getWorkflowapp(coord));
+    }
+
+    private void assertWorkflowRetries(WORKFLOWAPP wf) throws JAXBException, IOException {
         List<Object> actions = wf.getDecisionOrForkOrJoin();
         for (Object obj : actions) {
             if (!(obj instanceof ACTION)) {
@@ -269,7 +290,7 @@ public class OozieFeedWorkflowBuilderTest {
 
     @Test
     public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
-        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(fsReplFeed);
+        OozieFeedWorkflowBuilder builder = new OozieFeedWorkflowBuilder(fsReplFeed);
 
         List<COORDINATORAPP> alphaCoords = builder.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
         final COORDINATORAPP alphaCoord = alphaCoords.get(0);
@@ -297,11 +318,9 @@ public class OozieFeedWorkflowBuilderTest {
                 FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
         targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
 
-        StringBuilder pathsWithPartitions = new StringBuilder();
-        pathsWithPartitions.append("${coord:dataIn('input')}/")
-                .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
-        String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+        String pathsWithPartitions = "${coord:dataIn('input')}/"
+                + FeedHelper.normalizePartitionExpression(srcPart, targetPart);
+        String parts = pathsWithPartitions.replaceAll("//+", "/");
         parts = StringUtils.stripEnd(parts, "/");
         return parts;
     }
@@ -357,15 +376,25 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
     }
 
-    @Test
-    public void testReplicationCoordsForTableStorage() throws Exception {
-        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(tableFeed);
+    @DataProvider(name = "secureOptions")
+    private Object[][] createOptions() {
+        return new Object[][] {
+            {"simple"},
+            {"kerberos"},
+        };
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testReplicationCoordsForTableStorage(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        OozieFeedWorkflowBuilder builder = new OozieFeedWorkflowBuilder(tableFeed);
         List<COORDINATORAPP> coords = builder.getCoordinators(
                 trgCluster, new Path("/projects/falcon/"));
         COORDINATORAPP coord = coords.get(0);
 
         Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
-        Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
+        Assert.assertEquals(getWorkflowAppPath(),
                 coord.getAction().getWorkflow().getAppPath());
         Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
                 + srcCluster.getName(), coord.getName());
@@ -449,6 +478,53 @@ public class OozieFeedWorkflowBuilderTest {
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
+        assertReplicationHCatCredentials(getWorkflowapp(coord),
+                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+    }
+
+    private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
+        FileSystem fs = trgMiniDFS.getFileSystem();
+
+        Path hiveConfPath = new Path(wfPath, "conf/falcon-source-hive-site.xml");
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        hiveConfPath = new Path(wfPath, "conf/falcon-target-hive-site.xml");
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled();
+        if (isSecurityEnabled) {
+            Assert.assertNotNull(wf.getCredentials());
+            Assert.assertEquals(2, wf.getCredentials().getCredential().size());
+        }
+
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+
+            if (!isSecurityEnabled) {
+                Assert.assertNull(action.getCred());
+            }
+
+            if ("recordsize".equals(actionName)) {
+                Assert.assertEquals(action.getJava().getJobXml(), "${wf:appPath()}/conf/falcon-source-hive-site.xml");
+                if (isSecurityEnabled) {
+                    Assert.assertNotNull(action.getCred());
+                    Assert.assertEquals(action.getCred(), "falconSourceHiveAuth");
+                }
+            } else if ("table-export".equals(actionName) && isSecurityEnabled) {
+                Assert.assertNotNull(action.getCred());
+                Assert.assertEquals(action.getCred(), "falconSourceHiveAuth");
+            } else if ("table-import".equals(actionName) && isSecurityEnabled) {
+                Assert.assertNotNull(action.getCred());
+                Assert.assertEquals(action.getCred(), "falconTargetHiveAuth");
+            }
+        }
     }
 
     private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
@@ -469,7 +545,7 @@ public class OozieFeedWorkflowBuilderTest {
         instance.roll(Calendar.YEAR, 1);
         cluster.getValidity().setEnd(instance.getTime());
 
-        OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+        OozieFeedWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
         List<COORDINATORAPP> coords = builder.getCoordinators(srcCluster, new Path("/projects/falcon/"));
         COORDINATORAPP coord = coords.get(0);
 
@@ -504,4 +580,81 @@ public class OozieFeedWorkflowBuilderTest {
 
         assertWorkflowRetries(coord);
     }
+
+    @Test (dataProvider = "secureOptions")
+    public void testRetentionCoordsForTable(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
+        org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(tableFeed, trgCluster.getName());
+        final Calendar instance = Calendar.getInstance();
+        instance.roll(Calendar.YEAR, 1);
+        cluster.getValidity().setEnd(instance.getTime());
+
+        OozieFeedWorkflowBuilder builder = new OozieFeedWorkflowBuilder(tableFeed);
+        List<COORDINATORAPP> coords = builder.getCoordinators(trgCluster, new Path("/projects/falcon/"));
+        COORDINATORAPP coord = coords.get(0);
+
+        Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+        Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName());
+        Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+        HashMap<String, String> props = new HashMap<String, String>();
+        for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+            props.put(prop.getName(), prop.getValue());
+        }
+
+        String feedDataPath = props.get("feedDataPath");
+        String storageType = props.get("falconFeedStorageType");
+
+        // verify the param that feed evictor depends on
+        Assert.assertEquals(storageType, Storage.TYPE.TABLE.name());
+
+        final Storage storage = FeedHelper.createStorage(cluster, tableFeed);
+        if (feedDataPath != null) {
+            Assert.assertEquals(feedDataPath, storage.getUriTemplate()
+                    .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+        }
+
+        if (storageType != null) {
+            Assert.assertEquals(storageType, storage.getType().name());
+        }
+
+        // verify the post processing params
+        Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+        Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+
+        assertWorkflowRetries(coord);
+
+        Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
+        assertHCatCredentials(getWorkflowapp(coord),
+                coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
+    }
+
+    private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
+        Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
+        FileSystem fs = trgMiniDFS.getFileSystem();
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            Assert.assertNotNull(wf.getCredentials());
+            Assert.assertEquals(1, wf.getCredentials().getCredential().size());
+        }
+
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+            ACTION action = (ACTION) obj;
+            String actionName = action.getName();
+
+            if ("eviction".equals(actionName)) {
+                Assert.assertEquals(action.getJava().getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
+                if (SecurityUtil.isSecurityEnabled()) {
+                    Assert.assertNotNull(action.getCred());
+                    Assert.assertEquals(action.getCred(), "falconHiveAuth");
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index 990fdc5..19fdce8 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -26,11 +26,16 @@ import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.ExternalId;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
 import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -38,6 +43,8 @@ import org.apache.falcon.oozie.bundle.COORDINATOR;
 import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.ObjectFactory;
 import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CREDENTIAL;
+import org.apache.falcon.oozie.workflow.CREDENTIALS;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.service.FalconPathFilter;
@@ -52,8 +59,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.oozie.client.OozieClient;
 
@@ -93,6 +98,14 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
     protected static final String MR_QUEUE_NAME = "queueName";
     protected static final String MR_JOB_PRIORITY = "jobPriority";
 
+    protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+
+    public static final String METASTOREURIS = "hive.metastore.uris";
+    public static final String METASTORE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal";
+    public static final String METASTORE_USE_THRIFT_SASL = "hive.metastore.sasl.enabled";
+
+    protected static final String IGNORE = "IGNORE";
+
     public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
         Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
 
@@ -112,8 +125,11 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         }
     };
 
+    protected final boolean isSecurityEnabled;
+
     protected OozieWorkflowBuilder(T entity) {
         super(entity);
+        isSecurityEnabled = SecurityUtil.isSecurityEnabled();
     }
 
     protected Path getCoordPath(Path bundlePath, String coordName) {
@@ -189,7 +205,8 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
     protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
         throws IOException, FalconException {
         String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
-        FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+        FileSystem fs = HadoopClientFactory.get().createFileSystem(
+                ClusterHelper.getConfiguration(cluster));
         addExtensionJars(fs, new Path(libext), wf);
         addExtensionJars(fs, new Path(libext, type.name()), wf);
         if (StringUtils.isNotEmpty(lifecycle)) {
@@ -387,18 +404,32 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         }
     }
 
-    protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
-        Cluster cluster, String prefix) throws IOException {
-        Configuration hiveConf = new Configuration(false);
-        hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
-        hiveConf.set("hive.metastore.local", "false");
+    // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
+    protected void createHiveConfiguration(Cluster cluster, Path workflowPath,
+                                           String prefix) throws FalconException {
+        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+
+        try {
+            Configuration conf = ClusterHelper.getConfiguration(cluster);
+            FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
 
-        if (UserGroupInformation.isSecurityEnabled()) {
-            hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
-                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
-            hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+            // create hive conf to stagingDir
+            Path confPath = new Path(workflowPath + "/conf");
+
+            persistHiveConfiguration(fs, confPath, hiveConf, prefix);
+        } catch (IOException e) {
+            throw new FalconException("Unable to create create hive site", e);
         }
+    }
 
+    protected void persistHiveConfiguration(FileSystem fs, Path confPath,
+                                            Cluster cluster, String prefix) throws IOException {
+        Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+        persistHiveConfiguration(fs, confPath, hiveConf, prefix);
+    }
+
+    private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
+                                          String prefix) throws IOException {
         OutputStream out = null;
         try {
             out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
@@ -408,6 +439,129 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         }
     }
 
+    private Configuration getHiveCredentialsAsConf(Cluster cluster) {
+        Map<String, String> hiveCredentials = getHiveCredentials(cluster);
+
+        Configuration hiveConf = new Configuration(false);
+        for (Entry<String, String> entry : hiveCredentials.entrySet()) {
+            hiveConf.set(entry.getKey(), entry.getValue());
+        }
+
+        return hiveConf;
+    }
+
+    private Map<String, String> getHiveCredentials(Cluster cluster) {
+        Map<String, String> hiveCredentials = new HashMap<String, String>();
+
+        String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+        if (metaStoreUrl == null) {
+            throw new IllegalStateException(
+                    "Registry interface is not defined in cluster: " + cluster.getName());
+        }
+
+        hiveCredentials.put(METASTOREURIS, metaStoreUrl);
+        hiveCredentials.put("hive.metastore.execute.setugi", "true");
+        hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat"));
+        hiveCredentials.put("hcat.metastore.uri", metaStoreUrl);
+
+        if (isSecurityEnabled) {
+            String principal = ClusterHelper
+                    .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+            hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal);
+            hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true");
+            hiveCredentials.put("hcat.metastore.principal", principal);
+        }
+
+        return hiveCredentials;
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param cluster        cluster entity
+     * @param credentialName credential name
+     * @return CREDENTIALS object
+     */
+    protected CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) {
+        final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+
+        CREDENTIAL credential = new CREDENTIAL();
+        credential.setName(credentialName);
+        credential.setType("hcat");
+
+        credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl));
+        credential.getProperty().add(createProperty("hcat.metastore.principal",
+                ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
+
+        return credential;
+    }
+
+    private CREDENTIAL.Property createProperty(String name, String value) {
+        CREDENTIAL.Property property = new CREDENTIAL.Property();
+        property.setName(name);
+        property.setValue(value);
+        return property;
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param workflowApp workflow xml
+     * @param cluster     cluster entity
+     */
+    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
+                                          String credentialName) {
+        CREDENTIALS credentials = workflowApp.getCredentials();
+        if (credentials == null) {
+            credentials = new CREDENTIALS();
+        }
+
+        credentials.getCredential().add(createHCatalogCredential(cluster, credentialName));
+
+        // add credential for workflow
+        workflowApp.setCredentials(credentials);
+    }
+
+    /**
+     * This is only necessary if table is involved and is secure mode.
+     *
+     * @param workflowApp workflow xml
+     * @param cluster     cluster entity
+     */
+    protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
+                                          String credentialName, Set<String> actions) {
+        addHCatalogCredentials(workflowApp, cluster, credentialName);
+
+        // add credential to each action
+        for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+            if (!(object instanceof ACTION)) {
+                continue;
+            }
+
+            ACTION action = (ACTION) object;
+            String actionName = action.getName();
+            if (actions.contains(actionName)) {
+                action.setCred(credentialName);
+            }
+        }
+    }
+
+    private boolean isTableStorageType(Cluster cluster, T entity) throws FalconException {
+        return entity.getEntityType() == EntityType.PROCESS
+                ? isTableStorageType(cluster, (Process) entity)
+                : isTableStorageType(cluster, (Feed) entity);
+    }
+
+    protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
+        Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
+        return Storage.TYPE.TABLE == storageType;
+    }
+
+    protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
+        Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
+        return Storage.TYPE.TABLE == storageType;
+    }
+
     protected void decorateWithOozieRetries(ACTION action) {
         Properties props = RuntimeProperties.get();
         action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
@@ -429,6 +583,11 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         properties.setProperty(OozieClient.USER_NAME, user);
         properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
         properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
+
+        if (isTableStorageType(cluster, entity)) {
+            propagateHiveCredentials(cluster, properties);
+        }
+
         LOG.info("Cluster: " + cluster.getName() + ", PROPS: " + properties);
         return properties;
     }
@@ -439,5 +598,19 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         }
     }
 
+    /**
+     * This method propagates hive credentials for coordinator to authenticate against hive
+     * for data availability triggers.
+     *
+     * @param cluster cluster entity
+     * @param properties property object
+     */
+    private void propagateHiveCredentials(Cluster cluster, Properties properties) {
+        Map<String, String> hiveCredentials = getHiveCredentials(cluster);
+        for (Entry<String, String> entry : hiveCredentials.entrySet()) {
+            properties.setProperty(entry.getKey(), entry.getValue());
+        }
+    }
+
     public abstract Date getNextStartTime(T entity, String cluster, Date now) throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index c31842b..edfe5a8 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -32,7 +32,6 @@ import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.EngineType;
@@ -73,12 +72,15 @@ import org.apache.oozie.client.OozieClient;
 import javax.xml.bind.JAXBElement;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * Oozie workflow builder for falcon entities.
@@ -86,6 +88,9 @@ import java.util.Properties;
 public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     private static final Logger LOG = Logger.getLogger(OozieProcessWorkflowBuilder.class);
 
+    private static final Set<String> FALCON_PROCESS_HIVE_ACTIONS = new HashSet<String>(
+            Arrays.asList(new String[]{"recordsize", "user-oozie-workflow", "user-pig-job", "user-hive-job", }));
+
     public OozieProcessWorkflowBuilder(Process entity) {
         super(entity);
     }
@@ -253,7 +258,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     /**
      * Creates default oozie coordinator.
      *
-     * @param cluster    - Cluster for which the coordiantor app need to be created
+     * @param cluster    - Cluster for which the coordinator app need to be created
      * @param bundlePath - bundle path
      * @return COORDINATORAPP
      * @throws FalconException on Error
@@ -338,7 +343,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         Map<String, String> props) throws FalconException {
         if (process.getInputs() == null) {
             props.put("falconInputFeeds", "NONE");
-            props.put("falconInPaths", "IGNORE");
+            props.put("falconInPaths", IGNORE);
             return;
         }
 
@@ -382,7 +387,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
-        List<String> inputFeedStorageTypes, Map<String, String> props) {
+                                             List<String> inputFeedStorageTypes, Map<String, String> props) {
         // populate late data handler - should-record action
         props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
         props.put("falconInPaths", join(inputPaths.iterator(), '#'));
@@ -393,10 +398,10 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
-        Map<String, String> props) throws FalconException {
+                                       Map<String, String> props) throws FalconException {
         if (process.getOutputs() == null) {
             props.put(ARG.feedNames.getPropName(), "NONE");
-            props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+            props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
             return;
         }
 
@@ -542,7 +547,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
-        Map<String, String> props) {
+                                                 Map<String, String> props) {
         String prefix = "falcon_" + output.getName();
 
         propagateCommonCatalogTableProperties(tableStorage, props, prefix);
@@ -588,7 +593,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
-        String wfName, Path parentWfPath) throws FalconException {
+                                  String wfName, Path parentWfPath) throws FalconException {
         WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
         wfApp.setName(wfName);
         try {
@@ -597,6 +602,11 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
             throw new FalconException("Failed to add library extensions for the workflow", e);
         }
 
+        final boolean isTableStorageType = isTableStorageType(cluster, process);
+        if (isTableStorageType) {
+            setupHiveCredentials(cluster, parentWfPath, wfApp);
+        }
+
         String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
         EngineType engineType = processWorkflow.getEngine();
         for (Object object : wfApp.getDecisionOrForkOrJoin()) {
@@ -609,11 +619,15 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
             if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
                 action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
             } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
-                decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
+                decoratePIGAction(cluster, process, action.getPig(), parentWfPath, isTableStorageType);
             } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
                 decorateHiveAction(cluster, process, action, parentWfPath);
             } else if (FALCON_ACTIONS.contains(actionName)) {
                 decorateWithOozieRetries(action);
+                if (isTableStorageType && actionName.equals("recordsize")) {
+                    // adds hive-site.xml in actions classpath
+                    action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+                }
             }
         }
 
@@ -621,8 +635,19 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         marshal(cluster, wfApp, parentWfPath);
     }
 
-    private void decoratePIGAction(Cluster cluster, Process process,
-        PIG pigAction, Path parentWfPath) throws FalconException {
+    private void setupHiveCredentials(Cluster cluster, Path parentWfPath,
+                                      WORKFLOWAPP wfApp) throws FalconException {
+        // create hive-site.xml file so actions can use it in the classpath
+        createHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
+
+        if (isSecurityEnabled) {
+            // add hcatalog credentials for secure mode and add a reference to each action
+            addHCatalogCredentials(wfApp, cluster, HIVE_CREDENTIAL_NAME, FALCON_PROCESS_HIVE_ACTIONS);
+        }
+    }
+
+    private void decoratePIGAction(Cluster cluster, Process process, PIG pigAction,
+                                   Path parentWfPath, boolean isTableStorageType) throws FalconException {
         Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
         pigAction.setScript("${nameNode}" + userWfPath.toString());
 
@@ -634,10 +659,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
 
         propagateProcessProperties(pigAction, process);
 
-        Storage.TYPE storageType = getStorageType(cluster, process);
-        if (Storage.TYPE.TABLE == storageType) {
-            // adds hive-site.xml in pig classpath
-            setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
+        if (isTableStorageType) { // adds hive-site.xml in pig classpath
             pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
         }
 
@@ -646,7 +668,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
-        Path parentWfPath) throws FalconException {
+                                    Path parentWfPath) throws FalconException {
 
         JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(wfAction);
         org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
@@ -662,7 +684,8 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
 
         propagateProcessProperties(hiveAction, process);
 
-        setupHiveConfiguration(cluster, parentWfPath, "falcon-");
+        // adds hive-site.xml in hive classpath
+        hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml");
 
         addArchiveForCustomJars(cluster, hiveAction.getArchive(),
             getUserLibPath(cluster, parentWfPath.getParent()));
@@ -671,7 +694,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void addPrepareDeleteOutputPath(Process process,
-        PIG pigAction) throws FalconException {
+                                            PIG pigAction) throws FalconException {
         List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
         if (deleteOutputPathList.isEmpty()) {
             return;
@@ -691,8 +714,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         }
     }
 
-    private void addPrepareDeleteOutputPath(Process process,
-        org.apache.falcon.oozie.hive.ACTION hiveAction)
+    private void addPrepareDeleteOutputPath(Process process, org.apache.falcon.oozie.hive.ACTION hiveAction)
         throws FalconException {
 
         List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
@@ -734,7 +756,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
-        String engineType) throws FalconException {
+                                       String engineType) throws FalconException {
         if (process.getInputs() == null) {
             return;
         }
@@ -761,7 +783,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
     }
 
     private void addOutputFeedsAsParams(List<String> paramList, Process process,
-        Cluster cluster) throws FalconException {
+                                        Cluster cluster) throws FalconException {
         if (process.getOutputs() == null) {
             return;
         }
@@ -833,36 +855,6 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         }
     }
 
-    private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
-        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
-        if (process.getInputs() == null) {
-            return storageType;
-        }
-
-        for (Input input : process.getInputs().getInputs()) {
-            Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-            storageType = FeedHelper.getStorageType(feed, cluster);
-            if (Storage.TYPE.TABLE == storageType) {
-                break;
-            }
-        }
-
-        return storageType;
-    }
-
-    // creates hive-site.xml configuration in conf dir.
-    private void setupHiveConfiguration(Cluster cluster, Path wfPath,
-        String prefix) throws FalconException {
-        String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
-        try {
-            FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
-            Path confPath = new Path(wfPath, "conf");
-            createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
     private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
         Path libPath) throws FalconException {
         if (libPath == null) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index 30496a9..4a2495c 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -40,10 +40,6 @@
                 </property>
                 <!-- HCatalog jars -->
                 <property>
-                    <name>oozie.use.system.libpath</name>
-                    <value>true</value>
-                </property>
-                <property>
                     <name>oozie.action.sharelib.for.java</name>
                     <value>hcatalog</value>
                 </property>
@@ -62,7 +58,6 @@
         <ok to="user-workflow"/>
         <error to="failed-post-processing"/>
     </action>
-
     <decision name='user-workflow'>
         <switch>
             <case to="user-oozie-workflow">
@@ -91,10 +86,6 @@
                     <value>${jobPriority}</value>
                 </property>
                 <property>
-                    <name>oozie.use.system.libpath</name>
-                    <value>true</value>
-                </property>
-                <property>
                     <name>oozie.action.sharelib.for.pig</name>
                     <value>pig,hcatalog</value>
                 </property>
@@ -108,7 +99,7 @@
         <hive xmlns="uri:oozie:hive-action:0.2">
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
-            <job-xml>${wf:appPath()}/conf/falcon-hive-site.xml</job-xml>
+            <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
             <configuration>
                 <property>
                     <name>mapred.job.queue.name</name>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/49593f19/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 44f5d80..54c1809 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.CatalogStorage;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
@@ -49,9 +50,11 @@ import org.apache.falcon.oozie.workflow.DECISION;
 import org.apache.falcon.oozie.workflow.PIG;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.OozieProcessWorkflowBuilder;
+import org.apache.falcon.workflow.OozieWorkflowBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -59,6 +62,7 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import javax.xml.bind.JAXBContext;
@@ -88,13 +92,13 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
     private String hdfsUrl;
     private FileSystem fs;
+    private Cluster cluster;
 
     @BeforeClass
     public void setUpDFS() throws Exception {
         CurrentUser.authenticate("falcon");
 
-        EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
-        Configuration conf = cluster.getConf();
+        Configuration conf = EmbeddedCluster.newCluster("testCluster").getConf();
         hdfsUrl = conf.get("fs.default.name");
     }
 
@@ -103,7 +107,13 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         super.setup();
 
         ConfigurationStore store = ConfigurationStore.get();
-        Cluster cluster = store.get(EntityType.CLUSTER, "corp");
+        cluster = store.get(EntityType.CLUSTER, "corp");
+        org.apache.falcon.entity.v0.cluster.Property property =
+                new org.apache.falcon.entity.v0.cluster.Property();
+        property.setName(OozieWorkflowBuilder.METASTORE_KERBEROS_PRINCIPAL);
+        property.setValue("hive/_HOST");
+        cluster.getProperties().getProperties().add(property);
+
         ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
         ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
         fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
@@ -230,8 +240,18 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
     }
 
-    @Test
-    public void testHiveProcessMapper() throws Exception {
+    @DataProvider(name = "secureOptions")
+    private Object[][] createOptions() {
+        return new Object[][] {
+            {"simple"},
+            {"kerberos"},
+        };
+    }
+
+    @Test (dataProvider = "secureOptions")
+    public void testHiveProcessMapper(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
         URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
         Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.FEED, inFeed);
@@ -244,7 +264,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.PROCESS, process);
 
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         prepare(process);
         OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
         Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
@@ -265,7 +284,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         }
 
         // verify table props
-        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
         for (Map.Entry<String, String> entry : props.entrySet()) {
             if (expected.containsKey(entry.getKey())) {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
@@ -286,10 +305,47 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
         Assert.assertEquals(hiveAction.getScript(),
                 "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+        Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
         Assert.assertNull(hiveAction.getPrepare());
         Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
         Assert.assertFalse(hiveAction.getParam().isEmpty());
         Assert.assertEquals(11, hiveAction.getParam().size());
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
+
+        ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
+    }
+
+    private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
+        Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
+        Assert.assertTrue(fs.exists(hiveConfPath));
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            Assert.assertNotNull(wf.getCredentials());
+            Assert.assertEquals(1, wf.getCredentials().getCredential().size());
+        }
+
+        List<Object> actions = wf.getDecisionOrForkOrJoin();
+        for (Object obj : actions) {
+            if (!(obj instanceof ACTION)) {
+                continue;
+            }
+
+            ACTION action = (ACTION) obj;
+
+            if (!SecurityUtil.isSecurityEnabled()) {
+                Assert.assertNull(action.getCred());
+                return;
+            }
+
+            String actionName = action.getName();
+            if ("user-hive-job".equals(actionName) || "user-pig-job".equals(actionName)
+                    || "user-oozie-workflow".equals(actionName) || "recordsize".equals(actionName)) {
+                Assert.assertNotNull(action.getCred());
+                Assert.assertEquals(action.getCred(), "falconHiveAuth");
+            }
+        }
     }
 
     private void prepare(Process process) throws IOException {
@@ -298,8 +354,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         fs.create(wf).close();
     }
 
-    @Test
-    public void testProcessMapperForTableStorage() throws Exception {
+    @Test (dataProvider = "secureOptions")
+    public void testProcessMapperForTableStorage(String secureOption) throws Exception {
+        StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
+
         URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
         Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.FEED, inFeed);
@@ -312,7 +370,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.PROCESS, process);
 
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
         Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
         builder.map(cluster, bundlePath);
@@ -332,7 +389,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         }
 
         // verify table props
-        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+        Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
         for (Map.Entry<String, String> entry : props.entrySet()) {
             if (expected.containsKey(entry.getKey())) {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
@@ -347,10 +404,16 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
         Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+        String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+        WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+
+        Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
+        assertHCatCredentials(parentWorkflow, wfPath);
     }
 
-    private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
-                                                      Cluster cluster) throws FalconException {
+    private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed,
+                                                      Process process) throws FalconException {
         Map<String, String> expected = new HashMap<String, String>();
         for (Input input : process.getInputs().getInputs()) {
             CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
@@ -419,7 +482,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
     private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
         throws Exception {
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
         Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
         builder.map(cluster, bundlePath);
@@ -517,7 +579,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
 
     @Test
     public void testProcessWithNoInputsAndOutputs() throws Exception {
-        Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
         ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
 
         URL resource = this.getClass().getResource("/config/process/dumb-process.xml");


Mime
View raw message