falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkat...@apache.org
Subject [7/8] incubator-falcon git commit: FALCON-892 HCatReplication fails in secure setup. Contributed by Venkatesh Seetharam
Date Fri, 14 Nov 2014 02:54:52 GMT
FALCON-892 HCatReplication fails in secure setup. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e8b1d11e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e8b1d11e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e8b1d11e

Branch: refs/heads/master
Commit: e8b1d11eb074e947959cb8d8aaa92a98acecd827
Parents: 60d33b6
Author: Venkatesh Seetharam <venkatesh@apache.org>
Authored: Thu Nov 13 18:28:54 2014 -0800
Committer: Venkatesh Seetharam <venkatesh@apache.org>
Committed: Thu Nov 13 18:55:19 2014 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../org/apache/falcon/entity/FeedHelper.java    | 17 +++++++----
 .../apache/falcon/oozie/OozieBundleBuilder.java | 18 +++++++++---
 .../feed/FeedReplicationCoordinatorBuilder.java | 11 +++++--
 .../feed/HCatReplicationWorkflowBuilder.java    | 11 +++++++
 .../main/resources/action/feed/table-export.xml |  7 +++--
 .../feed/OozieFeedWorkflowBuilderTest.java      | 30 ++++++++++++++++++--
 7 files changed, 78 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f20ef0d..fda0338 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -144,6 +144,8 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-892 HCatReplication fails in secure setup (Venkatesh Seetharam)
+
    FALCON-889 Windows azure replication fails with "wasb" as the scheme to an
    HDFS file system (Chris Nauroth via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index b5dd5c3..ca31f95 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -291,9 +291,10 @@ public final class FeedHelper {
         return expHelp.evaluateFullExpression(exp, String.class);
     }
 
-    public static String getStagingPath(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
-                                       Feed feed, CatalogStorage storage, Tag tag, String
suffix) {
-        String stagingDirPath = getStagingDir(clusterEntity, feed, storage, tag);
+    public static String getStagingPath(boolean isSource,
+                                        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+                                        Feed feed, CatalogStorage storage, Tag tag, String
suffix) {
+        String stagingDirPath = getStagingDir(isSource, clusterEntity, feed, storage, tag);
 
         String datedPartitionKey = storage.getDatedPartitionKeys().get(0);
         String datedPartitionKeySuffix = datedPartitionKey + "=${coord:dataOutPartitionValue('output',"
@@ -304,13 +305,17 @@ public final class FeedHelper {
                 + "data";
     }
 
-    public static String getStagingDir(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
+    public static String getStagingDir(boolean isSource,
+                                       org.apache.falcon.entity.v0.cluster.Cluster clusterEntity,
                                        Feed feed, CatalogStorage storage, Tag tag) {
         String workflowName = EntityUtil.getWorkflowName(
                 tag, Arrays.asList(clusterEntity.getName()), feed).toString();
 
-        // log path is created at scheduling wf and has 777 perms
-        return ClusterHelper.getStorageUrl(clusterEntity)
+        // log path is created at scheduling wf
+        final String storageUri = isSource
+                ? ClusterHelper.getReadOnlyStorageUrl(clusterEntity) // read interface
+                : ClusterHelper.getStorageUrl(clusterEntity);        // write interface
+        return storageUri
                 + EntityUtil.getLogPath(clusterEntity, feed) + "/"
                 + workflowName + "/"
                 + storage.getDatabase() + "/"

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 957300a..c73401a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
@@ -77,9 +78,10 @@ public abstract class OozieBundleBuilder<T extends Entity> extends
OozieEntityBu
             // add the coordinator to the bundle
             COORDINATOR coord = new COORDINATOR();
             String coordPath = coordProps.getProperty(OozieEntityBuilder.ENTITY_PATH);
-            coord.setName(coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME));
+            final String coordName = coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME);
+            coord.setName(coordName);
             coord.setAppPath(getStoragePath(coordPath));
-            Properties appProps = createAppProperties(cluster, buildPath);
+            Properties appProps = createAppProperties(cluster, buildPath, coordName);
             appProps.putAll(coordProps);
             coord.setConfiguration(getConfig(appProps));
             bundle.getCoordinator().add(coord);
@@ -104,7 +106,8 @@ public abstract class OozieBundleBuilder<T extends Entity> extends
OozieEntityBu
         return conf;
     }
 
-    protected Properties createAppProperties(Cluster cluster, Path buildPath) throws FalconException
{
+    protected Properties createAppProperties(Cluster cluster, Path buildPath,
+                                             String coordName) throws FalconException {
         Properties properties = getEntityProperties(cluster);
         properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster));
         properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster));
