hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1403627 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver: HRegionServer.java MemStoreFlusher.java wal/HLog.java
Date Tue, 30 Oct 2012 06:14:45 GMT
Author: liyin
Date: Tue Oct 30 06:14:45 2012
New Revision: 1403627

URL: http://svn.apache.org/viewvc?rev=1403627&view=rev
Log:
HBASE-6980: Parallel Flushing of Memstores

Author: kannan

Summary:
For write dominated workloads, single threaded memstore flushing is an unnecessary bottleneck.
With a single flusher thread, we are basically not setup to take advantage of the aggregate
throughput that multi-disk nodes provide.

* For puts with WAL enabled, the bottleneck is more likely the "single" WAL per region server.
So this particular fix may not buy as much unless we unlock that bottleneck with multiple
commit logs per region server. (Topic for a separate JIRA-- HBASE-6981).

* But for puts with WAL disabled (e.g., when using HBASE-5783 style fast bulk imports), we
should be able to support much better ingest rates with parallel flushing of memstores.

----

For the purposes of seeing some best case benefits of this change, in my setup I:
1) had one Region Server (with 20 regions)
2) disabled WAL
3) disable all compactions; just flushing.
4) Set the number of flusher threads at 6 (I have left the default at 2 though, because we
need to account for WAL and compaction activity as well in normal setups).

The command line for the loadtest I ran was:

bin/hbase org.apache.hadoop.hbase.util.LoadTestTool -num_keys 2000000 -write 1:1000000:50
-tn loadtest -zk hbasedev134.ash3.facebook.com -compression NONE
Without this change:

[So essentially writing really big KVs (1M) or from 50 concurrent threads.]

----

Baseline:
--------
* Without parallel flushing, the max we throughput we could push to was about ~100MB/s.
* iostat clearly revealed that at any one time only a single disk was busy.
* There were often periods were memstore size would reach too high, and start blocking updates.

With parallel flushing (6 threads):

* Was able to get about 375-400MB/s sustained.
* iostat often now show 3-4 multiple disks busy at any time.

-----

Initial implementation didn't get true concurrency, because of the cacheFlushLock which ended
up serializing the flushes anyway. I have made this a R/W lock, and jstack dumps now confirmed
that several threads were actually in the flush logic concurrently.

----

I had an earlier version of the implementation, but it came to my attention that there was
a similar uncommitted patch done for trunk by Chenhui (HBASE-6466). So this patch is closer
to Chenhui's implementation, but for 89-fb. It is not identical because there are other differences
in the trunk code base.

--------

Test Plan: Going to run full suite of tests. And will post update when that run completes.

Reviewers: liyintang, kranganathan

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D605336

Task ID: 1803690

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1403627&r1=1403626&r2=1403627&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue Oct 30 06:14:45 2012
@@ -1455,8 +1455,7 @@ public class HRegionServer implements HR
     };
     Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
         handler);
-    Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
-      handler);
+    this.cacheFlusher.start(n, handler);
     Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
     Threads.setDaemonThreadRunning(this.majorCompactionChecker,
         n + ".majorCompactionChecker", handler);
