hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r559993 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ conf/ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Date Thu, 26 Jul 2007 21:30:56 GMT
Author: stack
Date: Thu Jul 26 14:30:55 2007
New Revision: 559993

URL: http://svn.apache.org/viewvc?view=rev&rev=559993
Log:
HADOOP-1646 RegionServer OOME's under sustained, substantial loading by
10 concurrent clients

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Jul 26 14:30:55 2007
@@ -75,3 +75,5 @@
      TestScanner2 (Izaak Rubin via Stack)
  48. HADOOP-1516 HClient fails to readjust when ROOT or META redeployed on new
      region server
+ 49. HADOOP-1646 RegionServer OOME's under sustained, substantial loading by
+     10 concurrent clients

Modified: lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml Thu Jul 26 14:30:55 2007
@@ -102,7 +102,7 @@
   </property>
   <property>
     <name>hbase.regionserver.msginterval</name>
-    <value>15000</value>
+    <value>10000</value>
     <description>Interval between messages from the RegionServer to HMaster
     in milliseconds.  Default is 15. Set this value low if you want unit
     tests to be responsive.
@@ -111,24 +111,60 @@
   <property>
     <name>hbase.regionserver.maxlogentries</name>
     <value>30000</value>
-    <description>Rotate the logs when count of entries exceeds this value.
-    Default: 30,000
+    <description>Rotate the HRegion HLogs when count of entries exceeds this
+    value.  Default: 30,000.  Value is checked by a thread that runs every
+    hbase.server.thread.wakefrequency.
     </description>
   </property>
   <property>
-    <name>hbase.hregion.maxunflushed</name>
-    <value>10000</value>
+    <name>hbase.hregion.memcache.flush.size</name>
+    <value>16777216</value>
+    <description>
+    A HRegion memcache will be flushed to disk if size of the memcache
+    exceeds this number of bytes.  Value is checked by a thread that runs
+    every hbase.server.thread.wakefrequency.  
+    </description>
+  </property>
+  <property>
+    <name>hbase.hregion.memcache.block.multiplier</name>
+    <value>2</value>
     <description>
-    Memcache will be flushed to disk if number of Memcache writes
-    are in excess of this number.
+    Block updates if memcache has hbase.hregion.block.memcache
+    time hbase.hregion.flush.size bytes.  Useful preventing
+    runaway memcache during spikes in update traffic.  Without an
+    upper-bound, memcache fills such that when it flushes the
+    resultant flush files take a long time to compact or split, or
+    worse, we OOME.
     </description>
   </property>
   <property>
     <name>hbase.hregion.max.filesize</name>
-    <value>134217728</value>
+    <value>67108864</value>
     <description>
     Maximum desired file size for an HRegion.  If filesize exceeds
-    value + (value / 2), the HRegion is split in two.  Default: 128M.
+    value + (value / 2), the HRegion is split in two.  Default: 64M.
+    If too large, splits will take so long, clients timeout.
+    </description>
+  </property>
+  <property>
+    <name>hbase.hregion.compactionThreshold</name>
+    <value>3</value>
+    <description>
+    If more than this number of HStoreFiles in any one HStore
+    (one HStoreFile is written per flush of memcache) then a compaction
+    is run to rewrite all HStoreFiles files as one.  Larger numbers
+    put off compaction but when it runs, it takes longer to complete.
+    During a compaction, updates cannot be flushed to disk.  Long
+    compactions require memory sufficient to carry the logging of
+    all updates across the duration of the compaction.
+    
+    If too large, clients timeout during compaction.
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
+    <value>15000</value>
+    <description>How often a region server runs the split/compaction check.
     </description>
   </property>
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java Thu Jul 26 14:30:55 2007
@@ -81,7 +81,7 @@
   static final String HREGION_OLDLOGFILE_NAME = "oldlogfile.log";
   
   /** Default maximum file size */
-  static final long DEFAULT_MAX_FILE_SIZE = 128 * 1024 * 1024;        // 128MB
+  static final long DEFAULT_MAX_FILE_SIZE = 64 * 1024 * 1024;        // 64MB
 
   // Always store the location of the root table's HRegion.
   // This HRegion is never split.

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java Thu Jul 26 14:30:55 2007
@@ -49,7 +49,7 @@
   }
 
   /**
-   * Caller needs the nonexclusive read-lock
+   * Caller needs the no-nexclusive read-lock
    */
   public void obtainReadLock() {
     synchronized(mutex) {
@@ -57,6 +57,7 @@
         try {
           mutex.wait();
         } catch(InterruptedException ie) {
+          // continue
         }
       }
       lockers.incrementAndGet();
@@ -65,7 +66,7 @@
   }
 
   /**
-   * Caller is finished with the nonexclusive read-lock
+   * Caller is finished with the non-exclusive read-lock
    */
   public void releaseReadLock() {
     synchronized(mutex) {
@@ -85,6 +86,7 @@
         try {
           mutex.wait();
         } catch (InterruptedException ie) {
+          // continue
         }
       }
       mutex.notifyAll();

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Thu Jul 26 14:30:55 2007
@@ -27,9 +27,8 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Text;
 
@@ -38,30 +37,36 @@
  * wrapper around a TreeMap that helps us when staging the Memcache out to disk.
  */
 public class HMemcache {
-  private final Log LOG = LogFactory.getLog(this.getClass().getName());
-  
   TreeMap<HStoreKey, byte []> memcache =
     new TreeMap<HStoreKey, byte []>();
-  
-  Vector<TreeMap<HStoreKey, byte []>> history
+  final Vector<TreeMap<HStoreKey, byte []>> history
     = new Vector<TreeMap<HStoreKey, byte []>>();
-  
   TreeMap<HStoreKey, byte []> snapshot = null;
 
   final HLocking lock = new HLocking();
+  
+  /*
+   * Approximate size in bytes of the payload carried by this memcache.
+   */
+  private AtomicLong size = new AtomicLong(0);
+
 
-  /** constructor */
+  /**
+   * Constructor
+   */
   public HMemcache() {
     super();
   }
 
   /** represents the state of the memcache at a specified point in time */
-  public static class Snapshot {
-    TreeMap<HStoreKey, byte []> memcacheSnapshot = null;
-    long sequenceId = 0;
+  static class Snapshot {
+    final TreeMap<HStoreKey, byte []> memcacheSnapshot;
+    final long sequenceId;
     
-    Snapshot() {
+    Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) {
       super();
+      this.memcacheSnapshot = memcache;
+      this.sequenceId = i;
     }
   }
   
@@ -79,36 +84,22 @@
    * @return frozen HMemcache TreeMap and HLog sequence number.
    */
   Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
-    Snapshot retval = new Snapshot();
-
     this.lock.obtainWriteLock();
     try {
       if(snapshot != null) {
         throw new IOException("Snapshot in progress!");
       }
+      // If no entries in memcache.
       if(memcache.size() == 0) {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("memcache empty. Skipping snapshot");
-        }
-        return retval;
-      }
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("starting memcache snapshot");
+        return null;
       }
-      
-      retval.memcacheSnapshot = memcache;
+      Snapshot retval = new Snapshot(memcache, log.startCacheFlush());
       this.snapshot = memcache;
       history.add(memcache);
       memcache = new TreeMap<HStoreKey, byte []>();
-      retval.sequenceId = log.startCacheFlush();
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("memcache snapshot complete");
-      }
-      
+      // Reset size of this memcache.
+      this.size.set(0);
       return retval;
-      
     } finally {
       this.lock.releaseWriteLock();
     }
@@ -127,12 +118,8 @@
       if(snapshot == null) {
         throw new IOException("Snapshot not present!");
       }
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("deleting snapshot");
-      }
-      
-      for(Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator(); 
-          it.hasNext(); ) {
+      for (Iterator<TreeMap<HStoreKey, byte []>> it = history.iterator(); 
+          it.hasNext();) {
         TreeMap<HStoreKey, byte []> cur = it.next();
         if (snapshot == cur) {
           it.remove();
@@ -140,9 +127,6 @@
         }
       }
       this.snapshot = null;
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("snapshot deleted");
-      }
     } finally {
       this.lock.releaseWriteLock();
     }
@@ -161,18 +145,28 @@
     try {
       for (Map.Entry<Text, byte []> es: columns.entrySet()) {
         HStoreKey key = new HStoreKey(row, es.getKey(), timestamp);
-        memcache.put(key, es.getValue());
+        byte [] value = es.getValue();
+        this.size.addAndGet(key.getSize());
+        this.size.addAndGet(((value == null)? 0: value.length));
+        memcache.put(key, value);
       }
     } finally {
       this.lock.releaseWriteLock();
     }
   }
