Return-Path: Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: (qmail 43572 invoked from network); 19 Jun 2010 05:21:53 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 19 Jun 2010 05:21:53 -0000 Received: (qmail 68583 invoked by uid 500); 19 Jun 2010 05:21:53 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 68505 invoked by uid 500); 19 Jun 2010 05:21:50 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 68498 invoked by uid 99); 19 Jun 2010 05:21:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Jun 2010 05:21:49 +0000 X-ASF-Spam-Status: No, hits=-1778.1 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Jun 2010 05:21:48 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E109F23888EA; Sat, 19 Jun 2010 05:21:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r956182 - in /hbase/branches/0.20: CHANGES.txt src/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java src/java/org/apache/hadoop/hbase/regionserver/Store.java Date: Sat, 19 Jun 2010 05:21:02 -0000 To: commits@hbase.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100619052102.E109F23888EA@eris.apache.org> Author: stack Date: Sat Jun 19 05:21:01 2010 New Revision: 956182 URL: http://svn.apache.org/viewvc?rev=956182&view=rev Log: HBASE-2752 Don't retry forever when waiting on too many store files Modified: hbase/branches/0.20/CHANGES.txt hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java Modified: hbase/branches/0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hbase/branches/0.20/CHANGES.txt?rev=956182&r1=956181&r2=956182&view=diff ============================================================================== --- hbase/branches/0.20/CHANGES.txt (original) +++ hbase/branches/0.20/CHANGES.txt Sat Jun 19 05:21:01 2010 @@ -32,6 +32,7 @@ Release 0.20.5 - Unreleased asking for first row HBASE-2740 NPE in ReadWriteConsistencyControl HBASE-2728 Support for HADOOP-4829 + HBASE-2752 Don't retry forever when waiting on too many store files IMPROVEMENTS HBASE-2567 [stargate] minimize differences between 0.20 branch and trunk Modified: hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=956182&r1=956181&r2=956182&view=diff ============================================================================== --- hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Jun 19 05:21:01 2010 @@ -1062,7 +1062,7 @@ public class HRegion implements HConstan if (LOG.isDebugEnabled()) { long now = System.currentTimeMillis(); - LOG.debug("Finished memstore flush of ~" + + LOG.info("Finished memstore flush of ~" + StringUtils.humanReadableInt(currentMemStoreSize) + " for region " + this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId + ", compaction requested=" + compactionRequested); Modified: hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java URL: http://svn.apache.org/viewvc/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=956182&r1=956181&r2=956182&view=diff ============================================================================== --- hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original) +++ hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Sat Jun 19 05:21:01 2010 @@ -23,10 +23,12 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.ConcurrentModificationException; -import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; import java.util.SortedMap; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -50,10 +52,13 @@ import org.apache.hadoop.util.StringUtil */ class MemStoreFlusher extends Thread implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); - private final BlockingQueue flushQueue = - new LinkedBlockingQueue(); - - private final HashSet regionsInQueue = new HashSet(); + + // These two data members go together. Any entry in the one must have + // a corresponding entry in the other. + private final BlockingQueue flushQueue = + new DelayQueue(); + private final Map regionsInQueue = + new HashMap(); private final long threadWakeFrequency; private final HRegionServer server; @@ -98,7 +103,7 @@ class MemStoreFlusher extends Thread imp conf.getInt("hbase.hstore.compactionThreshold", 3); } this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", - 90000); // default of 180 seconds + 90000); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + @@ -140,13 +145,13 @@ class MemStoreFlusher extends Thread imp } } while (!server.isStopRequested()) { - HRegion r = null; + FlushQueueEntry fqe = null; try { - r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (r == null) { + fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (fqe == null) { continue; } - if (!flushRegion(r, false)) { + if (!flushRegion(fqe)) { break; } } catch (InterruptedException ex) { @@ -155,7 +160,7 @@ class MemStoreFlusher extends Thread imp continue; } catch (Exception ex) { LOG.error("Cache flush failed" + - (r != null ? (" for region " + Bytes.toString(r.getRegionName())) : ""), + (fqe != null ? (" for region " + Bytes.toString(fqe.region.getRegionName())) : ""), ex); if (!server.checkFileSystem()) { break; @@ -169,9 +174,12 @@ class MemStoreFlusher extends Thread imp public void request(HRegion r) { synchronized (regionsInQueue) { - if (!regionsInQueue.contains(r)) { - regionsInQueue.add(r); - flushQueue.add(r); + if (!regionsInQueue.containsKey(r)) { + // This entry has no delay so it will be added at the top of the flush + // queue. It'll come out near immediately. + FlushQueueEntry fqe = new FlushQueueEntry(r); + this.regionsInQueue.put(r, fqe); + this.flushQueue.add(fqe); } } } @@ -187,79 +195,67 @@ class MemStoreFlusher extends Thread imp lock.unlock(); } } - + + /* + * A flushRegion that checks store file count. If too many, puts the flush + * on delay queue to retry later. + * @param fqe + * @return true if the region was successfully flushed, false otherwise. If + * false, there will be accompanying log messages explaining why the log was + * not flushed. + */ + private boolean flushRegion(final FlushQueueEntry fqe) { + HRegion region = fqe.region; + if (!fqe.region.getRegionInfo().isMetaRegion() && + isTooManyStoreFiles(region)) { + if (fqe.isMaximumWait(this.blockingWaitTime)) { + LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) + + "ms on a compaction to clean up 'too many store files'; waited " + + "long enough... proceeding with flush of " + + region.getRegionNameAsString()); + } else { + // If this is first time we've been put off, then emit a log message. + if (fqe.getRequeueCount() <= 0) { + // Note: We don't impose blockingStoreFiles constraint on meta regions + LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); + } + this.server.compactSplitThread.compactionRequested(region, getName()); + // Put back on the queue. Have it come back out of the queue + // after a delay of this.blockingWaitTime / 100 ms. + this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); + // Tell a lie, it's not flushed but it's ok + return true; + } + } + return flushRegion(region, false); + } + /* * Flush a region. * - * @param region the region to be flushed - * @param removeFromQueue True if the region needs to be removed from the - * flush queue. False if called from the main flusher run loop and true if - * called from flushSomeRegions to relieve memory pressure from the region - * server. If true, we are in a state of emergency; we are not - * taking on updates regionserver-wide, not until memory is flushed. In this - * case, do not let a compaction run inline with blocked updates. Compactions - * can take a long time. Stopping compactions, there is a danger that number - * of flushes will overwhelm compaction on a busy server; we'll have to see. - * That compactions do not run when called out of flushSomeRegions means that - * compactions can be reported by the historian without danger of deadlock - * (HBASE-670). - * - *

