hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1375451 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/fs/ main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/wal/ test/java/org/apache/hadoop/...
Date Tue, 21 Aug 2012 09:47:06 GMT
Author: nkeywal
Date: Tue Aug 21 09:47:05 2012
New Revision: 1375451

URL: http://svn.apache.org/viewvc?rev=1375451&view=rev
Log:
HBASE-6435 Reading WAL files after a recovery leads to time lost in HDFS timeouts when using
dead datanodes

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java?rev=1375451&r1=1375450&r2=1375451&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java Tue Aug
21 09:47:05 2012
@@ -301,6 +301,16 @@ public class ServerName implements Compa
       new ServerName(str, NON_STARTCODE);
   }
 
+
+  /**
+   * @return true if the String follows the pattern of {@link ServerName#toString()}, false
+   *  otherwise.
+   */
+  public static boolean isFullServerName(final String str){
+    if (str == null ||str.isEmpty()) return false;
+    return SERVERNAME_PATTERN.matcher(str).matches();
+  }
+
   /**
    * Get a ServerName from the passed in data bytes.
    * @param data Data with a serialize server name in it; can handle the old style

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java?rev=1375451&r1=1375450&r2=1375451&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java Tue
Aug 21 09:47:05 2012
@@ -21,16 +21,29 @@
 package org.apache.hadoop.hbase.fs;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
 import java.net.URI;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Methods;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Progressable;
 
@@ -42,6 +55,7 @@ import org.apache.hadoop.util.Progressab
  * this is the place to make it happen.
  */
 public class HFileSystem extends FilterFileSystem {
+  public static final Log LOG = LogFactory.getLog(HFileSystem.class);
 
   private final FileSystem noChecksumFs;   // read hfile data from storage
   private final boolean useHBaseChecksum;
@@ -78,6 +92,7 @@ public class HFileSystem extends FilterF
     } else {
       this.noChecksumFs = fs;
     }