+  
+  /**
+   * @return Approximate size in bytes of payload carried by this memcache.
+   */
+  public long getSize() {
+    return this.size.get();
+  }
 
   /**
    * Look back through all the backlog TreeMaps to find the target.
    * @param key
    * @param numVersions
-   * @return An array of byte arrays orderded by timestamp.
+   * @return An array of byte arrays ordered by timestamp.
    */
   public byte [][] get(final HStoreKey key, final int numVersions) {
     List<byte []> results = new ArrayList<byte[]>();
@@ -348,7 +342,7 @@
     }
     
     /**
-     * Get the next value from the specified iterater.
+     * Get the next value from the specified iterator.
      * 
      * @param i Which iterator to fetch next value from
      * @return true if there is more data available

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Thu Jul 26 14:30:55 2007
@@ -24,12 +24,12 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
@@ -75,10 +76,10 @@
   static String SPLITDIR = "splits";
   static String MERGEDIR = "merges";
   static String TMPREGION_PREFIX = "tmpregion_";
-  static int MIN_COMMITS_FOR_COMPACTION = 10;
   static Random rand = new Random();
-
   static final Log LOG = LogFactory.getLog(HRegion.class);
+  final AtomicBoolean closed = new AtomicBoolean(false);
+  private long noFlushCount = 0;
 
   /**
    * Deletes all the files for a HRegion
@@ -90,7 +91,7 @@
    */
   static void deleteRegion(FileSystem fs, Path baseDirectory,
       Text regionName) throws IOException {
-    LOG.debug("Deleting region " + regionName);
+    LOG.info("Deleting region " + regionName);
     fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
   }
   
@@ -263,7 +264,7 @@
   Map<Long, TreeMap<Text, byte []>> targetColumns 
     = new HashMap<Long, TreeMap<Text, byte []>>();
   
-  HMemcache memcache;
+  final HMemcache memcache;
 
   Path rootDir;
   HLog log;
@@ -275,19 +276,16 @@
   static class WriteState {
     volatile boolean writesOngoing;
     volatile boolean writesEnabled;
-    volatile boolean closed;
     WriteState() {
       this.writesOngoing = true;
       this.writesEnabled = true;
-      this.closed = false;
     }
   }
   
   volatile WriteState writestate = new WriteState();
-  int recentCommits = 0;
-  volatile int commitsSinceFlush = 0;
 
-  int maxUnflushedEntries = 0;
+  final int memcacheFlushSize;
+  final int blockingMemcacheSize;
   int compactionThreshold = 0;
   private final HLocking lock = new HLocking();
   private long desiredMaxFileSize;
@@ -330,7 +328,6 @@
 
     this.writestate.writesOngoing = true;
     this.writestate.writesEnabled = true;
-    this.writestate.closed = false;
 
     // Declare the regionName.  This is a unique string for the region, used to 
     // build a unique filename.
@@ -349,7 +346,7 @@
         this.regionInfo.tableDesc.families().entrySet()) {
       Text colFamily = HStoreKey.extractFamily(e.getKey());
       stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
-          e.getValue(), fs, oldLogFile, conf));
+        e.getValue(), fs, oldLogFile, conf));
     }
 
     // Get rid of any splits or merges that were lost in-progress
@@ -362,14 +359,16 @@
       fs.delete(merges);
     }
 
-    // By default, we flush the cache after 10,000 commits
-    
-    this.maxUnflushedEntries = conf.getInt("hbase.hregion.maxunflushed", 10000);
-    
-    // By default, we compact the region if an HStore has more than 10 map files
-    
-    this.compactionThreshold =
-      conf.getInt("hbase.hregion.compactionThreshold", 10);
+    // By default, we flush the cache when 32M.
+    this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
+      1024*1024*16);
+    this.blockingMemcacheSize = this.memcacheFlushSize *
+      conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
+    
+    // By default, we compact the region if an HStore has more than
+    // MIN_COMMITS_FOR_COMPACTION map files
+    this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold",
+      3);
     
     // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
     this.desiredMaxFileSize =
@@ -387,22 +386,20 @@
 
   /** returns true if region is closed */
   boolean isClosed() {
-    boolean closed = false;
-    synchronized(writestate) {
-      closed = writestate.closed;
-    }
-    return closed;
+    return this.closed.get();
   }
   
   /**
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't 
    * service any more calls.
    *
-   * This method could take some time to execute, so don't call it from a 
+   * <p>This method could take some time to execute, so don't call it from a 
    * time-sensitive thread.
    * 
    * @return Vector of all the storage files that the HRegion's component 
-   * HStores make use of.  It's a list of HStoreFile objects.
+   * HStores make use of.  It's a list of HStoreFile objects.  Returns empty
+   * vector if already closed and null if it is judged that it should not
+   * close.
    * 
    * @throws IOException
    */
@@ -424,14 +421,14 @@
    * @throws IOException
    */
   Vector<HStoreFile> close(boolean abort) throws IOException {
+    if (isClosed()) {
+      LOG.info("region " + this.regionInfo.regionName + " already closed");
+      return new Vector<HStoreFile>();
+    }
     lock.obtainWriteLock();
     try {
       boolean shouldClose = false;
       synchronized(writestate) {
-        if(writestate.closed) {
-          LOG.info("region " + this.regionInfo.regionName + " closed");
-          return new Vector<HStoreFile>();
-        }
         while(writestate.writesOngoing) {
           try {
             writestate.wait();
@@ -443,10 +440,16 @@
         shouldClose = true;
       }
 
-      if(! shouldClose) {
+      if(!shouldClose) {
         return null;
       }
       LOG.info("closing region " + this.regionInfo.regionName);
+      
+      // Write lock means no more row locks can be given out.  Wait on
+      // outstanding row locks to come in before we close so we do not drop
+      // outstanding updates.
+      waitOnRowLocks();
+
       Vector<HStoreFile> allHStoreFiles = null;
       if (!abort) {
         // Don't flush the cache if we are aborting during a test.
@@ -459,9 +462,9 @@
         return allHStoreFiles;
       } finally {
         synchronized (writestate) {
-          writestate.closed = true;
           writestate.writesOngoing = false;
         }
+        this.closed.set(true);
         LOG.info("region " + this.regionInfo.regionName + " closed");
       }
     } finally {
@@ -476,8 +479,7 @@
    * Returns two brand-new (and open) HRegions
    */
   HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
-      throws IOException {
-    
+  throws IOException {
     if(((regionInfo.startKey.getLength() != 0)
         && (regionInfo.startKey.compareTo(midKey) > 0))
         || ((regionInfo.endKey.getLength() != 0)
@@ -486,8 +488,7 @@
         "boundaries.");
     }
 
-    LOG.info("Splitting region " + this.regionInfo.regionName);
-
+    long startTime = System.currentTimeMillis();
     Path splits = new Path(regiondir, SPLITDIR);
     if(! fs.exists(splits)) {
       fs.mkdirs(splits);
@@ -495,47 +496,44 @@
     
     long regionAId = Math.abs(rand.nextLong());
     HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc, 
-        regionInfo.startKey, midKey);
-        
+      regionInfo.startKey, midKey);
     long regionBId = Math.abs(rand.nextLong());
-    HRegionInfo regionBInfo
-      = new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
-    
+    HRegionInfo regionBInfo =
+      new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
     Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
     Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
-
     if(fs.exists(dirA) || fs.exists(dirB)) {
-      throw new IOException("Cannot split; target file collision at " + dirA 
-          + " or " + dirB);
+      throw new IOException("Cannot split; target file collision at " + dirA +
+        " or " + dirB);
     }
+
+    // We just copied most of the data. Now get whatever updates are up in
+    // the memcache (after shutting down new updates).
     
-    TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
+    // Notify the caller that we are about to close the region. This moves
+    // us ot the 'retiring' queue. Means no more updates coming in -- just
+    // whatever is outstanding.
+    listener.closing(this.getRegionName());
+
+    // Wait on the last row updates to come in.
+    LOG.debug("Starting wait on row locks.");
+    waitOnRowLocks();
 
     // Flush this HRegion out to storage, and turn off flushes
     // or compactions until close() is called.
-    
-    // TODO: flushcache can come back null if it can't do the flush. FIX.
+    LOG.debug("Calling flushcache inside closeAndSplit");
     Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
+    if (hstoreFilesToSplit == null) {
+      // It should always return a list of hstore files even if memcache is
+      // empty.  It will return null if concurrent compaction or splits which
+      // should not happen.
+      throw new NullPointerException("Flushcache did not return any files");
+    }
+    TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
     for(HStoreFile hsf: hstoreFilesToSplit) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Splitting HStore " + hsf.getRegionName() + "/" +
-          hsf.getColFamily() + "/" + hsf.fileId());
-      }
-      HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, 
-        hsf.getColFamily(), Math.abs(rand.nextLong()));
-      HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, 
-        hsf.getColFamily(), Math.abs(rand.nextLong()));
-      hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
-      alreadySplit.add(hsf);
+      alreadySplit.add(splitStoreFile(hsf, splits, regionAInfo,
+        regionBInfo, midKey));
     }
-
-    // We just copied most of the data.
-    
-    // Notify the caller that we are about to close the region
-    listener.closing(this.getRegionName());
-    
-    // Wait on the last row updates to come in.
-    waitOnRowLocks();
     
     // Now close the HRegion
     hstoreFilesToSplit = close();
@@ -546,43 +544,46 @@
     
     // Copy the small remainder
     for(HStoreFile hsf: hstoreFilesToSplit) {
-      if(! alreadySplit.contains(hsf)) {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Splitting HStore " + hsf.getRegionName() + "/"
-              + hsf.getColFamily() + "/" + hsf.fileId());
-        }
-
-        HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName, 
-            hsf.getColFamily(), Math.abs(rand.nextLong()));
-        
-        HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName, 
-            hsf.getColFamily(), Math.abs(rand.nextLong()));
-        
-        hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
+      if(!alreadySplit.contains(hsf)) {
+        splitStoreFile(hsf, splits, regionAInfo, regionBInfo, midKey);
       }
     }
 
     // Done