In the main run loop, regions have already been removed from the flush - * queue, and if this method is called for the relief of memory pressure, - * this may not be necessarily true. We want to avoid trying to remove - * region from the queue because if it has already been removed, it requires a - * sequential scan of the queue to determine that it is not in the queue. - * - *

If called from flushSomeRegions, the region may be in the queue but - * it may have been determined that the region had a significant amount of - * memory in use and needed to be flushed to relieve memory pressure. In this - * case, its flush may preempt the pending request in the queue, and if so, - * it needs to be removed from the queue to avoid flushing the region - * multiple times. + * @param region Region to flush. + * @param emergencyFlush Set if we are being force flushed. If true the region + * needs to be removed from the flush queue. If false, when we were called + * from the main flusher run loop and we got the entry to flush by calling + * poll on the flush queue (which removed it). * * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(HRegion region, boolean removeFromQueue) { - // if removeFromQueue, then we come from flushSomeRegions and we need - // to block if there's too many store files. Else, we don't want to hang - // the main flushing thread so we'll just the region at the end of the - // queue if there's too many files. - if (removeFromQueue) { - checkStoreFileCount(region); - } else if ((!region.getRegionInfo().isMetaRegion()) && - isTooManyStoreFiles(region)) { - // Note: We don't impose blockingStoreFiles constraint on meta regions - - LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + - "store files, putting it back at the end of the flush queue."); - server.compactSplitThread.compactionRequested(region, getName()); - // If there's only this item in the queue or they are all in this - // situation, we will loop at lot. Sleep a bit. - try { - Thread.sleep(1000); - } catch (InterruptedException e) { } // just continue - flushQueue.add(region); - // Tell a lie, it's not flushed but it's ok - return true; - } - synchronized (regionsInQueue) { - // See comment above for removeFromQueue on why we do not - // take the region out of the set. If removeFromQueue is true, remove it - // from the queue too if it is there. This didn't used to be a - // constraint, but now that HBASE-512 is in play, we need to try and - // limit double-flushing of regions. - if (regionsInQueue.remove(region) && removeFromQueue) { - flushQueue.remove(region); + private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { + synchronized (this.regionsInQueue) { + FlushQueueEntry fqe = this.regionsInQueue.remove(region); + if (fqe != null && emergencyFlush) { + // Need to remove from region from delay queue. When NOT an + // emergencyFlush, then item was removed via a flushQueue.poll. + flushQueue.remove(fqe); } lock.lock(); } try { - // See comment above for removeFromQueue on why we do not - // compact if removeFromQueue is true. Note that region.flushCache() - // only returns true if a flush is done and if a compaction is needed. - if (region.flushcache() && !removeFromQueue) { + if (region.flushcache()) { server.compactSplitThread.compactionRequested(region, getName()); } } catch (DroppedSnapshotException ex) { @@ -272,65 +268,18 @@ class MemStoreFlusher extends Thread imp server.abort(); return false; } catch (IOException ex) { - LOG.error("Cache flush failed" - + (region != null ? (" for region " + Bytes.toString(region.getRegionName())) : ""), - RemoteExceptionHandler.checkIOException(ex)); + LOG.error("Cache flush failed" + + (region != null ? (" for region " + Bytes.toString(region.getRegionName())) : ""), + RemoteExceptionHandler.checkIOException(ex)); if (!server.checkFileSystem()) { return false; } } finally { lock.unlock(); } - return true; } - /* - * If too many store files already, schedule a compaction and pause a while - * before going on with compaction. - * @param region Region to check. - */ - private void checkStoreFileCount(final HRegion region) { - // If catalog region, do not ever hold up writes (isMetaRegion returns - // true if ROOT or META region). - if (region.getRegionInfo().isMetaRegion()) return; - - int count = 0; - boolean triggered = false; - boolean finished = false; - while (count++ < (blockingWaitTime / 500)) { - finished = true; - for (Store hstore: region.stores.values()) { - if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { - // only log once - if (!triggered) { - LOG.info("Too many store files for region " + region + ": " + - hstore.getStorefilesCount() + ", requesting compaction and " + - "waiting"); - this.server.compactSplitThread.compactionRequested(region, getName()); - triggered = true; - } - // pending compaction, not finished - finished = false; - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // ignore - } - } - } - if (triggered && finished) { - LOG.info("Compaction has completed, we waited " + (count * 500) + "ms, " - + "finishing flush of region " + region); - break; - } - } - if (triggered && !finished) { - LOG.warn("Tried to hold up flushing for compactions of region " + region + - " but have waited longer than " + blockingWaitTime + "ms, continuing"); - } - } - private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore: region.stores.values()) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { @@ -389,4 +338,65 @@ class MemStoreFlusher extends Thread imp server.compactSplitThread.compactionRequested(region, getName()); } } + + /** + * Datastructure used in the flush queue. Holds region and retry count. + * Keeps tabs on how old this object is. Implements {@link Delayed}. On + * construction, the delay is zero. When added to a delay queue, we'll come + * out near immediately. Call {@link #requeue(long)} passing delay in + * milliseconds before readding to delay queue if you want it to stay there + * a while. + */ + static class FlushQueueEntry implements Delayed { + private final HRegion region; + private final long createTime; + private long whenToExpire; + private int requeueCount = 0; + + FlushQueueEntry(final HRegion r) { + this.region = r; + this.createTime = System.currentTimeMillis(); + this.whenToExpire = this.createTime; + } + + /** + * @param maximumWait + * @return True if we have been delayed > maximumWait milliseconds. + */ + public boolean isMaximumWait(final long maximumWait) { + return (System.currentTimeMillis() - this.createTime) > maximumWait; + } + + /** + * @return Count of times {@link #resetDelay()} was called; i.e this is + * number of times we've been requeued. + */ + public int getRequeueCount() { + return this.requeueCount; + } + + /** + * @param when When to expire, when to come up out of the queue. + * Specify in milliseconds. This method adds System.currentTimeMillis() + * to whatever you pass. + * @return This. + */ + public FlushQueueEntry requeue(final long when) { + this.whenToExpire = System.currentTimeMillis() + when; + this.requeueCount++; + return this; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(this.whenToExpire - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed other) { + return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - + other.getDelay(TimeUnit.MILLISECONDS)).intValue(); + } + } } Modified: hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=956182&r1=956181&r2=956182&view=diff ============================================================================== --- hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Jun 19 05:21:01 2010 @@ -558,8 +558,8 @@ public class Store implements HConstants this.conf, this.inMemory); Reader r = sf.getReader(); this.storeSize += r.length(); - if(LOG.isDebugEnabled()) { - LOG.debug("Added " + sf + ", entries=" + r.getEntries() + + if(LOG.isInfoEnabled()) { + LOG.info("Added " + sf + ", entries=" + r.getEntries() + ", sequenceid=" + logCacheFlushId + ", memsize=" + StringUtils.humanReadableInt(flushed) + ", filesize=" + StringUtils.humanReadableInt(r.length()) + @@ -758,15 +758,17 @@ public class Store implements HConstants } // Ready to go. Have list of files to compact. - LOG.debug("Started compaction of " + filesToCompact.size() + " file(s)" + + LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " + + this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() + (references? ", hasReferences=true,": " ") + " into " + FSUtils.getPath(this.regionCompactionDir) + ", seqid=" + maxId); HFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, writer); - if (LOG.isDebugEnabled()) { - LOG.debug("Completed" + (majorcompaction? " major ": " ") + - "compaction of " + this.storeNameStr + + if (LOG.isInfoEnabled()) { + LOG.info("Completed" + (majorcompaction? " major ": " ") + + "compaction of " + filesToCompact.size() + " file(s) in " + + this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() + "; new storefile is " + (sf == null? "none": sf.toString()) + "; store size is " + StringUtils.humanReadableInt(storeSize)); }