hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1412373 - in /hbase/branches/0.89-fb: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/java/org/apache/hadoop/hbase/master/ src/t...
Date Thu, 22 Nov 2012 00:03:34 GMT
Author: liyin
Date: Thu Nov 22 00:03:31 2012
New Revision: 1412373

URL: http://svn.apache.org/viewvc?rev=1412373&view=rev
Log:
[HBASE-6981] Multiple hlog per region server

Author: liyintang

Summary:
In order to take full advantage of the aggregate throughput from multiple disks, hbase can
keep multiple hlog files open and let each region append/sync to one of them.

>From the initial benchmark on the single region server, this diff helps to improve the
write throughput by 3X, from 100MB/sec to 300MB/sec

Test Plan: Tested on the dev cluster

Reviewers: kannan, aaiyer

Reviewed By: kannan

CC: hbase-eng@, mycnyc

Differential Revision: https://phabricator.fb.com/D604539

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMultipleHLogs.java
Modified:
    hbase/branches/0.89-fb/pom.xml
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/branches/0.89-fb/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/pom.xml?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/pom.xml (original)
+++ hbase/branches/0.89-fb/pom.xml Thu Nov 22 00:03:31 2012
@@ -464,6 +464,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.7</version>
         <executions>
           <execution>
             <id>add-jspc-source</id>

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Nov 22
00:03:31 2012
@@ -591,7 +591,13 @@ public final class HConstants {
   /** Configuration key suffix for Thrift server port */
   public static final String THRIFT_PORT_SUFFIX = "port";
 
-  /**
+  /** The number of HLogs for each region server */
+  public static final String HLOG_CNT_PER_SERVER = "hbase.regionserver.hlog.cnt.perserver";
+  
+  public static final String HLOG_FORMAT_BACKWARD_COMPATIBILITY =
+      "hbase.regionserver.hlog.format.backward.compatibility";
+  
+  /** 
    * The byte array represents for NO_NEXT_INDEXED_KEY;
    * The actual value is irrelevant because this is always compared by reference.
    */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Thu Nov 22 00:03:31 2012
@@ -272,10 +272,14 @@ public class HRegionServer implements HR
    */
   Chore majorCompactionChecker;
 
-  // HLog and HLog roller.  log is protected rather than private to avoid
+  // An array of HLog and HLog roller.  log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
-  protected volatile HLog hlog;
-  LogRoller hlogRoller;
+  protected volatile HLog[] hlogs;  
+  protected LogRoller[] hlogRollers;
+  
+  private volatile int currentHLogIndex = 0;
+  private Map<String, Integer> regionNameToHLogIDMap =
+      new ConcurrentHashMap<String, Integer>();
 
   // flag set after we're done setting up server threads (used for testing)
   protected volatile boolean isOnline;
@@ -522,7 +526,15 @@ public class HRegionServer implements HR
     this.compactSplitThread = new CompactSplitThread(this);
 
     // Log rolling thread
-    this.hlogRoller = new LogRoller(this);
+    int hlogCntPerServer = this.conf.getInt(HConstants.HLOG_CNT_PER_SERVER, 2);
+    if (conf.getBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, true)) {
+      hlogCntPerServer = 1;
+      LOG.warn("Override HLOG_CNT_PER_SERVER as 1 due to HLOG_FORMAT_BACKWARD_COMPATIBILITY");
+    }
+    this.hlogRollers = new LogRoller[hlogCntPerServer];  
+    for (int i = 0; i < hlogCntPerServer; i++) { 
+      this.hlogRollers[i] = new LogRoller(this, i); 
+    }
 
     // Background thread to check for major compactions; needed if region
     // has not gotten updates in a while.  Make it run at a lesser frequency.
@@ -798,18 +810,20 @@ public class HRegionServer implements HR
     // TODO: Should we check they are alive?  If OOME could have exited already
     cacheFlusher.interruptIfNecessary();
     compactSplitThread.interruptIfNecessary();