-
     HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
-    
     HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
 
     // Cleanup
-
-    fs.delete(splits);                          // Get rid of splits directory
-    fs.delete(regiondir);                       // and the directory for the old region
-    
+    fs.delete(splits);    // Get rid of splits directory
+    fs.delete(regiondir); // and the directory for the old region
     HRegion regions[] = new HRegion[2];
     regions[0] = regionA;
     regions[1] = regionB;
-    
     LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
       "New regions are: " + regions[0].getRegionName() + ", " +
-      regions[1].getRegionName());
-    
+      regions[1].getRegionName() + ". Took " +
+      StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
     return regions;
   }
+  
+  private HStoreFile splitStoreFile(final HStoreFile hsf, final Path splits,
+      final HRegionInfo a, final HRegionInfo b, final Text midKey)
+  throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Started splitting HStore " + hsf.getRegionName() + "/" +
+        hsf.getColFamily() + "/" + hsf.fileId());
+    }
+    HStoreFile dstA = new HStoreFile(conf, splits, a.regionName, 
+      hsf.getColFamily(), Math.abs(rand.nextLong()));
+    HStoreFile dstB = new HStoreFile(conf, splits, b.regionName, 
+      hsf.getColFamily(), Math.abs(rand.nextLong()));
+    hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finished splitting HStore " + hsf.getRegionName() + "/" +
+        hsf.getColFamily() + "/" + hsf.fileId());
+    }
+    return hsf;
+  }
 
   //////////////////////////////////////////////////////////////////////////////
   // HRegion accessors
@@ -646,10 +647,10 @@
   //////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Iterates through all the HStores and finds the one with the largest MapFile
-   * size. If the size is greater than the (currently hard-coded) threshold,
-   * returns true indicating that the region should be split. The midKey for the
-   * largest MapFile is returned through the midKey parameter.
+   * Iterates through all the HStores and finds the one with the largest
+   * MapFile size. If the size is greater than the (currently hard-coded)
+   * threshold, returns true indicating that the region should be split. The
+   * midKey for the largest MapFile is returned through the midKey parameter.
    * 
    * @param midKey      - (return value) midKey of the largest MapFile
    * @return            - true if the region should be split
@@ -659,16 +660,27 @@
     try {
       Text key = new Text();
       long maxSize = 0;
+      long aggregateSize = 0;
       for(HStore store: stores.values()) {
         long size = store.getLargestFileSize(key);
+        aggregateSize += size;
         if(size > maxSize) {                      // Largest so far
           maxSize = size;
           midKey.set(key);
         }
       }
-
-      return (maxSize >
-        (this.desiredMaxFileSize + (this.desiredMaxFileSize / 2)));
+      long triggerSize =
+        this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
+      boolean split = (maxSize >= triggerSize || aggregateSize >= triggerSize);
+      if (split) {
+        LOG.info("Splitting " + getRegionName().toString() +
+          " because largest file is " + StringUtils.humanReadableInt(maxSize) +
+          ", aggregate size is " +
+          StringUtils.humanReadableInt(aggregateSize) +
+          " and desired size is " +
+          StringUtils.humanReadableInt(this.desiredMaxFileSize));
+      }
+      return split;
     } finally {
       lock.releaseReadLock();
     }
@@ -706,6 +718,9 @@
       for(HStore store: stores.values()) {
         if(store.getNMaps() > this.compactionThreshold) {
           needsCompaction = true;
+          LOG.info(getRegionName().toString() + " needs compaction because " +
+            store.getNMaps() + " store files present and threshold is " +
+            this.compactionThreshold);
           break;
         }
       }
@@ -719,46 +734,50 @@
    * Compact all the stores.  This should be called periodically to make sure 
    * the stores are kept manageable.  
    *
-   * This operation could block for a long time, so don't call it from a 
+   * <p>This operation could block for a long time, so don't call it from a 
    * time-sensitive thread.
    *
-   * If it returns TRUE, the compaction has completed.
-   * 
-   * If it returns FALSE, the compaction was not carried out, because the 
-   * HRegion is busy doing something else storage-intensive (like flushing the 
-   * cache).  The caller should check back later.
+   * @return Returns TRUE if the compaction has completed.  FALSE, if the
+   * compaction was not carried out, because the HRegion is busy doing
+   * something else storage-intensive (like flushing the cache). The caller
+   * should check back later.
    */
   boolean compactStores() throws IOException {
     boolean shouldCompact = false;
+    if (this.closed.get()) {
+      return shouldCompact;
+    }
     lock.obtainReadLock();
     try {
       synchronized (writestate) {
         if ((!writestate.writesOngoing) &&
-            writestate.writesEnabled &&
-            (!writestate.closed) &&
-            recentCommits > MIN_COMMITS_FOR_COMPACTION) {
+            writestate.writesEnabled) {
           writestate.writesOngoing = true;
           shouldCompact = true;
         }
       }
 
       if (!shouldCompact) {
-        LOG.info("not compacting region " + this.regionInfo);
+        LOG.info("NOT compacting region " +
+          this.regionInfo.getRegionName().toString());
         return false;
       }
 
-      LOG.info("starting compaction on region " + this.regionInfo);
+      long startTime = System.currentTimeMillis();
+      LOG.info("starting compaction on region " +
+        this.regionInfo.getRegionName().toString());
       for (HStore store : stores.values()) {
         store.compact();
       }
-      LOG.info("compaction completed on region " + this.regionInfo);
+      LOG.info("compaction completed on region " +
+        this.regionInfo.getRegionName().toString() + ". Took " +
+        StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
       return true;
       
     } finally {
       lock.releaseReadLock();
       synchronized (writestate) {
         writestate.writesOngoing = false;
-        recentCommits = 0;
         writestate.notifyAll();
       }
     }
@@ -769,50 +788,64 @@
    * only take if there have been a lot of uncommitted writes.
    */
   void optionallyFlush() throws IOException {
-    if(commitsSinceFlush > maxUnflushedEntries) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Flushing cache. Number of commits is: " + commitsSinceFlush);
-      }
+    if(this.memcache.getSize() > this.memcacheFlushSize) {
+      flushcache(false);
+    } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
+      LOG.info("Optional flush called " + this.noFlushCount +
+        " times when data present without flushing.  Forcing one.");
       flushcache(false);
+      if (this.memcache.getSize() > 0) {
+        // Only increment if something in the cache.
+        // Gets zero'd when a flushcache is called.
+        this.noFlushCount++;
+      }
     }
   }
 
   /**
-   * Flush the cache.  This is called periodically to minimize the amount of log
-   * processing needed upon startup.
+   * Flush the cache.  This is called periodically to minimize the amount of
+   * log processing needed upon startup.
    * 
-   * The returned Vector is a list of all the files used by the component HStores.
-   * It is a list of HStoreFile objects.  If the returned value is NULL, then the
-   * flush could not be executed, because the HRegion is busy doing something
-   * else storage-intensive.  The caller should check back later.
+   * <p>The returned Vector is a list of all the files used by the component
+   * HStores. It is a list of HStoreFile objects.  If the returned value is
+   * NULL, then the flush could not be executed, because the HRegion is busy
+   * doing something else storage-intensive.  The caller should check back
+   * later.
    *
-   * The 'disableFutureWrites' boolean indicates that the caller intends to 
+   * <p>This method may block for some time, so it should not be called from a 
+   * time-sensitive thread.
+   * 
+   * @param disableFutureWrites indicates that the caller intends to 
    * close() the HRegion shortly, so the HRegion should not take on any new and 
-   * potentially long-lasting disk operations.  This flush() should be the final
+   * potentially long-lasting disk operations. This flush() should be the final
    * pre-close() disk operation.
-   *
-   * This method may block for some time, so it should not be called from a 
-   * time-sensitive thread.
+   * 
+   * @return List of store files including new flushes, if any.  If no flushes
+   * because  memcache is null, returns all current store files.  Returns
+   * null if no flush (Writes are going on elsewhere -- concurrently we are
+   * compacting or splitting).
    */
