hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r596835 [1/3] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/mapred/
Date Tue, 20 Nov 2007 21:53:32 GMT
Author: jimk
Date: Tue Nov 20 13:53:30 2007
New Revision: 596835

URL: http://svn.apache.org/viewvc?rev=596835&view=rev
Log:
HADOOP-2139 (phase 1) Increase parallelism in region servers

There are a lot of changes in this patch. The memcache has been changed from a per/region object to a per/column object, and HLocking has been removed since we do not have to maintain any locks across RPC calls.

This necessitated major changes to HRegion and HStore

Additionally there were many changes required to the unit tests since they tend to exploit some private interfaces that weren't designed to be public. Some of those interfaces changed so the test cases did as well.

This patch is the result of extensive analysis of the multiple threads in HBase that contend for shared resources: updates, reads, scanners, cache flushing, compaction and region splitting.

Many of the tests are timing sensitive, and since we tend to make "dormant" intervals as short as possible to speed up the Hudson build, we may go through several iterations of getting them right before Hudson is happy. This is especially true since two test cases failed on my dual cpu windows machine while running the tests under Ant, but ran fine under Eclipse.

However, now that the tests are passing locally, I believe the changes are doing the right thing, but may require some parameter tweaks.

Removed:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLocking.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Tue Nov 20 13:53:30 2007
@@ -40,6 +40,7 @@
     HADOOP-2126 Use Bob Jenkins' hash for bloom filters
     HADOOP-2157 Make Scanners implement Iterable
     HADOOP-2176 Htable.deleteAll documentation is ambiguous
+    HADOOP-2139 (phase 1) Increase parallelism in region servers.
 
 Release 0.15.1
 Branch 0.15

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java Tue Nov 20 13:53:30 2007
@@ -212,6 +212,9 @@
    */
   public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
   throws IOException {
+    if (scannerClosed) {
+      return false;
+    }
     // Find the next row label (and timestamp)
     Text chosenRow = null;
     long chosenTimestamp = -1;
@@ -277,6 +280,7 @@
     return insertedItem;
   }
   
+  /** {@inheritDoc} */
   public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
     throw new UnsupportedOperationException("Unimplemented serverside. " +
       "next(HStoreKey, StortedMap(...) is more efficient");

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java Tue Nov 20 13:53:30 2007
@@ -43,7 +43,8 @@
 
   /** default host address */
   static final String DEFAULT_HOST = "0.0.0.0";
-  
+
+  /** default port that the master listens on */
   static final int DEFAULT_MASTER_PORT = 60000;
   
   /** Default master address */
@@ -164,7 +165,7 @@
    * commit.
    */
   static final long LATEST_TIMESTAMP = Long.MAX_VALUE;
-  
+
   /**
    * Define for 'return-all-versions'.
    */

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Tue Nov 20 13:53:30 2007
@@ -409,13 +409,13 @@
    * @param timestamp
    * @throws IOException
    */
-  synchronized void append(Text regionName, Text tableName, Text row,
-      TreeMap<Text, byte[]> columns, long timestamp)
-  throws IOException {
+  synchronized void append(Text regionName, Text tableName,
+      TreeMap<HStoreKey, byte[]> edits) throws IOException {
+    
     if (closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    long seqNum[] = obtainSeqNum(columns.size());
+    long seqNum[] = obtainSeqNum(edits.size());
     // The 'lastSeqWritten' map holds the sequence number of the oldest
     // write for each region. When the cache is flushed, the entry for the
     // region being flushed is removed if the sequence number of the flush
@@ -424,10 +424,12 @@
       this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
     }
     int counter = 0;
-    for (Map.Entry<Text, byte[]> es : columns.entrySet()) {
+    for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
+      HStoreKey key = es.getKey();
       HLogKey logKey =
-        new HLogKey(regionName, tableName, row, seqNum[counter++]);
-      HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
+        new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
+      HLogEdit logEdit =
+        new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
       this.writer.append(logKey, logEdit);
       this.numEntries++;
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Tue Nov 20 13:53:30 2007
@@ -47,7 +47,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -2468,10 +2467,12 @@
       // for the table we want to create already exists, then table already
       // created. Throw already-exists exception.
       
-      MetaRegion m = (onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
-          onlineMetaRegions.get(newRegion.getRegionName()) :
-            onlineMetaRegions.get(onlineMetaRegions.headMap(
-                newRegion.getTableDesc().getName()).lastKey()));
+      MetaRegion m = (onlineMetaRegions.size() == 1 ?
+          onlineMetaRegions.get(onlineMetaRegions.firstKey()) : 
+            (onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
+                onlineMetaRegions.get(newRegion.getRegionName()) :
+                  onlineMetaRegions.get(onlineMetaRegions.headMap(
+                      newRegion.getTableDesc().getName()).lastKey())));
           
       Text metaRegionName = m.getRegionName();
       HRegionInterface server = connection.getHRegionConnection(m.getServer());

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java Tue Nov 20 13:53:30 2007
@@ -23,6 +23,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.NoSuchElementException;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
@@ -34,6 +35,8 @@
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.Text;
 
+import org.apache.hadoop.hbase.io.BatchUpdate;
+
 /** 
  * A non-instantiable class that has a static method capable of compacting
  * a table by merging adjacent regions that have grown too small.
@@ -41,6 +44,7 @@
 class HMerge implements HConstants {
   static final Log LOG = LogFactory.getLog(HMerge.class);
   static final Text[] META_COLS = {COL_REGIONINFO};
+  static final Random rand = new Random();
   
   private HMerge() {
     // Not instantiable
@@ -366,53 +370,30 @@
           oldRegion2
       };
       for(int r = 0; r < regionsToDelete.length; r++) {
-        long lockid = -1L;
-        try {
-          lockid = root.startUpdate(regionsToDelete[r]);
-          root.delete(lockid, COL_REGIONINFO);
-          root.delete(lockid, COL_SERVER);
-          root.delete(lockid, COL_STARTCODE);
-          root.commit(lockid, System.currentTimeMillis());
-          lockid = -1L;
-
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("updated columns in row: " + regionsToDelete[r]);
-          }
-        } finally {
-          try {
-            if(lockid != -1L) {
-              root.abort(lockid);
-            }
+        long lockid = Math.abs(rand.nextLong());
+        BatchUpdate b = new BatchUpdate(lockid);
+        lockid = b.startUpdate(regionsToDelete[r]);
+        b.delete(lockid, COL_REGIONINFO);
+        b.delete(lockid, COL_SERVER);
+        b.delete(lockid, COL_STARTCODE);
+        root.batchUpdate(System.currentTimeMillis(), b);
+        lockid = -1L;
 
-          } catch(IOException iex) {
-            LOG.error(iex);
-          }
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("updated columns in row: " + regionsToDelete[r]);
         }
       }
       ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
       DataOutputStream s = new DataOutputStream(byteValue);
       newRegion.getRegionInfo().setOffline(true);
       newRegion.getRegionInfo().write(s);
-      long lockid = -1L;
-      try {
-        lockid = root.startUpdate(newRegion.getRegionName());
-        root.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
-        root.commit(lockid, System.currentTimeMillis());
-        lockid = -1L;
-
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("updated columns in row: "
-              + newRegion.getRegionName());
-        }
-      } finally {
-        try {
-          if(lockid != -1L) {
-            root.abort(lockid);
-          }
-
-        } catch(IOException iex) {
-          LOG.error(iex);
-        }
+      long lockid = Math.abs(rand.nextLong());
+      BatchUpdate b = new BatchUpdate(lockid);
+      lockid = b.startUpdate(newRegion.getRegionName());
+      b.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
+      root.batchUpdate(System.currentTimeMillis(), b);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("updated columns in row: " + newRegion.getRegionName());
       }
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Tue Nov 20 13:53:30 2007
@@ -32,12 +32,17 @@
 import java.util.Vector;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
@@ -58,8 +63,14 @@
  * startKey for region 1 (if it exists).  The startKey for the
  * first region is null. The endKey for the final region is null.
  *
- * <p>The HStores have no locking built-in.  All row-level locking
- * and row-level atomicity is provided by the HRegion.
+ * <p>Locking at the HRegion level serves only one purpose: preventing the
+ * region from being closed (and consequently split) while other operations
+ * are ongoing. Each row level operation obtains both a row lock and a region
+ * read lock for the duration of the operation. While a scanner is being
+ * constructed, getScanner holds a read lock. If the scanner is successfully
+ * constructed, it holds a read lock until it is closed. A close takes out a
+ * write lock and consequently will block for ongoing operations and will block
+ * new operations from starting while the close is in progress.
  * 
  * <p>An HRegion is defined by its table and its key extent.
  * 
@@ -74,13 +85,13 @@
  * defines the keyspace for this HRegion.
  */
 public class HRegion implements HConstants {
-  static String SPLITDIR = "splits";
-  static String MERGEDIR = "merges";
+  static final String SPLITDIR = "splits";
+  static final String MERGEDIR = "merges";
   static final Random rand = new Random();
   static final Log LOG = LogFactory.getLog(HRegion.class);
   final AtomicBoolean closed = new AtomicBoolean(false);
-  private long noFlushCount = 0;
-  
+  private volatile long noFlushCount = 0;
+
   /**
    * Merge two HRegions.  They must be available on the current
    * HRegionServer. Returns a brand-new active HRegion, also
@@ -88,7 +99,7 @@
    */
   static HRegion closeAndMerge(final HRegion srcA, final HRegion srcB)
   throws IOException {
-    
+
     HRegion a = srcA;
     HRegion b = srcB;
 
@@ -105,7 +116,7 @@
       a = srcB;
       b = srcA;
     }
-    
+
     if (! a.getEndKey().equals(b.getStartKey())) {
       throw new IOException("Cannot merge non-adjacent regions");
     }
@@ -120,17 +131,17 @@
     if(! fs.exists(merges)) {
       fs.mkdirs(merges);
     }
-    
+
     HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
     Path newRegionDir = HRegion.getRegionDir(merges,
         HRegionInfo.encodeRegionName(newRegionInfo.getRegionName()));
     if(fs.exists(newRegionDir)) {
       throw new IOException("Cannot merge; target file collision at " +
-        newRegionDir);
+          newRegionDir);
     }
 
     LOG.info("starting merge of regions: " + a.getRegionName() + " and " +
-      b.getRegionName() + " into new region " + newRegionInfo.toString());
+        b.getRegionName() + " into new region " + newRegionInfo.toString());
 
     Map<Text, Vector<HStoreFile>> byFamily =
       new TreeMap<Text, Vector<HStoreFile>>();
@@ -151,14 +162,14 @@
         newRegionDir);
 
     // Get rid of merges directory
-    
+
     fs.delete(merges);
 
     LOG.info("merge completed. New region is " + dstRegion.getRegionName());
-    
+
     return dstRegion;
   }