+    addLocationsOrderInterceptor(conf);
   }
 
   /**
@@ -159,9 +174,154 @@ public class HFileSystem extends FilterF
     if (fs == null) {
       throw new IOException("No FileSystem for scheme: " + uri.getScheme());
     }
+
     return fs;
   }
 
+  public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException
{
+    return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
+  }
+
+  /**
+   * Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient
+   * linked to this FileSystem. See HBASE-6435 for the background.
+   * <p/>
+   * There should be no reason, except testing, to create a specific ReorderBlocks.
+   *
+   * @return true if the interceptor was added, false otherwise.
+   */
+  static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb)
{
+    LOG.debug("Starting addLocationsOrderInterceptor with class " + lrb.getClass());
+
+    if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) {  // activated by default
+      LOG.debug("addLocationsOrderInterceptor configured to false");
+      return false;
+    }
+
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(conf);
+    } catch (IOException e) {
+      LOG.warn("Can't get the file system from the conf.", e);
+      return false;
+    }
+
+    if (!(fs instanceof DistributedFileSystem)) {
+      LOG.warn("The file system is not a DistributedFileSystem." +
+          "Not adding block location reordering");
+      return false;
+    }
+
+    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DFSClient dfsc = dfs.getClient();
+    if (dfsc == null) {
+      LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location
" +
+          "block reordering interceptor. Continuing, but this is unexpected."
+      );
+      return false;
+    }
+
+    try {
+      Field nf = DFSClient.class.getDeclaredField("namenode");
+      nf.setAccessible(true);
+      Field modifiersField = Field.class.getDeclaredField("modifiers");
+      modifiersField.setAccessible(true);
+      modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
+
+      ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
+      if (namenode == null) {
+        LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block"
+
+            " reordering interceptor. Continuing, but this is unexpected."
+        );
+        return false;
+      }
+
+      ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
+      nf.set(dfsc, cp1);
+      LOG.info("Added intercepting call to namenode#getBlockLocations");
+    } catch (NoSuchFieldException e) {
+      LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.",
e);
+      return false;
+    } catch (IllegalAccessException e) {
+      LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.",
e);
+      return false;
+    }
+
+    return true;
+  }
+
+  private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
+      final ReorderBlocks lrb, final Configuration conf) {
+    return (ClientProtocol) Proxy.newProxyInstance
+        (cp.getClass().getClassLoader(),
+            new Class[]{ClientProtocol.class},
+            new InvocationHandler() {
+              public Object invoke(Object proxy, Method method,
+                                   Object[] args) throws Throwable {
+                Object res = method.invoke(cp, args);
+                if (res != null && args.length == 3 && "getBlockLocations".equals(method.getName())
+                    && res instanceof LocatedBlocks
+                    && args[0] instanceof String
+                    && args[0] != null) {
+                  lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
+                }
+                return res;
+              }
+            });
+  }
+
+  /**
+   * Interface to implement to add a specific reordering logic in hdfs.
+   */
+  static interface ReorderBlocks {
+    /**
+     *
+     * @param conf - the conf to use
+     * @param lbs - the LocatedBlocks to reorder
+     * @param src - the file name currently read
+     * @throws IOException - if something went wrong
+     */
+    public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
+  }
+
+  /**
+   * We're putting at lowest priority the hlog files blocks that are on the same datanode
+   * as the original regionserver which created these files. This because we fear that the
+   * datanode is actually dead, so if we use it it will timeout.
+   */
+  static class ReorderWALBlocks implements ReorderBlocks {
+    public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
+        throws IOException {
+
+      ServerName sn = HLog.getServerNameFromHLogDirectoryName(conf, src);
+      if (sn == null) {
+        // It's not an HLOG
+        return;
+      }
+
+      // Ok, so it's an HLog
+      String hostName = sn.getHostname();
+      LOG.debug(src + " is an HLog file, so reordering blocks, last hostname will be:" +
hostName);
+
+      // Just check for all blocks
+      for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+        DatanodeInfo[] dnis = lb.getLocations();
+        if (dnis != null && dnis.length > 1) {
+          boolean found = false;
+          for (int i = 0; i < dnis.length - 1 && !found; i++) {
+            if (hostName.equals(dnis[i].getHostName())) {
+              // advance the other locations by one and put this one at the last place.
+              DatanodeInfo toLast = dnis[i];
+              System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
+              dnis[dnis.length - 1] = toLast;
+              found = true;
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Create a new HFileSystem object, similar to FileSystem.get().
    * This returns a filesystem object that avoids checksum

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1375451&r1=1375450&r2=1375451&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
Tue Aug 21 09:47:05 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -45,9 +44,9 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
@@ -118,6 +117,7 @@ public class MasterFileSystem {
     // setup the filesystem variable
     // set up the archived logs path
     this.oldLogDir = createInitialFileSystemLayout();
+    HFileSystem.addLocationsOrderInterceptor(conf);
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1375451&r1=1375450&r2=1375451&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Tue Aug 21 09:47:05 2012
@@ -53,6 +53,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -151,6 +152,7 @@ public class HLog implements Syncable {
 
   private WALCoprocessorHost coprocessorHost;
 
+
   static void resetLogReaderClass() {
     HLog.logReaderClass = null;
   }
@@ -1757,6 +1759,62 @@ public class HLog implements Syncable {
     return dirName.toString();
   }
 
+
+  /**
+   * @param path - the path to analyze. Expected format, if it's in hlog directory:
+   *  / [base directory for hbase] / hbase / .logs / ServerName / logfile
+   * @return null if it's not a log file. Returns the ServerName of the region server that
created
+   *  this log file otherwise.
+   */
+  public static ServerName getServerNameFromHLogDirectoryName(Configuration conf, String
path)
+      throws IOException {
+    if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
+      return null;
+    }
+
+    if (conf == null) {
+      throw new IllegalArgumentException("parameter conf must be set");
+    }
+
+    final String rootDir = conf.get(HConstants.HBASE_DIR);
+    if (rootDir == null || rootDir.isEmpty()) {
+      throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
+    }
+
+    final StringBuilder startPathSB = new StringBuilder(rootDir);
+    if (!rootDir.endsWith("/")) startPathSB.append('/');
+    startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
+    if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) startPathSB.append('/');
+    final String startPath = startPathSB.toString();
+
+    String fullPath;
+    try {
+      fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
+    } catch (IllegalArgumentException e) {
+      LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
+      return null;
+    }
+
+    if (!fullPath.startsWith(startPath)) {
+      return null;
+    }
+
+    final String serverNameAndFile = fullPath.substring(startPath.length());
+
+    if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
+      // Either it's a file (not a directory) or it's not a ServerName format
+      return null;
+    }
+
+    final String serverName = serverNameAndFile.substring(0, serverNameAndFile.indexOf('/')
- 1);
+
+    if (!ServerName.isFullServerName(serverName)) {
+      return null;
+    }
+
+    return ServerName.parseServerName(serverName);
+  }
+
   /**
    * Get the directory we are making logs in.
    * 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1375451&r1=1375450&r2=1375451&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
Tue Aug 21 09:47:05 2012
@@ -439,6 +439,25 @@ public class HBaseTestingUtility {
     return this.dfsCluster;
   }
 
+
+  public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
+      throws Exception {
+    createDirsAndSetProperties();
+    this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
+        true, null, racks, hosts, null);
+
+    // Set this just-started cluster as our filesystem.
+    FileSystem fs = this.dfsCluster.getFileSystem();
+    this.conf.set("fs.defaultFS", fs.getUri().toString());
+    // Do old style too just to be safe.
+    this.conf.set("fs.default.name", fs.getUri().toString());
+
+    // Wait for the cluster to be totally up
+    this.dfsCluster.waitClusterUp();
+
+    return this.dfsCluster;
+  }
+
   public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException
{
     createDirsAndSetProperties();
     dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
@@ -637,6 +656,11 @@ public class HBaseTestingUtility {
     return startMiniHBaseCluster(numMasters, numSlaves);
   }
 
+  public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
+      throws IOException, InterruptedException{
+    return startMiniHBaseCluster(numMasters, numSlaves, null, null);
+  }
+
   /**
    * Starts up mini hbase cluster.  Usually used after call to
    * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
@@ -649,7 +673,8 @@ public class HBaseTestingUtility {
    * @see {@link #startMiniCluster()}
    */
   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
-      final int numSlaves)
+        final int numSlaves, Class<? extends HMaster> masterClass,
+        Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
   throws IOException, InterruptedException {
     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
     createRootDir();
@@ -660,7 +685,8 @@ public class HBaseTestingUtility {
     conf.setInt("hbase.master.wait.on.regionservers.maxtostart", numSlaves);
 
     Configuration c = new Configuration(this.conf);
-    this.hbaseCluster = new MiniHBaseCluster(c, numMasters, numSlaves);
+    this.hbaseCluster =
+        new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
     // Don't leave here till we've done a successful scan of the .META.
     HTable t = new HTable(c, HConstants.META_TABLE_NAME);
     ResultScanner s = t.getScanner(new Scan());

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1375451&r1=1375450&r2=1375451&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Tue
Aug 21 09:47:05 2012
@@ -76,11 +76,20 @@ public class MiniHBaseCluster {
    * @throws IOException
    */
   public MiniHBaseCluster(Configuration conf, int numMasters,
-      int numRegionServers)
-  throws IOException, InterruptedException {
+                             int numRegionServers)
+      throws IOException, InterruptedException {
+    this.conf = conf;
+    conf.set(HConstants.MASTER_PORT, "0");
+    init(numMasters, numRegionServers, null, null);
+  }
+
+  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
+         Class<? extends HMaster> masterClass,
+         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
+      throws IOException, InterruptedException {
     this.conf = conf;
     conf.set(HConstants.MASTER_PORT, "0");
-    init(numMasters, numRegionServers);
+    init(numMasters, numRegionServers, masterClass, regionserverClass);
   }
 
   public Configuration getConfiguration() {
@@ -186,12 +195,21 @@ public class MiniHBaseCluster {
     }
   }
 
-  private void init(final int nMasterNodes, final int nRegionNodes)
+  private void init(final int nMasterNodes, final int nRegionNodes,
+                 Class<? extends HMaster> masterClass,
+                 Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
   throws IOException, InterruptedException {
     try {
+      if (masterClass == null){
+        masterClass =  HMaster.class;
+      }
+      if (regionserverClass == null){
+        regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
+      }
+
       // start up a LocalHBaseCluster
       hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
-        HMaster.class, MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
+          masterClass, regionserverClass);
 
       // manually add the regionservers as other users
       for (int i=0; i<nRegionNodes; i++) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1375451&r1=1375450&r2=1375451&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Tue Aug 21 09:47:05 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -720,6 +721,29 @@ public class TestHLog  {
     }
   }
 
+
+  @Test
+  public void testGetServerNameFromHLogDirectoryName() throws IOException {
+    String hl = conf.get(HConstants.HBASE_DIR) + "/"+
+        HLog.getHLogDirectoryName(new ServerName("hn", 450, 1398).toString());
+
+    // Must not throw exception
+    Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf, null));
+    Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf,
+        conf.get(HConstants.HBASE_DIR) + "/"));
+    Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "") );
+    Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "                  ")
);
+    Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl) );
+    Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"qdf") );
+    Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "sfqf"+hl+"qdf") );
+
+    Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, conf.get(
+        HConstants.HBASE_DIR) +
+        "/.logs/localhost,32984,1343316388997/localhost%2C32984%2C1343316388997.1343316390417"
+        ));
+    Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"/qdf") );
+  }
+
   /**
    * A loaded WAL coprocessor won't break existing HLog test cases.
    */



Mime
View raw message