hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r956183 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Date Sat, 19 Jun 2010 05:39:47 GMT
Author: stack
Date: Sat Jun 19 05:39:47 2010
New Revision: 956183

URL: http://svn.apache.org/viewvc?rev=956183&view=rev
Log:
HBASE-2752 Don't retry forever when waiting on too many store files

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=956183&r1=956182&r2=956183&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Sat Jun 19 05:39:47 2010
@@ -395,6 +395,7 @@ Release 0.21.0 - Unreleased
                same timestamp in MemStore
    HBASE-2725  Shutdown hook management is gone in trunk; restore
    HBASE-2740  NPE in ReadWriteConsistencyControl
+   HBASE-2752  Don't retry forever when waiting on too many store files
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=956183&r1=956182&r2=956183&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Jun 19
05:39:47 2010
@@ -1080,7 +1080,7 @@ public class HRegion implements HeapSize
 
     if (LOG.isDebugEnabled()) {
       long now = EnvironmentEdgeManager.currentTimeMillis();
-      LOG.debug("Finished memstore flush of ~" +
+      LOG.info("Finished memstore flush of ~" +
         StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
         this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId +
         ", compaction requested=" + compactionRequested);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=956183&r1=956182&r2=956183&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Sat
Jun 19 05:39:47 2010
@@ -32,10 +32,12 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -50,10 +52,12 @@ import java.util.concurrent.locks.Reentr
  */
 class MemStoreFlusher extends Thread implements FlushRequester {
   static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
-  private final BlockingQueue<HRegion> flushQueue =
-    new LinkedBlockingQueue<HRegion>();
-
-  private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
+  // These two data members go together.  Any entry in the one must have
+  // a corresponding entry in the other.
+  private final BlockingQueue<FlushQueueEntry> flushQueue =
+    new DelayQueue<FlushQueueEntry>();
+  private final Map<HRegion, FlushQueueEntry> regionsInQueue =
+    new HashMap<HRegion, FlushQueueEntry>();
 
   private final long threadWakeFrequency;
   private final HRegionServer server;
@@ -98,7 +102,7 @@ class MemStoreFlusher extends Thread imp
         conf.getInt("hbase.hstore.compactionThreshold", 3);
     }
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
-      90000); // default of 180 seconds
+      90000);
     LOG.info("globalMemStoreLimit=" +
       StringUtils.humanReadableInt(this.globalMemStoreLimit) +
       ", globalMemStoreLimitLowMark=" +
@@ -133,13 +137,13 @@ class MemStoreFlusher extends Thread imp
   @Override
   public void run() {
     while (!this.server.isStopRequested()) {
-      HRegion r = null;
+      FlushQueueEntry fqe = null;
       try {
-        r = this.flushQueue.poll(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
-        if (r == null) {
+        fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+        if (fqe == null) {
           continue;
         }
-        if (!flushRegion(r, false)) {
+        if (!flushRegion(fqe)) {
           break;
         }
       } catch (InterruptedException ex) {
@@ -148,7 +152,7 @@ class MemStoreFlusher extends Thread imp
         continue;
       } catch (Exception ex) {
         LOG.error("Cache flush failed" +
-          (r != null ? (" for region " + Bytes.toString(r.getRegionName())) : ""),
+          (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) :
""),
           ex);
         if (!server.checkFileSystem()) {
           break;
@@ -162,9 +166,12 @@ class MemStoreFlusher extends Thread imp
 
   public void request(HRegion r) {
     synchronized (regionsInQueue) {
-      if (!regionsInQueue.contains(r)) {
-        regionsInQueue.add(r);
-        flushQueue.add(r);
+      if (!regionsInQueue.containsKey(r)) {
+        // This entry has no delay so it will be added at the top of the flush
+        // queue.  It'll come out near immediately.
+        FlushQueueEntry fqe = new FlushQueueEntry(r);
+        this.regionsInQueue.put(r, fqe);
+        this.flushQueue.add(fqe);
       }
     }
   }
@@ -182,77 +189,64 @@ class MemStoreFlusher extends Thread imp
   }
 
   /*
+   * A flushRegion that checks store file count.  If too many, puts the flush
+   * on delay queue to retry later.
+   * @param fqe
+   * @return true if the region was successfully flushed, false otherwise. If 
+   * false, there will be accompanying log messages explaining why the log was
+   * not flushed.
+   */
+  private boolean flushRegion(final FlushQueueEntry fqe) {
+    HRegion region = fqe.region;
+    if (!fqe.region.getRegionInfo().isMetaRegion() &&
+        isTooManyStoreFiles(region)) {
+      if (fqe.isMaximumWait(this.blockingWaitTime)) {
+        LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
+          "ms on a compaction to clean up 'too many store files'; waited " +
+          "long enough... proceeding with flush of " +
+          region.getRegionNameAsString());
+      } else {
+        // If this is first time we've been put off, then emit a log message.
+        if (fqe.getRequeueCount() <= 0) {
+          // Note: We don't impose blockingStoreFiles constraint on meta regions
+          LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
+            "store files; delaying flush up to " + this.blockingWaitTime + "ms");
+        }
+        this.server.compactSplitThread.compactionRequested(region, getName());
+        // Put back on the queue.  Have it come back out of the queue
+        // after a delay of this.blockingWaitTime / 100 ms.
+        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
+        // Tell a lie, it's not flushed but it's ok
+        return true;
+      }
+    }
+    return flushRegion(region, false);
+  }
+
+  /*
    * Flush a region.
-   *
-   * @param region the region to be flushed
-   * @param removeFromQueue True if the region needs to be removed from the
-   * flush queue. False if called from the main flusher run loop and true if
-   * called from flushSomeRegions to relieve memory pressure from the region
-   * server.  If <code>true</code>, we are in a state of emergency; we are not
-   * taking on updates regionserver-wide, not until memory is flushed.  In this
-   * case, do not let a compaction run inline with blocked updates. Compactions
-   * can take a long time. Stopping compactions, there is a danger that number
-   * of flushes will overwhelm compaction on a busy server; we'll have to see.
-   * That compactions do not run when called out of flushSomeRegions means that
-   * compactions can be reported by the historian without danger of deadlock
-   * (HBASE-670).
-   *
-   * <p>In the main run loop, regions have already been removed from the flush
-   * queue, and if this method is called for the relief of memory pressure,
-   * this may not be necessarily true. We want to avoid trying to remove
-   * region from the queue because if it has already been removed, it requires a
-   * sequential scan of the queue to determine that it is not in the queue.
-   *
-   * <p>If called from flushSomeRegions, the region may be in the queue but
-   * it may have been determined that the region had a significant amount of
-   * memory in use and needed to be flushed to relieve memory pressure. In this
-   * case, its flush may preempt the pending request in the queue, and if so,
-   * it needs to be removed from the queue to avoid flushing the region
-   * multiple times.
+   * @param region Region to flush.
+   * @param emergencyFlush Set if we are being force flushed. If true the region
+   * needs to be removed from the flush queue. If false, when we were called
+   * from the main flusher run loop and we got the entry to flush by calling
+   * poll on the flush queue (which removed it).
    *
    * @return true if the region was successfully flushed, false otherwise. If
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
-  private boolean flushRegion(HRegion region, boolean removeFromQueue) {
-    // if removeFromQueue, then we come from flushSomeRegions and we need
-    // to block if there's too many store files. Else, we don't want to hang
-    // the main flushing thread so we'll just the region at the end of the
-    // queue if there's too many files.
-    if (removeFromQueue) {
-      checkStoreFileCount(region);
-    } else if ((!region.getRegionInfo().isMetaRegion()) &&
-               isTooManyStoreFiles(region)) {
-      // Note: We don't impose blockingStoreFiles constraint on meta regions
-
-      LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
-          "store files, putting it back at the end of the flush queue.");
-      server.compactSplitThread.compactionRequested(region, getName());
-      // If there's only this item in the queue or they are all in this
-      // situation, we will loop at lot. Sleep a bit.
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) { } // just continue
-      flushQueue.add(region);
-      // Tell a lie, it's not flushed but it's ok
-      return true;
-    }
-    synchronized (regionsInQueue) {
-      // See comment above for removeFromQueue on why we do not
-      // take the region out of the set. If removeFromQueue is true, remove it
-      // from the queue too if it is there. This didn't used to be a
-      // constraint, but now that HBASE-512 is in play, we need to try and
-      // limit double-flushing of regions.
-      if (regionsInQueue.remove(region) && removeFromQueue) {
-        flushQueue.remove(region);
-      }
-      lock.lock();
+  private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
+    synchronized (this.regionsInQueue) {
+      FlushQueueEntry fqe = this.regionsInQueue.remove(region);
+      if (fqe != null && emergencyFlush) {
+        // Need to remove from region from delay queue.  When NOT an
+        // emergencyFlush, then item was removed via a flushQueue.poll.
+        flushQueue.remove(fqe);
+     }
+     lock.lock();
     }
     try {
-      // See comment above for removeFromQueue on why we do not
-      // compact if removeFromQueue is true. Note that region.flushCache()
-      // only returns true if a flush is done and if a compaction is needed.
-      if (region.flushcache() && !removeFromQueue) {
+      if (region.flushcache()) {
         server.compactSplitThread.compactionRequested(region, getName());
       }
     } catch (DroppedSnapshotException ex) {
@@ -264,65 +258,18 @@ class MemStoreFlusher extends Thread imp
       server.abort("Replay of HLog required. Forcing server shutdown", ex);
       return false;
     } catch (IOException ex) {
-      LOG.error("Cache flush failed"
-          + (region != null ? (" for region " + Bytes.toString(region.getRegionName())) :
""),
-          RemoteExceptionHandler.checkIOException(ex));
+      LOG.error("Cache flush failed" +
+        (region != null ? (" for region " + Bytes.toString(region.getRegionName())) : ""),
+        RemoteExceptionHandler.checkIOException(ex));
       if (!server.checkFileSystem()) {
         return false;
       }
     } finally {
       lock.unlock();
     }
-
     return true;
   }
 
-  /*
-   * If too many store files already, schedule a compaction and pause a while
-   * before going on with compaction.
-   * @param region Region to check.
-   */
-  private void checkStoreFileCount(final HRegion region) {
-    // If catalog region, do not ever hold up writes (isMetaRegion returns
-    // true if ROOT or META region).
-    if (region.getRegionInfo().isMetaRegion()) return;
-
-    int count = 0;
-    boolean triggered = false;
-    boolean finished = false;
-    while (count++ < (blockingWaitTime / 500)) {
-      finished = true;
-      for (Store hstore: region.stores.values()) {
-        if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
-          // only log once
-          if (!triggered) {
-            LOG.info("Too many store files for region " + region + ": " +
-              hstore.getStorefilesCount() + ", requesting compaction and " +
-              "waiting");
-            this.server.compactSplitThread.compactionRequested(region, getName());
-            triggered = true;
-          }
-          // pending compaction, not finished
-          finished = false;
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException e) {
-            // ignore
-          }
-        }
-      }
-      if (triggered && finished) {
-        LOG.info("Compaction has completed, we waited " + (count * 500) + "ms, "
-            + "finishing flush of region " + region);
-        break;
-      }
-    }
-    if (triggered && !finished) {
-      LOG.warn("Tried to hold up flushing for compactions of region " + region +
-          " but have waited longer than " + blockingWaitTime + "ms, continuing");
-    }
-  }
-
   private boolean isTooManyStoreFiles(HRegion region) {
     for (Store hstore: region.stores.values()) {
       if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
@@ -381,4 +328,65 @@ class MemStoreFlusher extends Thread imp
       server.compactSplitThread.compactionRequested(region, getName());
     }
   }
+
+  /**
+   * Datastructure used in the flush queue.  Holds region and retry count.
+   * Keeps tabs on how old this object is.  Implements {@link Delayed}.  On
+   * construction, the delay is zero. When added to a delay queue, we'll come
+   * out near immediately.  Call {@link #requeue(long)} passing delay in
+   * milliseconds before readding to delay queue if you want it to stay there
+   * a while.
+   */
+  static class FlushQueueEntry implements Delayed {
+    private final HRegion region;
+    private final long createTime;
+    private long whenToExpire;
+    private int requeueCount = 0;
+
+    FlushQueueEntry(final HRegion r) {
+      this.region = r;
+      this.createTime = System.currentTimeMillis();
+      this.whenToExpire = this.createTime;
+    }
+
+    /**
+     * @param maximumWait
+     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
+     */
+    public boolean isMaximumWait(final long maximumWait) {
+      return (System.currentTimeMillis() - this.createTime) > maximumWait;
+    }
+
+    /**
+     * @return Count of times {@link #resetDelay()} was called; i.e this is
+     * number of times we've been requeued.
+     */
+    public int getRequeueCount() {
+      return this.requeueCount;
+    }
+ 
+    /**
+     * @param when When to expire, when to come up out of the queue.
+     * Specify in milliseconds.  This method adds System.currentTimeMillis()
+     * to whatever you pass.
+     * @return This.
+     */
+    public FlushQueueEntry requeue(final long when) {
+      this.whenToExpire = System.currentTimeMillis() + when;
+      this.requeueCount++;
+      return this;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(this.whenToExpire - System.currentTimeMillis(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+      return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
+        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
+    }
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=956183&r1=956182&r2=956183&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Jun 19 05:39:47
2010
@@ -629,8 +629,8 @@ public class Store implements HeapSize {
       this.conf, this.family.getBloomFilterType(), this.inMemory);
     Reader r = sf.createReader();
     this.storeSize += r.length();
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Added " + sf + ", entries=" + r.getEntries() +
+    if(LOG.isInfoEnabled()) {
+      LOG.info("Added " + sf + ", entries=" + r.getEntries() +
         ", sequenceid=" + logCacheFlushId +
         ", memsize=" + StringUtils.humanReadableInt(flushed) +
         ", filesize=" + StringUtils.humanReadableInt(r.length()) +
@@ -822,15 +822,17 @@ public class Store implements HeapSize {
       }
 
       // Ready to go.  Have list of files to compact.
-      LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" +
+      LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
+          this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString()
+
         (references? ", hasReferences=true,": " ") + " into " +
           FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId);
       HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Completed" + (majorcompaction? " major ": " ") +
-          "compaction of " + this.storeNameStr +
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Completed" + (majorcompaction? " major ": " ") +
+          "compaction of " + filesToCompact.size() + " file(s) in " +
+          this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString()
+
           "; new storefile is " + (sf == null? "none": sf.toString()) +
           "; store size is " + StringUtils.humanReadableInt(storeSize));
       }



Mime
View raw message