falcon-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peeyu...@apache.org
Subject falcon git commit: FALCON-2280 Unable to create mirror on WASB target using extensions
Date Tue, 28 Feb 2017 23:34:55 GMT
Repository: falcon
Updated Branches:
  refs/heads/master 3dc7efe50 -> 9e9cb7a21


FALCON-2280 Unable to create mirror on WASB target using extensions

Author: Sowmya Ramesh <sramesh@hortonworks.com>

Reviewers: Peeyush <peeyushb@apache.org>, Venkat <n.r.v@live.com>

Closes #365 from sowmyaramesh/FALCON-2280


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9e9cb7a2
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9e9cb7a2
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9e9cb7a2

Branch: refs/heads/master
Commit: 9e9cb7a21d138e7489198472f3296ec237aac969
Parents: 3dc7efe
Author: Sowmya Ramesh <sramesh@hortonworks.com>
Authored: Wed Mar 1 05:04:43 2017 +0530
Committer: peeyush b <pbishnoi@hortonworks.com>
Committed: Wed Mar 1 05:04:43 2017 +0530

----------------------------------------------------------------------
 .../runtime/hdfs-mirroring-workflow.xml         |   2 +-
 .../falcon/hive/mapreduce/CopyMapper.java       |   2 +-
 .../falcon/hive/mapreduce/CopyReducer.java      |   8 +-
 .../java/org/apache/falcon/util/FSDRUtils.java  |  69 +++++++++
 .../org/apache/falcon/util/FSDRUtilsTest.java   |  61 ++++++++
 .../mirroring/hdfs/HdfsMirroringExtension.java  | 147 ++++++++++++++-----
 .../hdfs/HdfsMirroringExtensionProperties.java  |   8 +-
 .../apache/falcon/extensions/ExtensionTest.java |   5 +-
 .../apache/falcon/unit/FalconUnitClient.java    |   2 +-
 9 files changed, 253 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
index 7929dd7..1287333 100644
--- a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
+++ b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
@@ -92,7 +92,7 @@
             <arg>-sourcePaths</arg>
             <arg>${sourceDir}</arg>
             <arg>-targetPath</arg>
-            <arg>${targetClusterFS}${targetDir}</arg>
+            <arg>${targetDir}</arg>
             <arg>-falconFeedStorageType</arg>
             <arg>FILESYSTEM</arg>
             <arg>-availabilityFlag</arg>

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
index 5cd7e74..fabc9ed 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
@@ -42,7 +42,7 @@ public class CopyMapper extends Mapper<LongWritable, Text, Text, Text>
{
 
     private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class);
     private EventUtils eventUtils;