@@ -115,7 +118,14 @@ public abstract class OozieBundleBuilder<T extends Entity> extends
OozieEntityBu
         properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working")
+ "/lib");
 
         if (EntityUtil.isTableStorageType(cluster, entity)) {
-            properties.putAll(getHiveCredentials(cluster));
+            Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
+            if (tag == Tag.REPLICATION) {
+                // todo: kludge send source hcat creds for coord dependency check to pass
+                String srcClusterName = EntityUtil.getWorkflowNameSuffix(coordName, entity);
+                properties.putAll(getHiveCredentials(ClusterHelper.getCluster(srcClusterName)));
+            } else {
+                properties.putAll(getHiveCredentials(cluster));
+            }
         }
 
         //Add libpath

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 8f7f01a..2963ac9 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -246,15 +246,20 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
     private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
         Cluster trgCluster, CatalogStorage targetStorage, Properties props) {
-        // create staging dirs for export at source & set it as distcpSourcePaths
+        // create staging dirs for copy from source & set it as distcpSourcePaths - Read
interface
         String sourceStagingPath =
-            FeedHelper.getStagingPath(srcCluster, entity, sourceStorage, Tag.REPLICATION,
+            FeedHelper.getStagingPath(true, srcCluster, entity, sourceStorage, Tag.REPLICATION,
                 NOMINAL_TIME_EL + "/" + trgCluster.getName());
         props.put("distcpSourcePaths", sourceStagingPath);
+        // create staging dirs for export at source which needs to be writable - hence write
interface
+        String falconSourceStagingPath =
+            FeedHelper.getStagingPath(false, srcCluster, entity, sourceStorage, Tag.REPLICATION,
+                NOMINAL_TIME_EL + "/" + trgCluster.getName());
+        props.put("falconSourceStagingDir", falconSourceStagingPath);
 
         // create staging dirs for import at target & set it as distcpTargetPaths
         String targetStagingPath =
-            FeedHelper.getStagingPath(trgCluster, entity, targetStorage, Tag.REPLICATION,
+            FeedHelper.getStagingPath(false, trgCluster, entity, targetStorage, Tag.REPLICATION,
                 NOMINAL_TIME_EL + "/" + trgCluster.getName());
         props.put("distcpTargetPaths", targetStagingPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 61739a5..30ca0a8 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -20,10 +20,12 @@ package org.apache.falcon.oozie.feed;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 
 import java.util.Arrays;
@@ -127,6 +129,15 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
                 if (isSecurityEnabled) { // add a reference to credential in the action
                     action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
                 }
+            } else if (REPLICATION_ACTION_NAME.equals(actionName)) {
+                if (isSecurityEnabled) {
+                    // this is to ensure that the delegation tokens are checked out for both
clusters
+                    CONFIGURATION.Property property = new CONFIGURATION.Property();
+                    property.setName("mapreduce.job.hdfs-servers");
+                    property.setValue(ClusterHelper.getReadOnlyStorageUrl(sourceCluster)
+                            + "," + ClusterHelper.getStorageUrl(targetCluster));
+                    action.getJava().getConfiguration().getProperty().add(property);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/resources/action/feed/table-export.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/table-export.xml b/oozie/src/main/resources/action/feed/table-export.xml
index f5f7e66..fcf1a1a 100644
--- a/oozie/src/main/resources/action/feed/table-export.xml
+++ b/oozie/src/main/resources/action/feed/table-export.xml
@@ -20,6 +20,9 @@
     <hive xmlns="uri:oozie:hive-action:0.2">
         <job-tracker>${falconSourceJobTracker}</job-tracker>
         <name-node>${falconSourceNameNode}</name-node>
+        <!--
+            falconSourceStagingDir and distcpSourcePaths are same but falconSourceStagingDir
is readonly
+        -->
         <prepare>
             <delete path="${distcpSourcePaths}"/>
         </prepare>
@@ -38,8 +41,8 @@
         <param>falconSourceDatabase=${falconSourceDatabase}</param>
         <param>falconSourceTable=${falconSourceTable}</param>
         <param>falconSourcePartition=${falconSourcePartition}</param>
-        <param>falconSourceStagingDir=${distcpSourcePaths}</param>
+        <param>falconSourceStagingDir=${falconSourceStagingDir}</param>
     </hive>
     <ok to="replication"/>
     <error to="failed-post-processing"/>
-</action>
\ No newline at end of file
+</action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 42c231f..e5588b4 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -43,6 +43,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
 import org.apache.falcon.oozie.coordinator.SYNCDATASET;
 import org.apache.falcon.oozie.process.AbstractTestBase;
 import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.JAVA;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.CurrentUser;
@@ -437,13 +438,26 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
 
         Assert.assertTrue(props.containsKey("distcpSourcePaths"));
-        Assert.assertEquals(props.get("distcpSourcePaths"),
-                FeedHelper.getStagingPath(srcCluster, tableFeed, srcStorage, Tag.REPLICATION,
+        final String distcpSourcePaths = props.get("distcpSourcePaths");
+        Assert.assertEquals(distcpSourcePaths,
+                FeedHelper.getStagingPath(true, srcCluster, tableFeed, srcStorage, Tag.REPLICATION,
                         "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" +
"/" + trgCluster.getName()));
+        Assert.assertTrue(props.containsKey("falconSourceStagingDir"));
+
+        final String falconSourceStagingDir = props.get("falconSourceStagingDir");
+        Assert.assertEquals(falconSourceStagingDir,
+                FeedHelper.getStagingPath(false, srcCluster, tableFeed, srcStorage, Tag.REPLICATION,
+                        "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" +
"/" + trgCluster.getName()));
+
+        String exportPath = falconSourceStagingDir.substring(
+                ClusterHelper.getStorageUrl(srcCluster).length(), falconSourceStagingDir.length());
+        String distCPPath = distcpSourcePaths.substring(
+                ClusterHelper.getReadOnlyStorageUrl(srcCluster).length(), distcpSourcePaths.length());
+        Assert.assertEquals(exportPath, distCPPath);
 
         Assert.assertTrue(props.containsKey("distcpTargetPaths"));
         Assert.assertEquals(props.get("distcpTargetPaths"),
-                FeedHelper.getStagingPath(trgCluster, tableFeed, trgStorage, Tag.REPLICATION,
+                FeedHelper.getStagingPath(false, trgCluster, tableFeed, trgStorage, Tag.REPLICATION,
                         "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" +
"/" + trgCluster.getName()));
 
         Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
@@ -510,6 +524,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
             } else if ("table-import".equals(actionName) && isSecurityEnabled) {
                 Assert.assertNotNull(action.getCred());
                 Assert.assertEquals(action.getCred(), "falconTargetHiveAuth");
+            } else if ("replication".equals(actionName)) {
+                List<CONFIGURATION.Property> properties =
+                        action.getJava().getConfiguration().getProperty();
+                for (CONFIGURATION.Property property : properties) {
+                    if (property.getName().equals("mapreduce.job.hdfs-servers")) {
+                        Assert.assertEquals(property.getValue(),
+                                ClusterHelper.getReadOnlyStorageUrl(srcCluster)
+                                        + "," + ClusterHelper.getStorageUrl(trgCluster));
+                    }
+                }
             }
         }
     }


Mime
View raw message