-  
+
   /*
    * Fills a map with a vector of store files keyed by column family. 
    * @param byFamily Map to fill.
@@ -182,20 +193,20 @@
   // Members
   //////////////////////////////////////////////////////////////////////////////
 
-  Map<Text, Long> rowsToLocks = new HashMap<Text, Long>();
-  Map<Long, Text> locksToRows = new HashMap<Long, Text>();
-  Map<Text, HStore> stores = new HashMap<Text, HStore>();
-  Map<Long, TreeMap<Text, byte []>> targetColumns 
-    = new HashMap<Long, TreeMap<Text, byte []>>();
-  
-  final HMemcache memcache;
-
-  Path rootDir;
-  HLog log;
-  FileSystem fs;
-  HBaseConfiguration conf;
-  HRegionInfo regionInfo;
-  Path regiondir;
+  volatile Map<Text, Long> rowsToLocks = new HashMap<Text, Long>();
+  volatile Map<Long, Text> locksToRows = new HashMap<Long, Text>();
+  volatile Map<Text, HStore> stores = new HashMap<Text, HStore>();
+  volatile Map<Long, TreeMap<HStoreKey, byte []>> targetColumns =
+    new HashMap<Long, TreeMap<HStoreKey, byte []>>();
+
+  final AtomicLong memcacheSize = new AtomicLong(0);
+
+  final Path rootDir;
+  final HLog log;
+  final FileSystem fs;
+  final HBaseConfiguration conf;
+  final HRegionInfo regionInfo;
+  final Path regiondir;
 
   static class WriteState {
     // Set while a memcache flush is happening.
@@ -206,17 +217,19 @@
     // again.
     volatile boolean writesEnabled = true;
   }
-  
+
   volatile WriteState writestate = new WriteState();
 
   final int memcacheFlushSize;
   final int blockingMemcacheSize;
   protected final long threadWakeFrequency;
   protected final int optionalFlushCount;
-  private final HLocking lock = new HLocking();
-  private long desiredMaxFileSize;
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Integer updateLock = new Integer(0);
+  private final long desiredMaxFileSize;
   private final long minSequenceId;
   private final String encodedRegionName;
+  final AtomicInteger activeScannerCount = new AtomicInteger(0);
 
   //////////////////////////////////////////////////////////////////////////////
   // Constructor
@@ -242,8 +255,8 @@
    * @throws IOException
    */
   public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles)
-  throws IOException {
+      HRegionInfo regionInfo, Path initialFiles) throws IOException {
+    
     this.rootDir = rootDir;
     this.log = log;
     this.fs = fs;
@@ -251,7 +264,6 @@
     this.regionInfo = regionInfo;
     this.encodedRegionName =
       HRegionInfo.encodeRegionName(this.regionInfo.getRegionName());
-    this.memcache = new HMemcache();
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
     this.optionalFlushCount =
       conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
@@ -270,14 +282,14 @@
     // Load in all the HStores.
     long maxSeqId = -1;
     for(Map.Entry<Text, HColumnDescriptor> e :
-        this.regionInfo.getTableDesc().families().entrySet()) {
+      this.regionInfo.getTableDesc().families().entrySet()) {
       Text colFamily = HStoreKey.extractFamily(e.getKey());
-      
+
       HStore store = new HStore(rootDir, this.regionInfo.getRegionName(),
           this.encodedRegionName, e.getValue(), fs, oldLogFile, conf); 
-      
+
       stores.put(colFamily, store);
-      
+
       long storeSeqId = store.getMaxSequenceId();
       if (storeSeqId > maxSeqId) {
         maxSeqId = storeSeqId;
@@ -286,7 +298,7 @@
     this.minSequenceId = maxSeqId;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Next sequence id for region " + regionInfo.getRegionName() +
-        " is " + this.minSequenceId);
+          " is " + this.minSequenceId);
     }
 
     // Get rid of any splits or merges that were lost in-progress
@@ -301,10 +313,10 @@
 
     // By default, we flush the cache when 16M.
     this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
-      1024*1024*16);
+        1024*1024*16);
     this.blockingMemcacheSize = this.memcacheFlushSize *
       conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
-    
+
     // By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
     this.desiredMaxFileSize =
       conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);
@@ -368,10 +380,10 @@
       LOG.info("region " + this.regionInfo.getRegionName() + " already closed");
       return null;
     }