-    ScheduledThreadPoolExecutor timer;
+    private ScheduledThreadPoolExecutor timer;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
index 7c415c3..c894b13 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
@@ -48,10 +48,10 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text>
{
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
         Configuration conf = context.getConfiguration();
-        FileSystem fs= FileSystem.get(
+        FileSystem fs = FileSystem.get(
                 FileUtils.getConfiguration(context.getConfiguration(),
-                conf.get(HiveDRArgs.TARGET_NN.getName()),
-                conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
+                        conf.get(HiveDRArgs.TARGET_NN.getName()),
+                        conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
         hiveDRStore = new HiveDRStatusStore(fs);
     }
 
@@ -67,7 +67,7 @@ public class CopyReducer extends Reducer<Text, Text, Text, Text> {
 
     @Override
     protected void reduce(Text key, Iterable<Text> values, final Context context)
-            throws IOException, InterruptedException {
+        throws IOException, InterruptedException {
         List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>();
         ReplicationStatus rs;
         timer = new ScheduledThreadPoolExecutor(1);

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/common/src/main/java/org/apache/falcon/util/FSDRUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/FSDRUtils.java b/common/src/main/java/org/apache/falcon/util/FSDRUtils.java
new file mode 100644
index 0000000..17da193
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/FSDRUtils.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Util class for FS DR.
+ */
+public final class FSDRUtils {
+    private FSDRUtils() {
+    }
+
+    private static final List<String> HDFS_SCHEME_PREFIXES =
+            Arrays.asList("file", "hdfs", "hftp", "hsftp", "webhdfs", "swebhdfs");
+
+    private static Configuration defaultConf = new Configuration();
+
+    public static Configuration getDefaultConf() {
+        return defaultConf;
+    }
+
+    public static void setDefaultConf(Configuration conf) {
+        defaultConf = conf;
+    }
+
+    public static boolean isHCFS(Path filePath) throws FalconException {
+        if (filePath == null) {
+            throw new FalconException("filePath cannot be empty");
+        }
+
+        String scheme;
+        try {
+            FileSystem f = FileSystem.get(filePath.toUri(), getDefaultConf());
+            scheme = f.getScheme();
+            if (StringUtils.isBlank(scheme)) {
+                throw new FalconException("Cannot get valid scheme for " + filePath);
+            }
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+
+        return HDFS_SCHEME_PREFIXES.contains(scheme.toLowerCase().trim()) ? false : true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java b/common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java
new file mode 100644
index 0000000..7f63cda
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/util/FSDRUtilsTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.falcon.FalconException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests FSDRUtils.
+ */
+public final class FSDRUtilsTest {
+
+    @BeforeClass
+    private void setup() throws Exception {
+        Configuration conf = new Configuration();
+        conf.set("fs.s3n.awsAccessKeyId", "testS3KeyId");
+        conf.set("fs.s3n.awsSecretAccessKey", "testS3AccessKey");
+        conf.set("fs.azure.account.key.mystorage.blob.core.windows.net", "dGVzdEF6dXJlQWNjZXNzS2V5");
+        FSDRUtils.setDefaultConf(conf);
+    }
+
+    @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = "filePath
cannot be empty")
+    public void testIsHCFSEmptyPath() throws Exception {
+        FSDRUtils.isHCFS(null);
+    }
+
+    @Test
+    public void testIsHCFS() throws Exception {
+        boolean isHCFSPath = FSDRUtils.isHCFS(new Path("/apps/dr"));
+        Assert.assertFalse(isHCFSPath);
+
+        isHCFSPath = FSDRUtils.isHCFS(new Path("hdfs://localhost:54136/apps/dr"));
+        Assert.assertFalse(isHCFSPath);
+
+        isHCFSPath = FSDRUtils.isHCFS(new Path("hftp://localhost:54136/apps/dr"));
+        Assert.assertFalse(isHCFSPath);
+
+        isHCFSPath = FSDRUtils.isHCFS(new Path("s3n://testBucket/apps/dr"));
+        Assert.assertTrue(isHCFSPath);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
index 74d217c..fd823cf 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
@@ -23,6 +23,8 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.util.FSDRUtils;
+import org.apache.hadoop.fs.Path;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -48,11 +50,30 @@ public class HdfsMirroringExtension extends AbstractExtension {
                 throw new FalconException("Missing extension property: " + option.getName());
             }
         }
+
+        String srcPaths = extensionProperties.getProperty(HdfsMirroringExtensionProperties
+                .SOURCE_DIR.getName());
+        if (!isHCFSPath(srcPaths)) {
+            if (extensionProperties.getProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName())
== null) {
+                throw new FalconException("Missing extension property: " + HdfsMirroringExtensionProperties.
+                        SOURCE_CLUSTER.getName());
+            }
+        }
+        String targetDir = extensionProperties.getProperty(HdfsMirroringExtensionProperties
+                .TARGET_DIR.getName());
+
+        if (!FSDRUtils.isHCFS(new Path(targetDir.trim()))) {
+            if (extensionProperties.getProperty(HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName())
== null) {
+                throw new FalconException("Missing extension property: " + HdfsMirroringExtensionProperties.
+                        TARGET_CLUSTER.getName());
+            }
+        }
     }
 
     @Override
     public Properties getAdditionalProperties(final Properties extensionProperties) throws
FalconException {
         Properties additionalProperties = new Properties();
+        boolean isHCFSDR = false;
 
         // Add default properties if not passed
         String distcpMaxMaps = extensionProperties.getProperty(
@@ -67,23 +88,31 @@ public class HdfsMirroringExtension extends AbstractExtension {
             additionalProperties.put(HdfsMirroringExtensionProperties.DISTCP_MAP_BANDWIDTH_IN_MB.getName(),
"100");
         }
 
-        // Construct fully qualified hdfs src path
         String srcPaths = extensionProperties.getProperty(HdfsMirroringExtensionProperties
                 .SOURCE_DIR.getName());
-        StringBuilder absoluteSrcPaths = new StringBuilder();
-        String sourceClusterName = extensionProperties.getProperty(
-                HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName());
-
-        // Since source cluster get read interface
-        Cluster srcCluster = ClusterHelper.getCluster(sourceClusterName);
-        if (srcCluster == null) {
-            throw new FalconException("Cluster entity " + sourceClusterName + " not found");
-        }
-        String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(srcCluster);
+        String sourceClusterFS = "";
+        if (isHCFSPath(srcPaths)) {
+            // Make sure path is fully qualified
+            // For HCFS only one path
+            URI pathUri = new Path(srcPaths).toUri();
+            if (pathUri.getAuthority() == null) {
+                throw new FalconException("getAdditionalProperties: " + srcPaths + " is not
fully qualified path");
+            }
+            isHCFSDR = true;
+        } else {
+            StringBuilder absoluteSrcPaths = new StringBuilder();
+            String sourceClusterName = extensionProperties.getProperty(
+                    HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName());
+
+            // Since source cluster get read interface
+            Cluster srcCluster = ClusterHelper.getCluster(sourceClusterName.trim());
+            if (srcCluster == null) {
+                throw new FalconException("Source Cluster entity " + sourceClusterName +
" not found");
+            }
+            String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(srcCluster);
 
-        if (StringUtils.isNotBlank(srcPaths)) {
+            // Construct fully qualified hdfs src path
             String[] paths = srcPaths.split(COMMA_SEPARATOR);
-
             URI pathUri;
             for (String path : paths) {
                 try {
@@ -92,48 +121,78 @@ public class HdfsMirroringExtension extends AbstractExtension {
                     throw new FalconException(e);
                 }
                 String authority = pathUri.getAuthority();
-                StringBuilder srcpath = new StringBuilder();
+                StringBuilder srcpath;
                 if (authority == null) {
-                    srcpath.append(srcClusterEndPoint);
+                    srcpath = new StringBuilder(srcClusterEndPoint);
+                } else {
+                    srcpath = new StringBuilder();
                 }
-
                 srcpath.append(path.trim());
                 srcpath.append(COMMA_SEPARATOR);
                 absoluteSrcPaths.append(srcpath);
             }
+            additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
+                    StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR));
+            sourceClusterFS = ClusterHelper.getReadOnlyStorageUrl(srcCluster);
         }
-        additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
-                StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR));
 
-        // Target dir shouldn't have the namenode
+
         String targetDir = extensionProperties.getProperty(HdfsMirroringExtensionProperties
                 .TARGET_DIR.getName());
+        String targetClusterFS = "";
+        if (FSDRUtils.isHCFS(new Path(targetDir.trim()))) {
+            // Make sure path is fully qualified
+            URI pathUri = new Path(targetDir).toUri();
+            if (pathUri.getAuthority() == null) {
+                throw new FalconException("getAdditionalProperties: " + targetDir + " is
not fully qualified path");
+            }
+            isHCFSDR = true;
+        } else {
+            String targetClusterName = extensionProperties.getProperty(
+                    HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName());
+
+            Cluster targetCluster = ClusterHelper.getCluster(targetClusterName.trim());
+            if (targetCluster == null) {
+                throw new FalconException("Target Cluster entity " + targetClusterName +
" not found");
+            }
 
-        URI targetPathUri;
-        try {
-            targetPathUri = new URI(targetDir.trim());
-        } catch (URISyntaxException e) {
-            throw new FalconException(e);
-        }
+            targetClusterFS = ClusterHelper.getStorageUrl(targetCluster);
 
-        if (targetPathUri.getScheme() != null) {
-            additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_DIR.getName(),
-                    targetPathUri.getPath());
-        }
+            // Construct fully qualified hdfs target path
+            URI pathUri;
+            try {
+                pathUri = new URI(targetDir.trim());
+            } catch (URISyntaxException e) {
+                throw new FalconException(e);
+            }
 
-        // add sourceClusterFS and targetClusterFS
-        additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_CLUSTER_FS_WRITE_ENDPOINT.getName(),
-                ClusterHelper.getStorageUrl(srcCluster));
+            StringBuilder targetPath;
+            String authority = pathUri.getAuthority();
+            if (authority == null) {
+                targetPath = new StringBuilder(targetClusterFS);
+            } else {
+                targetPath = new StringBuilder();
+            }
+            targetPath.append(targetDir.trim());
 
-        String targetClusterName = extensionProperties.getProperty(
-                HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName());
+            additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_DIR.getName(),
+                    targetPath.toString());
+        }
 
-        Cluster targetCluster = ClusterHelper.getCluster(targetClusterName);
-        if (targetCluster == null) {
-            throw new FalconException("Cluster entity " + targetClusterName + " not found");
+        // Oozie doesn't take null or empty string for arg in the WF. For HCFS pass the source
FS as its not used
+        if (isHCFSDR) {
+            if (StringUtils.isBlank(sourceClusterFS)) {
+                sourceClusterFS = targetClusterFS;
+            } else if (StringUtils.isBlank(targetClusterFS)) {
+                targetClusterFS = sourceClusterFS;
+            }
         }
+        // Add sourceClusterFS
+        additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_CLUSTER_FS_READ_ENDPOINT.getName(),
+                sourceClusterFS);
+        // Add targetClusterFS
         additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_CLUSTER_FS_WRITE_ENDPOINT.getName(),
-                ClusterHelper.getStorageUrl(targetCluster));
+                targetClusterFS);
 
         if (StringUtils.isBlank(
                 extensionProperties.getProperty(HdfsMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName())))
{
@@ -144,4 +203,16 @@ public class HdfsMirroringExtension extends AbstractExtension {
         return additionalProperties;
     }
 
+    private static boolean isHCFSPath(String srcPaths) throws FalconException {
+        if (StringUtils.isNotBlank(srcPaths)) {
+            String[] paths = srcPaths.split(COMMA_SEPARATOR);
+
+            // We expect all paths to be of same type, hence verify the first path
+            for (String path : paths) {
+                return FSDRUtils.isHCFS(new Path(path.trim()));
+            }
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
index 52ae0c0..47ebca0 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
@@ -23,13 +23,13 @@ package org.apache.falcon.extensions.mirroring.hdfs;
  */
 public enum HdfsMirroringExtensionProperties {
     SOURCE_DIR("sourceDir", "Location of source data to replicate"),
-    SOURCE_CLUSTER("sourceCluster", "Source cluster"),
-    SOURCE_CLUSTER_FS_WRITE_ENDPOINT("sourceClusterFS", "Source cluster end point", false),
+    SOURCE_CLUSTER("sourceCluster", "Source cluster", false),
+    SOURCE_CLUSTER_FS_READ_ENDPOINT("sourceClusterFS", "Source cluster end point", false),
     TARGET_DIR("targetDir", "Location on target cluster for replication"),
-    TARGET_CLUSTER("targetCluster", "Target cluster"),
+    TARGET_CLUSTER("targetCluster", "Target cluster", false),
     TARGET_CLUSTER_FS_WRITE_ENDPOINT("targetClusterFS", "Target cluster end point", false),
     DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during replication", false),
-    DISTCP_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper
during replication",
+    DISTCP_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s usCOed by each mapper
during replication",
             false),
     TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled",
false);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
index 4763db8..f6bfe17 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
@@ -213,7 +213,7 @@ public class ExtensionTest extends AbstractTestExtensionStore {
         Assert.assertEquals(extensionStorePath + "/hdfs-mirroring/libs",
                 processEntity.getWorkflow().getLib());
         Assert.assertEquals(extensionStorePath
-                + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
+                        + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
                 processEntity.getWorkflow().getPath());
 
         Properties props = EntityUtil.getEntityProperties(processEntity);
@@ -221,7 +221,8 @@ public class ExtensionTest extends AbstractTestExtensionStore {
         String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(ClusterHelper.getCluster(SOURCE_CLUSTER));
         Assert.assertEquals(srcClusterEndPoint + SOURCEDIR, props.getProperty("sourceDir"));
         Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster"));
-        Assert.assertEquals(TARGETDIR, props.getProperty("targetDir"));
+        String tgtClusterEndPoint = ClusterHelper.getStorageUrl(ClusterHelper.getCluster(TARGET_CLUSTER));
+        Assert.assertEquals(tgtClusterEndPoint + TARGETDIR, props.getProperty("targetDir"));
         Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster"));
 
         //retry

http://git-wip-us.apache.org/repos/asf/falcon/blob/9e9cb7a2/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index 6148979..b0c6edf 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -333,7 +333,7 @@ public class FalconUnitClient extends AbstractFalconClient {
             entities = ExtensionHandler.loadAndPrepare(extensionName, jobName, configStream,
                     packagePath);
         } catch (FalconException | IOException | URISyntaxException e) {
-            throw new FalconCLIException("Failed in generating entities for job:" + jobName);
+            throw new FalconCLIException("Failed in generating entities for job:" + jobName,
e);
         }
         return entities;
     }


Mime
View raw message