hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1575590 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
Date Sat, 08 Mar 2014 18:48:56 GMT
Author: tedyu
Date: Sat Mar  8 18:48:55 2014
New Revision: 1575590

URL: http://svn.apache.org/r1575590
Log:
HBASE-8304 Bulkload fails to remove files if fs.default.name / fs.defaultFS is configured
without default port (Haosdent)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java?rev=1575590&r1=1575589&r2=1575590&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
Sat Mar  8 18:48:55 2014
@@ -49,8 +49,8 @@ import org.apache.hadoop.hbase.backup.HF
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
 import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Threads;
 
 /**
  * View to an on-disk Region.
@@ -403,7 +403,7 @@ public class HRegionFileSystem {
     // We can't compare FileSystem instances as equals() includes UGI instance
     // as part of the comparison and won't work when doing SecureBulkLoad
     // TODO deal with viewFS
-    if (!srcFs.getUri().equals(desFs.getUri())) {
+    if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
       LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
           "the destination store. Copying file over to destination filesystem.");
       Path tmpPath = createTempName();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java?rev=1575590&r1=1575589&r2=1575590&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java Sat
Mar  8 18:48:55 2014
@@ -22,7 +22,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -41,6 +47,82 @@ import org.apache.hadoop.hdfs.server.nam
 @InterfaceStability.Evolving
 public class FSHDFSUtils extends FSUtils {
   private static final Log LOG = LogFactory.getLog(FSHDFSUtils.class);
+  private static Class dfsUtilClazz;
+  private static Method getNNAddressesMethod;
+
+  /**
+   * @param fs
+   * @param conf
+   * @return A set containing all namenode addresses of fs
+   */
+  private static Set<InetSocketAddress> getNNAddresses(DistributedFileSystem fs,
+                                                      Configuration conf) {
+    Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>();
+    String serviceName = fs.getCanonicalServiceName();
+
+    if (serviceName.startsWith("ha-hdfs")) {
+      try {
+        if (dfsUtilClazz == null) {
+          dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
+        }
+        if (getNNAddressesMethod == null) {
+          getNNAddressesMethod =
+                  dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
+        }
+
+        Map<String, Map<String, InetSocketAddress>> addressMap =
+                (Map<String, Map<String, InetSocketAddress>>) getNNAddressesMethod
+                        .invoke(null, conf);
+        for (Map.Entry<String, Map<String, InetSocketAddress>> entry : addressMap.entrySet())
{
+          Map<String, InetSocketAddress> nnMap = entry.getValue();
+          for (Map.Entry<String, InetSocketAddress> e2 : nnMap.entrySet()) {
+            InetSocketAddress addr = e2.getValue();
+            addresses.add(addr);
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn("DFSUtil.getNNServiceRpcAddresses failed. serviceName=" + serviceName, e);
+      }
+    } else {
+      URI uri = fs.getUri();
+      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), uri.getPort());
+      addresses.add(addr);
+    }
+
+    return addresses;
+  }
+
+  /**
+   * @param conf the Configuration of HBase
+   * @param srcFs
+   * @param desFs
+   * @return Whether srcFs and desFs are on same hdfs or not
+   */
+  public static boolean isSameHdfs(Configuration conf, FileSystem srcFs, FileSystem desFs)
{
+    // By getCanonicalServiceName, we could make sure both srcFs and desFs
+    // show a unified format which contains scheme, host and port.
+    String srcServiceName = srcFs.getCanonicalServiceName();
+    String desServiceName = desFs.getCanonicalServiceName();
+
+    if (srcServiceName == null || desServiceName == null) {
+      return false;
+    }
+    if (srcServiceName.equals(desServiceName)) {
+      return true;
+    }
+    if (srcFs instanceof DistributedFileSystem && desFs instanceof DistributedFileSystem)
{
+      //If one serviceName is an HA format while the other is a non-HA format,
+      // maybe they refer to the same FileSystem.
+      //For example, srcFs is "ha-hdfs://nameservices" and desFs is "hdfs://activeNamenode:port"
+      Set<InetSocketAddress> srcAddrs = getNNAddresses((DistributedFileSystem) srcFs,
conf);
+      Set<InetSocketAddress> desAddrs = getNNAddresses((DistributedFileSystem) desFs,
conf);
+      if (Sets.intersection(srcAddrs, desAddrs).size() > 0) {
+        return true;
+      }
+    }
+
+    return false;
+  }
 
   /**
    * Recover the lease from HDFS, retrying multiple times.

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java?rev=1575590&r1=1575589&r2=1575590&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
Sat Mar  8 18:48:55 2014
@@ -21,8 +21,12 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -36,6 +40,7 @@ import org.mockito.Mockito;
  */
 @Category(MediumTests.class)
 public class TestFSHDFSUtils {
+  private static final Log LOG = LogFactory.getLog(TestFSHDFSUtils.class);
   private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
   static {
     Configuration conf = HTU.getConfiguration();
@@ -94,6 +99,51 @@ public class TestFSHDFSUtils {
     Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
   }
 
+  @Test
+  public void testIsSameHdfs() throws IOException {
+    try {
+      Class dfsUtilClazz = Class.forName("org.apache.hadoop.hdfs.DFSUtil");
+      dfsUtilClazz.getMethod("getNNServiceRpcAddresses", Configuration.class);
+    } catch (Exception e) {
+      LOG.info("Skip testIsSameHdfs test case because of the no-HA hadoop version.");
+      return;
+    }
+
+    Configuration conf = HBaseConfiguration.create();
+    Path srcPath = new Path("hdfs://localhost:8020/");
+    Path desPath = new Path("hdfs://127.0.0.1/");
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    FileSystem desFs = desPath.getFileSystem(conf);
+
+    assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    desPath = new Path("hdfs://127.0.0.1:8070/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    desPath = new Path("hdfs://127.0.1.1:8020/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    conf.set("fs.defaultFS", "hdfs://haosong-hadoop");
+    conf.set("dfs.nameservices", "haosong-hadoop");
+    conf.set("dfs.ha.namenodes.haosong-hadoop", "nn1,nn2");
+    conf.set("dfs.client.failover.proxy.provider.haosong-hadoop",
+            "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.0.0.1:8020");
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.10.2.1:8000");
+    desPath = new Path("/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn1", "127.10.2.1:8020");
+    conf.set("dfs.namenode.rpc-address.haosong-hadoop.nn2", "127.0.0.1:8000");
+    desPath = new Path("/");
+    desFs = desPath.getFileSystem(conf);
+    assertTrue(!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs));
+  }
+
   /**
    * Version of DFS that has HDFS-4525 in it.
    */



Mime
View raw message