-  Vector<HStoreFile> flushcache(boolean disableFutureWrites) throws IOException {
+  Vector<HStoreFile> flushcache(boolean disableFutureWrites)
+  throws IOException {
+    if (this.closed.get()) {
+      return null;
+    }
+    this.noFlushCount = 0;
     boolean shouldFlush = false;
     synchronized(writestate) {
-      if((! writestate.writesOngoing)
-          && writestate.writesEnabled
-          && (! writestate.closed)) {
-        
+      if((!writestate.writesOngoing) &&
+          writestate.writesEnabled) {
         writestate.writesOngoing = true;
         shouldFlush = true;
-        
         if(disableFutureWrites) {
           writestate.writesEnabled = false;
         }
       }
     }
-    
-    if(! shouldFlush) {
+
+    if(!shouldFlush) {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("not flushing cache for region " +
+        LOG.debug("NOT flushing memcache for region " +
           this.regionInfo.regionName);
       }
       return null;  
@@ -820,7 +853,6 @@
     
     try {
       return internalFlushcache();
-
     } finally {
       synchronized (writestate) {
         writestate.writesOngoing = false;
@@ -831,73 +863,69 @@
 
   /**
    * Flushing the cache is a little tricky. We have a lot of updates in the
-   * HMemcache, all of which have also been written to the log. We need to write
-   * those updates in the HMemcache out to disk, while being able to process
-   * reads/writes as much as possible during the flush operation. Also, the log
-   * has to state clearly the point in time at which the HMemcache was flushed.
-   * (That way, during recovery, we know when we can rely on the on-disk flushed
-   * structures and when we have to recover the HMemcache from the log.)
-   * 
-   * So, we have a three-step process:
+   * HMemcache, all of which have also been written to the log. We need to
+   * write those updates in the HMemcache out to disk, while being able to
+   * process reads/writes as much as possible during the flush operation. Also,
+   * the log has to state clearly the point in time at which the HMemcache was
+   * flushed. (That way, during recovery, we know when we can rely on the
+   * on-disk flushed structures and when we have to recover the HMemcache from
+   * the log.)
+   * 
+   * <p>So, we have a three-step process:
+   * 
+   * <ul><li>A. Flush the memcache to the on-disk stores, noting the current
+   * sequence ID for the log.<li>
+   * 
+   * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
+   * ID that was current at the time of memcache-flush.</li>
+   * 
+   * <li>C. Get rid of the memcache structures that are now redundant, as
+   * they've been flushed to the on-disk HStores.</li>
+   * </ul>
+   * <p>This method is protected, but can be accessed via several public
+   * routes.
    * 
-   * A. Flush the memcache to the on-disk stores, noting the current sequence ID
-   * for the log.
+   * <p> This method may block for some time.
    * 
-   * B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence ID
-   * that was current at the time of memcache-flush.
-   * 
-   * C. Get rid of the memcache structures that are now redundant, as they've
-   * been flushed to the on-disk HStores.
-   * 
-   * This method is protected, but can be accessed via several public routes.
-   * 
-   * This method may block for some time.
+   * @return List of store files including just-made new flushes per-store. If
+   * not flush, returns list of all store files.
    */
   Vector<HStoreFile> internalFlushcache() throws IOException {
-    Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
-    
+
+    long startTime = -1;
     if(LOG.isDebugEnabled()) {
-      LOG.debug("flushing cache for region " + this.regionInfo.regionName);
+      startTime = System.currentTimeMillis();
+      LOG.debug("Started memcache flush for region " +
+        this.regionInfo.regionName + ". Size " +
+        StringUtils.humanReadableInt(this.memcache.getSize()));
     }
 
-    // We pass the log to the HMemcache, so we can lock down 
-    // both simultaneously.  We only have to do this for a moment:
-    // we need the HMemcache state at the time of a known log sequence
-    // number.  Since multiple HRegions may write to a single HLog,
-    // the sequence numbers may zoom past unless we lock it.
+    // We pass the log to the HMemcache, so we can lock down both
+    // simultaneously.  We only have to do this for a moment: we need the
+    // HMemcache state at the time of a known log sequence number. Since
+    // multiple HRegions may write to a single HLog, the sequence numbers may
+    // zoom past unless we lock it.
     //
-    // When execution returns from snapshotMemcacheForLog()
-    // with a non-NULL value, the HMemcache will have a snapshot 
-    // object stored that must be explicitly cleaned up using
-    // a call to deleteSnapshot().
-    
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("starting memcache snapshot");
-    }
-    
+    // When execution returns from snapshotMemcacheForLog() with a non-NULL
+    // value, the HMemcache will have a snapshot object stored that must be
+    // explicitly cleaned up using a call to deleteSnapshot().
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
-    TreeMap<HStoreKey, byte []> memcacheSnapshot = retval.memcacheSnapshot;
-    if(memcacheSnapshot == null) {
-      for(HStore hstore: stores.values()) {
-        Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
-        allHStoreFiles.addAll(0, hstoreFiles);
-      }
-      return allHStoreFiles;
+    if(retval == null || retval.memcacheSnapshot == null) {
+      return getAllStoreFiles();
     }
-    
     long logCacheFlushId = retval.sequenceId;
-
-    // A.  Flush memcache to all the HStores.
-    
     if(LOG.isDebugEnabled()) {
-      LOG.debug("flushing memcache to HStores");
+      LOG.debug("Snapshotted memcache for region " +
+        this.regionInfo.regionName + ". Sequence id " + retval.sequenceId);
     }
-    
-    for(Iterator<HStore> it = stores.values().iterator(); it.hasNext(); ) {
-      HStore hstore = it.next();
-      Vector<HStoreFile> hstoreFiles 
-        = hstore.flushCache(memcacheSnapshot, logCacheFlushId);
-      
+
+    // A.  Flush memcache to all the HStores.
+    // Keep running vector of all store files that includes both old and the
+    // just-made new flush store file.
+    Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
+    for(HStore hstore: stores.values()) {
+      Vector<HStoreFile> hstoreFiles
+        = hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
       allHStoreFiles.addAll(0, hstoreFiles);
     }
 
@@ -906,27 +934,32 @@
     //     and that all updates to the log for this regionName that have lower 
     //     log-sequence-ids can be safely ignored.
     
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("writing flush cache complete to log");
-    }
-    
     log.completeCacheFlush(this.regionInfo.regionName,
-        regionInfo.tableDesc.getName(), logCacheFlushId);
+      regionInfo.tableDesc.getName(), logCacheFlushId);
 
     // C. Delete the now-irrelevant memcache snapshot; its contents have been 
     //    dumped to disk-based HStores.
-    
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("deleting memcache snapshot");
-    }
-    
     memcache.deleteSnapshot();
 
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("cache flush complete for region " + this.regionInfo.regionName);
+    // D. Finally notify anyone waiting on memcache to clear:
+    // e.g. checkResources().
+    synchronized(this) {
+      notifyAll();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finished memcache flush for region " +
+        this.regionInfo.regionName + " in " +
+          (System.currentTimeMillis() - startTime) + "ms");
+    }
+    return allHStoreFiles;
+  }
+  
+  private Vector<HStoreFile> getAllStoreFiles() {
+    Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
+    for(HStore hstore: stores.values()) {
+      Vector<HStoreFile> hstoreFiles = hstore.getAllMapFiles();
+      allHStoreFiles.addAll(0, hstoreFiles);
     }
-    
-    this.commitsSinceFlush = 0;
     return allHStoreFiles;
   }
 
@@ -947,9 +980,10 @@
 
   /** Fetch multiple versions of a single data item, with timestamp. */
   byte [][] get(Text row, Text column, long timestamp, int numVersions) 
-  throws IOException {  
-    if(writestate.closed) {
-      throw new IOException("HRegion is closed.");
+  throws IOException {
+    if (this.closed.get()) {
+      throw new IOException("Region " + this.getRegionName().toString() +
+        " closed");
     }
 
     // Make sure this is a valid row and valid column
@@ -1065,17 +1099,75 @@
    * after a specified quiet period.
    * 
    * @param row Row to update
-   * @return lockid
+   * @return lock id
    * @throws IOException
    * @see #put(long, Text, byte[])
    */
   public long startUpdate(Text row) throws IOException {
-    // We obtain a per-row lock, so other clients will block while one client
-    // performs an update.  The read lock is released by the client calling
-    // #commit or #abort or if the HRegionServer lease on the lock expires.
-    // See HRegionServer#RegionListener for how the expire on HRegionServer
-    // invokes a HRegion#abort.
-    return obtainRowLock(row);
+    // Do a rough check that we have resources to accept a write.  The check is
+    // 'rough' in that between the resource check and the call to obtain a 
+    // read lock, resources may run out.  For now, the thought is that this
+    // will be extremely rare; we'll deal with it when it happens.
+    checkResources();
+
+    if (this.closed.get()) {
+      throw new IOException("Region " + this.getRegionName().toString() +
+        " closed");
+    }
+
+    // Get a read lock. We will not be able to get one if we are closing or
+    // if this region is being split.  In neither case should we be allowing
+    // updates.
+    this.lock.obtainReadLock();
+    try {
+      // We obtain a per-row lock, so other clients will block while one client
+      // performs an update. The read lock is released by the client calling
+      // #commit or #abort or if the HRegionServer lease on the lock expires.
+      // See HRegionServer#RegionListener for how the expire on HRegionServer
+      // invokes a HRegion#abort.
+      return obtainRowLock(row);
+    } finally {
+      this.lock.releaseReadLock();
+    }
+  }
+  
+  /*
+   * Check if resources to support an update.
+   * 
+   * For now, just checks memcache saturation.
+   * 
+   * Here we synchronize on HRegion, a broad scoped lock.  Its appropriate
+   * given we're figuring in here whether this region is able to take on
+   * writes.  This is only method with a synchronize (at time of writing),
+   * this and the synchronize on 'this' inside in internalFlushCache to send
+   * the notify.
+   */
+  private synchronized void checkResources() {
+    if (checkCommitsSinceFlush()) {
+      return;
+    }
+    
+    LOG.warn("Blocking updates for '" + Thread.currentThread().getName() +
+      "': Memcache size " +
+      StringUtils.humanReadableInt(this.memcache.getSize()) +
+      " is >= than blocking " +
+      StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
+    while (!checkCommitsSinceFlush()) {
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        // continue;
+      }
+    }
+    LOG.warn("Unblocking updates for '" + Thread.currentThread().getName() +
+      "'");
+  }
+  
+  /*
+   * @return True if commits since flush is under the blocking threshold.
+   */
+  private boolean checkCommitsSinceFlush() {
+    return this.memcache.getSize() < this.blockingMemcacheSize;
   }
 
   /**
@@ -1142,11 +1234,11 @@
         throw new LockException("Locking error: put operation on lock " +
             lockid + " unexpected aborted by another thread");
       }
-      
-      TreeMap<Text, byte []> targets = this.targetColumns.get(lockid);
+      Long lid = Long.valueOf(lockid);
+      TreeMap<Text, byte []> targets = this.targetColumns.get(lid);
       if (targets == null) {
         targets = new TreeMap<Text, byte []>();
-        this.targetColumns.put(lockid, targets);
+        this.targetColumns.put(lid, targets);
       }
       targets.put(targetCol, val);
     }
@@ -1207,18 +1299,17 @@
     // hasn't aborted/committed the write-operation
     synchronized(row) {
       // Add updates to the log and add values to the memcache.
-      TreeMap<Text, byte []> columns =  this.targetColumns.get(lockid);
+      Long lid = Long.valueOf(lockid);
+      TreeMap<Text, byte []> columns =  this.targetColumns.get(lid);
       if (columns != null && columns.size() > 0) {
         log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
           row, columns, timestamp);
         memcache.add(row, columns, timestamp);
         // OK, all done!
       }
-      targetColumns.remove(lockid);
+      targetColumns.remove(lid);
       releaseRowLock(row);
     }
-    recentCommits++;
-    this.commitsSinceFlush++;
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -1284,11 +1375,11 @@
         }
       }
       
-      long lockid = Math.abs(rand.nextLong());
-      rowsToLocks.put(row, lockid);
-      locksToRows.put(lockid, row);
+      Long lid = Long.valueOf(Math.abs(rand.nextLong()));
+      rowsToLocks.put(row, lid);
+      locksToRows.put(lid, row);
       rowsToLocks.notifyAll();
-      return lockid;
+      return lid.longValue();
     }
   }
   
@@ -1451,7 +1542,6 @@
               // If we are doing a wild card match or there are multiple
               // matchers per column, we need to scan all the older versions of 
               // this row to pick up the rest of the family members
-
               if (!wildcardMatch
                   && !multipleMatchers
                   && (keys[i].getTimestamp() != chosenTimestamp)) {
@@ -1467,7 +1557,6 @@
               // but this had the effect of overwriting newer
               // values with older ones. So now we only insert
               // a result if the map does not contain the key.
-
               for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
                 if (!filtered && moreToFollow &&
                     !results.containsKey(e.getKey())) {
@@ -1516,7 +1605,9 @@
           }
         }
         if (LOG.isDebugEnabled()) {
-          LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
+          if (this.dataFilter != null) {
+            LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
+          }
         }
       }
       
@@ -1640,7 +1731,10 @@
     client.put(lockid, COL_STARTCODE,
       String.valueOf(startCode).getBytes(UTF8_ENCODING));
     client.commit(lockid);
-    LOG.info("Added region " + region.getRegionName() + " to table " + table);
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Added region " + region.getRegionName() + " to table " +
+        table);
+    }
   }
   
   /**
@@ -1659,7 +1753,9 @@
     client.delete(lockid, COL_SERVER);
     client.delete(lockid, COL_STARTCODE);
     client.commit(lockid);
-    LOG.info("Removed " + regionName + " from table " + table);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removed " + regionName + " from table " + table);
+    }
   }
   
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Thu Jul 26 14:30:55 2007
@@ -114,7 +114,8 @@
         // regions.
         retiringRegions.put(regionName, onlineRegions.remove(regionName));
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding " + regionName + " to retiringRegions");
+          LOG.debug(regionName.toString() + "closing (" +
+            "Adding to retiringRegions)");
         }
       } finally {
         lock.writeLock().unlock();
@@ -129,7 +130,7 @@
       try {
         retiringRegions.remove(regionName);
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Removing " + regionName + " from retiringRegions");
+          LOG.debug(regionName.toString() + " closed");
         }
       } finally {
         lock.writeLock().unlock();
@@ -140,7 +141,7 @@
      * {@inheritDoc}
      */
     public void run() {
-      while(! stopRequested) {
+      while(!stopRequested) {
         long startTime = System.currentTimeMillis();
         synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
           // Grab a list of regions to check
@@ -151,21 +152,19 @@
           } finally {
             lock.readLock().unlock();
           }
-
           try {
             for(HRegion cur: regionsToCheck) {
               if(cur.isClosed()) {
                 continue;                               // Skip if closed
               }
-
-              if(cur.needsCompaction()) {
-                // Best time to split a region is right after compaction
-                if(cur.compactStores()) {
-                  Text midKey = new Text();
-                  if(cur.needsSplit(midKey)) {
-                    split(cur, midKey);
-                  }
-                }
+              if (cur.needsCompaction()) {
+                cur.compactStores();
+              }
+              // After compaction, it probably needs splitting.  May also need
+              // splitting just because one of the memcache flushes was big.
+              Text midKey = new Text();
+              if (cur.needsSplit(midKey)) {
+                split(cur, midKey);
               }
             }
           } catch(IOException e) {
@@ -196,23 +195,21 @@
     throws IOException {
       final HRegionInfo oldRegionInfo = region.getRegionInfo();
       final HRegion[] newRegions = region.closeAndSplit(midKey, this);
-
+      
       // When a region is split, the META table needs to updated if we're
       // splitting a 'normal' region, and the ROOT table needs to be
       // updated if we are splitting a META region.
-      
       final Text tableToUpdate =
         region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ?
             ROOT_TABLE_NAME : META_TABLE_NAME;
       LOG.info("Updating " + tableToUpdate + " with region split info");
 
       // Remove old region from META
-      
       for (int tries = 0; tries < numRetries; tries++) {
         try {
           HRegion.removeRegionFromMETA(client, tableToUpdate,
               region.getRegionName());
-          
+          break;
         } catch (IOException e) {
           if(tries == numRetries - 1) {
             if(e instanceof RemoteException) {
@@ -224,13 +221,12 @@
       }
       
       // Add new regions to META
-      
       for (int i = 0; i < newRegions.length; i++) {
         for (int tries = 0; tries < numRetries; tries ++) {
           try {
             HRegion.addRegionToMETA(client, tableToUpdate, newRegions[i],
                 serverInfo.getServerAddress(), serverInfo.getStartCode());
-
+            break;
           } catch(IOException e) {
             if(tries == numRetries - 1) {
               if(e instanceof RemoteException) {
@@ -243,7 +239,6 @@
       }
           
       // Now tell the master about the new regions
-      
       if (LOG.isDebugEnabled()) {
         LOG.debug("Reporting region split to master");
       }
@@ -253,9 +248,8 @@
           " successful. Old region=" + oldRegionInfo.getRegionName() +
           ", new regions: " + newRegions[0].getRegionName() + ", " +
           newRegions[1].getRegionName());
-
-      // Finally, start serving the new regions
       
+      // Finally, start serving the new regions
       lock.writeLock().lock();
       try {
         onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]);
@@ -271,7 +265,13 @@
   private final Flusher cacheFlusher;
   private final Thread cacheFlusherThread;
   protected final Integer cacheFlusherLock = new Integer(0);
-  /** Runs periodically to flush the memcache */
+  
+  /* Runs periodically to flush memcache.
+   * 
+   * Memcache flush is also called just before compaction and just before
+   * split so memcache is best prepared for the the long trip across
+   * compactions/splits during which it will not be able to flush to disk.
+   */
   class Flusher implements Runnable {
     /**
      * {@inheritDoc}
@@ -293,7 +293,6 @@
           }
 
           // Flush them, if necessary
-
           for(HRegion cur: toFlush) {
             if(cur.isClosed()) {                // Skip if closed
               continue;
@@ -305,7 +304,6 @@
               if (iex instanceof RemoteException) {
                 try {
                   iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex);
-                  
                 } catch (IOException x) {
                   iex = x;
                 }
@@ -316,9 +314,8 @@
         }
         
         // Sleep
-        long waitTime = stopRequested ? 0
-            : threadWakeFrequency - (System.currentTimeMillis() - startTime);
-        
+        long waitTime = stopRequested? 0:
+          threadWakeFrequency - (System.currentTimeMillis() - startTime);
         if(waitTime > 0) {
           try {
             Thread.sleep(waitTime);
@@ -358,9 +355,7 @@
           int nEntries = log.getNumEntries();
           if(nEntries > this.maxLogEntries) {
             try {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Rolling log. Number of entries is: " + nEntries);
-              }
+              LOG.info("Rolling hlog. Number of entries: " + nEntries);
               log.rollWriter();
             } catch (IOException iex) {
               if (iex instanceof RemoteException) {
@@ -439,7 +434,7 @@
       15 * 1000);
     this.splitOrCompactCheckFrequency =
       conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
-      60 * 1000);
+      30 * 1000);
 
     // Cache flushing
     this.cacheFlusher = new Flusher();
@@ -465,20 +460,21 @@
       // rather than 'localhost' or 0.0.0.0 or 127.0.0.1 in it.
       String realIP = DNS.getDefaultIP(
         conf.get("dfs.datanode.dns.interface","default"));
-
       this.serverInfo = new HServerInfo(new HServerAddress(
-          new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
-          this.rand.nextLong());
-
+        new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
+        this.rand.nextLong());
       Path logdir = new Path(rootDir, "log" + "_" + realIP + "_" +
-          this.serverInfo.getServerAddress().getPort());
+        this.serverInfo.getServerAddress().getPort());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Log dir " + logdir);
+      }
       
       // Logging
       this.fs = FileSystem.get(conf);
       if(fs.exists(logdir)) {
-        throw new RegionServerRunningException("region server already running at " +
-          this.serverInfo.getServerAddress().toString() + " because logdir " +
-          logdir.toString() + " exists");
+        throw new RegionServerRunningException("region server already " +
+          "running at " + this.serverInfo.getServerAddress().toString() +
+          " because logdir " + logdir.toString() + " exists");
       }
       
       this.log = new HLog(fs, logdir, conf);
@@ -579,7 +575,8 @@
 
     try {
       this.server.start();
-      LOG.info("HRegionServer started at: " + serverInfo.getServerAddress().toString());
+      LOG.info("HRegionServer started at: " +
+        serverInfo.getServerAddress().toString());
     } catch(IOException e) {
       stopRequested = true;
       if (e instanceof RemoteException) {
@@ -759,8 +756,8 @@
             region.getRegionInfo());
         }
 
-        LOG.info("telling master that region server is shutting down at: "
-            + serverInfo.getServerAddress().toString());
+        LOG.info("telling master that region server is shutting down at: " +
+          serverInfo.getServerAddress().toString());
         hbaseMaster.regionServerReport(serverInfo, exitMsg);
       } catch (IOException e) {
         if (e instanceof RemoteException) {
@@ -947,7 +944,6 @@
       }
       try {
         region.close(abortRequested);
-        LOG.debug("region closed " + region.getRegionName());
       } catch (IOException e) {
         if (e instanceof RemoteException) {
           try {
@@ -978,7 +974,8 @@
   /**
    * {@inheritDoc}
    */
-  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException {
+  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
+  throws IOException {
     for(Map.Entry<Text, ArrayList<BatchOperation>> e: b) {
       Text row = e.getKey();
       long clientid = rand.nextLong();
@@ -1027,7 +1024,8 @@
   /**
    * {@inheritDoc}
    */
-  public KeyedData[] getRow(final Text regionName, final Text row) throws IOException {
+  public KeyedData[] getRow(final Text regionName, final Text row)
+  throws IOException {
     HRegion region = getRegion(regionName);
     TreeMap<Text, byte[]> map = region.getFull(row);
     KeyedData result[] = new KeyedData[map.size()];
@@ -1066,13 +1064,7 @@
         byte [] val = e.getValue();
         if (DELETE_BYTES.compareTo(val) == 0) {
           // Column value is deleted. Don't return it.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("skipping deleted value for key: " + k.toString());
-          }
           continue;
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("adding value for key: " + k.toString());
         }
         values.add(new KeyedData(k, val));
       }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Thu Jul 26 14:30:55 2007
@@ -45,6 +45,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.StringUtils;
 
 import org.onelab.filter.*;
 
@@ -78,6 +79,7 @@
   Path loginfodir;
   Path filterDir;
   Filter bloomFilter;
+  private String storeName;
 
   Integer compactLock = new Integer(0);
   Integer flushLock = new Integer(0);
@@ -129,6 +131,8 @@
     this.family = family;
     this.familyName = HStoreKey.extractFamily(this.family.getName());
     this.compression = SequenceFile.CompressionType.NONE;
+    this.storeName = this.regionName.toString() + "/" +
+      this.familyName.toString();
     
     if(family.getCompression() != HColumnDescriptor.CompressionType.NONE) {
       if(family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
@@ -161,7 +165,7 @@
     }
 
     if(LOG.isDebugEnabled()) {
-      LOG.debug("starting HStore for " + regionName + "/"+ familyName);
+      LOG.debug("Starting HStore for " + this.storeName);
     }
     
     // Either restart or get rid of any leftover compaction work.  Either way, 
@@ -216,16 +220,11 @@
 
     // Finally, start up all the map readers! (There should be just one at this 
     // point, as we've compacted them all.)
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("starting map readers");
-    }
     for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
       // TODO - is this really necessary?  Don't I do this inside compact()?
       maps.put(e.getKey(),
         getMapFileReader(e.getValue().getMapFilePath().toString()));
     }
-    
-    LOG.info("HStore online for " + this.regionName + "/" + this.familyName);
   }
   
   /*
@@ -239,9 +238,6 @@
   private void doReconstructionLog(final Path reconstructionLog,
       final long maxSeqID)
   throws UnsupportedEncodingException, IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("reading reconstructionLog");
-    }
     if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
       return;
     }
@@ -306,7 +302,7 @@
     Path filterFile = new Path(filterDir, BLOOMFILTER_FILE_NAME);
     if(fs.exists(filterFile)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("loading bloom filter for " + family.getName());
+        LOG.debug("loading bloom filter for " + this.storeName);
       }
 
       switch(family.bloomFilter.filterType) {
@@ -328,7 +324,7 @@
       
     } else {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("creating bloom filter for " + family.getName());
+        LOG.debug("creating bloom filter for " + this.storeName);
       }
 
       switch(family.bloomFilter.filterType) {
@@ -357,7 +353,7 @@
    */
   private void flushBloomFilter() throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("flushing bloom filter for " + family.getName());
+      LOG.debug("flushing bloom filter for " + this.storeName);
     }
     FSDataOutputStream out =
       fs.create(new Path(filterDir, BLOOMFILTER_FILE_NAME));
@@ -365,7 +361,7 @@
     bloomFilter.write(out);
     out.close();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("flushed bloom filter for " + family.getName());
+      LOG.debug("flushed bloom filter for " + this.storeName);
     }
   }
 
@@ -494,7 +490,9 @@
    * @throws IOException
    */
   void close() throws IOException {
-    LOG.info("closing HStore for " + this.regionName + "/" + this.familyName);
+    if (LOG.isDebugEnabled()) {
+      LOG.info("closing HStore for " + this.storeName);
+    }
     this.lock.obtainWriteLock();
     try {
       for (MapFile.Reader map: maps.values()) {
@@ -503,7 +501,7 @@
       maps.clear();
       mapFiles.clear();
       
-      LOG.info("HStore closed for " + this.regionName + "/" + this.familyName);
+      LOG.info("HStore closed for " + this.storeName);
     } finally {
       this.lock.releaseWriteLock();
     }
@@ -524,13 +522,13 @@
    *
    * Return the entire list of HStoreFiles currently used by the HStore.
    *
-   * @param inputCache          - memcache to flush
-   * @param logCacheFlushId     - flush sequence number
-   * @return - Vector of all the HStoreFiles in use
+   * @param inputCache memcache to flush
+   * @param logCacheFlushId flush sequence number
+   * @return Vector of all the HStoreFiles in use
    * @throws IOException
    */
-  Vector<HStoreFile> flushCache(TreeMap<HStoreKey, byte []> inputCache,
-      long logCacheFlushId)
+  Vector<HStoreFile> flushCache(final TreeMap<HStoreKey, byte []> inputCache,
+    final long logCacheFlushId)
   throws IOException {
     return flushCacheHelper(inputCache, logCacheFlushId, true);
   }
@@ -538,64 +536,48 @@
   Vector<HStoreFile> flushCacheHelper(TreeMap<HStoreKey, byte []> inputCache,
       long logCacheFlushId, boolean addToAvailableMaps)
   throws IOException {
-    
     synchronized(flushLock) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("flushing HStore " + this.regionName + "/" + this.familyName);
-      }
-      
       // A. Write the TreeMap out to the disk
-
-      HStoreFile flushedFile 
-        = HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
-      
+      HStoreFile flushedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
+        regionName, familyName, fs);
       Path mapfile = flushedFile.getMapFilePath();
       if(LOG.isDebugEnabled()) {
-        LOG.debug("map file is: " + mapfile.toString());
+        LOG.debug("Flushing to " + mapfile.toString());
       }
-      
       MapFile.Writer out = getMapFileWriter(mapfile.toString());
       try {
         for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
           HStoreKey curkey = es.getKey();
-          if (this.familyName.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+          if (this.familyName.
+              equals(HStoreKey.extractFamily(curkey.getColumn()))) {
             out.append(curkey, new ImmutableBytesWritable(es.getValue()));
           }
         }
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("HStore " + this.regionName + "/" + this.familyName + " flushed");
-        }
-        
       } finally {
         out.close();
       }
 
       // B. Write out the log sequence number that corresponds to this output
       // MapFile.  The MapFile is current up to and including the log seq num.
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("writing log cache flush id");
-      }
       flushedFile.writeInfo(fs, logCacheFlushId);
       
       // C. Flush the bloom filter if any
-      
       if(bloomFilter != null) {
         flushBloomFilter();
       }
 
       // D. Finally, make the new MapFile available.
-
       if(addToAvailableMaps) {
         this.lock.obtainWriteLock();
-        
         try {
           Long flushid = Long.valueOf(logCacheFlushId);
           maps.put(flushid, getMapFileReader(mapfile.toString()));
           mapFiles.put(flushid, flushedFile);
           if(LOG.isDebugEnabled()) {
-            LOG.debug("HStore available for " + this.regionName + "/"
-                + this.familyName + " flush id=" + logCacheFlushId);
+            LOG.debug("Added " + mapfile.toString() +
+                " with flush id " + logCacheFlushId + " and size " +
+              StringUtils.humanReadableInt(mapfile.getFileSystem(this.conf).
+                getContentLength(mapfile)));
           }
         } finally {
           this.lock.releaseWriteLock();
@@ -626,7 +608,7 @@
    * Compact the back-HStores.  This method may take some time, so the calling 
    * thread must be able to block for long periods.
    * 
-   * During this time, the HStore can work as usual, getting values from
+   * <p>During this time, the HStore can work as usual, getting values from
    * MapFiles and writing new MapFiles from given memcaches.
    * 
    * Existing MapFiles are not destroyed until the new compacted TreeMap is 
@@ -646,28 +628,25 @@
   
   void compactHelper(boolean deleteSequenceInfo) throws IOException {
     synchronized(compactLock) {
+      Path curCompactStore =
+        HStoreFile.getHStoreDir(compactdir, regionName, familyName);
+      fs.mkdirs(curCompactStore);
       if(LOG.isDebugEnabled()) {
-        LOG.debug("started compaction of " + this.regionName + "/" + this.familyName);
+        LOG.debug("started compaction of " + mapFiles.size() + " files in " +
+          curCompactStore.toString());
       }
-      
-      Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, familyName);
-      fs.mkdirs(curCompactStore);
-      
       try {
-        
         // Grab a list of files to compact.
-        
         Vector<HStoreFile> toCompactFiles = null;
         this.lock.obtainWriteLock();
         try {
           toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
-          
         } finally {
           this.lock.releaseWriteLock();
         }
 
-        // Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
-
+        // Compute the max-sequenceID seen in any of the to-be-compacted
+        // TreeMaps
         long maxSeenSeqID = -1;
         for (HStoreFile hsf: toCompactFiles) {
           long seqid = hsf.loadInfo(fs);
@@ -677,18 +656,13 @@
             }
           }
         }
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("max sequence id: " + maxSeenSeqID);
-        }
-        
+
         HStoreFile compactedOutputFile 
           = new HStoreFile(conf, compactdir, regionName, familyName, -1);
-        
         if(toCompactFiles.size() == 1) {
           if(LOG.isDebugEnabled()) {
-            LOG.debug("nothing to compact for " + this.regionName + "/" + this.familyName);
+            LOG.debug("nothing to compact for " + this.storeName);
           }
-          
           HStoreFile hsf = toCompactFiles.elementAt(0);
           if(hsf.loadInfo(fs) == -1) {
             return;
@@ -699,7 +673,6 @@
         MapFile.Writer compactedOut =
           getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
         try {
-
           // We create a new set of MapFile.Reader objects so we don't screw up 
           // the caching associated with the currently-loaded ones.
           //
@@ -711,15 +684,13 @@
           // lowest-ranked one.  Updates to a single row/column will appear 
           // ranked by timestamp.  This allows us to throw out deleted values or
           // obsolete versions.
-
           MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
           HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
           ImmutableBytesWritable[] vals =
             new ImmutableBytesWritable[toCompactFiles.size()];
           boolean[] done = new boolean[toCompactFiles.size()];
           int pos = 0;
-          for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
-            HStoreFile hsf = it.next();
+          for(HStoreFile hsf: toCompactFiles) {
             readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
             keys[pos] = new HStoreKey();
             vals[pos] = new ImmutableBytesWritable();
@@ -729,11 +700,6 @@
 
           // Now, advance through the readers in order.  This will have the
           // effect of a run-time sort of the entire dataset.
-
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("processing HStoreFile readers");
-          }
-          
           int numDone = 0;
           for(int i = 0; i < readers.length; i++) {
             readers[i].reset();
@@ -747,9 +713,7 @@
           Text lastRow = new Text();
           Text lastColumn = new Text();
           while(numDone < done.length) {
-
             // Find the reader with the smallest key
-
             int smallestKey = -1;
             for(int i = 0; i < readers.length; i++) {
               if(done[i]) {
@@ -758,7 +722,6 @@
               
               if(smallestKey < 0) {
                 smallestKey = i;
-              
               } else {
                 if(keys[i].compareTo(keys[smallestKey]) < 0) {
                   smallestKey = i;
@@ -767,74 +730,60 @@
             }
 
             // Reflect the current key/val in the output
-
             HStoreKey sk = keys[smallestKey];
             if(lastRow.equals(sk.getRow())
                 && lastColumn.equals(sk.getColumn())) {
-              
               timesSeen++;
-              
             } else {
               timesSeen = 1;
             }
             
             if(timesSeen <= family.getMaxVersions()) {
-
               // Keep old versions until we have maxVersions worth.
               // Then just skip them.
-
               if(sk.getRow().getLength() != 0
                   && sk.getColumn().getLength() != 0) {
-                
-                // Only write out objects which have a non-zero length key and value
-
+                // Only write out objects which have a non-zero length key and
+                // value
                 compactedOut.append(sk, vals[smallestKey]);
               }
-              
             }
 
-            //TODO: I don't know what to do about deleted values.  I currently 
+            // TODO: I don't know what to do about deleted values.  I currently 
             // include the fact that the item was deleted as a legitimate 
-            // "version" of the data.  Maybe it should just drop the deleted val?
+            // "version" of the data.  Maybe it should just drop the deleted
+            // val?
 
             // Update last-seen items
-
             lastRow.set(sk.getRow());
             lastColumn.set(sk.getColumn());
 
             // Advance the smallest key.  If that reader's all finished, then 
             // mark it as done.
-
-            if(! readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
+            if(! readers[smallestKey].next(keys[smallestKey],
+                vals[smallestKey])) {
               done[smallestKey] = true;
               readers[smallestKey].close();
               numDone++;
             }
           }
-          
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("all HStores processed");
-          }
-          
         } finally {
           compactedOut.close();
         }
 
         if(LOG.isDebugEnabled()) {
-          LOG.debug("writing new compacted HStore");
+          LOG.debug("writing new compacted HStore to " +
+            compactedOutputFile.getMapFilePath().toString());
         }
 
         // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
-
         if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
           compactedOutputFile.writeInfo(fs, maxSeenSeqID);
-          
         } else {
           compactedOutputFile.writeInfo(fs, -1);
         }
 
         // Write out a list of data files that we're replacing
-
         Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
         DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
         try {
@@ -848,18 +797,11 @@
         }
 
         // Indicate that we're done.
-
         Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
         (new DataOutputStream(fs.create(doneFile))).close();
 
         // Move the compaction into place.
-
         processReadyCompaction();
-        
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("compaction complete for " + this.regionName + "/" + this.familyName);
-        }
-
       } finally {
         fs.delete(compactdir);
       }
@@ -872,8 +814,8 @@
    *
    * It works by processing a compaction that's been written to disk.
    * 
-   * It is usually invoked at the end of a compaction, but might also be invoked
-   * at HStore startup, if the prior execution died midway through.
+   * It is usually invoked at the end of a compaction, but might also be
+   * invoked at HStore startup, if the prior execution died midway through.
    */
   void processReadyCompaction() throws IOException {
 
@@ -890,22 +832,22 @@
     // 1. Acquiring the write-lock
 
 
-    Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, familyName);
+    Path curCompactStore =
+      HStoreFile.getHStoreDir(compactdir, regionName, familyName);
     this.lock.obtainWriteLock();
     try {
       Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
-      if(! fs.exists(doneFile)) {
+      if(!fs.exists(doneFile)) {
         
         // The last execution didn't finish the compaction, so there's nothing 
         // we can do.  We'll just have to redo it. Abandon it and return.
-        
+        LOG.warn("Redoing a failed compaction");
         return;
       }
 
       // OK, there's actually compaction work that needs to be put into place.
-
       if(LOG.isDebugEnabled()) {
-        LOG.debug("compaction starting");
+        LOG.debug("Process ready compaction starting");
       }
       
       // 2. Load in the files to be deleted.
@@ -927,13 +869,14 @@
       }
 
       if(LOG.isDebugEnabled()) {
-        LOG.debug("loaded files to be deleted");
+        LOG.debug("loaded " + toCompactFiles.size() +
+          " file(s) to be deleted");
       }
       
       // 3. Unload all the replaced MapFiles.
-      
       Iterator<HStoreFile> it2 = mapFiles.values().iterator();
-      for(Iterator<MapFile.Reader> it = maps.values().iterator(); it.hasNext(); ) {
+      for(Iterator<MapFile.Reader> it = maps.values().iterator();
+          it.hasNext(); ) {
         MapFile.Reader curReader = it.next();
         HStoreFile curMapFile = it2.next();
         if(toCompactFiles.contains(curMapFile)) {
@@ -948,24 +891,18 @@
           it.remove();
         }
       }
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("unloaded existing MapFiles");
-      }
       
       // What if we crash at this point?  No big deal; we will restart
       // processReadyCompaction(), and nothing has been lost.
 
       // 4. Delete all the old files, no longer needed
-      
-      for(Iterator<HStoreFile> it = toCompactFiles.iterator(); it.hasNext(); ) {
-        HStoreFile hsf = it.next();
+      for(HStoreFile hsf: toCompactFiles) {
         fs.delete(hsf.getMapFilePath());
         fs.delete(hsf.getInfoFilePath());
       }
 
       if(LOG.isDebugEnabled()) {
-        LOG.debug("old files deleted");
+        LOG.debug("old file(s) deleted");
       }
       
       // What if we fail now?  The above deletes will fail silently. We'd better
@@ -973,41 +910,32 @@
       // something we delete, though.
 
       // 5. Moving the new MapFile into place
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("moving new MapFile into place");
-      }
-      
       HStoreFile compactedFile 
         = new HStoreFile(conf, compactdir, regionName, familyName, -1);
-      
       HStoreFile finalCompactedFile 
         = HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("moving " + compactedFile.getMapFilePath().toString() +
+          " to " + finalCompactedFile.getMapFilePath().toString());
+      }
       
-      fs.rename(compactedFile.getMapFilePath(), finalCompactedFile.getMapFilePath());
+      fs.rename(compactedFile.getMapFilePath(),
+        finalCompactedFile.getMapFilePath());
       
       // Fail here?  No problem.
-      
-      fs.rename(compactedFile.getInfoFilePath(), finalCompactedFile.getInfoFilePath());
+      fs.rename(compactedFile.getInfoFilePath(),
+        finalCompactedFile.getInfoFilePath());
 
       // Fail here?  No worries.
-      
       Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
 
       // 6. Loading the new TreeMap.
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("loading new TreeMap");
-      }
-      
       mapFiles.put(orderVal, finalCompactedFile);
       maps.put(orderVal, getMapFileReader(
-          finalCompactedFile.getMapFilePath().toString()));
-      
+        finalCompactedFile.getMapFilePath().toString()));
     } finally {
       
       // 7. Releasing the write-lock
-      
       this.lock.releaseWriteLock();
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java Thu Jul 26 14:30:55 2007
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
@@ -36,6 +38,7 @@
  * This class handles all that path-building stuff for you.
  ******************************************************************************/
 public class HStoreFile implements HConstants, WritableComparable {
+  private static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
   static final byte INFO_SEQ_NUM = 0;
   static final String HSTORE_DATFILE_PREFIX = "mapfile.dat.";
   static final String HSTORE_INFOFILE_PREFIX = "mapfile.info.";
@@ -211,29 +214,34 @@
    * brand-new HRegions.
    */
   void splitStoreFile(Text midKey, HStoreFile dstA, HStoreFile dstB,
-      FileSystem fs, Configuration conf) throws IOException {
-
+      FileSystem fs, Configuration c)
+  throws IOException {
     // Copy the appropriate tuples to one MapFile or the other.
-
-    MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), conf);
+    MapFile.Reader in = new MapFile.Reader(fs, getMapFilePath().toString(), c);
     try {
-      MapFile.Writer outA = new MapFile.Writer(conf, fs, 
+      MapFile.Writer outA = new MapFile.Writer(c, fs, 
         dstA.getMapFilePath().toString(), HStoreKey.class,
         ImmutableBytesWritable.class);
       try {
-        MapFile.Writer outB = new MapFile.Writer(conf, fs, 
+        MapFile.Writer outB = new MapFile.Writer(c, fs, 
           dstB.getMapFilePath().toString(), HStoreKey.class,
           ImmutableBytesWritable.class);
         try {
+          long count = 0;
           HStoreKey readkey = new HStoreKey();
           ImmutableBytesWritable readval = new ImmutableBytesWritable();
           while(in.next(readkey, readval)) {
-            Text key = readkey.getRow();
-            if(key.compareTo(midKey) < 0) {
+            if(readkey.getRow().compareTo(midKey) < 0) {
               outA.append(readkey, readval);
             } else {
               outB.append(readkey, readval);
             }
+            if (LOG.isDebugEnabled()) {
+              count++;
+              if ((count % 10000) == 0) {
+                LOG.debug("Write " + count + " records");
+              }
+            }
           }
         } finally {
           outB.close();
@@ -300,15 +308,12 @@
   long loadInfo(FileSystem fs) throws IOException {
     Path p = getInfoFilePath();
     DataInputStream in = new DataInputStream(fs.open(p));
-    
     try {
       byte flag = in.readByte();
       if(flag == INFO_SEQ_NUM) {
         return in.readLong();
-        
       }
       throw new IOException("Cannot process log file: " + p);
-      
     } finally {
       in.close();
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java Thu Jul 26 14:30:55 2007
@@ -95,11 +95,10 @@
   Text column;
   long timestamp;
 
+
   /** Default constructor used in conjunction with Writable interface */
   public HStoreKey() {
-    this.row = new Text();
-    this.column = new Text();
-    this.timestamp = Long.MAX_VALUE;
+    this(new Text());
   }
   
   /**
@@ -110,9 +109,7 @@
    * @param row - row key
    */
   public HStoreKey(Text row) {
-    this.row = new Text(row);
-    this.column = new Text();
-    this.timestamp = Long.MAX_VALUE;
+    this(row, Long.MAX_VALUE);
   }
   
   /**
@@ -123,9 +120,7 @@
    * @param timestamp timestamp value
    */
   public HStoreKey(Text row, long timestamp) {
-    this.row = new Text(row);
-    this.column = new Text();
-    this.timestamp = timestamp;
+    this(row, new Text(), timestamp);
   }
   
   /**
@@ -136,9 +131,7 @@
    * @param column column key
    */
   public HStoreKey(Text row, Text column) {
-    this.row = new Text(row);
-    this.column = new Text(column);
-    this.timestamp = Long.MAX_VALUE;
+    this(row, column, Long.MAX_VALUE);
   }
   
   /**
@@ -155,15 +148,20 @@
   }
   
   /**
+   * @return Approximate size in bytes of this key.
+   */
+  public long getSize() {
+    return this.row.getLength() + this.column.getLength() +
+      8 /* There is no sizeof in java. Presume long is 8 (64bit machine)*/;
+  }
+  
+  /**
    * Construct a new HStoreKey from another
    * 
    * @param other the source key
    */
   public HStoreKey(HStoreKey other) {
-    this();
-    this.row.set(other.row);
-    this.column.set(other.column);
-    this.timestamp = other.timestamp;
+    this(other.row, other.column, other.timestamp);
   }
   
   /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java?view=diff&rev=559993&r1=559992&r2=559993
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java Thu Jul 26 14:30:55 2007
@@ -51,16 +51,10 @@
   @Override
   protected void setUp() throws Exception {
     super.setUp();
-
     this.hmemcache = new HMemcache();
-
     // Set up a configuration that has configuration for a file
     // filesystem implementation.
     this.conf = new HBaseConfiguration();
-    // The test hadoop-site.xml doesn't have a default file fs
-    // implementation. Remove below when gets added.
-    this.conf.set("fs.file.impl",
-        "org.apache.hadoop.fs.LocalFileSystem");
   }
 
   /* (non-Javadoc)
@@ -140,11 +134,14 @@
     // Add some rows, run a snapshot. Do it a few times.
     for (int i = 0; i < snapshotCount; i++) {
       addRows(this.hmemcache);
+      int historyInitialSize = this.hmemcache.history.size();
       Snapshot s = runSnapshot(this.hmemcache, log);
       log.completeCacheFlush(new Text(Integer.toString(i)),
           tableName, s.sequenceId);
       // Clean up snapshot now we are done with it.
       this.hmemcache.deleteSnapshot();
+      assertTrue("History not being cleared",
+        historyInitialSize == this.hmemcache.history.size());
     }
     log.closeAndDelete();
   }



Mime
View raw message