-    hlogRoller.interruptIfNecessary();
+    for (int i = 0; i < hlogRollers.length; i++) {  
+      hlogRollers[i].interruptIfNecessary();  
+    }
     this.majorCompactionChecker.interrupt();
 
     if (killed) {
       // Just skip out w/o closing regions.
-      hlog.kill();
+      this.killAllHLogs();
     } else if (abortRequested) {
       if (this.fsOk) {
         // Only try to clean up if the file system is available
         try {
-          if (this.hlog != null) {
-            this.hlog.close();
+          if (this.hlogs != null) {
+            this.closeAllHLogs();
             LOG.info("On abort, closed hlog");
           }
         } catch (Throwable e) {
@@ -824,8 +838,8 @@ public class HRegionServer implements HR
       ArrayList<HRegion> regionsClosed = closeAllRegions();
       if (numRegionsToClose == regionsClosed.size()) {
         try {
-          if (this.hlog != null) {
-            hlog.closeAndDelete();
+          if (this.hlogs != null) { 
+            this.closeAndDeleteAllHLogs();
           }
         } catch (Throwable e) {
           LOG.error("Close and delete failed",
@@ -965,10 +979,29 @@ public class HRegionServer implements HR
       // accessors will be going against wrong filesystem (unless all is set
       // to defaults).
       this.conf.set("fs.defaultFS", this.conf.get("hbase.rootdir"));
-      // Get fs instance used by this RS
+
+      // Check the log directory:
       this.fs = FileSystem.get(this.conf);
       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
-      this.hlog = setupHLog();
+      Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("HLog dir " + logdir);
+      }
+      if (!fs.exists(logdir)) {
+        fs.mkdirs(logdir);
+      } else {
+        throw new RegionServerRunningException("region server already " +
+            "running at " + this.serverInfo.getServerName() +
+            " because logdir " + logdir.toString() + " exists");
+      }
+      // Check the old log directory
+      final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+      if (!fs.exists(oldLogDir)) {
+        fs.mkdirs(oldLogDir);
+      }
+      // Initialize the HLogs
+      setupHLog(logdir, oldLogDir, this.hlogRollers.length);
+      
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
       this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
@@ -1257,30 +1290,31 @@ public class HRegionServer implements HR
     return isOnline;
   }
 
-  private HLog setupHLog() throws IOException {
-    final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
-    Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Log dir " + logdir);
-    }
-    if (fs.exists(logdir)) {
-      throw new RegionServerRunningException("region server already " +
-        "running at " + this.serverInfo.getServerName() +
-        " because logdir " + logdir.toString() + " exists");
-    }
-    HLog log = instantiateHLog(logdir, oldLogDir);
-    return log;
-  }
-
-  // instantiate
-  protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
-    return new HLog(this.fs, logdir, oldLogDir, this.conf, this.hlogRoller,
-        null, this.serverInfo.getServerAddress().toString());
-  }
-
-
-  protected LogRoller getLogRoller() {
-    return hlogRoller;
+  private void setupHLog(Path logDir, Path oldLogDir, int totalHLogCnt) throws IOException
{
+    hlogs = new HLog[totalHLogCnt];
+    for (int i = 0; i < totalHLogCnt; i++) { 
+      hlogs[i] = new HLog(this.fs, logDir, oldLogDir, this.conf, this.hlogRollers[i], 
+          null, (this.serverInfo.getServerAddress().toString()), i, totalHLogCnt);  
+    }
+    LOG.info("Initialized " + totalHLogCnt + " HLogs");
+  }
+
+  private void killAllHLogs() { 
+    for (int i = 0; i < this.hlogs.length; i++) { 
+      hlogs[i].kill();  
+    } 
+  } 
+    
+  private void closeAllHLogs() throws IOException { 
+    for (int i = 0; i < this.hlogs.length; i++) { 
+      hlogs[i].close(); 
+    } 
+  } 
+    
+  private void closeAndDeleteAllHLogs() throws IOException {  
+    for (int i = 0; i < this.hlogs.length; i++) { 
+      hlogs[i].closeAndDelete();  
+    } 
   }
 
   /*
@@ -1448,9 +1482,13 @@ public class HRegionServer implements HR
         abort("Uncaught exception in service thread " + t.getName(), e);
       }
     };
-    Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
-        handler);
     this.cacheFlusher.start(n, handler);
+    
+    // Initialize the hlog roller threads
+    for (int i = 0; i < this.hlogRollers.length; i++) { 
+      Threads.setDaemonThreadRunning(this.hlogRollers[i], n + ".logRoller-" + i, handler);
 
+    }
+    
     Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
     Threads.setDaemonThreadRunning(this.majorCompactionChecker,
         n + ".majorCompactionChecker", handler);
@@ -1522,13 +1560,21 @@ public class HRegionServer implements HR
 
     // Verify that all threads are alive
     if (!(leases.isAlive() &&
-        cacheFlusher.isAlive() && hlogRoller.isAlive() &&
+        cacheFlusher.isAlive() && isAllHLogRollerAlive() &&
         workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
       stop("One or more threads are no longer alive");
       return false;
     }
     return true;
   }
+  
+  private boolean isAllHLogRollerAlive() {  
+    boolean res = true; 
+    for (int i = 0; i < this.hlogRollers.length; i++) { 
+      res = res && this.hlogRollers[i].isAlive(); 
+    } 
+    return res; 
+  }
 
   /*
    * Run some housekeeping tasks.
@@ -1558,22 +1604,31 @@ public class HRegionServer implements HR
     }
   }
 
-  /** @return the HLog */
-  public HLog getLog() {
-    return this.hlog;
-  }
+  @Override
+  public List<String> getHLogsList(boolean rollCurrentHLog) throws IOException {
+    List <String> allHLogsList = new ArrayList<String>();
+  
+    for (int i = 0; i < hlogs.length; i++) {
+      if (rollCurrentHLog) {
+        this.hlogs[i].rollWriter();
+      }
+      allHLogsList.addAll(this.hlogs[i].getHLogsList());
+    }
 
+    return allHLogsList;
+  }
+  
   /**
-   * @param rollCurrentHLog if true, the current HLog is rolled and will be
-   * included in the list returned
-   * @return list of HLog files
+   * Return the i th HLog in this region server
    */
-  @Override
-  public List<String> getHLogsList(boolean rollCurrentHLog) throws IOException {
-    if (rollCurrentHLog) this.hlog.rollWriter();
-    return this.hlog.getHLogsList();
+  public HLog getLog(int i) {
+    return this.hlogs[i];
   }
 
+  public int getTotalHLogCnt() {
+    return this.hlogs.length;
+  }
+  
   /**
    * Sets a flag that will cause all the HRegionServer threads to shut down
    * in an orderly fashion.  Used by unit tests.
@@ -1655,7 +1710,9 @@ public class HRegionServer implements HR
     Threads.shutdown(this.majorCompactionChecker);
     Threads.shutdown(this.workerThread);
     this.cacheFlusher.join();
-    Threads.shutdown(this.hlogRoller);
+    for (int i = 0; i < this.hlogRollers.length; i++) {  
+      Threads.shutdown(this.hlogRollers[i]);  
+    } 
     this.compactSplitThread.join();
   }
 
@@ -1943,8 +2000,21 @@ public class HRegionServer implements HR
     if (region == null) {
       try {
         zkUpdater.startRegionOpenEvent(null, true);
-        region = instantiateRegion(regionInfo, this.hlog);
+        
+        // Assign one of the HLogs to the new opening region.
+        // If the region has been opened before, assign the previous HLog instance to that
region.
+        Integer hLogIndex = null;
+        if ((hLogIndex = regionNameToHLogIDMap.get(regionInfo.getRegionNameAsString())) ==
null) {
+          hLogIndex = Integer.valueOf((this.currentHLogIndex++) % (this.hlogs.length));
+          this.regionNameToHLogIDMap.put(regionInfo.getRegionNameAsString(), hLogIndex);
+        }
+        region = instantiateRegion(regionInfo, this.hlogs[hLogIndex.intValue()]);
+        LOG.info("Initiate the region: " + regionInfo.getRegionNameAsString() + " with HLog
#" + 
+            hLogIndex);
+        
+        // Set up the favorite nodes for all the HFile for that region
         setFavoredNodes(region, favoredNodes);
+       
         // Startup a compaction early if one is needed, if store has references
         // or has too many store files
         for (Store s : region.getStores().values()) {
@@ -2012,11 +2082,11 @@ public class HRegionServer implements HR
    * @return
    * @throws IOException
    */
-  protected HRegion instantiateRegion(final HRegionInfo regionInfo, final HLog wal)
+  protected HRegion instantiateRegion(final HRegionInfo regionInfo, final HLog hlog)
   throws IOException {
     Path dir =
       HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName());
-    HRegion r = HRegion.newHRegion(dir, this.hlog, this.fs, conf, regionInfo,
+    HRegion r = HRegion.newHRegion(dir, hlog, this.fs, conf, regionInfo,
       this.cacheFlusher);
     long seqid = r.initialize(new Progressable() {
       @Override
@@ -2025,8 +2095,8 @@ public class HRegionServer implements HR
       }
     });
     // If a wal and its seqid is < that of new region, use new regions seqid.
-    if (wal != null) {
-      if (seqid > wal.getSequenceNumber()) wal.setSequenceNumber(seqid);
+    if (hlog != null) {
+      if (seqid > hlog.getSequenceNumber()) hlog.setSequenceNumber(seqid);
     }
     return r;
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
Thu Nov 22 00:03:31 2012
@@ -48,13 +48,17 @@ class LogRoller extends HasThread implem
   private volatile long lastrolltime = System.currentTimeMillis();
   // Period to roll log.
   private final long rollperiod;
+  private final int hlogIndexID;
+  private final String logRollerName;
 
   /** @param server */
-  public LogRoller(final HRegionServer server) {
+  public LogRoller(final HRegionServer server, int hlogIndexID) {
     super();
     this.server = server;
     this.rollperiod =
       this.server.conf.getLong("hbase.regionserver.logroll.period", 3600000);
+    this.hlogIndexID = hlogIndexID;
+    this.logRollerName = "HLogRoller-" + hlogIndexID + " ";
   }
 
   @Override
@@ -82,15 +86,15 @@ class LogRoller extends HasThread implem
         // Time for periodic roll
         if (LOG.isDebugEnabled()) {
           if (modifiedRollPeriod == this.rollperiod) {
-            LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
+            LOG.debug(logRollerName + "roll period " + this.rollperiod + "ms elapsed");
           }
         }
       }
       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
       try {
-        byte [][] regionsToFlush = server.getLog().rollWriter();
+        byte [][] regionsToFlush = server.getLog(this.hlogIndexID).rollWriter();
         if (status != null) {
-          status.markComplete("Log rolling succeeded after " + retried +
+          status.markComplete(logRollerName + "Log rolling succeeded after " + retried +
               "retires.");
           retried = -1;
           status = null;
@@ -100,10 +104,10 @@ class LogRoller extends HasThread implem
         }
       } catch (IOException ex) {
         retried++;
-        String msg = "log roll failed." +
+        String msg = logRollerName + "log roll failed." +
             " retried=" + retried + ", " + StringUtils.stringifyException(ex);
         if (status == null) {
-          LOG.warn("Log rolling failed with ioe. Will retry." +
+          LOG.warn(logRollerName + "Log rolling failed with ioe. Will retry." +
               " Will update status with exceptionif retry fails " +
               RemoteExceptionHandler.checkIOException(ex));
           status = TaskMonitor.get().createStatus(msg);
@@ -112,7 +116,7 @@ class LogRoller extends HasThread implem
         }
         server.checkFileSystem();
       } catch (Exception ex) {
-        LOG.fatal("Log rolling failed, unexpected exception. Force Aborting",
+        LOG.fatal(logRollerName + "Log rolling failed, unexpected exception. Force Aborting",
             ex);
         server.forceAbort();
       } finally {
@@ -122,9 +126,9 @@ class LogRoller extends HasThread implem
       }
     }
     if (status != null) {
-      status.abort("LogRoller exiting while log was unstable and roll pending");
+      status.abort(logRollerName + "LogRoller exiting while log was unstable and roll pending");
     }
-    LOG.info("LogRoller exiting.");
+    LOG.info(logRollerName + "exiting.");
   }
 
   private void scheduleFlush(final byte [] region) {
@@ -139,7 +143,7 @@ class LogRoller extends HasThread implem
       }
     }
     if (!scheduled) {
-    LOG.warn("Failed to schedule flush of " +
+    LOG.warn(logRollerName + "Failed to schedule flush of " +
       Bytes.toString(region) + "r=" + r + ", requester=" + requester);
     }
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Thu Nov 22 00:03:31 2012
@@ -252,6 +252,9 @@ public class HLog implements Syncable {
    */
   private final int maxLogs;
 
+  private final int hlogIndexID;
+  private final String hlogName;
+
   /**
    * Thread that handles group commit
    */
@@ -391,7 +394,7 @@ public class HLog implements Syncable {
   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
               final Configuration conf, final LogRollListener listener)
   throws IOException {
-    this(fs, dir, oldLogDir, conf, listener, null, null);
+    this(fs, dir, oldLogDir, conf, listener, null, null, 0, 1);
   }
 
   /**
@@ -410,17 +413,19 @@ public class HLog implements Syncable {
    * @param prefix should always be hostname and port in distributed env and
    *        it will be URL encoded before being used.
    *        If prefix is null, "hlog" will be used
+   * @param hlogIndexID the index ID for the current HLog
+   * @param totalHLogCnt the total number of HLog in the current region server
    * @throws IOException
    */
   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
               final Configuration conf, final LogRollListener listener,
-              final LogActionsListener actionListener, final String prefix)
+              final LogActionsListener actionListener, final String prefix, int hlogIndexID,
+              int totalHLogCnt)
   throws IOException {
     super();
     syncFailureAbortStrategy = conf.getBoolean("hbase.hlog.sync.failure.abort.process", true)
?
         RuntimeHaltAbortStrategy.INSTANCE : RuntimeExceptionAbortStrategy.INSTANCE;
     this.fs = fs;
-    this.dir = dir;
     this.conf = conf;
     this.listener = listener;
     this.flushlogentries =
@@ -432,14 +437,20 @@ public class HLog implements Syncable {
     this.logrollsize = (long)(this.blocksize * multi);
     this.optionalFlushInterval =
       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
-    if (fs.exists(dir)) {
-      throw new IOException("Target HLog directory already exists: " + dir);
+    
+    if (!fs.exists(oldLogDir)) {
+      fs.mkdirs(oldLogDir);
     }
-    fs.mkdirs(dir);
     this.oldLogDir = oldLogDir;
-    if (!fs.exists(oldLogDir)) {
-      fs.mkdirs(this.oldLogDir);
+    
+    if (!fs.exists(dir)) {
+      fs.mkdirs(dir);
     }
+    this.dir = dir;
+    
+    this.hlogIndexID = hlogIndexID;
+    this.hlogName = "HLog-" + this.hlogIndexID + " ";
+
     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
     LOG.info("HLog configuration: blocksize=" + this.blocksize +
@@ -450,10 +461,19 @@ public class HLog implements Syncable {
     if (actionListener != null) {
       addLogActionsListerner(actionListener);
     }
-    // If prefix is null||empty then just name it hlog
-    this.prefix = prefix == null || prefix.isEmpty() ?
-        "hlog" : URLEncoder.encode(prefix, "UTF8");
-    // rollWriter sets this.hdfs_out if it can.
+    
+    // If prefix is null||empty, then just name it hlog.
+    if (conf.getBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, true)) {
+      this.prefix = prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix,
"UTF8");
+      LOG.warn("Still using old hlog prefix due to HLOG_FORMAT_BACK_COMPATIBILITY: " + this.prefix);
+    } else {
+      // Also append the current hlogIndexId-totalHLogCnt to the prefix.
+      this.prefix = (prefix == null || prefix.isEmpty() ? 
+          "hlog" : URLEncoder.encode(prefix, "UTF8"))
+          + "." + hlogIndexID + "-" + totalHLogCnt;
+      LOG.info("HLog prefix is " + this.prefix);
+    }
+
     rollWriter();
 
     // handle the reflection necessary to call getNumCurrentReplicas()
@@ -475,10 +495,10 @@ public class HLog implements Syncable {
     } else {
       LOG.info("getNumCurrentReplicas--HDFS-826 not available" );
     }
-
+    
     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
     Threads.setDaemonThreadRunning(logSyncerThread.getThread(),
-        Thread.currentThread().getName() + ".logSyncer");
+        Thread.currentThread().getName() + ".logSyncer-" + hlogIndexID);
   }
 
   /**
@@ -501,7 +521,7 @@ public class HLog implements Syncable {
         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
       // This could spin on occasion but better the occasional spin than locking
       // every increment of sequence number.
-      LOG.debug("Change sequence number from " + logSeqNum + " to " + newvalue);
+      LOG.debug(hlogName + "Change sequence number from " + logSeqNum + " to " + newvalue);
     }
   }
 
@@ -621,7 +641,7 @@ public class HLog implements Syncable {
             + FSUtils.getPath(oldFile), e);
       }
 
-      LOG.info((oldFile != null ? "Roll " + FSUtils.getPath(oldFile)
+      LOG.info(hlogName + (oldFile != null ? "Roll " + FSUtils.getPath(oldFile)
           + ", entries=" + oldNumEntries + ", filesize="
           + this.fs.getFileStatus(oldFile).getLen() + ". " : "")
           + "New hlog " + FSUtils.getPath(newPath)
@@ -637,7 +657,7 @@ public class HLog implements Syncable {
         if (this.firstSeqWrittenInCurrentMemstore.size() <= 0
             && this.firstSeqWrittenInSnapshotMemstore.size() <= 0) {
           LOG.debug("Last sequence written is empty. Deleting all old hlogs");
-          // If so, then no new writes have come in since all regions were
+         // If so, then no new writes have come in since all regions were
           // flushed (and removed from the firstSeqWrittenInXXX maps). Means can
           // remove all but currently open log file.
           TreeSet<Long> tempSet = new TreeSet<Long>(outputfiles.keySet());
@@ -728,7 +748,7 @@ public class HLog implements Syncable {
       if (LOG.isDebugEnabled()) {
         // Find associated region; helps debugging.
         byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
-        LOG.debug("Found " + logsToRemove + " hlogs to remove" +
+        LOG.debug(hlogName + "Found " + logsToRemove + " hlogs to remove" +
           " out of total " + this.outputfiles.size() + ";" +
           " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
           " from region " + Bytes.toString(oldestRegion));
@@ -753,7 +773,7 @@ public class HLog implements Syncable {
           if (i > 0) sb.append(", ");
           sb.append(Bytes.toStringBinary(regions[i]));
         }
-        LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
+        LOG.info(hlogName + ": Too many hlogs: logs=" + logCount + ", maxlogs=" +
             this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
             sb.toString());
       }
@@ -856,7 +876,7 @@ public class HLog implements Syncable {
       try {
           writer.sync();
       } catch (IOException ioe) {
-        syncFailureAbortStrategy.abort("log sync failed when trying to close "
+        syncFailureAbortStrategy.abort(hlogName + " log sync failed when trying to close
"
             + writer, ioe);
       }
     }
@@ -875,7 +895,7 @@ public class HLog implements Syncable {
         Path fname = computeFilename(filenum);
         if (!tryRecoverFileLease(fs, fname, conf)) {
           IOException ioe2 =
-              new IOException("lease recovery pending for " + fname, ioe);
+              new IOException(hlogName + "lease recovery pending for " + fname, ioe);
           throw ioe2;
         }
       }
@@ -890,7 +910,7 @@ public class HLog implements Syncable {
 
   private void archiveLogFile(final Path p, final Long seqno) throws IOException {
     Path newPath = getHLogArchivePath(this.oldLogDir, p);
-    LOG.info("moving old hlog file " + FSUtils.getPath(p) +
+    LOG.info(hlogName + "moving old hlog file " + FSUtils.getPath(p) +
       " whose highest sequence/edit id is " + seqno + " to " +
       FSUtils.getPath(newPath));
     this.fs.rename(p, newPath);
@@ -934,7 +954,7 @@ public class HLog implements Syncable {
       fs.rename(file.getPath(),
           getHLogArchivePath(this.oldLogDir, file.getPath()));
     }
-    LOG.debug("Moved " + files.length + " log files to " +
+    LOG.debug(hlogName + "Moved " + files.length + " log files to " +
         FSUtils.getPath(this.oldLogDir));
     fs.delete(dir, true);
   }
@@ -956,7 +976,7 @@ public class HLog implements Syncable {
         logSyncerThread.interrupt();
       }
     } catch (InterruptedException e) {
-      LOG.error("Exception while waiting for syncer thread to die", e);
+      LOG.error(hlogName + "Exception while waiting for syncer thread to die", e);
     }
     
     if (LOG.isDebugEnabled()) {
@@ -1225,7 +1245,7 @@ public class HLog implements Syncable {
         try {
           this.syncTillHere += this.logBuffer.appendAndSync();
         } catch (IOException e) {
-          syncFailureAbortStrategy.abort("Could not sync hlog. Aborting", e);
+          syncFailureAbortStrategy.abort(hlogName + "Could not sync hlog. Aborting", e);
         }
       }
       
@@ -1366,10 +1386,10 @@ public class HLog implements Syncable {
    * by the failure gets restored to the memstore.
    */
   public void abortCacheFlush(byte[] regionName) {
-    LOG.debug("Aborting cache flush of region " +
+    LOG.debug(hlogName + "Aborting cache flush of region " +
               Bytes.toString(regionName));
     // Let us leave the old Seq number in this.firstSeqWrittenInPrevMemstore
-    this.cacheFlushLock.readLock().unlock();
+   this.cacheFlushLock.readLock().unlock();
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
Thu Nov 22 00:03:31 2012
@@ -74,11 +74,15 @@ public class TestDistributedLogSplitting
   }
 
   final int NUM_RS = 6;
+  final int HLOG_CNT_PER_SERVER = 2;
 
   MiniHBaseCluster cluster;
   HMaster master;
   Configuration conf;
   HBaseTestingUtility TEST_UTIL;
+  byte[] table = Bytes.toBytes("table");
+  byte[] family = Bytes.toBytes("family");
+  byte[] value = Bytes.toBytes("value");
 
   @Before
   public void before() throws Exception {
@@ -92,6 +96,8 @@ public class TestDistributedLogSplitting
     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
     conf.setFloat("hbase.regions.slop", (float)100.0); // no load balancing
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
+    conf.setInt(HConstants.HLOG_CNT_PER_SERVER, HLOG_CNT_PER_SERVER);
+    conf.setBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, false);
     TEST_UTIL = new HBaseTestingUtility(conf);
     cluster = TEST_UTIL.startMiniCluster(num_rs);
     int live_rs;
@@ -120,7 +126,7 @@ public class TestDistributedLogSplitting
     startCluster(NUM_RS);
 
 
-    HTable ht = installTable("table", "family", NUM_REGIONS_TO_CREATE);
+    HTable ht = installTable(table, family, NUM_REGIONS_TO_CREATE);
     populateDataInTable(NUM_ROWS_PER_REGION, "family");
 
 
@@ -155,17 +161,18 @@ public class TestDistributedLogSplitting
     LOG.info("testRecoveredEdits");
     startCluster(NUM_RS);
     final int NUM_LOG_LINES = 1000;
+    final int NUM_REGIONS = 40;
     final SplitLogManager slm = master.getSplitLogManager();
     FileSystem fs = master.getFileSystem();
-
     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
+
     HRegionServer hrs = rsts.get(0).getRegionServer();
     Path rootdir = FSUtils.getRootDir(conf);
     final Path logDir = new Path(rootdir,
         HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
 
-    installTable("table", "family", 40);
-    byte[] table = Bytes.toBytes("table");
+    HTable htable = installTable(table, family, NUM_REGIONS);
+    
     Collection<HRegion> regions = new LinkedList<HRegion>(hrs.getOnlineRegions());
     LOG.info("#regions = " + regions.size());
     Iterator<HRegion> it = regions.iterator();
@@ -176,14 +183,18 @@ public class TestDistributedLogSplitting
         it.remove();
       }
     }
-    makeHLog(hrs.getLog(), regions, "table",
-        NUM_LOG_LINES, 100);
-
+    for (HRegion region : regions) {
+      for (int i = 0; i < NUM_LOG_LINES; i++) {
+        Put p = new Put(region.getStartKey());
+        p.add(family, Bytes.toBytes("cf"+i), i, value);
+        htable.put(p);
+      }
+    }
+    
     slm.splitLogDistributed(logDir);
 
-    int count = 0;
     for (HRegion rgn : regions) {
-
+      int count = 0;
       Path tdir = HTableDescriptor.getTableDir(rootdir, table);
       Path editsdir =
         HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
@@ -194,15 +205,15 @@ public class TestDistributedLogSplitting
       int c = countHLog(files[0].getPath(), fs, conf);
       count += c;
       LOG.info(c + " edits in " + files[0].getPath());
+      assertEquals(NUM_LOG_LINES, count);
     }
-    assertEquals(NUM_LOG_LINES, count);
   }
 
   @Test
   public void testWorkerAbort() throws Exception {
     LOG.info("testWorkerAbort");
     startCluster(1);
-    final int NUM_LOG_LINES = 10000;
+    final int NUM_LOG_LINES = 1000;
     final SplitLogManager slm = master.getSplitLogManager();
     FileSystem fs = master.getFileSystem();
 
@@ -212,10 +223,15 @@ public class TestDistributedLogSplitting
     final Path logDir = new Path(rootdir,
         HLog.getHLogDirectoryName(hrs.getServerInfo().getServerName()));
 
-    installTable("table", "family", 40);
-    byte[] table = Bytes.toBytes("table");
-    makeHLog(hrs.getLog(), hrs.getOnlineRegions(), "table",
-        NUM_LOG_LINES, 100);
+    HTable htable = installTable(table, family, 40);
+
+    for (HRegion region : hrs.getOnlineRegions()) {
+      for (int i = 0; i < NUM_LOG_LINES; i++) {
+        Put p = new Put(region.getStartKey());
+        p.add(family, Bytes.toBytes("cf"+i), i, value);
+        htable.put(p);
+      }
+    }
 
     new Thread() {
       public void run() {
@@ -287,10 +303,7 @@ public class TestDistributedLogSplitting
     t.join();
   }
 
-  HTable installTable(String tname, String fname, int nrs ) throws Exception {
-    // Create a table with regions
-    byte [] table = Bytes.toBytes(tname);
-    byte [] family = Bytes.toBytes(fname);
+  HTable installTable(byte [] tname, byte [] fname, int nrs ) throws Exception {
     LOG.info("Creating table with " + nrs + " regions");
     HTable ht = TEST_UTIL.createTable(table, new byte[][]{family},
         3, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), nrs);
@@ -321,43 +334,6 @@ public class TestDistributedLogSplitting
     }
   }
 
-  public void makeHLog(HLog log,
-      Collection<HRegion> rgns, String tname,
-      int num_edits, int edit_size) throws IOException {
-
-    List<HRegion> regions = new ArrayList<HRegion>(rgns);
-    byte[] table = Bytes.toBytes(tname);
-    byte[] value = new byte[edit_size];
-    for (int i = 0; i < edit_size; i++) {
-      value[i] = (byte)('a' + (i % 26));
-    }
-    int n = regions.size();
-    int[] counts = new int[n];
-    int j = 0;
-    for (int i = 0; i < num_edits; i += 1) {
-      WALEdit e = new WALEdit();
-      byte [] row = Bytes.toBytes("r" + Integer.toString(i));
-      byte [] family = Bytes.toBytes("f");
-      byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i));
-      e.add(new KeyValue(row, family, qualifier,
-          System.currentTimeMillis(), value));
-      // LOG.info("Region " + i + ": " + e);
-      j++;
-      log.append(regions.get(j % n).getRegionInfo(), table, e, System.currentTimeMillis());
-      counts[j % n] += 1;
-      // if ((i % 8096) == 0) {
-        // log.sync();
-      //  }
-    }
-    log.sync();
-    log.close();
-    for (int i = 0; i < n; i++) {
-      LOG.info("region " + regions.get(i).getRegionNameAsString() +
-          " has " + counts[i] + " edits");
-    }
-    return;
-  }
-
   private int countHLog(Path log, FileSystem fs, Configuration conf)
   throws IOException {
     int count = 0;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Thu Nov 22 00:03:31 2012
@@ -113,6 +113,7 @@ public class TestHLog  {
         .setInt("ipc.client.connect.max.retries", 1);
     TEST_UTIL.getConfiguration().setInt(
         "dfs.client.block.recovery.retries", 1);
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY,
false);
     TEST_UTIL.startMiniCluster(3);
 
     conf = TEST_UTIL.getConfiguration();

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
Thu Nov 22 00:03:31 2012
@@ -117,7 +117,7 @@ public class TestHLogSplit {
     TEST_UTIL.getConfiguration().
             setClass("hbase.regionserver.hlog.writer.impl",
                 InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
-
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY,
false);
     TEST_UTIL.startMiniDFSCluster(2);
   }
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogActionsListener.java
Thu Nov 22 00:03:31 2012
@@ -84,7 +84,7 @@ public class TestLogActionsListener {
   public void testActionListener() throws Exception {
     DummyLogActionsListener list = new DummyLogActionsListener();
     DummyLogActionsListener laterList = new DummyLogActionsListener();
-    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, list, null);
+    HLog hlog = new HLog(fs, logDir, oldLogDir, conf, null, list, null, 0 ,1);
     HRegionInfo hri = new HRegionInfo(new HTableDescriptor(SOME_BYTES),
         SOME_BYTES, SOME_BYTES, false);
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1412373&r1=1412372&r2=1412373&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Thu Nov 22 00:03:31 2012
@@ -127,7 +127,7 @@ public class TestLogRolling extends HBas
    // the namenode might still try to choose the recently-dead datanode 
    // for a pipeline, so try to a new pipeline multiple times
    conf.setInt("dfs.client.block.write.retries", 30);
-   
+   conf.setBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY, false);
    super.setUp();
   }
 
@@ -135,7 +135,7 @@ public class TestLogRolling extends HBas
     // When the META table can be opened, the region servers are running
     new HTable(conf, HConstants.META_TABLE_NAME);
     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
-    this.log = server.getLog();
+    this.log = server.getLog(0);
 
     // Create the test table and open it
     HTableDescriptor desc = new HTableDescriptor(tableName);
@@ -216,7 +216,7 @@ public class TestLogRolling extends HBas
     // When the META table can be opened, the region servers are running
     new HTable(conf, HConstants.META_TABLE_NAME);
     this.server = cluster.getRegionServer(0);
-    this.log = server.getLog();
+    this.log = server.getLog(0);
     
     assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
     // don't run this test without append support (HDFS-200 & HDFS-142)

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMultipleHLogs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMultipleHLogs.java?rev=1412373&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMultipleHLogs.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMultipleHLogs.java
Thu Nov 22 00:03:31 2012
@@ -0,0 +1,103 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestMultipleHLogs {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private final static int USER_REGION_NUM = 3;
+  private final static int TOTAL_REGION_NUM = USER_REGION_NUM + 2;
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path dir;
+  private static MiniDFSCluster cluster;
+
+  private static Path hbaseDir;
+  private static Path oldLogDir;
+  
+  @Before
+  public void setUp() throws Exception {}
+  
+  @After
+  public void tearDown() throws Exception {}
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // quicker heartbeat interval for faster DN death notification
+    TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000);
+    // faster failover with cluster.shutdown();fs.close() idiom
+    TEST_UTIL.getConfiguration().setInt("ipc.client.connect.max.retries", 1);
+    TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1);
+    
+    TEST_UTIL.getConfiguration().setInt(HConstants.HLOG_CNT_PER_SERVER, TOTAL_REGION_NUM);
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.HLOG_FORMAT_BACKWARD_COMPATIBILITY,
false);
+    TEST_UTIL.startMiniCluster(1);
+  }
+  
+  @Test
+  public void testMultipleHLogs() throws IOException, InterruptedException {
+    final byte[] CF = Bytes.toBytes("cf");
+    final byte[] QAULIFIER = Bytes.toBytes("qaulifier");
+    final byte[] VALUE = Bytes.toBytes("VALUE");
+    final int actualStartKey = 0;
+    final int actualEndKey = Integer.MAX_VALUE;
+    final int keysPerRegion = (actualEndKey - actualStartKey) / USER_REGION_NUM;
+    final int splitStartKey = actualStartKey + keysPerRegion;
+    final int splitEndKey = actualEndKey - keysPerRegion;
+    final String keyFormat = "%08x";
+    final HTable table = TEST_UTIL.createTable(Bytes.toBytes("testMultipleHLogs"),
+        new byte[][]{CF},
+        1,
+        Bytes.toBytes(String.format(keyFormat, splitStartKey)),
+        Bytes.toBytes(String.format(keyFormat, splitEndKey)),
+        USER_REGION_NUM);
+    // Put some data for each Region
+    for (byte[] row : table.getStartKeys()) {
+      Put p = new Put(row);
+      p.add(CF, QAULIFIER, VALUE);
+      table.put(p);
+      table.flushCommits();
+    }
+    HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    
+    final Path logDir = new Path(FSUtils.getRootDir(TEST_UTIL.getConfiguration()),
+        HLog.getHLogDirectoryName(regionServer.getServerInfo().getServerName()));
+    FileStatus[] files = TEST_UTIL.getDFSCluster().getFileSystem().listStatus(logDir);
+    assertEquals(TOTAL_REGION_NUM, files.length);
+    assertEquals(TOTAL_REGION_NUM, regionServer.getTotalHLogCnt());
+    assertEquals(TOTAL_REGION_NUM, regionServer.getOnlineRegions().size());
+    
+    for (HRegion region : regionServer.getOnlineRegions()) {
+      HLog hlog = region.getLog();
+      hlog.rollWriter();
+      assertEquals(1, hlog.getNumLogFiles());
+    }
+    
+    for (FileStatus fileStatus : files) {
+      assertTrue(HLog.validateHLogFilename(fileStatus.getPath().getName()));
+    }
+  }
+}



Mime
View raw message