-    lock.obtainWriteLock();
+    lock.writeLock().lock();
     try {
-      synchronized(writestate) {
-        while(writestate.compacting || writestate.flushing) {
+      synchronized (writestate) {
+        while (writestate.compacting || writestate.flushing) {
           try {
             writestate.wait();
           } catch (InterruptedException iex) {
@@ -383,14 +395,28 @@
         writestate.writesEnabled = false;
       }
       
+      // Wait for active scanners to finish. The write lock we hold will prevent
+      // new scanners from being created.
+      
+      synchronized (activeScannerCount) {
+        while (activeScannerCount.get() != 0) {
+          try {
+            activeScannerCount.wait();
+            
+          } catch (InterruptedException e) {
+            // continue
+          }
+        }
+      }
+      
       // Write lock means no more row locks can be given out.  Wait on
       // outstanding row locks to come in before we close so we do not drop
       // outstanding updates.
       waitOnRowLocks();
 
+      // Don't flush the cache if we are aborting
       if (!abort) {
-        // Don't flush the cache if we are aborting during a test.
-        internalFlushcache();
+        internalFlushcache(snapshotMemcaches());
       }
 
       Vector<HStoreFile> result = new Vector<HStoreFile>();
@@ -401,24 +427,116 @@
       LOG.info("closed " + this.regionInfo.getRegionName());
       return result;
     } finally {
-      lock.releaseWriteLock();
+      lock.writeLock().unlock();
     }
   }
   
+  //////////////////////////////////////////////////////////////////////////////
+  // HRegion accessors
+  //////////////////////////////////////////////////////////////////////////////
+
+  /** @return start key for region */
+  public Text getStartKey() {
+    return this.regionInfo.getStartKey();
+  }
+
+  /** @return end key for region */
+  public Text getEndKey() {
+    return this.regionInfo.getEndKey();
+  }
+
+  /** @return region id */
+  public long getRegionId() {
+    return this.regionInfo.getRegionId();
+  }
+
+  /** @return region name */
+  public Text getRegionName() {
+    return this.regionInfo.getRegionName();
+  }
+
+  /** @return root directory path */
+  public Path getRootDir() {
+    return rootDir;
+  }
+
+  /** @return HTableDescriptor for this region */
+  public HTableDescriptor getTableDesc() {
+    return this.regionInfo.getTableDesc();
+  }
+
+  /** @return HLog in use for this region */
+  public HLog getLog() {
+    return this.log;
+  }
+
+  /** @return Configuration object */
+  public HBaseConfiguration getConf() {
+    return this.conf;
+  }
+
+  /** @return region directory Path */
+  public Path getRegionDir() {
+    return this.regiondir;
+  }
+
+  /** @return FileSystem being used by this region */
+  public FileSystem getFilesystem() {
+    return this.fs;
+  }
+
+  //////////////////////////////////////////////////////////////////////////////
+  // HRegion maintenance.  
+  //
+  // These methods are meant to be called periodically by the HRegionServer for 
+  // upkeep.
+  //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * @return returns size of largest HStore.  Also returns whether store is
+   * splitable or not (Its not splitable if region has a store that has a
+   * reference store file).
+   */
+  HStore.HStoreSize largestHStore(Text midkey) {
+    HStore.HStoreSize biggest = null;
+    boolean splitable = true;
+    for(HStore h: stores.values()) {
+      HStore.HStoreSize size = h.size(midkey);
+      // If we came across a reference down in the store, then propagate
+      // fact that region is not splitable.
+      if (splitable) {
+        splitable = size.splitable;
+      }
+      if (biggest == null) {
+        biggest = size;
+        continue;
+      }
+      if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
+        biggest = size;
+      }
+    }
+    if (biggest != null) {
+      biggest.setSplitable(splitable);
+    }
+    return biggest;
+  }
+  
   /*
    * Split the HRegion to create two brand-new ones.  This also closes
    * current HRegion.  Split should be fast since we don't rewrite store files
    * but instead create new 'reference' store files that read off the top and
    * bottom ranges of parent store files.
-   * @param midKey Row to split on.
    * @param listener May be null.
-   * @return two brand-new (and open) HRegions
+   * @return two brand-new (and open) HRegions or null if a split is not needed
    * @throws IOException
    */
-  HRegion[] closeAndSplit(final Text midKey,
-      final RegionUnavailableListener listener)
-  throws IOException {
-    checkMidKey(midKey);
+  HRegion[] splitRegion(final RegionUnavailableListener listener)
+    throws IOException {
+
+    Text midKey = new Text();
+    if (!needsSplit(midKey)) {
+      return null;
+    }
     long startTime = System.currentTimeMillis();
     Path splits = getSplitsDir();
     HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
@@ -496,89 +614,6 @@
     return regions;
   }
   
-  private void checkMidKey(final Text midKey) throws IOException {
-    if(((this.regionInfo.getStartKey().getLength() != 0)
-        && (this.regionInfo.getStartKey().compareTo(midKey) > 0))
-        || ((this.regionInfo.getEndKey().getLength() != 0)
-            && (this.regionInfo.getEndKey().compareTo(midKey) < 0))) {
-      throw new IOException("Region splitkey must lie within region " +
-        "boundaries.");
-    }
-  }
-  
-  private Path getSplitRegionDir(final Path splits, final String region) {
-    return HRegion.getRegionDir(splits, region);
-  }
-  
-  private Path getSplitsDir() throws IOException {
-    Path splits = new Path(this.regiondir, SPLITDIR);
-    if(!this.fs.exists(splits)) {
-      this.fs.mkdirs(splits);
-    }
-    return splits;
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-  // HRegion accessors
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** @return start key for region */
-  public Text getStartKey() {
-    return this.regionInfo.getStartKey();
-  }
-
-  /** @return end key for region */
-  public Text getEndKey() {
-    return this.regionInfo.getEndKey();
-  }
-
-  /** @return region id */
-  public long getRegionId() {
-    return this.regionInfo.getRegionId();
-  }
-
-  /** @return region name */
-  public Text getRegionName() {
-    return this.regionInfo.getRegionName();
-  }
-
-  /** @return root directory path */
-  public Path getRootDir() {
-    return rootDir;
-  }
-
-  /** @return HTableDescriptor for this region */
-  public HTableDescriptor getTableDesc() {
-    return this.regionInfo.getTableDesc();
-  }
-
-  /** @return HLog in use for this region */
-  public HLog getLog() {
-    return this.log;
-  }
-
-  /** @return Configuration object */
-  public HBaseConfiguration getConf() {
-    return this.conf;
-  }
-
-  /** @return region directory Path */
-  public Path getRegionDir() {
-    return this.regiondir;
-  }
-
-  /** @return FileSystem being used by this region */
-  public FileSystem getFilesystem() {
-    return this.fs;
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-  // HRegion maintenance.  
-  //
-  // These methods are meant to be called periodically by the HRegionServer for 
-  // upkeep.
-  //////////////////////////////////////////////////////////////////////////////
-
   /*
    * Iterates through all the HStores and finds the one with the largest
    * MapFile size. If the size is greater than the (currently hard-coded)
@@ -587,95 +622,77 @@
    * It is possible for us to rule the region non-splitable even in excess of
    * configured size.  This happens if region contains a reference file.  If
    * a reference file, the region can not be split.
+   * 
+   * Note that there is no need to do locking in this method because it calls
+   * largestHStore which does the necessary locking.
+   * 
    * @param midKey midKey of the largest MapFile
    * @return true if the region should be split. midKey is set by this method.
    * Check it for a midKey value on return.
    */
   boolean needsSplit(Text midKey) {
-    lock.obtainReadLock();
-    try {
-      HStore.HStoreSize biggest = largestHStore(midKey);
-      if (biggest == null) {
-        return false;
-      }
-      long triggerSize =
-        this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
-      boolean split = (biggest.getAggregate() >= triggerSize);
-      if (split) {
-        if (!biggest.isSplitable()) {
-          LOG.warn("Region " + getRegionName().toString() +
+    HStore.HStoreSize biggest = largestHStore(midKey);
+    if (biggest == null) {
+      return false;
+    }
+    long triggerSize = this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
+    boolean split = (biggest.getAggregate() >= triggerSize);
+    if (split) {
+      if (!biggest.isSplitable()) {
+        LOG.warn("Region " + getRegionName().toString() +
             " is NOT splitable though its aggregate size is " +
             StringUtils.humanReadableInt(biggest.getAggregate()) +
             " and desired size is " +
             StringUtils.humanReadableInt(this.desiredMaxFileSize));
-          split = false;
-        } else {
-          LOG.info("Splitting " + getRegionName().toString() +
+        split = false;
+      } else {
+        LOG.info("Splitting " + getRegionName().toString() +
             " because largest aggregate size is " +
             StringUtils.humanReadableInt(biggest.getAggregate()) +
             " and desired size is " +
             StringUtils.humanReadableInt(this.desiredMaxFileSize));
-        }
       }
-      return split;
-    } finally {
-      lock.releaseReadLock();
     }
+    return split;
   }
   
-  /**
-   * @return returns size of largest HStore.  Also returns whether store is
-   * splitable or not (Its not splitable if region has a store that has a
-   * reference store file).
-   */
-  HStore.HStoreSize largestHStore(final Text midkey) {
-    HStore.HStoreSize biggest = null;
-    boolean splitable = true;
-    lock.obtainReadLock();
-    try {
-      for(HStore h: stores.values()) {
-        HStore.HStoreSize size = h.size(midkey);
-        // If we came across a reference down in the store, then propagate
-        // fact that region is not splitable.
-        if (splitable) {
-          splitable = size.splitable;
-        }
-        if (biggest == null) {
-          biggest = size;
-          continue;
-        }
-        if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
-          biggest = size;
-        }
-      }
-      if (biggest != null) {
-        biggest.setSplitable(splitable);
-      }
-      return biggest;
-      
-    } finally {
-      lock.releaseReadLock();
-    }
+  private Path getSplitRegionDir(final Path splits, final String region) {
+    return HRegion.getRegionDir(splits, region);
   }
   
+  private Path getSplitsDir() throws IOException {
+    Path splits = new Path(this.regiondir, SPLITDIR);
+    if(!this.fs.exists(splits)) {
+      this.fs.mkdirs(splits);
+    }
+    return splits;
+  }
+
   /**
-   * @return true if the region should be compacted.
+   * Only do a compaction if it is necessary
+   * 
+   * @return
+   * @throws IOException
    */
-  boolean needsCompaction() {
+  boolean compactIfNeeded() throws IOException {
     boolean needsCompaction = false;
-    this.lock.obtainReadLock();
-    try {
-      for (HStore store: stores.values()) {
-        if (store.needsCompaction()) {
-          needsCompaction = true;
+    for (HStore store: stores.values()) {
+      if (store.needsCompaction()) {
+        needsCompaction = true;
+        if (LOG.isDebugEnabled()) {
           LOG.debug(store.toString() + " needs compaction");
-          break;
         }
+        break;
       }
-    } finally {
-      this.lock.releaseReadLock();
     }
-    return needsCompaction;
+    if (!needsCompaction) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("region " + regionInfo.getRegionName() +
+        " does not need compaction");
+      }
+      return false;
+    }
+    return compactStores();
   }
   
   /**
@@ -689,41 +706,43 @@
    * compaction was not carried out, because the HRegion is busy doing
    * something else storage-intensive (like flushing the cache). The caller
    * should check back later.
+   * 
+   * Note that no locking is necessary at this level because compaction only
+   * conflicts with a region split, and that cannot happen because the region
+   * server does them sequentially and not in parallel.
    */
   boolean compactStores() throws IOException {
-    boolean shouldCompact = false;
     if (this.closed.get()) {
-      return shouldCompact;
+      return false;
     }
-    lock.obtainReadLock();
     try {
       synchronized (writestate) {
-        if ((!writestate.compacting) &&
-            writestate.writesEnabled) {
+        if (!writestate.compacting && writestate.writesEnabled) {
           writestate.compacting = true;
-          shouldCompact = true;
-        }
-      }
 
-      if (!shouldCompact) {
-        LOG.info("NOT compacting region " +
-          this.regionInfo.getRegionName().toString());
-        return false;
+        } else {
+          LOG.info("NOT compacting region " +
+              this.regionInfo.getRegionName().toString() + ": compacting=" +
+              writestate.compacting + ", writesEnabled=" +
+              writestate.writesEnabled);
+            return false;
+        }
       }
-
       long startTime = System.currentTimeMillis();
       LOG.info("starting compaction on region " +
         this.regionInfo.getRegionName().toString());
+      boolean status = true;
       for (HStore store : stores.values()) {
-        store.compact();
+        if(!store.compact()) {
+          status = false;
+        }
       }
       LOG.info("compaction completed on region " +
         this.regionInfo.getRegionName().toString() + ". Took " +
         StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
-      return true;
+      return status;
       
     } finally {
-      lock.releaseReadLock();
       synchronized (writestate) {
         writestate.compacting = false;
         writestate.notifyAll();
@@ -732,31 +751,8 @@
   }
 
   /**
-   * Each HRegion is given a periodic chance to flush the cache, which it should
-   * only take if there have been a lot of uncommitted writes.
-   * @throws IOException
-   * @throws DroppedSnapshotException Thrown when replay of hlog is required
-   * because a Snapshot was not properly persisted.
-   */
-  void optionallyFlush() throws IOException {
-    if(this.memcache.getSize() > this.memcacheFlushSize) {
-      flushcache(false);
-    } else if (this.memcache.getSize() > 0) {
-      if (this.noFlushCount >= this.optionalFlushCount) {
-        LOG.info("Optional flush called " + this.noFlushCount +
-            " times when data present without flushing.  Forcing one.");
-        flushcache(false);
-      } else {
-        // Only increment if something in the cache.
-        // Gets zero'd when a flushcache is called.
-        this.noFlushCount++;
-      }
-    }
-  }
-
-  /**
-   * Flush the cache.  This is called periodically to minimize the amount of
-   * log processing needed upon startup.
+   * Flush the cache if necessary. This is called periodically to minimize the
+   * amount of log processing needed upon startup.
    * 
    * <p>The returned Vector is a list of all the files used by the component
    * HStores. It is a list of HStoreFile objects.  If the returned value is
@@ -775,39 +771,91 @@
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  void flushcache(boolean disableFutureWrites)
-  throws IOException {
-    if (this.closed.get()) {
-      return;
-    }
-    this.noFlushCount = 0;
-    boolean shouldFlush = false;
-    synchronized(writestate) {
-      if((!writestate.flushing) && writestate.writesEnabled) {
-        writestate.flushing = true;
-        shouldFlush = true;
-        if(disableFutureWrites) {
-          writestate.writesEnabled = false;
+  void flushcache() throws IOException {
+    lock.readLock().lock();                      // Prevent splits and closes
+    try {
+      if (this.closed.get()) {
+        return;
+      }
+      boolean needFlush = false;
+      long memcacheSize = this.memcacheSize.get();
+      if(memcacheSize > this.memcacheFlushSize) {
+        needFlush = true;
+      } else if (memcacheSize > 0) {
+        if (this.noFlushCount >= this.optionalFlushCount) {
+          LOG.info("Optional flush called " + this.noFlushCount +
+          " times when data present without flushing.  Forcing one.");
+          needFlush = true;
+        } else {
+          // Only increment if something in the cache.
+          // Gets zero'd when a flushcache is called.
+          this.noFlushCount++;
+        }
+      }
+      if (!needFlush) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cache flush not needed for region " +
+              regionInfo.getRegionName() + ". Cache size=" + memcacheSize +
+              ", cache flush threshold=" + this.memcacheFlushSize);
+        }
+        return;
+      }
+      synchronized (writestate) {
+        if ((!writestate.flushing) && writestate.writesEnabled) {
+          writestate.flushing = true;
+
+        } else {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("NOT flushing memcache for region " +
+                this.regionInfo.getRegionName() + ", flushing=" +
+                writestate.flushing + ", writesEnabled=" +
+                writestate.writesEnabled);
+          }
+          return;  
         }
       }
+      this.noFlushCount = 0;
+      long startTime = -1;
+      synchronized (updateLock) {// Stop updates while we snapshot the memcaches
+        startTime = snapshotMemcaches();
+      }
+      try {
+        internalFlushcache(startTime);
+      } finally {
+        synchronized (writestate) {
+          writestate.flushing = false;
+          writestate.notifyAll();
+        }
+      }
+    } finally {
+      lock.readLock().unlock();
     }
+  }
 
-    if(!shouldFlush) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("NOT flushing memcache for region " +
-          this.regionInfo.getRegionName());
-      }
-      return;  
+  /*
+   * It is assumed that updates are blocked for the duration of this method
+   */
+  long snapshotMemcaches() {
+    if (this.memcacheSize.get() == 0) {
+      return -1;
     }
+    long startTime = System.currentTimeMillis();
     
-    try {
-      internalFlushcache();
-    } finally {
-      synchronized (writestate) {
-        writestate.flushing = false;
-        writestate.notifyAll();
-      }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Started memcache flush for region " +
+        this.regionInfo.getRegionName() + ". Size " +
+        StringUtils.humanReadableInt(this.memcacheSize.get()));
+    }
+
+    // We reset the aggregate memcache size here so that subsequent updates
+    // will add to the unflushed size
+    
+    this.memcacheSize.set(0L);
+    
+    for (HStore hstore: stores.values()) {
+      hstore.snapshotMemcache();
     }
+    return startTime;
   }
 
   /**
@@ -839,13 +887,13 @@
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  void internalFlushcache() throws IOException {
-    long startTime = -1;
-    if(LOG.isDebugEnabled()) {
-      startTime = System.currentTimeMillis();
-      LOG.debug("Started memcache flush for region " +
-        this.regionInfo.getRegionName() + ". Size " +
-        StringUtils.humanReadableInt(this.memcache.getSize()));
+  void internalFlushcache(long startTime) throws IOException {
+    if (startTime == -1) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not flushing cache: snapshotMemcaches() determined that " +
+            "there was nothing to do");
+      }
+      return;
     }
 
     // We pass the log to the HMemcache, so we can lock down both
@@ -859,64 +907,47 @@
     // explicitly cleaned up using a call to deleteSnapshot() or by calling
     // abort.
     //
-    HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
-    if(retval == null || retval.memcacheSnapshot == null) {
-      LOG.debug("Finished memcache flush; empty snapshot");
-      return;
-    }
+    long sequenceId = log.startCacheFlush();
 
     // Any failure from here on out will be catastrophic requiring server
     // restart so hlog content can be replayed and put back into the memcache.
     // Otherwise, the snapshot content while backed up in the hlog, it will not
     // be part of the current running servers state.
-    try {
-      long logCacheFlushId = retval.sequenceId;
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Snapshotted memcache for region " +
-            this.regionInfo.getRegionName() + " with sequence id " +
-            retval.sequenceId + " and entries " +
-            retval.memcacheSnapshot.size());
-      }
 
-      try {
-        // A.  Flush memcache to all the HStores.
-        // Keep running vector of all store files that includes both old and the
-        // just-made new flush store file.
-        for (HStore hstore: stores.values()) {
-          hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
-        }
-      } catch (IOException e) {
-        // An exception here means that the snapshot was not persisted.
-        // The hlog needs to be replayed so its content is restored to memcache.
-        // Currently, only a server restart will do this.
-        this.log.abortCacheFlush();
-        throw new DroppedSnapshotException(e.getMessage());
-      }
-
-      // If we get to here, the HStores have been written. If we get an
-      // error in completeCacheFlush it will release the lock it is holding
-
-      // B.  Write a FLUSHCACHE-COMPLETE message to the log.
-      //     This tells future readers that the HStores were emitted correctly,
-      //     and that all updates to the log for this regionName that have lower 
-      //     log-sequence-ids can be safely ignored.
-      this.log.completeCacheFlush(this.regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), logCacheFlushId);
+    long logCacheFlushId = sequenceId;
+    try {
+      // A.  Flush memcache to all the HStores.
+      // Keep running vector of all store files that includes both old and the
+      // just-made new flush store file.
+      for (HStore hstore: stores.values()) {
+        hstore.flushCache(sequenceId);
+      }
+    } catch (IOException e) {
+      // An exception here means that the snapshot was not persisted.
+      // The hlog needs to be replayed so its content is restored to memcache.
+      // Currently, only a server restart will do this.
+      this.log.abortCacheFlush();
+      throw new DroppedSnapshotException(e.getMessage());
+    }
+
+    // If we get to here, the HStores have been written. If we get an
+    // error in completeCacheFlush it will release the lock it is holding
+
+    // B.  Write a FLUSHCACHE-COMPLETE message to the log.
+    //     This tells future readers that the HStores were emitted correctly,
+    //     and that all updates to the log for this regionName that have lower 
+    //     log-sequence-ids can be safely ignored.
+    this.log.completeCacheFlush(this.regionInfo.getRegionName(),
+        regionInfo.getTableDesc().getName(), logCacheFlushId);
 
-    } finally {
-      // C. Delete the now-irrelevant memcache snapshot; its contents have been 
-      //    dumped to disk-based HStores or, if error, clear aborted snapshot.
-      this.memcache.deleteSnapshot();
-    }
-    
     // D. Finally notify anyone waiting on memcache to clear:
     // e.g. checkResources().
-    synchronized(this) {
+    synchronized (this) {
       notifyAll();
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Finished memcache flush for region " +
-        this.regionInfo.getRegionName() + " in " +
+          this.regionInfo.getRegionName() + " in " +
           (System.currentTimeMillis() - startTime) + "ms");
     }
   }
@@ -925,20 +956,44 @@
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
 
-  /** Fetch a single data item. */
-  byte [] get(Text row, Text column) throws IOException {
+  /**
+   * Fetch a single data item.
+   * @param row
+   * @param column
+   * @return column value
+   * @throws IOException
+   */
+  public byte [] get(Text row, Text column) throws IOException {
     byte [][] results = get(row, column, Long.MAX_VALUE, 1);
     return (results == null || results.length == 0)? null: results[0];
   }
   
-  /** Fetch multiple versions of a single data item */
-  byte [][] get(Text row, Text column, int numVersions) throws IOException {
+  /**
+   * Fetch multiple versions of a single data item
+   * 
+   * @param row
+   * @param column
+   * @param numVersions
+   * @return array of values one element per version
+   * @throws IOException
+   */
+  public byte [][] get(Text row, Text column, int numVersions) throws IOException {
     return get(row, column, Long.MAX_VALUE, numVersions);
   }
 
-  /** Fetch multiple versions of a single data item, with timestamp. */
-  byte [][] get(Text row, Text column, long timestamp, int numVersions) 
-  throws IOException {
+  /**
+   * Fetch multiple versions of a single data item, with timestamp.
+   *
+   * @param row
+   * @param column
+   * @param timestamp
+   * @param numVersions
+   * @return array of values one element per version that matches the timestamp
+   * @throws IOException
+   */
+  public byte [][] get(Text row, Text column, long timestamp, int numVersions) 
+    throws IOException {
+    
     if (this.closed.get()) {
       throw new IOException("Region " + this.getRegionName().toString() +
         " closed");
@@ -948,61 +1003,11 @@
     checkRow(row);
     checkColumn(column);
 
-    // Obtain the row-lock
-    obtainRowLock(row);
-    try {
-      // Obtain the -col results
-      return get(new HStoreKey(row, column, timestamp), numVersions);
+    // Don't need a row lock for a simple get
     
-    } finally {
-      releaseRowLock(row);
-    }
-  }
-
-  private byte [][] get(final HStoreKey key, final int numVersions)
-  throws IOException {
-    lock.obtainReadLock();
-    try {
-      // Check the memcache
-      byte [][] memcacheResult = this.memcache.get(key, numVersions);
-      // If we got sufficient versions from memcache, return.
-      if (memcacheResult != null && memcacheResult.length == numVersions) {
-        return memcacheResult;
-      }
-
-      // Check hstore for more versions.
-      Text colFamily = HStoreKey.extractFamily(key.getColumn());
-      HStore targetStore = stores.get(colFamily);
-      if(targetStore == null) {
-        // There are no stores.  Return what we got from memcache.
-        return memcacheResult;
-      }
-      
-      // Update the number of versions we need to fetch from the store.
-      int amendedNumVersions = numVersions;
-      if (memcacheResult != null) {
-        amendedNumVersions -= memcacheResult.length;
-      }
-      byte [][] result =
-        targetStore.get(key, amendedNumVersions, this.memcache);
-      if (result == null) {
-        result = memcacheResult;
-      } else if (memcacheResult != null) {
-        // We have results from both memcache and from stores.  Put them
-        // together in an array in the proper order.
-        byte [][] storeResult = result;
-        result = new byte [memcacheResult.length + result.length][];
-        for (int i = 0; i < memcacheResult.length; i++) {
-          result[i] = memcacheResult[i];
-        }
-        for (int i = 0; i < storeResult.length; i++) {
-          result[i + memcacheResult.length] = storeResult[i];
-        }
-      }
-      return result;
-    } finally {
-      lock.releaseReadLock();
-    }
+    HStoreKey key = new HStoreKey(row, column, timestamp);
+    HStore targetStore = stores.get(HStoreKey.extractFamily(column));
+    return targetStore.get(key, numVersions);
   }
 
   /**
@@ -1014,34 +1019,27 @@
    * checking many files needlessly.  A small Bloom for each row would help us 
    * determine which column groups are useful for that row.  That would let us 
    * avoid a bunch of disk activity.
+   *
+   * @param row
+   * @return Map<columnName, byte[]> values
+   * @throws IOException
    */
-  TreeMap<Text, byte []> getFull(Text row) throws IOException {
+  public Map<Text, byte []> getFull(Text row) throws IOException {
     HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
-    lock.obtainReadLock();
+    obtainRowLock(row);
     try {
-      TreeMap<Text, byte []> memResult = memcache.getFull(key);
+      TreeMap<Text, byte []> result = new TreeMap<Text, byte[]>();
       for (Text colFamily: stores.keySet()) {
-        this.stores.get(colFamily).getFull(key, memResult);
+        HStore targetStore = stores.get(colFamily);
+        targetStore.getFull(key, result);
       }
-      return memResult;
+      return result;
     } finally {
-      lock.releaseReadLock();
+      releaseRowLock(row);
     }
   }
 
   /**
-   * Get all keys matching the origin key's row/column/timestamp and those
-   * of an older vintage
-   * Default access so can be accessed out of {@link HRegionServer}.
-   * @param origin Where to start searching.
-   * @return Ordered list of keys going from newest on back.
-   * @throws IOException
-   */
-  List<HStoreKey> getKeys(final HStoreKey origin) throws IOException {
-    return getKeys(origin, ALL_VERSIONS);
-  }
-  
-  /**
    * Get <code>versions</code> keys matching the origin key's
    * row/column/timestamp and those of an older vintage
    * Default access so can be accessed out of {@link HRegionServer}.
@@ -1051,19 +1049,16 @@
    * @return Ordered list of <code>versions</code> keys going from newest back.
    * @throws IOException
    */
-  List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
-  throws IOException {
-    List<HStoreKey> keys = this.memcache.getKeys(origin, versions);
-    if (versions != ALL_VERSIONS && keys.size() >= versions) {
-      return keys;
-    }
-    // Check hstore for more versions.
+  private List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
+    throws IOException {
+    
+    List<HStoreKey> keys = null;
     Text colFamily = HStoreKey.extractFamily(origin.getColumn());
     HStore targetStore = stores.get(colFamily);
     if (targetStore != null) {
       // Pass versions without modification since in the store getKeys, it
       // includes the size of the passed <code>keys</code> array when counting.
-      keys = targetStore.getKeys(origin, keys, versions);
+      keys = targetStore.getKeys(origin, versions);
     }
     return keys;
   }
@@ -1085,10 +1080,13 @@
    * @throws IOException
    */
   public HInternalScannerInterface getScanner(Text[] cols, Text firstRow,
-      long timestamp, RowFilterInterface filter)
-  throws IOException {
-    lock.obtainReadLock();
+      long timestamp, RowFilterInterface filter) throws IOException {
+    lock.readLock().lock();
     try {
+      if (this.closed.get()) {
+        throw new IOException("Region " + this.getRegionName().toString() +
+          " closed");
+      }
       TreeSet<Text> families = new TreeSet<Text>();
       for(int i = 0; i < cols.length; i++) {
         families.add(HStoreKey.extractFamily(cols[i]));
@@ -1101,10 +1099,10 @@
         }
         storelist.add(stores.get(family));
       }
-      return new HScanner(cols, firstRow, timestamp, memcache,
+      return new HScanner(cols, firstRow, timestamp,
         storelist.toArray(new HStore [storelist.size()]), filter);
     } finally {
-      lock.releaseReadLock();
+      lock.readLock().unlock();
     }
   }
 
@@ -1113,44 +1111,77 @@
   //////////////////////////////////////////////////////////////////////////////
   
   /**
-   * The caller wants to apply a series of writes to a single row in the
-   * HRegion. The caller will invoke startUpdate(), followed by a series of
-   * calls to put/delete, then finally either abort() or commit().
-   *
-   * <p>Note that we rely on the external caller to properly abort() or
-   * commit() every transaction.  If the caller is a network client, there
-   * should be a lease-system in place that automatically aborts() transactions
-   * after a specified quiet period.
-   * 
-   * @param row Row to update
-   * @return lock id
+   * @param timestamp
+   * @param b
    * @throws IOException
-   * @see #put(long, Text, byte[])
    */
-  public long startUpdate(Text row) throws IOException {
+  public void batchUpdate(long timestamp, BatchUpdate b)
+    throws IOException {
     // Do a rough check that we have resources to accept a write.  The check is
     // 'rough' in that between the resource check and the call to obtain a 
     // read lock, resources may run out.  For now, the thought is that this
     // will be extremely rare; we'll deal with it when it happens.
     checkResources();
 
-    // Get a read lock. We will not be able to get one if we are closing or
-    // if this region is being split.  In neither case should we be allowing
-    // updates.
-    this.lock.obtainReadLock();
-    if (this.closed.get()) {
-      throw new IOException("Region " + this.getRegionName().toString() +
-        " closed");
-    }
+    // We obtain a per-row lock, so other clients will block while one client
+    // performs an update. The read lock is released by the client calling
+    // #commit or #abort or if the HRegionServer lease on the lock expires.
+    // See HRegionServer#RegionListener for how the expire on HRegionServer
+    // invokes a HRegion#abort.
+    Text row = b.getRow();
+    long lockid = obtainRowLock(row);
+
+    long commitTime =
+      (timestamp == LATEST_TIMESTAMP) ? System.currentTimeMillis() : timestamp;
+      
     try {
-      // We obtain a per-row lock, so other clients will block while one client
-      // performs an update. The read lock is released by the client calling
-      // #commit or #abort or if the HRegionServer lease on the lock expires.
-      // See HRegionServer#RegionListener for how the expire on HRegionServer
-      // invokes a HRegion#abort.
-      return obtainRowLock(row);
+      List<Text> deletes = null;
+      for (BatchOperation op: b) {
+        HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime);
+        byte[] val = null;
+        switch(op.getOp()) {
+        case PUT:
+          val = op.getValue();
+          if (HLogEdit.isDeleted(val)) {
+            throw new IOException("Cannot insert value: " + val);
+          }
+          break;
+
+        case DELETE:
+          if (timestamp == LATEST_TIMESTAMP) {
+            // Save off these deletes
+            if (deletes == null) {
+              deletes = new ArrayList<Text>();
+            }
+            deletes.add(op.getColumn());
+          } else {
+            val = HLogEdit.deleteBytes.get();
+          }
+          break;
+        }
+        if (val != null) {
+          localput(lockid, key, val);
+        }
+      }
+      TreeMap<HStoreKey, byte[]> edits =
+        this.targetColumns.remove(Long.valueOf(lockid));
+      if (edits != null && edits.size() > 0) {
+        update(edits);
+      }
+      
+      if (deletes != null && deletes.size() > 0) {
+        // We have some LATEST_TIMESTAMP deletes to run.
+        for (Text column: deletes) {
+          deleteMultiple(row, column, LATEST_TIMESTAMP, 1);
+        }
+      }
+
+    } catch (IOException e) {
+      this.targetColumns.remove(Long.valueOf(lockid));
+      throw e;
+      
     } finally {
-      this.lock.releaseReadLock();
+      releaseRowLock(row);
     }
   }
   
@@ -1168,11 +1199,11 @@
   private synchronized void checkResources() {
     boolean blocked = false;
     
-    while (!checkCommitsSinceFlush()) {
+    while (this.memcacheSize.get() >= this.blockingMemcacheSize) {
       if (!blocked) {
         LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
             "': Memcache size " +
-            StringUtils.humanReadableInt(this.memcache.getSize()) +
+            StringUtils.humanReadableInt(this.memcacheSize.get()) +
             " is >= than blocking " +
             StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size");
       }
@@ -1190,45 +1221,6 @@
     }
   }
   
-  /*
-   * @return True if commits since flush is under the blocking threshold.
-   */
-  private boolean checkCommitsSinceFlush() {
-    return this.memcache.getSize() < this.blockingMemcacheSize;
-  }
-
-  /**
-   * Put a cell value into the locked row.  The user indicates the row-lock, the
-   * target column, and the desired value.  This stuff is set into a temporary 
-   * memory area until the user commits the change, at which point it's logged 
-   * and placed into the memcache.
-   *
-   * This method really just tests the input, then calls an internal localput() 
-   * method.
-   *
-   * @param lockid lock id obtained from startUpdate
-   * @param targetCol name of column to be updated
-   * @param val new value for column
-   * @throws IOException
-   */
-  public void put(long lockid, Text targetCol, byte [] val) throws IOException {
-    if (HLogEdit.isDeleted(val)) {
-      throw new IOException("Cannot insert value: " + val);
-    }
-    localput(lockid, targetCol, val);
-  }
-
-  /**
-   * Delete a value or write a value.
-   * This is a just a convenience method for put().
-   * @param lockid lock id obtained from startUpdate
-   * @param targetCol name of column to be deleted
-   * @throws IOException
-   */
-  public void delete(long lockid, Text targetCol) throws IOException {
-    localput(lockid, targetCol, HLogEdit.deleteBytes.get());
-  }
-  
   /**
    * Delete all cells of the same age as the passed timestamp or older.
    * @param row
@@ -1237,43 +1229,43 @@
    * @throws IOException
    */
   public void deleteAll(final Text row, final Text column, final long ts)
-  throws IOException {
-    deleteMultiple(row, column, ts, ALL_VERSIONS);
+    throws IOException {
+    
+    checkColumn(column);
+    obtainRowLock(row);
+    try {
+      deleteMultiple(row, column, ts, ALL_VERSIONS);
+    } finally {
+      releaseRowLock(row);
+    }
   }
   
   /**
    * Delete one or many cells.
    * Used to support {@link #deleteAll(Text, Text, long)} and deletion of
    * latest cell.
+   * 
    * @param row
    * @param column
    * @param ts Timestamp to start search on.
    * @param versions How many versions to delete. Pass
-   * {@link HConstants.ALL_VERSIONS} to delete all.
+   * {@link HConstants#ALL_VERSIONS} to delete all.
    * @throws IOException
    */
-  void deleteMultiple(final Text row, final Text column, final long ts,
-    final int versions)
-  throws IOException {
-    lock.obtainReadLock();
-    try {
-      checkColumn(column);
-      HStoreKey origin = new HStoreKey(row, column, ts);
-      synchronized(row) {
-        List<HStoreKey> keys = getKeys(origin, versions);
-        if (keys.size() > 0) {
-          TreeMap<Text, byte []> edits = new TreeMap<Text, byte []>();
-          edits.put(column, HLogEdit.deleteBytes.get());
-          for (HStoreKey key: keys) {
-            update(row, key.getTimestamp(), edits);
-          }
-        }
+  private void deleteMultiple(final Text row, final Text column, final long ts,
+      final int versions) throws IOException {
+    
+    HStoreKey origin = new HStoreKey(row, column, ts);
+    List<HStoreKey> keys = getKeys(origin, versions);
+    if (keys.size() > 0) {
+      TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>();
+      for (HStoreKey key: keys) {
+        edits.put(key, HLogEdit.deleteBytes.get());
       }
-    } finally {
-      lock.releaseReadLock();
+      update(edits);
     }
   }
-
+  
   /**
    * Private implementation.
    * 
@@ -1282,146 +1274,23 @@
    * (Or until the user's write-lock expires.)
    * 
    * @param lockid
-   * @param targetCol
+   * @param key 
    * @param val Value to enter into cell
    * @throws IOException
    */
-  void localput(final long lockid, final Text targetCol,
-    final byte [] val)
-  throws IOException {
-    checkColumn(targetCol);
-
-    Text row = getRowFromLock(lockid);
-    if (row == null) {
-      throw new LockException("No write lock for lockid " + lockid);
-    }
-
-    // This sync block makes localput() thread-safe when multiple
-    // threads from the same client attempt an insert on the same 
-    // locked row (via lockid).
-    synchronized(row) {
-      // This check makes sure that another thread from the client
-      // hasn't aborted/committed the write-operation.
-      if (row != getRowFromLock(lockid)) {
-        throw new LockException("Locking error: put operation on lock " +
-            lockid + " unexpected aborted by another thread");
-      }
-      Long lid = Long.valueOf(lockid);
-      TreeMap<Text, byte []> targets = this.targetColumns.get(lid);
-      if (targets == null) {
-        targets = new TreeMap<Text, byte []>();
-        this.targetColumns.put(lid, targets);
-      }
-      targets.put(targetCol, val);
-    }
-  }
-
-  /**
-   * Abort a pending set of writes. This dumps from memory all in-progress
-   * writes associated with the given row-lock.  These values have not yet
-   * been placed in memcache or written to the log.
-   *
-   * @param lockid lock id obtained from startUpdate
-   * @throws IOException
-   */
-  public void abort(long lockid) throws IOException {
-    Text row = getRowFromLock(lockid);
-    if(row == null) {
-      throw new LockException("No write lock for lockid " + lockid);
-    }
-    
-    // This sync block makes abort() thread-safe when multiple
-    // threads from the same client attempt to operate on the same
-    // locked row (via lockid).
+  private void localput(final long lockid, final HStoreKey key, 
+      final byte [] val) throws IOException {
     
-    synchronized(row) {
-      
-      // This check makes sure another thread from the client
-      // hasn't aborted/committed the write-operation.
-      
-      if(row != getRowFromLock(lockid)) {
-        throw new LockException("Locking error: abort() operation on lock " 
-            + lockid + " unexpected aborted by another thread");
-      }
-      
-      this.targetColumns.remove(Long.valueOf(lockid));
-      releaseRowLock(row);
+    checkColumn(key.getColumn());
+    Long lid = Long.valueOf(lockid);
+    TreeMap<HStoreKey, byte []> targets = this.targetColumns.get(lid);
+    if (targets == null) {
+      targets = new TreeMap<HStoreKey, byte []>();
+      this.targetColumns.put(lid, targets);
     }
+    targets.put(key, val);
   }
 
-  /**
-   * Commit a pending set of writes to the memcache. This also results in
-   * writing to the change log.
-   *
-   * Once updates hit the change log, they are safe.  They will either be moved 
-   * into an HStore in the future, or they will be recovered from the log.
-   * @param lockid Lock for row we're to commit.
-   * @param timestamp the time to associate with this change.
-   * @throws IOException
-   */
-  public void commit(final long lockid, final long timestamp)
-  throws IOException {
-    // Remove the row from the pendingWrites list so 
-    // that repeated executions won't screw this up.
-    Text row = getRowFromLock(lockid);
-    if(row == null) {
-      throw new LockException("No write lock for lockid " + lockid);
-    }
-    
-    // This check makes sure that another thread from the client
-    // hasn't aborted/committed the write-operation
-    synchronized(row) {
-      Long lid = Long.valueOf(lockid);
-      update(row, timestamp, this.targetColumns.get(lid));
-      targetColumns.remove(lid);
-      releaseRowLock(row);
-    }
-  }
-  
-  /**
-   * This method for unit testing only.
-   * Does each operation individually so can do appropriate
-   * {@link HConstants#LATEST_TIMESTAMP} action.  Tries to mimic how
-   * {@link HRegionServer#batchUpdate(Text, long, org.apache.hadoop.hbase.io.BatchUpdate)}
-   * works when passed a timestamp of LATEST_TIMESTAMP.
-   * @param lockid Lock for row we're to commit.
-   * @throws IOException 
-   * @throws IOException
-   * @see {@link #commit(long, long)}
-   */
-  void commit(final long lockid) throws IOException {
-    // Remove the row from the pendingWrites list so 
-    // that repeated executions won't screw this up.
-    Text row = getRowFromLock(lockid);
-    if(row == null) {
-      throw new LockException("No write lock for lockid " + lockid);
-    }
-    
-    // This check makes sure that another thread from the client
-    // hasn't aborted/committed the write-operation
-    synchronized(row) {
-      Long lid = Long.valueOf(lockid);
-      TreeMap<Text, byte []> updatesByColumn = this.targetColumns.get(lid);
-      // Run updates one at a time so we can supply appropriate timestamp
-      long now = System.currentTimeMillis();
-      for (Map.Entry<Text, byte []>e: updatesByColumn.entrySet()) {
-        if (HLogEdit.isDeleted(e.getValue())) {
-          // Its a delete.  Delete latest.  deleteMultiple calls update for us.
-          // Actually regets the row lock but since we already have it, should
-          // be fine.
-          deleteMultiple(row, e.getKey(), LATEST_TIMESTAMP, 1);
-          continue;
-        }
-        // Must be a 'put'.
-        TreeMap<Text, byte []> putEdit = new TreeMap<Text, byte []>();
-        putEdit.put(e.getKey(), e.getValue());
-        update(row, now, putEdit);
-      }
-      this.targetColumns.remove(lid);
-      releaseRowLock(row);
-    }
-  }
-   
   /* 
    * Add updates first to the hlog and then add values to memcache.
    * Warning: Assumption is caller has lock on passed in row.
@@ -1430,15 +1299,24 @@
    * @param updatesByColumn Cell updates by column
    * @throws IOException
    */
-  private void update(final Text row, final long timestamp,
-    final TreeMap<Text, byte []> updatesByColumn)
-  throws IOException {
+  private void update(final TreeMap<HStoreKey, byte []> updatesByColumn)
+    throws IOException {
+    
     if (updatesByColumn == null || updatesByColumn.size() <= 0) {
       return;
     }
-    this.log.append(regionInfo.getRegionName(),
-        regionInfo.getTableDesc().getName(), row, updatesByColumn, timestamp);
-    this.memcache.add(row, updatesByColumn, timestamp);
+    synchronized (updateLock) {                         // prevent a cache flush
+      this.log.append(regionInfo.getRegionName(),
+          regionInfo.getTableDesc().getName(), updatesByColumn);
+
+      for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
+        HStoreKey key = e.getKey();
+        byte[] val = e.getValue();
+        this.memcacheSize.addAndGet(key.getSize() +
+            (val == null ? 0 : val.length));
+        stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
+      }
+    }
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -1446,7 +1324,7 @@
   //////////////////////////////////////////////////////////////////////////////
 
   /** Make sure this is a valid row for the HRegion */
-  void checkRow(Text row) throws IOException {
+  private void checkRow(Text row) throws IOException {
     if(((regionInfo.getStartKey().getLength() == 0)
         || (regionInfo.getStartKey().compareTo(row) <= 0))
         && ((regionInfo.getEndKey().getLength() == 0)
@@ -1466,7 +1344,7 @@
    * @param columnName
    * @throws IOException
    */
-  void checkColumn(Text columnName) throws IOException {
+  private void checkColumn(Text columnName) throws IOException {
     Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
     if(! regionInfo.getTableDesc().hasFamily(family)) {
       throw new IOException("Requested column family " + family 
@@ -1495,41 +1373,50 @@
    * which maybe we'll do in the future.
    * 
    * @param row Name of row to lock.
+   * @throws IOException
    * @return The id of the held lock.
    */
   long obtainRowLock(Text row) throws IOException {
     checkRow(row);
-    synchronized(rowsToLocks) {
-      while(rowsToLocks.get(row) != null) {
-        try {
-          rowsToLocks.wait();
-        } catch (InterruptedException ie) {
-          // Empty
+    lock.readLock().lock();
+    try {
+      if (this.closed.get()) {
+        throw new IOException("Region " + this.getRegionName().toString() +
+          " closed");
+      }
+      synchronized (rowsToLocks) {
+        while (rowsToLocks.get(row) != null) {
+          try {
+            rowsToLocks.wait();
+          } catch (InterruptedException ie) {
+            // Empty
+          }
         }
+        Long lid = Long.valueOf(Math.abs(rand.nextLong()));
+        rowsToLocks.put(row, lid);
+        locksToRows.put(lid, row);
+        rowsToLocks.notifyAll();
+        return lid.longValue();
       }
-      
-      Long lid = Long.valueOf(Math.abs(rand.nextLong()));
-      rowsToLocks.put(row, lid);
-      locksToRows.put(lid, row);
-      rowsToLocks.notifyAll();
-      return lid.longValue();
+    } finally {
+      lock.readLock().unlock();
     }
   }
   
   Text getRowFromLock(long lockid) {
     // Pattern is that all access to rowsToLocks and/or to
     // locksToRows is via a lock on rowsToLocks.
-    synchronized(rowsToLocks) {
+    synchronized (rowsToLocks) {
       return locksToRows.get(Long.valueOf(lockid));
     }
   }
   
   /** 
    * Release the row lock!
-   * @param lock Name of row whose lock we are to release
+   * @param row Name of row whose lock we are to release
    */
   void releaseRowLock(Text row) {
-    synchronized(rowsToLocks) {
+    synchronized (rowsToLocks) {
       long lockid = rowsToLocks.remove(row).longValue();
       locksToRows.remove(Long.valueOf(lockid));
       rowsToLocks.notifyAll();
@@ -1537,7 +1424,7 @@
   }
   
   private void waitOnRowLocks() {
-    synchronized (this.rowsToLocks) {
+    synchronized (rowsToLocks) {
       while (this.rowsToLocks.size() > 0) {
         try {
           this.rowsToLocks.wait();
@@ -1557,51 +1444,33 @@
   /**
    * HScanner is an iterator through a bunch of rows in an HRegion.
    */
-  private static class HScanner implements HInternalScannerInterface {
+  private class HScanner implements HInternalScannerInterface {
     private HInternalScannerInterface[] scanners;
-    private TreeMap<Text, byte []>[] resultSets;
-    private HStoreKey[] keys;
     private boolean wildcardMatch = false;
     private boolean multipleMatchers = false;
-    private RowFilterInterface dataFilter;
 
     /** Create an HScanner with a handle on many HStores. */
     @SuppressWarnings("unchecked")
-    HScanner(Text[] cols, Text firstRow, long timestamp, HMemcache memcache,
-        HStore[] stores, RowFilterInterface filter) throws IOException {  
-      this.dataFilter = filter;
-      if (null != dataFilter) {
-        dataFilter.reset();
-      }
-      this.scanners = new HInternalScannerInterface[stores.length + 1];
-      this.resultSets = new TreeMap[scanners.length];
-      this.keys = new HStoreKey[scanners.length];
+    HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
+        RowFilterInterface filter) throws IOException {
+      this.scanners = new HInternalScannerInterface[stores.length];
 
-      // Advance to the first key in each store.
-      // All results will match the required column-set and scanTime.
+//       Advance to the first key in each store.
+//       All results will match the required column-set and scanTime.
       
-      // NOTE: the memcache scanner should be the first scanner
       try {
-        HInternalScannerInterface scanner =
-          memcache.getScanner(timestamp, cols, firstRow);
-        if (scanner.isWildcardScanner()) {
-          this.wildcardMatch = true;
-        }
-        if (scanner.isMultipleMatchScanner()) {
-          this.multipleMatchers = true;
-        }
-        scanners[0] = scanner;
-
         for (int i = 0; i < stores.length; i++) {
-          scanner = stores[i].getScanner(timestamp, cols, firstRow);
-          if (scanner.isWildcardScanner()) {
-            this.wildcardMatch = true;
-          }
-          if (scanner.isMultipleMatchScanner()) {
-            this.multipleMatchers = true;
+          HInternalScannerInterface scanner =
+          scanners[i] =
+            stores[i].getScanner(timestamp, cols, firstRow, filter);
+          
+            if (scanner.isWildcardScanner()) {
+              this.wildcardMatch = true;
+            }
+            if (scanner.isMultipleMatchScanner()) {
+              this.multipleMatchers = true;
+            }
           }
-          scanners[i + 1] = scanner;
-        }
 
       } catch(IOException e) {
         for (int i = 0; i < this.scanners.length; i++) {
@@ -1611,13 +1480,9 @@
         }
         throw e;
       }
-      for (int i = 0; i < scanners.length; i++) {
-        keys[i] = new HStoreKey();
-        resultSets[i] = new TreeMap<Text, byte []>();
-        if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
-          closeScanner(i);
-        }
-      }
+      // As we have now successfully completed initialization, increment the
+      // activeScanner count.
+      activeScannerCount.incrementAndGet();
     }
 
     /** @return true if the scanner is a wild card scanner */
@@ -1633,152 +1498,17 @@
     /** {@inheritDoc} */
     public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
     throws IOException {
-      // Filtered flag is set by filters.  If a cell has been 'filtered out'
-      // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
-      boolean filtered = true;
-      boolean moreToFollow = true;
-      while (filtered && moreToFollow) {
-        // Find the lowest-possible key.
-        Text chosenRow = null;
-        long chosenTimestamp = -1;
-        for (int i = 0; i < this.keys.length; i++) {
-          if (scanners[i] != null &&
-              (chosenRow == null ||
-              (keys[i].getRow().compareTo(chosenRow) < 0) ||
-              ((keys[i].getRow().compareTo(chosenRow) == 0) &&
-              (keys[i].getTimestamp() > chosenTimestamp)))) {
-            chosenRow = new Text(keys[i].getRow());
-            chosenTimestamp = keys[i].getTimestamp();
-          }
-        }
-        
-        // Filter whole row by row key?
-        filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
-
-        // Store the key and results for each sub-scanner. Merge them as
-        // appropriate.
-        if (chosenTimestamp >= 0 && !filtered) {
-          // Here we are setting the passed in key with current row+timestamp
-          key.setRow(chosenRow);
-          key.setVersion(chosenTimestamp);
-          key.setColumn(HConstants.EMPTY_TEXT);
-          // Keep list of deleted cell keys within this row.  We need this
-          // because as we go through scanners, the delete record may be in an
-          // early scanner and then the same record with a non-delete, non-null
-          // value in a later. Without history of what we've seen, we'll return
-          // deleted values. This List should not ever grow too large since we
-          // are only keeping rows and columns that match those set on the
-          // scanner and which have delete values.  If memory usage becomes a
-          // problem, could redo as bloom filter.
-          List<HStoreKey> deletes = new ArrayList<HStoreKey>();
-          for (int i = 0; i < scanners.length && !filtered; i++) {
-            while ((scanners[i] != null
-                && !filtered
-                && moreToFollow)
-                && (keys[i].getRow().compareTo(chosenRow) == 0)) {
-              // If we are doing a wild card match or there are multiple
-              // matchers per column, we need to scan all the older versions of 
-              // this row to pick up the rest of the family members
-              if (!wildcardMatch
-                  && !multipleMatchers
-                  && (keys[i].getTimestamp() != chosenTimestamp)) {
-                break;
-              }
-
-              // Filter out null criteria columns that are not null
-              if (dataFilter != null) {
-                filtered = dataFilter.filterNotNull(resultSets[i]);
-              }
-
-              // NOTE: We used to do results.putAll(resultSets[i]);
-              // but this had the effect of overwriting newer
-              // values with older ones. So now we only insert
-              // a result if the map does not contain the key.
-              HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT,
-                key.getTimestamp());
-              for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
-                hsk.setColumn(e.getKey());
-                if (HLogEdit.isDeleted(e.getValue())) {
-                  if (!deletes.contains(hsk)) {
-                    // Key changes as we cycle the for loop so add a copy to
-                    // the set of deletes.
-                    deletes.add(new HStoreKey(hsk));
-                  }
-                } else if (!deletes.contains(hsk) &&
-                    !filtered &&
-                    moreToFollow &&
-                    !results.containsKey(e.getKey())) {
-                  if (dataFilter != null) {
-                    // Filter whole row by column data?
-                    filtered =
-                        dataFilter.filter(chosenRow, e.getKey(), e.getValue());
-                    if (filtered) {
-                      results.clear();
-                      break;
-                    }
-                  }
-                  results.put(e.getKey(), e.getValue());
-                }
-              }
-              resultSets[i].clear();
-              if (!scanners[i].next(keys[i], resultSets[i])) {
-                closeScanner(i);
-              }
-            }
-          }          
-        }
-        
-        for (int i = 0; i < scanners.length; i++) {
-          // If the current scanner is non-null AND has a lower-or-equal
-          // row label, then its timestamp is bad. We need to advance it.
-          while ((scanners[i] != null) &&
-              (keys[i].getRow().compareTo(chosenRow) <= 0)) {
-            resultSets[i].clear();
-            if (!scanners[i].next(keys[i], resultSets[i])) {
-              closeScanner(i);
-            }
-          }
-        }
-
-        moreToFollow = chosenTimestamp >= 0;
-        
-        if (dataFilter != null) {
-          if (moreToFollow) {
-            dataFilter.rowProcessed(filtered, chosenRow);
-          }
-          if (dataFilter.filterAllRemaining()) {
-            moreToFollow = false;
-            LOG.debug("page limit");
-          }
-        }
-        if (LOG.isDebugEnabled()) {
-          if (this.dataFilter != null) {
-            LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
-          }
-        }
-        
-        if (results.size() <= 0 && !filtered) {
-          // There were no results found for this row.  Marked it as 
-          // 'filtered'-out otherwise we will not move on to the next row.
-          filtered = true;
-        }
-      }
-      
-      // If we got no results, then there is no more to follow.
-      if (results == null || results.size() <= 0) {
-        moreToFollow = false;
-      }
-      
-      // Make sure scanners closed if no more results
-      if (!moreToFollow) {
-        for (int i = 0; i < scanners.length; i++) {
-          if (null != scanners[i]) {
+      boolean haveResults = false;
+      for (int i = 0; i < scanners.length; i++) {
+        if (scanners[i] != null) {
+          if (scanners[i].next(key, results)) {
+            haveResults = true;
+          } else {
             closeScanner(i);
           }
         }
       }
-      
-      return moreToFollow;
+      return haveResults;
     }
 
     
@@ -1792,8 +1522,6 @@
         }
       } finally {
         scanners[i] = null;
-        keys[i] = null;
-        resultSets[i] = null;
       }
     }
 
@@ -1801,13 +1529,29 @@
      * {@inheritDoc}
      */
     public void close() {
-      for(int i = 0; i < scanners.length; i++) {
-        if(scanners[i] != null) {
-          closeScanner(i);
+      try {
+        for(int i = 0; i < scanners.length; i++) {
+          if(scanners[i] != null) {
+            closeScanner(i);
+          }
+        }
+      } finally {
+        synchronized (activeScannerCount) {
+          int scanners = activeScannerCount.decrementAndGet();
+          if (scanners < 0) {
+            LOG.error("active scanner count less than zero: " + scanners +
+                " resetting to zero");
+            activeScannerCount.set(0);
+            scanners = 0;
+          }
+          if (scanners == 0) {
+            activeScannerCount.notifyAll();
+          }
         }
       }
     }
 
+    /** {@inheritDoc} */
     public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
       throw new UnsupportedOperationException("Unimplemented serverside. " +
         "next(HStoreKey, StortedMap(...) is more efficient");
@@ -1852,12 +1596,21 @@
    *
    * @throws IOException
    */
-  static void addRegionToMETA(HRegion meta, HRegion r)
-  throws IOException {  
+  static void addRegionToMETA(HRegion meta, HRegion r) throws IOException {
+    meta.checkResources();
     // The row key is the region name
-    long writeid = meta.startUpdate(r.getRegionName());
-    meta.put(writeid, COL_REGIONINFO, Writables.getBytes(r.getRegionInfo()));
-    meta.commit(writeid, System.currentTimeMillis());
+    Text row = r.getRegionName();
+    meta.obtainRowLock(row);
+    try {
+      HStoreKey key =
+        new HStoreKey(row, COL_REGIONINFO, System.currentTimeMillis());
+      TreeMap<HStoreKey, byte[]> edits = new TreeMap<HStoreKey, byte[]>();
+      edits.put(key, Writables.getBytes(r.getRegionInfo()));
+      meta.update(edits);
+    
+    } finally {
+      meta.releaseRowLock(row);
+    }
   }
   
   /**



Mime
View raw message