@@ -1660,7 +1659,7 @@ public class HRegionServer implements HR
   protected void join() {
     Threads.shutdown(this.majorCompactionChecker);
     Threads.shutdown(this.workerThread);
-    Threads.shutdown(this.cacheFlusher);
+    this.cacheFlusher.join();
     Threads.shutdown(this.hlogRoller);
     this.compactSplitThread.join();
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1403627&r1=1403626&r2=1403627&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Tue Oct 30 06:14:45 2012
@@ -26,9 +26,11 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
@@ -39,7 +41,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Thread that flushes cache on request
@@ -50,7 +52,7 @@ import java.util.concurrent.locks.Reentr
  *
  * @see FlushRequester
  */
-class MemStoreFlusher extends HasThread implements FlushRequester {
+class MemStoreFlusher implements FlushRequester {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
   // These two data members go together.  Any entry in the one must have
   // a corresponding entry in the other.
@@ -61,7 +63,7 @@ class MemStoreFlusher extends HasThread 
 
   private final long threadWakeFrequency;
   private final HRegionServer server;
-  private final ReentrantLock lock = new ReentrantLock();
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
@@ -75,6 +77,9 @@ class MemStoreFlusher extends HasThread 
   private long blockingStoreFilesNumber;
   private long blockingWaitTime;
 
+  private FlushHandler[] flushHandlers = null;
+  private int handlerCount;
+
   /**
    * @param conf
    * @param server
@@ -103,6 +108,10 @@ class MemStoreFlusher extends HasThread 
     }
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
+
+    // number of "memstore flusher" threads per region server
+    this.handlerCount = conf.getInt("hbase.regionserver.flusher.count", 2);
+
     LOG.info("globalMemStoreLimit=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
       ", globalMemStoreLimitLowMark=" +
@@ -134,32 +143,38 @@ class MemStoreFlusher extends HasThread 
     return (long)(max * limit);
   }
 
-  @Override
-  public void run() {
-    while (!this.server.isStopRequested()) {
-      FlushQueueEntry fqe = null;
-      try {
-        fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (fqe == null) {
+  private class FlushHandler extends HasThread {
+
+    FlushHandler(String threadName) {
+      this.setDaemon(true);
+      this.setName(threadName);
+    }
+
+    @Override
+    public void run() {
+      while (!server.isStopRequested()) {
+        FlushQueueEntry fqe = null;
+        try {
+          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          if (fqe == null) {
+            continue;
+          }
+          if (!flushRegion(fqe, getName())) {
+            LOG.warn("Failed to flush " + fqe.region);
+          }
+        } catch (InterruptedException ex) {
           continue;
+        } catch (ConcurrentModificationException ex) {
+          continue;
+        } catch (Exception ex) {
+          LOG.error("Cache flush failed" +
+              (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName()))
: ""),
+              ex);
+          server.checkFileSystem();
         }
-        if (!flushRegion(fqe)) {
-          LOG.warn("Failed to flush " + fqe.region);
-        }
-      } catch (InterruptedException ex) {
-        continue;
-      } catch (ConcurrentModificationException ex) {
-        continue;
-      } catch (Exception ex) {
-        LOG.error("Cache flush failed" +
-          (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) :
""),
-          ex);
-        server.checkFileSystem();
       }
+      LOG.info(getName() + " exiting");
     }
-    this.regionsInQueue.clear();
-    this.flushQueue.clear();
-    LOG.info(getName() + " exiting");
   }
 
   public void request(HRegion r) {
@@ -178,11 +193,49 @@ class MemStoreFlusher extends HasThread 
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    lock.lock();
+    lock.writeLock().lock();
     try {
-      this.interrupt();
+      for (FlushHandler flushHandler : flushHandlers) {
+        if (flushHandler != null)
+          flushHandler.interrupt();
+      }
     } finally {
-      lock.unlock();
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Start the flusher threads.
+   *
+   * @param rsThreadName prefix for thread name (since there might be multiple
+   *                     region servers running within the same JVM.
+   * @param eh
+   */
+  void start(String rsThreadName, UncaughtExceptionHandler eh) {
+    flushHandlers = new FlushHandler[handlerCount];
+    for (int i = 0; i < flushHandlers.length; i++) {
+      flushHandlers[i] = new FlushHandler(rsThreadName + ".cacheFlusher." + i);
+      if (eh != null) {
+        flushHandlers[i].setUncaughtExceptionHandler(eh);
+      }
+      flushHandlers[i].start();
+    }
+  }
+
+  boolean isAlive() {
+    for (FlushHandler flushHander : flushHandlers) {
+      if (flushHander != null && flushHander.isAlive()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  void join() {
+    for (FlushHandler flushHandler : flushHandlers) {
+      if (flushHandler != null) {
+        Threads.shutdown(flushHandler.getThread());
+      }
     }
   }
 
@@ -194,7 +247,7 @@ class MemStoreFlusher extends HasThread 
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
-  private boolean flushRegion(final FlushQueueEntry fqe) {
+  private boolean flushRegion(final FlushQueueEntry fqe, String why) {
     HRegion region = fqe.region;
     if (!fqe.region.getRegionInfo().isMetaRegion() &&
         isTooManyStoreFiles(region)) {
@@ -220,7 +273,7 @@ class MemStoreFlusher extends HasThread 
          */
         if (!this.server.compactSplitThread.requestSplit(region)
             || region.hasReferences()) {
-          this.server.compactSplitThread.requestCompaction(region, getName());
+          this.server.compactSplitThread.requestCompaction(region, why);
         }
         // Put back on the queue.  Have it come back out of the queue
         // after a delay of this.blockingWaitTime / 100 ms.
@@ -229,7 +282,7 @@ class MemStoreFlusher extends HasThread 
         return true;
       }
     }
-    return flushRegion(region, false);
+    return flushRegion(region, why, false);
   }
 
   /*
@@ -244,7 +297,9 @@ class MemStoreFlusher extends HasThread 
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
-  private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
+  private boolean flushRegion(final HRegion region, String why,
+    final boolean emergencyFlush) {
+
     synchronized (this.regionsInQueue) {
       FlushQueueEntry fqe = this.regionsInQueue.remove(region);
       if (fqe != null && emergencyFlush) {
@@ -252,11 +307,11 @@ class MemStoreFlusher extends HasThread 
         // emergencyFlush, then item was removed via a flushQueue.poll.
         flushQueue.remove(fqe);
      }
-     lock.lock();
+     lock.readLock().lock();
     }
     try {
       if (region.flushcache()) {
-        server.compactSplitThread.requestCompaction(region, getName());
+        server.compactSplitThread.requestCompaction(region, why);
       }
       server.getMetrics().addFlush(region.getRecentFlushInfo());
     } catch (IOException ex) {
@@ -267,7 +322,7 @@ class MemStoreFlusher extends HasThread 
       server.checkFileSystem();
       return false;
     } finally {
-      lock.unlock();
+      lock.readLock().unlock();
     }
     return true;
   }
@@ -324,14 +379,14 @@ class MemStoreFlusher extends HasThread 
         " exceeded; currently " +
         StringUtils.humanReadableInt(globalMemStoreSize) + " and flushing till " +
         StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-      if (!flushRegion(biggestMemStoreRegion, true)) {
+      if (!flushRegion(biggestMemStoreRegion, "emergencyFlush", true)) {
         LOG.warn("Flush failed");
         break;
       }
       regionsToCompact.add(biggestMemStoreRegion);
     }
     for (HRegion region : regionsToCompact) {
-      server.compactSplitThread.requestCompaction(region, getName());
+      server.compactSplitThread.requestCompaction(region, "emergencyFlush");
     }
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1403627&r1=1403626&r2=1403627&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Tue Oct 30 06:14:45 2012
@@ -55,8 +55,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -232,7 +232,7 @@ public class HLog implements Syncable {
 
   // This lock prevents starting a log roll during a cache flush.
   // synchronized is insufficient because a cache flush spans two method calls.
-  private final Lock cacheFlushLock = new ReentrantLock();
+  private final ReentrantReadWriteLock cacheFlushLock = new ReentrantReadWriteLock();
 
   // We synchronize on updateLock to prevent updates and to prevent a log roll
   // during an update
@@ -505,7 +505,7 @@ public class HLog implements Syncable {
       return null;
     }
     byte [][] regionsToFlush = null;
-    this.cacheFlushLock.lock();
+    this.cacheFlushLock.writeLock().lock();
     long t0 = 0;
     long t1 = 0;
     try {
@@ -602,7 +602,7 @@ public class HLog implements Syncable {
         }
       }
     } finally {
-      this.cacheFlushLock.unlock();
+      this.cacheFlushLock.writeLock().unlock();
     }
     return regionsToFlush;
   }
@@ -884,7 +884,7 @@ public class HLog implements Syncable {
       LOG.error("Exception while waiting for syncer thread to die", e);
     }
 
-    cacheFlushLock.lock();
+    cacheFlushLock.writeLock().lock();
     try {
       synchronized (updateLock) {
         this.closed = true;
@@ -894,7 +894,7 @@ public class HLog implements Syncable {
         cleanupCurrentWriter(-1);
       }
     } finally {
-      cacheFlushLock.unlock();
+      cacheFlushLock.writeLock().unlock();
     }
   }
 
@@ -1287,7 +1287,7 @@ public class HLog implements Syncable {
    * @see #abortCacheFlush()
    */
   public long startCacheFlush(final byte [] regionName) {
-    this.cacheFlushLock.lock();
+    this.cacheFlushLock.readLock().lock();
     Long seq = this.lastSeqWritten.remove(regionName);
     // seq is the lsn of the oldest edit associated with this region. If a
     // snapshot already exists - because the last flush failed - then seq will
@@ -1330,7 +1330,6 @@ public class HLog implements Syncable {
           return;
         }
         try {
-          long now = System.currentTimeMillis();
           WALEdit edit = completeCacheFlushLogEdit();
           HLogKey key = makeKey(regionName, tableName, logSeqId,
               System.currentTimeMillis());
@@ -1347,7 +1346,7 @@ public class HLog implements Syncable {
       // Cleaning up of lastSeqWritten is in the finally clause because we
       // don't want to confuse getOldestOutstandingSeqNum()
       this.lastSeqWritten.remove(regionName);
-      this.cacheFlushLock.unlock();
+      this.cacheFlushLock.readLock().unlock();
     }
   }
 
@@ -1386,7 +1385,7 @@ public class HLog implements Syncable {
             current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq, new Throwable());
       }
     }
-    this.cacheFlushLock.unlock();
+    this.cacheFlushLock.readLock().unlock();
   }
 
   /**



Mime
View raw message