hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1437591 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Wed, 23 Jan 2013 17:28:09 GMT
Author: tedyu
Date: Wed Jan 23 17:28:09 2013
New Revision: 1437591

URL: http://svn.apache.org/viewvc?rev=1437591&view=rev
Log:
HBASE-6466 Enable multi-thread for memstore flush (Chunhui)


Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1437591&r1=1437590&r2=1437591&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java Wed Jan
23 17:28:09 2013
@@ -208,16 +208,30 @@ public class Threads {
   }
 
   /**
-   * Get a named {@link ThreadFactory} that just builds daemon threads
-   * @param prefix name prefix for all threads created from the factory
-   * @return a thread factory that creates named, daemon threads
+   * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
+   * without setting the exception handler.
    */
   public static ThreadFactory newDaemonThreadFactory(final String prefix) {
+    return newDaemonThreadFactory(prefix, null);
+  }
+
+  /**
+   * Get a named {@link ThreadFactory} that just builds daemon threads.
+   * @param prefix name prefix for all threads created from the factory
+   * @param handler unhandles exception handler to set for all threads
+   * @return a thread factory that creates named, daemon threads with
+   *         the supplied exception handler and normal priority
+   */
+  public static ThreadFactory newDaemonThreadFactory(final String prefix,
+      final UncaughtExceptionHandler handler) {
     final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
     return new ThreadFactory() {
       @Override
       public Thread newThread(Runnable r) {
         Thread t = namedFactory.newThread(r);
+        if (handler != null) {
+          t.setUncaughtExceptionHandler(handler);
+        }
         if (!t.isDaemon()) {
           t.setDaemon(true);
         }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1437591&r1=1437590&r2=1437591&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Wed Jan 23 17:28:09 2013
@@ -1531,8 +1531,7 @@ public class  HRegionServer implements C
 
     Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
         uncaughtExceptionHandler);
-    Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher",
-      uncaughtExceptionHandler);
+    this.cacheFlusher.start(uncaughtExceptionHandler);
     Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n +
       ".compactionChecker", uncaughtExceptionHandler);
     if (this.healthCheckChore != null) {
@@ -1790,7 +1789,7 @@ public class  HRegionServer implements C
    */
   protected void join() {
     Threads.shutdown(this.compactionChecker.getThread());
-    Threads.shutdown(this.cacheFlusher.getThread());
+    this.cacheFlusher.join();
     if (this.healthCheckChore != null) {
       Threads.shutdown(this.healthCheckChore.getThread());
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1437591&r1=1437590&r2=1437591&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Wed Jan 23 17:28:09 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -29,10 +30,10 @@ import java.util.SortedMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.util.StringUtils;
 import org.cliffc.high_scale_lib.Counter;
 
@@ -59,7 +61,7 @@ import com.google.common.base.Preconditi
  * @see FlushRequester
  */
 @InterfaceAudience.Private
-class MemStoreFlusher extends HasThread implements FlushRequester {
+class MemStoreFlusher implements FlushRequester {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
   // These two data members go together.  Any entry in the one must have
   // a corresponding entry in the other.
@@ -71,8 +73,8 @@ class MemStoreFlusher extends HasThread 
 
   private final long threadWakeFrequency;
   private final HRegionServer server;
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition flushOccurred = lock.newCondition();
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Object blockSignal = new Object();
 
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
@@ -87,6 +89,9 @@ class MemStoreFlusher extends HasThread 
   private long blockingWaitTime;
   private final Counter updatesBlockedMsHighWater = new Counter();
 
+  private FlushHandler[] flushHandlers = null;
+  private int handlerCount;
+
   /**
    * @param conf
    * @param server
@@ -111,6 +116,7 @@ class MemStoreFlusher extends HasThread 
       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
+    this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1);
     LOG.info("globalMemStoreLimit=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
       ", globalMemStoreLimitLowMark=" +
@@ -213,64 +219,59 @@ class MemStoreFlusher extends HasThread 
     return true;
   }
 
-  @Override
-  public void run() {
-    while (!this.server.isStopped()) {
-      FlushQueueEntry fqe = null;
-      try {
-        wakeupPending.set(false); // allow someone to wake us up again
-        fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (fqe == null || fqe instanceof WakeupFlushThread) {
-          if (isAboveLowWaterMark()) {
-            LOG.debug("Flush thread woke up because memory above low water=" +
-              StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
-            if (!flushOneForGlobalPressure()) {
-              // Wasn't able to flush any region, but we're above low water mark
-              // This is unlikely to happen, but might happen when closing the
-              // entire server - another thread is flushing regions. We'll just
-              // sleep a little bit to avoid spinning, and then pretend that
-              // we flushed one, so anyone blocked will check again
-              lock.lock();
-              try {
+  private class FlushHandler extends HasThread {
+    @Override
+    public void run() {
+      while (!server.isStopped()) {
+        FlushQueueEntry fqe = null;
+        try {
+          wakeupPending.set(false); // allow someone to wake us up again
+          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          if (fqe == null || fqe instanceof WakeupFlushThread) {
+            if (isAboveLowWaterMark()) {
+              LOG.debug("Flush thread woke up because memory above low water="
+                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+              if (!flushOneForGlobalPressure()) {
+                // Wasn't able to flush any region, but we're above low water mark
+                // This is unlikely to happen, but might happen when closing the
+                // entire server - another thread is flushing regions. We'll just
+                // sleep a little bit to avoid spinning, and then pretend that
+                // we flushed one, so anyone blocked will check again
                 Thread.sleep(1000);
-                flushOccurred.signalAll();
-              } finally {
-                lock.unlock();
+                wakeUpIfBlocking();
               }
+              // Enqueue another one of these tokens so we'll wake up again
+              wakeupFlushThread();
             }
-            // Enqueue another one of these tokens so we'll wake up again
-            wakeupFlushThread();
+            continue;
           }
+          FlushRegionEntry fre = (FlushRegionEntry) fqe;
+          if (!flushRegion(fre)) {
+            break;
+          }
+        } catch (InterruptedException ex) {
           continue;
-        }
-        FlushRegionEntry fre = (FlushRegionEntry)fqe;
-        if (!flushRegion(fre)) {
-          break;
-        }
-      } catch (InterruptedException ex) {
-        continue;
-      } catch (ConcurrentModificationException ex) {
-        continue;
-      } catch (Exception ex) {
-        LOG.error("Cache flusher failed for entry " + fqe, ex);
-        if (!server.checkFileSystem()) {
-          break;
+        } catch (ConcurrentModificationException ex) {
+          continue;
+        } catch (Exception ex) {
+          LOG.error("Cache flusher failed for entry " + fqe, ex);
+          if (!server.checkFileSystem()) {
+            break;
+          }
         }
       }
-    }
-    this.regionsInQueue.clear();
-    this.flushQueue.clear();
+      synchronized (regionsInQueue) {
+        regionsInQueue.clear();
+        flushQueue.clear();
+      }
 
-    // Signal anyone waiting, so they see the close flag
-    lock.lock();
-    try {
-      flushOccurred.signalAll();
-    } finally {
-      lock.unlock();
+      // Signal anyone waiting, so they see the close flag
+      wakeUpIfBlocking();
+      LOG.info(getName() + " exiting");
     }
-    LOG.info(getName() + " exiting");
   }
 
+
   private void wakeupFlushThread() {
     if (wakeupPending.compareAndSet(false, true)) {
       flushQueue.add(new WakeupFlushThread());
@@ -287,6 +288,10 @@ class MemStoreFlusher extends HasThread 
           continue;
         }
 
+        if (region.writestate.flushing || !region.writestate.writesEnabled) {
+          continue;
+        }
+
         if (checkStoreFileCount && isTooManyStoreFiles(region)) {
           continue;
         }
@@ -332,11 +337,41 @@ class MemStoreFlusher extends HasThread 
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    lock.lock();
+    lock.writeLock().lock();
     try {
-      this.interrupt();
+      for (FlushHandler flushHander : flushHandlers) {
+        if (flushHander != null) flushHander.interrupt();
+      }
     } finally {
-      lock.unlock();
+      lock.writeLock().unlock();
+    }
+  }
+
+  synchronized void start(UncaughtExceptionHandler eh) {
+    ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
+        server.getServerName().toString() + "-MemStoreFlusher", eh);
+    flushHandlers = new FlushHandler[handlerCount];
+    for (int i = 0; i < flushHandlers.length; i++) {
+      flushHandlers[i] = new FlushHandler();
+      flusherThreadFactory.newThread(flushHandlers[i]);
+      flushHandlers[i].start();
+    }
+  }
+
+  boolean isAlive() {
+    for (FlushHandler flushHander : flushHandlers) {
+      if (flushHander != null && flushHander.isAlive()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  void join() {
+    for (FlushHandler flushHander : flushHandlers) {
+      if (flushHander != null) {
+        Threads.shutdown(flushHander.getThread());
+      }
     }
   }
 
@@ -365,7 +400,8 @@ class MemStoreFlusher extends HasThread 
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
           if (!this.server.compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestCompaction(region, getName());
+              this.server.compactSplitThread.requestCompaction(region, Thread
+                  .currentThread().getName());
             } catch (IOException e) {
               LOG.error(
                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@@ -404,8 +440,8 @@ class MemStoreFlusher extends HasThread 
         // emergencyFlush, then item was removed via a flushQueue.poll.
         flushQueue.remove(fqe);
      }
-     lock.lock();
     }
+    lock.readLock().lock();
     try {
       boolean shouldCompact = region.flushcache();
       // We just want to check the size
@@ -413,7 +449,7 @@ class MemStoreFlusher extends HasThread 
       if (shouldSplit) {
         this.server.compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestCompaction(region, getName());
+        server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
       }
 
     } catch (DroppedSnapshotException ex) {
@@ -432,15 +468,18 @@ class MemStoreFlusher extends HasThread 
         return false;
       }
     } finally {
-      try {
-        flushOccurred.signalAll();
-      } finally {
-        lock.unlock();
-      }
+      lock.readLock().unlock();
+      wakeUpIfBlocking();
     }
     return true;
   }
 
+  private void wakeUpIfBlocking() {
+    synchronized (blockSignal) {
+      blockSignal.notifyAll();
+    }
+  }
+
   private boolean isTooManyStoreFiles(HRegion region) {
     for (Store hstore : region.stores.values()) {
       if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@@ -458,12 +497,12 @@ class MemStoreFlusher extends HasThread 
    */
   public void reclaimMemStoreMemory() {
     if (isAboveHighWaterMark()) {
-      lock.lock();
-      try {
+      long start = System.currentTimeMillis();
+      synchronized (this.blockSignal) {
         boolean blocked = false;
         long startTime = 0;
         while (isAboveHighWaterMark() && !server.isStopped()) {
-          if(!blocked){
+          if (!blocked) {
             startTime = EnvironmentEdgeManager.currentTimeMillis();
             LOG.info("Blocking updates on " + server.toString() +
             ": the global memstore size " +
@@ -476,10 +515,12 @@ class MemStoreFlusher extends HasThread 
           try {
             // we should be able to wait forever, but we've seen a bug where
             // we miss a notify, so put a 5 second bound on it at least.
-            flushOccurred.await(5, TimeUnit.SECONDS);
+            blockSignal.wait(5 * 1000);
           } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
           }
+          long took = System.currentTimeMillis() - start;
+          LOG.warn("Memstore is above high water mark and block " + took + "ms");
         }
         if(blocked){
           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
@@ -488,8 +529,6 @@ class MemStoreFlusher extends HasThread 
           }
           LOG.info("Unblocking updates for server " + server.toString());
         }
-      } finally {
-        lock.unlock();
       }
     } else if (isAboveLowWaterMark()) {
       wakeupFlushThread();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1437591&r1=1437590&r2=1437591&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Wed Jan 23 17:28:09 2013
@@ -247,12 +247,16 @@ public class TestLogRolling  {
       put.add(HConstants.CATALOG_FAMILY, null, value);
       table.put(put);
     }
+    Put tmpPut = new Put(Bytes.toBytes("tmprow"));
+    tmpPut.add(HConstants.CATALOG_FAMILY, null, value);
     long startTime = System.currentTimeMillis();
     long remaining = timeout;
     while (remaining > 0) {
       if (log.isLowReplicationRollEnabled() == expect) {
         break;
       } else {
+        // Trigger calling FSHlog#checkLowReplication()
+        table.put(tmpPut);
         try {
           Thread.sleep(200);
         } catch (InterruptedException e) {
@@ -371,7 +375,8 @@ public class TestLogRolling  {
     assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null);
 
     batchWriteAndWait(table, 3, false, 10000);
-    assertTrue("LowReplication Roller should've been disabled",
+    assertTrue("LowReplication Roller should've been disabled, current replication="
+            + ((FSHLog) log).getLogReplication(),
         !log.isLowReplicationRollEnabled());
 
     dfsCluster



Mime
View raw message