hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r596835 [2/3] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/ src/test/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/mapred/
Date Tue, 20 Nov 2007 21:53:32 GMT
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Tue Nov 20 13:53:30 2007
@@ -47,7 +47,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchOperation;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -194,15 +193,12 @@
       List<HRegion> nonClosedRegionsToCheck = getRegionsToCheck();
       for(HRegion cur: nonClosedRegionsToCheck) {
         try {
-          if (cur.needsCompaction()) {
-            cur.compactStores();
-          }
-          // After compaction, it probably needs splitting.  May also need
-          // splitting just because one of the memcache flushes was big.
-          Text midKey = new Text();
-          if (cur.needsSplit(midKey)) {
-            split(cur, midKey);
+          if (cur.compactIfNeeded()) {
+            // After compaction, it probably needs splitting.  May also need
+            // splitting just because one of the memcache flushes was big.
+            split(cur);
           }
+
         } catch(IOException e) {
           //TODO: What happens if this fails? Are we toast?
           LOG.error("Split or compaction failed", e);
@@ -213,10 +209,13 @@
       }
     }
     
-    private void split(final HRegion region, final Text midKey)
-    throws IOException {
+    private void split(final HRegion region) throws IOException {
       final HRegionInfo oldRegionInfo = region.getRegionInfo();
-      final HRegion[] newRegions = region.closeAndSplit(midKey, this);
+      final HRegion[] newRegions = region.splitRegion(this);
+      
+      if (newRegions == null) {
+        return;                                 // Didn't need to be split
+      }
       
       // When a region is split, the META table needs to updated if we're
       // splitting a 'normal' region, and the ROOT table needs to be
@@ -302,7 +301,7 @@
       // Flush them, if necessary
       for(HRegion cur: nonClosedRegionsToFlush) {
         try {
-          cur.optionallyFlush();
+          cur.flushcache();
         } catch (DroppedSnapshotException e) {
           // Cache flush can fail in a few places.  If it fails in a critical
           // section, we get a DroppedSnapshotException and a replay of hlog
@@ -1046,7 +1045,7 @@
     try {
       HRegion region = getRegion(regionName);
       MapWritable result = new MapWritable();
-      TreeMap<Text, byte[]> map = region.getFull(row);
+      Map<Text, byte[]> map = region.getFull(row);
       for (Map.Entry<Text, byte []> es: map.entrySet()) {
         result.put(new HStoreKey(row, es.getKey()),
             new ImmutableBytesWritable(es.getValue()));
@@ -1100,46 +1099,13 @@
 
   /** {@inheritDoc} */
   public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
-  throws IOException {
+    throws IOException {
+    
     checkOpen();
     this.requestCount.incrementAndGet();
-    // If timestamp == LATEST_TIMESTAMP and we have deletes, then they need
-    // special treatment.  For these we need to first find the latest cell so
-    // when we write the delete, we write it with the latest cells' timestamp
-    // so the delete record overshadows.  This means deletes and puts do not
-    // happen within the same row lock.
-    List<Text> deletes = null;
+    HRegion region = getRegion(regionName);
     try {
-      long lockid = startUpdate(regionName, b.getRow());
-      for (BatchOperation op: b) {
-        switch(op.getOp()) {
-        case PUT:
-          put(regionName, lockid, op.getColumn(), op.getValue());
-          break;
-
-        case DELETE:
-          if (timestamp == LATEST_TIMESTAMP) {
-            // Save off these deletes.
-            if (deletes == null) {
-              deletes = new ArrayList<Text>();
-            }
-            deletes.add(op.getColumn());
-          } else {
-            delete(regionName, lockid, op.getColumn());
-          }
-          break;
-        }
-      }
-      commit(regionName, lockid,
-        (timestamp == LATEST_TIMESTAMP)? System.currentTimeMillis(): timestamp);
-      
-      if (deletes != null && deletes.size() > 0) {
-        // We have some LATEST_TIMESTAMP deletes to run.
-        HRegion r = getRegion(regionName);
-        for (Text column: deletes) {
-          r.deleteMultiple(b.getRow(), column, LATEST_TIMESTAMP, 1);
-        }
-      }
+      region.batchUpdate(timestamp, b);
     } catch (IOException e) {
       checkFileSystem();
       throw e;
@@ -1234,24 +1200,6 @@
   // Methods that do the actual work for the remote API
   //
   
-  protected long startUpdate(Text regionName, Text row) throws IOException {
-    HRegion region = getRegion(regionName);
-    return region.startUpdate(row);
-  }
-
-  protected void put(final Text regionName, final long lockid,
-      final Text column, final byte [] val)
-  throws IOException {
-    HRegion region = getRegion(regionName, true);
-    region.put(lockid, column, val);
-  }
-
-  protected void delete(Text regionName, long lockid, Text column) 
-  throws IOException {
-    HRegion region = getRegion(regionName);
-    region.delete(lockid, column);
-  }
-  
   /** {@inheritDoc} */
   public void deleteAll(final Text regionName, final Text row,
       final Text column, final long timestamp) 
@@ -1260,13 +1208,6 @@
     region.deleteAll(row, column, timestamp);
   }
 
-  protected void commit(Text regionName, final long lockid,
-      final long timestamp) throws IOException {
-
-    HRegion region = getRegion(regionName, true);
-    region.commit(lockid, timestamp);
-  }
-
   /**
    * @return Info on this server.
    */
@@ -1379,6 +1320,7 @@
    */
   protected List<HRegion> getRegionsToCheck() {
     ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
+    //TODO: is this locking necessary? 
     lock.readLock().lock();
     try {
       regionsToCheck.addAll(this.onlineRegions.values());

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Tue Nov 20 13:53:30 2007
@@ -34,6 +34,8 @@
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,6 +43,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.SequenceFile;
@@ -64,11 +67,384 @@
 class HStore implements HConstants {
   static final Log LOG = LogFactory.getLog(HStore.class);
 
+  /**
+   * The Memcache holds in-memory modifications to the HRegion.  This is really a
+   * wrapper around a TreeMap that helps us when staging the Memcache out to disk.
+   */
+  static class Memcache {
+
+    // Note that since these structures are always accessed with a lock held,
+    // no additional synchronization is required.
+
+    @SuppressWarnings("hiding")
+    private final SortedMap<HStoreKey, byte[]> memcache =
+      Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+      
+    volatile SortedMap<HStoreKey, byte[]> snapshot;
+      
+    @SuppressWarnings("hiding")
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Constructor
+     */
+    Memcache() {
+      snapshot = 
+        Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+    }
+
+    /**
+     * Creates a snapshot of the current Memcache
+     */
+    void snapshot() {
+      this.lock.writeLock().lock();
+      try {
+        if (memcache.size() != 0) {
+          snapshot.putAll(memcache);
+          memcache.clear();
+        }
+      } finally {
+        this.lock.writeLock().unlock();
+      }
+    }
+    
+    /**
+     * @return memcache snapshot
+     */
+    SortedMap<HStoreKey, byte[]> getSnapshot() {
+      this.lock.writeLock().lock();
+      try {
+        SortedMap<HStoreKey, byte[]> currentSnapshot = snapshot;
+        snapshot = 
+          Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
+        
+        return currentSnapshot;
+
+      } finally {
+        this.lock.writeLock().unlock();
+      }
+    }
+    
+    /**
+     * Store a value.  
+     * @param key
+     * @param value
+     */
+    void add(final HStoreKey key, final byte[] value) {
+      this.lock.readLock().lock();
+      try {
+        memcache.put(key, value);
+        
+      } finally {
+        this.lock.readLock().unlock();
+      }
+    }
+
+    /**
+     * Look back through all the backlog TreeMaps to find the target.
+     * @param key
+     * @param numVersions
+     * @return An array of byte arrays ordered by timestamp.
+     */
+    List<byte[]> get(final HStoreKey key, final int numVersions) {
+      this.lock.readLock().lock();
+      try {
+        ArrayList<byte []> results = internalGet(memcache, key, numVersions);
+        results.addAll(results.size(),
+              internalGet(snapshot, key, numVersions - results.size()));
+        return results;
+        
+      } finally {
+        this.lock.readLock().unlock();
+      }
+    }
+
+    /**
+     * Return all the available columns for the given key.  The key indicates a 
+     * row and timestamp, but not a column name.
+     *
+     * The returned object should map column names to byte arrays (byte[]).
+     * @param key
+     * @param results
+     */
+    void getFull(HStoreKey key, SortedMap<Text, byte[]> results) {
+      this.lock.readLock().lock();
+      try {
+        internalGetFull(memcache, key, results);
+        internalGetFull(snapshot, key, results);
+
+      } finally {
+        this.lock.readLock().unlock();
+      }
+    }
+
+    private void internalGetFull(SortedMap<HStoreKey, byte []> map, HStoreKey key, 
+        SortedMap<Text, byte []> results) {
+
+      SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
+      for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+        HStoreKey itKey = es.getKey();
+        Text itCol = itKey.getColumn();
+        if (results.get(itCol) == null && key.matchesWithoutColumn(itKey)) {
+          byte [] val = tailMap.get(itKey);
+          results.put(itCol, val);
+
+        } else if (key.getRow().compareTo(itKey.getRow()) < 0) {
+          break;
+        }
+      }
+    }
+
+    /**
+     * Examine a single map for the desired key.
+     *
+     * TODO - This is kinda slow.  We need a data structure that allows for 
+     * proximity-searches, not just precise-matches.
+     * 
+     * @param map
+     * @param key
+     * @param numVersions
+     * @return Ordered list of items found in passed <code>map</code>.  If no
+     * matching values, returns an empty list (does not return null).
+     */
+    private ArrayList<byte []> internalGet(
+        final SortedMap<HStoreKey, byte []> map, final HStoreKey key,
+        final int numVersions) {
+
+      ArrayList<byte []> result = new ArrayList<byte []>();
+      // TODO: If get is of a particular version -- numVersions == 1 -- we
+      // should be able to avoid all of the tailmap creations and iterations
+      // below.
+      HStoreKey curKey = new HStoreKey(key);
+      SortedMap<HStoreKey, byte []> tailMap = map.tailMap(curKey);
+      for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+        HStoreKey itKey = es.getKey();
+        if (itKey.matchesRowCol(curKey)) {
+          if (!HLogEdit.isDeleted(es.getValue())) {
+            result.add(tailMap.get(itKey));
+            curKey.setVersion(itKey.getTimestamp() - 1);
+          }
+        }
+        if (numVersions > 0 && result.size() >= numVersions) {
+          break;
+        }
+      }
+      return result;
+    }
+
+    /**
+     * Get <code>versions</code> keys matching the origin key's
+     * row/column/timestamp and those of an older vintage
+     * Default access so can be accessed out of {@link HRegionServer}.
+     * @param origin Where to start searching.
+     * @param versions How many versions to return. Pass
+     * {@link HConstants.ALL_VERSIONS} to retrieve all.
+     * @return Ordered list of <code>versions</code> keys going from newest back.
+     * @throws IOException
+     */
+    List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
+      this.lock.readLock().lock();
+      try {
+        List<HStoreKey> results =
+          internalGetKeys(this.memcache, origin, versions);
+        results.addAll(results.size(), internalGetKeys(snapshot, origin,
+            versions == HConstants.ALL_VERSIONS ? versions :
+              (versions - results.size())));
+        return results;
+
+      } finally {
+        this.lock.readLock().unlock();
+      }
+    }
+
+    /*
+     * @param origin Where to start searching.
+     * @param versions How many versions to return. Pass
+     * {@link HConstants.ALL_VERSIONS} to retrieve all.
+     * @return List of all keys that are of the same row and column and of
+     * equal or older timestamp.  If no keys, returns an empty List. Does not
+     * return null.
+     */
+    private List<HStoreKey> internalGetKeys(final SortedMap<HStoreKey, byte []> map,
+        final HStoreKey origin, final int versions) {
+
+      List<HStoreKey> result = new ArrayList<HStoreKey>();
+      SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
+      for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+        HStoreKey key = es.getKey();
+        if (!key.matchesRowCol(origin)) {
+          break;
+        }
+        if (!HLogEdit.isDeleted(es.getValue())) {
+          result.add(key);
+          if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) {
+            // We have enough results.  Return.
+            break;
+          }
+        }
+      }
+      return result;
+    }
+
+
+    /**
+     * @param key
+     * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+     * Use checking values in store. On occasion the memcache has the fact that
+     * the cell has been deleted.
+     */
+    boolean isDeleted(final HStoreKey key) {
+      return HLogEdit.isDeleted(this.memcache.get(key));
+    }
+
+    /**
+     * @return a scanner over the keys in the Memcache
+     */
+    HInternalScannerInterface getScanner(long timestamp,
+        Text targetCols[], Text firstRow) throws IOException {
+
+      // Here we rely on ReentrantReadWriteLock's ability to acquire multiple
+      // locks by the same thread and to be able to downgrade a write lock to
+      // a read lock. We need to hold a lock throughout this method, but only
+      // need the write lock while creating the memcache snapshot
+      
+      this.lock.writeLock().lock(); // hold write lock during memcache snapshot
+      snapshot();                       // snapshot memcache
+      this.lock.readLock().lock();      // acquire read lock
+      this.lock.writeLock().unlock();   // downgrade to read lock
+      try {
+        // Prevent a cache flush while we are constructing the scanner
+
+        return new MemcacheScanner(timestamp, targetCols, firstRow);
+      
+      } finally {
+        this.lock.readLock().unlock();
+      }
+    }
+
+    //////////////////////////////////////////////////////////////////////////////
+    // MemcacheScanner implements the HScannerInterface.
+    // It lets the caller scan the contents of the Memcache.
+    //////////////////////////////////////////////////////////////////////////////
+
+    class MemcacheScanner extends HAbstractScanner {
+      SortedMap<HStoreKey, byte []> backingMap;
+      Iterator<HStoreKey> keyIterator;
+
+      @SuppressWarnings("unchecked")
+      MemcacheScanner(final long timestamp, final Text targetCols[],
+          final Text firstRow) throws IOException {
+
+        super(timestamp, targetCols);
+        try {
+          this.backingMap = new TreeMap<HStoreKey, byte[]>();
+          this.backingMap.putAll(snapshot);
+          this.keys = new HStoreKey[1];
+          this.vals = new byte[1][];
+
+          // Generate list of iterators
+
+          HStoreKey firstKey = new HStoreKey(firstRow);
+            if (firstRow != null && firstRow.getLength() != 0) {
+              keyIterator =
+                backingMap.tailMap(firstKey).keySet().iterator();
+
+            } else {
+              keyIterator = backingMap.keySet().iterator();
+            }
+
+            while (getNext(0)) {
+              if (!findFirstRow(0, firstRow)) {
+                continue;
+              }
+              if (columnMatch(0)) {
+                break;
+              }
+            }
+        } catch (RuntimeException ex) {
+          LOG.error("error initializing Memcache scanner: ", ex);
+          close();
+          IOException e = new IOException("error initializing Memcache scanner");
+          e.initCause(ex);
+          throw e;
+
+        } catch(IOException ex) {
+          LOG.error("error initializing Memcache scanner: ", ex);
+          close();
+          throw ex;
+        }
+      }
+
+      /**
+       * The user didn't want to start scanning at the first row. This method
+       * seeks to the requested row.
+       *
+       * @param i which iterator to advance
+       * @param firstRow seek to this row
+       * @return true if this is the first row
+       */
+      @Override
+      boolean findFirstRow(int i, Text firstRow) {
+        return firstRow.getLength() == 0 ||
+        keys[i].getRow().compareTo(firstRow) >= 0;
+      }
+
+      /**
+       * Get the next value from the specified iterator.
+       * 
+       * @param i Which iterator to fetch next value from
+       * @return true if there is more data available
+       */
+      @Override
+      boolean getNext(int i) {
+        boolean result = false;
+        while (true) {
+          if (!keyIterator.hasNext()) {
+            closeSubScanner(i);
+            break;
+          }
+          // Check key is < than passed timestamp for this scanner.
+          HStoreKey hsk = keyIterator.next();
+          if (hsk == null) {
+            throw new NullPointerException("Unexpected null key");
+          }
+          if (hsk.getTimestamp() <= this.timestamp) {
+            this.keys[i] = hsk;
+            this.vals[i] = backingMap.get(keys[i]);
+            result = true;
+            break;
+          }
+        }
+        return result;
+      }
+
+      /** Shut down an individual map iterator. */
+      @Override
+      void closeSubScanner(int i) {
+        keyIterator = null;
+        keys[i] = null;
+        vals[i] = null;
+        backingMap = null;
+      }
+
+      /** Shut down map iterators */
+      public void close() {
+        if (!scannerClosed) {
+          if(keyIterator != null) {
+            closeSubScanner(0);
+          }
+          scannerClosed = true;
+        }
+      }
+    }
+  }
+  
   static final String COMPACTION_TO_REPLACE = "toreplace";    
   static final String COMPACTION_DONE = "done";
   
   private static final String BLOOMFILTER_FILE_NAME = "filter";
 
+  final Memcache memcache = new Memcache();
   Path dir;
   Text regionName;
   String encodedRegionName;
@@ -87,12 +463,14 @@
   Integer compactLock = new Integer(0);
   Integer flushLock = new Integer(0);
 
-  final HLocking lock = new HLocking();
+  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  final AtomicInteger activeScanners = new AtomicInteger(0);
 
   /* Sorted Map of readers keyed by sequence id (Most recent should be last in
    * in list).
    */
-  TreeMap<Long, HStoreFile> storefiles = new TreeMap<Long, HStoreFile>();
+  SortedMap<Long, HStoreFile> storefiles =
+    Collections.synchronizedSortedMap(new TreeMap<Long, HStoreFile>());
   
   /* Sorted Map of readers keyed by sequence id (Most recent should be last in
    * in list).
@@ -101,9 +479,12 @@
 
   Random rand = new Random();
   
-  private long maxSeqId;
+  private volatile long maxSeqId;
   
-  private int compactionThreshold;
+  private final int compactionThreshold;
+  
+  private final ReentrantReadWriteLock newScannerLock =
+    new ReentrantReadWriteLock();
 
   /**
    * An HStore is a set of zero or more MapFiles, which stretch backwards over 
@@ -140,8 +521,8 @@
    */
   HStore(Path dir, Text regionName, String encodedName,
       HColumnDescriptor family, FileSystem fs, Path reconstructionLog,
-      HBaseConfiguration conf)
-  throws IOException {  
+      HBaseConfiguration conf) throws IOException {  
+    
     this.dir = dir;
     this.compactionDir = new Path(dir, "compaction.dir");
     this.regionName = regionName;
@@ -255,8 +636,8 @@
    * reflected in the MapFiles.)
    */
   private void doReconstructionLog(final Path reconstructionLog,
-      final long maxSeqID)
-  throws UnsupportedEncodingException, IOException {
+      final long maxSeqID) throws UnsupportedEncodingException, IOException {
+    
     if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
       // Nothing to do.
       return;
@@ -264,8 +645,10 @@
     long maxSeqIdInLog = -1;
     TreeMap<HStoreKey, byte []> reconstructedCache =
       new TreeMap<HStoreKey, byte []>();
-    SequenceFile.Reader login =
-      new SequenceFile.Reader(this.fs, reconstructionLog, this.conf);
+      
+    SequenceFile.Reader login = new SequenceFile.Reader(this.fs,
+        reconstructionLog, this.conf);
+    
     try {
       HLogKey key = new HLogKey();
       HLogEdit val = new HLogEdit();
@@ -310,7 +693,7 @@
       if (LOG.isDebugEnabled()) {
         LOG.debug("flushing reconstructionCache");
       }
-      flushCacheHelper(reconstructedCache, maxSeqIdInLog + 1, true);
+      internalFlushCache(reconstructedCache, maxSeqIdInLog + 1);
     }
   }
   
@@ -403,23 +786,43 @@
   //////////////////////////////////////////////////////////////////////////////
 
   /**
+   * Adds a value to the memcache
+   * 
+   * @param key
+   * @param value
+   */
+  void add(HStoreKey key, byte[] value) {
+    lock.readLock().lock();
+    try {
+      this.memcache.add(key, value);
+      
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+  
+  /**
    * Close all the MapFile readers
+   * 
+   * We don't need to worry about subsequent requests because the HRegion holds
+   * a write lock that will prevent any more reads or writes.
+   * 
    * @throws IOException
    */
-  Vector<HStoreFile> close() throws IOException {
-    Vector<HStoreFile> result = null;
-    this.lock.obtainWriteLock();
+  List<HStoreFile> close() throws IOException {
+    ArrayList<HStoreFile> result = null;
+    this.lock.writeLock().lock();
     try {
       for (MapFile.Reader reader: this.readers.values()) {
         reader.close();
       }
       this.readers.clear();
-      result = new Vector<HStoreFile>(storefiles.values());
+      result = new ArrayList<HStoreFile>(storefiles.values());
       this.storefiles.clear();
       LOG.debug("closed " + this.storeName);
       return result;
     } finally {
-      this.lock.releaseWriteLock();
+      this.lock.writeLock().unlock();
     }
   }
 
@@ -429,6 +832,14 @@
   //////////////////////////////////////////////////////////////////////////////
 
   /**
+   * Prior to doing a cache flush, we need to snapshot the memcache. Locking is
+   * handled by the memcache.
+   */
+  void snapshotMemcache() {
+    this.memcache.snapshot();
+  }
+  
+  /**
    * Write out a brand-new set of items to the disk.
    *
    * We should only store key/vals that are appropriate for the data-columns 
@@ -438,21 +849,18 @@
    *
    * Return the entire list of HStoreFiles currently used by the HStore.
    *
-   * @param inputCache memcache to flush
    * @param logCacheFlushId flush sequence number
    * @throws IOException
    */
-  void flushCache(final SortedMap<HStoreKey, byte []> inputCache,
-    final long logCacheFlushId)
-  throws IOException {
-    flushCacheHelper(inputCache, logCacheFlushId, true);
+  void flushCache(final long logCacheFlushId) throws IOException {
+      internalFlushCache(memcache.getSnapshot(), logCacheFlushId);
   }
   
-  void flushCacheHelper(SortedMap<HStoreKey, byte []> inputCache,
-      long logCacheFlushId, boolean addToAvailableMaps)
-  throws IOException {
+  private void internalFlushCache(SortedMap<HStoreKey, byte []> cache,
+      long logCacheFlushId) throws IOException {
+    
     synchronized(flushLock) {
-      // A. Write the TreeMap out to the disk
+      // A. Write the Maps out to the disk
       HStoreFile flushedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
         encodedRegionName, familyName, fs);
       String name = flushedFile.toString();
@@ -471,10 +879,11 @@
       // Related, looks like 'merging compactions' in BigTable paper interlaces
       // a memcache flush.  We don't.
       try {
-        for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
+        for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
           HStoreKey curkey = es.getKey();
-          if (this.familyName.
-              equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+          if (this.familyName.equals(HStoreKey.extractFamily(
+              curkey.getColumn()))) {
+              
             out.append(curkey, new ImmutableBytesWritable(es.getValue()));
           }
         }
@@ -487,44 +896,30 @@
       flushedFile.writeInfo(fs, logCacheFlushId);
       
       // C. Flush the bloom filter if any
-      if(bloomFilter != null) {
+      if (bloomFilter != null) {
         flushBloomFilter();
       }
 
       // D. Finally, make the new MapFile available.
-      if(addToAvailableMaps) {
-        this.lock.obtainWriteLock();
-        try {
-          Long flushid = Long.valueOf(logCacheFlushId);
-          // Open the map file reader.
-          this.readers.put(flushid,
+      this.lock.writeLock().lock();
+      try {
+        Long flushid = Long.valueOf(logCacheFlushId);
+        // Open the map file reader.
+        this.readers.put(flushid,
             flushedFile.getReader(this.fs, this.bloomFilter));
-          this.storefiles.put(flushid, flushedFile);
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Added " + name +
-                " with sequence id " + logCacheFlushId + " and size " +
+        this.storefiles.put(flushid, flushedFile);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Added " + name +
+              " with sequence id " + logCacheFlushId + " and size " +
               StringUtils.humanReadableInt(flushedFile.length()));
-          }
-        } finally {
-          this.lock.releaseWriteLock();
         }
+      } finally {
+        this.lock.writeLock().unlock();
       }
       return;
     }
   }
 
-  /**
-   * @return - vector of all the HStore files in use
-   */
-  Vector<HStoreFile> getAllStoreFiles() {
-    this.lock.obtainReadLock();
-    try {
-      return new Vector<HStoreFile>(storefiles.values());
-    } finally {
-      this.lock.releaseReadLock();
-    }
-  }
-
   //////////////////////////////////////////////////////////////////////////////
   // Compaction
   //////////////////////////////////////////////////////////////////////////////
@@ -532,9 +927,16 @@
   /**
    * @return True if this store needs compaction.
    */
-  public boolean needsCompaction() {
-    return this.storefiles != null &&
-      this.storefiles.size() >= this.compactionThreshold;
+  boolean needsCompaction() {
+    boolean compactionNeeded = false;
+    if (this.storefiles != null) {
+      compactionNeeded = this.storefiles.size() >= this.compactionThreshold;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("compaction for HStore " + regionName + "/" + familyName +
+            (compactionNeeded ? " " : " not ") + "needed.");
+      }
+    }
+    return compactionNeeded;
   }
 
   /**
@@ -542,44 +944,26 @@
    * thread must be able to block for long periods.
    * 
    * <p>During this time, the HStore can work as usual, getting values from
-   * MapFiles and writing new MapFiles from given memcaches.
+   * MapFiles and writing new MapFiles from the Memcache.
    * 
    * Existing MapFiles are not destroyed until the new compacted TreeMap is 
    * completely written-out to disk.
    *
-   * The compactLock block prevents multiple simultaneous compactions.
+   * The compactLock prevents multiple simultaneous compactions.
    * The structureLock prevents us from interfering with other write operations.
    * 
    * We don't want to hold the structureLock for the whole time, as a compact() 
    * can be lengthy and we want to allow cache-flushes during this period.
    * @throws IOException
+   * 
+   * @return true if compaction completed successfully
    */
-  void compact() throws IOException {
-    compactHelper(false);
-  }
-  
-  void compactHelper(final boolean deleteSequenceInfo)
-  throws IOException {
-    compactHelper(deleteSequenceInfo, -1);
-  }
-
-  /* 
-   * @param deleteSequenceInfo True if we are to set the sequence number to -1
-   * on compacted file.
-   * @param maxSeenSeqID We may have already calculated the maxSeenSeqID.  If
-   * so, pass it here.  Otherwise, pass -1 and it will be calculated inside in
-   * this method.
-   * @param deleteSequenceInfo
-   * @param maxSeenSeqID
-   * @throws IOException
-   */
-  void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
-  throws IOException {
-    long maxId = maxSeenSeqID;
-    synchronized(compactLock) {
+  boolean compact() throws IOException {
+    long maxId = -1;
+    synchronized (compactLock) {
       Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir,
           encodedRegionName, familyName);
-      if(LOG.isDebugEnabled()) {
+      if (LOG.isDebugEnabled()) {
         LOG.debug("started compaction of " + storefiles.size() + " files in " +
           curCompactStore.toString());
       }
@@ -591,19 +975,22 @@
         }
       }
       try {
-        List<HStoreFile> toCompactFiles = getFilesToCompact();
+        // Storefiles are keyed by sequence id.  The oldest file comes first.
+        // We need to return out of here a List that has the newest file as
+        // first.
+        List<HStoreFile> filesToCompact =
+          new ArrayList<HStoreFile>(this.storefiles.values());
+        Collections.reverse(filesToCompact);
+
         HStoreFile compactedOutputFile = new HStoreFile(conf,
             this.compactionDir, encodedRegionName, familyName, -1);
-        if (toCompactFiles.size() < 1 ||
-            (toCompactFiles.size() == 1 &&
-              !toCompactFiles.get(0).isReference())) {
+        if (filesToCompact.size() < 1 ||
+            (filesToCompact.size() == 1 &&
+              !filesToCompact.get(0).isReference())) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("nothing to compact for " + this.storeName);
           }
-          if (deleteSequenceInfo && toCompactFiles.size() == 1) {
-            toCompactFiles.get(0).writeInfo(fs, -1);
-          }
-          return;
+          return false;
         }
         
         if (!fs.mkdirs(curCompactStore)) {
@@ -613,10 +1000,10 @@
         // Compute the max-sequenceID seen in any of the to-be-compacted
         // TreeMaps if it hasn't been passed in to us.
         if (maxId == -1) {
-          for (HStoreFile hsf: toCompactFiles) {
+          for (HStoreFile hsf: filesToCompact) {
             long seqid = hsf.loadInfo(fs);
-            if(seqid > 0) {
-              if(seqid > maxId) {
+            if (seqid > 0) {
+              if (seqid > maxId) {
                 maxId = seqid;
               }
             }
@@ -624,17 +1011,16 @@
         }
 
         // Step through them, writing to the brand-new TreeMap
-        MapFile.Writer compactedOut =
-          compactedOutputFile.getWriter(this.fs, this.compression,
-            this.bloomFilter);
+        MapFile.Writer compactedOut = compactedOutputFile.getWriter(this.fs,
+            this.compression, this.bloomFilter);
         try {
-          compact(compactedOut, toCompactFiles);
+          compactHStoreFiles(compactedOut, filesToCompact);
         } finally {
           compactedOut.close();
         }
 
         // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
-        if((! deleteSequenceInfo) && maxId >= 0) {
+        if (maxId >= 0) {
           compactedOutputFile.writeInfo(fs, maxId);
         } else {
           compactedOutputFile.writeInfo(fs, -1);
@@ -644,8 +1030,8 @@
         Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
         DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
         try {
-          out.writeInt(toCompactFiles.size());
-          for(HStoreFile hsf: toCompactFiles) {
+          out.writeInt(filesToCompact.size());
+          for (HStoreFile hsf: filesToCompact) {
             hsf.write(out);
           }
         } finally {
@@ -657,7 +1043,8 @@
         (new DataOutputStream(fs.create(doneFile))).close();
 
         // Move the compaction into place.
-        processReadyCompaction();
+        completeCompaction();
+        return true;
       } finally {
         // Clean up the parent -- the region dir in the compactions directory.
         if (this.fs.exists(curCompactStore.getParent())) {
@@ -671,24 +1058,6 @@
   }
   
   /*
-   * @return list of files to compact sorted so most recent comes first.
-   */
-  private List<HStoreFile> getFilesToCompact() {
-    List<HStoreFile> filesToCompact = null;
-    this.lock.obtainWriteLock();
-    try {
-      // Storefiles are keyed by sequence id.  The oldest file comes first.
-      // We need to return out of here a List that has the newest file as
-      // first.
-      filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
-      Collections.reverse(filesToCompact);
-    } finally {
-      this.lock.releaseWriteLock();
-    }
-    return filesToCompact;
-  }
-  
-  /*
    * Compact passed <code>toCompactFiles</code> into <code>compactedOut</code>. 
    * We create a new set of MapFile.Reader objects so we don't screw up 
    * the caching associated with the currently-loaded ones. Our
@@ -704,9 +1073,9 @@
    * @param toCompactFiles
    * @throws IOException
    */
-  void compact(final MapFile.Writer compactedOut,
-      final List<HStoreFile> toCompactFiles)
-  throws IOException {
+  private void compactHStoreFiles(final MapFile.Writer compactedOut,
+      final List<HStoreFile> toCompactFiles) throws IOException {
+    
     int size = toCompactFiles.size();
     CompactionReader[] rdrs = new CompactionReader[size];
     int index = 0;
@@ -724,7 +1093,89 @@
       }
     }
     try {
-      compact(compactedOut, rdrs);
+      HStoreKey[] keys = new HStoreKey[rdrs.length];
+      ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
+      boolean[] done = new boolean[rdrs.length];
+      for(int i = 0; i < rdrs.length; i++) {
+        keys[i] = new HStoreKey();
+        vals[i] = new ImmutableBytesWritable();
+        done[i] = false;
+      }
+
+      // Now, advance through the readers in order.  This will have the
+      // effect of a run-time sort of the entire dataset.
+      int numDone = 0;
+      for(int i = 0; i < rdrs.length; i++) {
+        rdrs[i].reset();
+        done[i] = ! rdrs[i].next(keys[i], vals[i]);
+        if(done[i]) {
+          numDone++;
+        }
+      }
+
+      int timesSeen = 0;
+      Text lastRow = new Text();
+      Text lastColumn = new Text();
+      // Map of a row deletes keyed by column with a list of timestamps for value
+      Map<Text, List<Long>> deletes = null;
+      while (numDone < done.length) {
+        // Find the reader with the smallest key.  If two files have same key
+        // but different values -- i.e. one is delete and other is non-delete
+        // value -- we will find the first, the one that was written later and
+        // therefore the one whose value should make it out to the compacted
+        // store file.
+        int smallestKey = -1;
+        for(int i = 0; i < rdrs.length; i++) {
+          if(done[i]) {
+            continue;
+          }
+          if(smallestKey < 0) {
+            smallestKey = i;
+          } else {
+            if(keys[i].compareTo(keys[smallestKey]) < 0) {
+              smallestKey = i;
+            }
+          }
+        }
+
+        // Reflect the current key/val in the output
+        HStoreKey sk = keys[smallestKey];
+        if(lastRow.equals(sk.getRow())
+            && lastColumn.equals(sk.getColumn())) {
+          timesSeen++;
+        } else {
+          timesSeen = 1;
+          // We are on to a new row.  Create a new deletes list.
+          deletes = new HashMap<Text, List<Long>>();
+        }
+
+        byte [] value = (vals[smallestKey] == null)?
+          null: vals[smallestKey].get();
+        if (!isDeleted(sk, value, false, deletes) &&
+            timesSeen <= family.getMaxVersions()) {
+          // Keep old versions until we have maxVersions worth.
+          // Then just skip them.
+          if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) {
+            // Only write out objects which have a non-zero length key and
+            // value
+            compactedOut.append(sk, vals[smallestKey]);
+          }
+        }
+
+        // Update last-seen items
+        lastRow.set(sk.getRow());
+        lastColumn.set(sk.getColumn());
+
+        // Advance the smallest key.  If that reader's all finished, then 
+        // mark it as done.
+        if(!rdrs[smallestKey].next(keys[smallestKey],
+            vals[smallestKey])) {
+          done[smallestKey] = true;
+          rdrs[smallestKey].close();
+          rdrs[smallestKey] = null;
+          numDone++;
+        }
+      }
     } finally {
       for (int i = 0; i < rdrs.length; i++) {
         if (rdrs[i] != null) {
@@ -789,131 +1240,6 @@
       this.reader.reset();
     }
   }
-  
-  void compact(final MapFile.Writer compactedOut,
-      final Iterator<Entry<HStoreKey, byte []>> iterator,
-      final MapFile.Reader reader)
-  throws IOException {
-    // Make an instance of a CompactionReader that wraps the iterator.
-    CompactionReader cr = new CompactionReader() {
-      public boolean next(WritableComparable key, Writable val)
-          throws IOException {
-        boolean result = false;
-        while (iterator.hasNext()) {
-          Entry<HStoreKey, byte []> e = iterator.next();
-          HStoreKey hsk = e.getKey();
-          if (familyName.equals(HStoreKey.extractFamily(hsk.getColumn()))) {
-            ((HStoreKey)key).set(hsk);
-            ((ImmutableBytesWritable)val).set(e.getValue());
-            result = true;
-            break;
-          }
-        }
-        return result;
-      }
-
-      @SuppressWarnings("unused")
-      public void reset() throws IOException {
-        // noop.
-      }
-      
-      @SuppressWarnings("unused")
-      public void close() throws IOException {
-        // noop.
-      }
-    };
-    
-    compact(compactedOut,
-      new CompactionReader [] {cr, new MapFileCompactionReader(reader)});
-  }
-  
-  void compact(final MapFile.Writer compactedOut,
-      final CompactionReader [] rdrs)
-  throws IOException {
-    HStoreKey[] keys = new HStoreKey[rdrs.length];
-    ImmutableBytesWritable[] vals = new ImmutableBytesWritable[rdrs.length];
-    boolean[] done = new boolean[rdrs.length];
-    for(int i = 0; i < rdrs.length; i++) {
-      keys[i] = new HStoreKey();
-      vals[i] = new ImmutableBytesWritable();
-      done[i] = false;
-    }
-
-    // Now, advance through the readers in order.  This will have the
-    // effect of a run-time sort of the entire dataset.
-    int numDone = 0;
-    for(int i = 0; i < rdrs.length; i++) {
-      rdrs[i].reset();
-      done[i] = ! rdrs[i].next(keys[i], vals[i]);
-      if(done[i]) {
-        numDone++;
-      }
-    }
-
-    int timesSeen = 0;
-    Text lastRow = new Text();
-    Text lastColumn = new Text();
-    // Map of a row deletes keyed by column with a list of timestamps for value
-    Map<Text, List<Long>> deletes = null;
-    while (numDone < done.length) {
-      // Find the reader with the smallest key.  If two files have same key
-      // but different values -- i.e. one is delete and other is non-delete
-      // value -- we will find the first, the one that was written later and
-      // therefore the one whose value should make it out to the compacted
-      // store file.
-      int smallestKey = -1;
-      for(int i = 0; i < rdrs.length; i++) {
-        if(done[i]) {
-          continue;
-        }
-        if(smallestKey < 0) {
-          smallestKey = i;
-        } else {
-          if(keys[i].compareTo(keys[smallestKey]) < 0) {
-            smallestKey = i;
-          }
-        }
-      }
-
-      // Reflect the current key/val in the output
-      HStoreKey sk = keys[smallestKey];
-      if(lastRow.equals(sk.getRow())
-          && lastColumn.equals(sk.getColumn())) {
-        timesSeen++;
-      } else {
-        timesSeen = 1;
-        // We are on to a new row.  Create a new deletes list.
-        deletes = new HashMap<Text, List<Long>>();
-      }
-
-      byte [] value = (vals[smallestKey] == null)?
-        null: vals[smallestKey].get();
-      if (!isDeleted(sk, value, null, deletes) &&
-          timesSeen <= family.getMaxVersions()) {
-        // Keep old versions until we have maxVersions worth.
-        // Then just skip them.
-        if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) {
-          // Only write out objects which have a non-zero length key and
-          // value
-          compactedOut.append(sk, vals[smallestKey]);
-        }
-      }
-
-      // Update last-seen items
-      lastRow.set(sk.getRow());
-      lastColumn.set(sk.getColumn());
-
-      // Advance the smallest key.  If that reader's all finished, then 
-      // mark it as done.
-      if(!rdrs[smallestKey].next(keys[smallestKey],
-          vals[smallestKey])) {
-        done[smallestKey] = true;
-        rdrs[smallestKey].close();
-        rdrs[smallestKey] = null;
-        numDone++;
-      }
-    }
-  }
 
   /*
    * Check if this is cell is deleted.
@@ -923,7 +1249,7 @@
    * deletes map.
    * @param hsk
    * @param value
-   * @param memcache Can be null.
+   * @param checkMemcache true if the memcache should be consulted
    * @param deletes Map keyed by column with a value of timestamp. Can be null.
    * If non-null and passed value is HGlobals.deleteBytes, then we add to this
    * map.
@@ -931,14 +1257,14 @@
    * passed value is HGlobals.deleteBytes.
   */
   private boolean isDeleted(final HStoreKey hsk, final byte [] value,
-      final HMemcache memcache, final Map<Text, List<Long>> deletes) {
-    if (memcache != null && memcache.isDeleted(hsk)) {
+      final boolean checkMemcache, final Map<Text, List<Long>> deletes) {
+    if (checkMemcache && memcache.isDeleted(hsk)) {
       return true;
     }
-    List<Long> timestamps = (deletes == null)?
-      null: deletes.get(hsk.getColumn());
+    List<Long> timestamps =
+      (deletes == null) ? null: deletes.get(hsk.getColumn());
     if (timestamps != null &&
-      timestamps.contains(Long.valueOf(hsk.getTimestamp()))) {
+        timestamps.contains(Long.valueOf(hsk.getTimestamp()))) {
       return true;
     }
     if (value == null) {
@@ -972,97 +1298,117 @@
    * 
    * <p>Moving the compacted TreeMap into place means:
    * <pre>
-   * 1) Acquiring the write-lock
-   * 2) Figuring out what MapFiles are going to be replaced
-   * 3) Moving the new compacted MapFile into place
-   * 4) Unloading all the replaced MapFiles.
-   * 5) Deleting all the old MapFile files.
-   * 6) Loading the new TreeMap.
-   * 7) Releasing the write-lock
+   * 1) Wait for active scanners to exit
+   * 2) Acquiring the write-lock
+   * 3) Figuring out what MapFiles are going to be replaced
+   * 4) Moving the new compacted MapFile into place
+   * 5) Unloading all the replaced MapFiles.
+   * 6) Deleting all the old MapFile files.
+   * 7) Loading the new TreeMap.
+   * 8) Releasing the write-lock
+   * 9) Allow new scanners to proceed.
    * </pre>
    */
-  void processReadyCompaction() throws IOException {
-    // 1. Acquiring the write-lock
+  private void completeCompaction() throws IOException {
     Path curCompactStore = HStoreFile.getHStoreDir(this.compactionDir,
         encodedRegionName, familyName);
-    this.lock.obtainWriteLock();
+    
+    // 1. Wait for active scanners to exit
+    newScannerLock.writeLock().lock();                  // prevent new scanners
     try {
-      Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
-      if (!fs.exists(doneFile)) {
-        // The last execution didn't finish the compaction, so there's nothing 
-        // we can do.  We'll just have to redo it. Abandon it and return.
-        LOG.warn("Redo failed compaction (missing 'done' file)");
-        return;
-      }
+      synchronized (activeScanners) {
+        while (activeScanners.get() != 0) {
+          try {
+            activeScanners.wait();
+          } catch (InterruptedException e) {
+            // continue
+          }
+        }
 
-      // 2. Load in the files to be deleted.
-      Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
-      Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
-      DataInputStream in = new DataInputStream(fs.open(filesToReplace));
+        // 2. Acquiring the HStore write-lock
+        this.lock.writeLock().lock();
+      }
       try {
-        int numfiles = in.readInt();
-        for(int i = 0; i < numfiles; i++) {
-          HStoreFile hsf = new HStoreFile(conf);
-          hsf.readFields(in);
-          toCompactFiles.add(hsf);
+        Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
+        if (!fs.exists(doneFile)) {
+          // The last execution didn't finish the compaction, so there's nothing 
+          // we can do.  We'll just have to redo it. Abandon it and return.
+          LOG.warn("Redo failed compaction (missing 'done' file)");
+          return;
         }
-        
-      } finally {
-        in.close();
-      }
 
-      // 3. Moving the new MapFile into place.
-      HStoreFile compactedFile = new HStoreFile(conf, this.compactionDir,
-          encodedRegionName, familyName, -1);
-      // obtainNewHStoreFile does its best to generate a filename that does not
-      // currently exist.
-      HStoreFile finalCompactedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
-          encodedRegionName, familyName, fs);
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("moving " + compactedFile.toString() + " in " +
-            this.compactionDir.toString() +
-          " to " + finalCompactedFile.toString() + " in " + dir.toString());
-      }
-      if (!compactedFile.rename(this.fs, finalCompactedFile)) {
-        LOG.error("Failed move of compacted file " +
-          finalCompactedFile.toString());
-        return;
-      }
-
-      // 4. and 5. Unload all the replaced MapFiles, close and delete.
-      Vector<Long> toDelete = new Vector<Long>(toCompactFiles.size());
-      for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
-        if (!toCompactFiles.contains(e.getValue())) {
-          continue;
+        // 3. Load in the files to be deleted.
+        Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
+        Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
+        DataInputStream in = new DataInputStream(fs.open(filesToReplace));
+        try {
+          int numfiles = in.readInt();
+          for(int i = 0; i < numfiles; i++) {
+            HStoreFile hsf = new HStoreFile(conf);
+            hsf.readFields(in);
+            toCompactFiles.add(hsf);
+          }
+
+        } finally {
+          in.close();
         }
-        Long key = e.getKey();
-        MapFile.Reader reader = this.readers.remove(key);
-        if (reader != null) {
-          reader.close();
+
+        // 4. Moving the new MapFile into place.
+        HStoreFile compactedFile = new HStoreFile(conf, this.compactionDir,
+            encodedRegionName, familyName, -1);
+        // obtainNewHStoreFile does its best to generate a filename that does not
+        // currently exist.
+        HStoreFile finalCompactedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
+            encodedRegionName, familyName, fs);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("moving " + compactedFile.toString() + " in " +
+              this.compactionDir.toString() +
+              " to " + finalCompactedFile.toString() + " in " + dir.toString());
+        }
+        if (!compactedFile.rename(this.fs, finalCompactedFile)) {
+          LOG.error("Failed move of compacted file " +
+              finalCompactedFile.toString());
+          return;
         }
-        toDelete.add(key);
-      }
-      
-      try {
-        for (Long key: toDelete) {
-          HStoreFile hsf = this.storefiles.remove(key);
-          hsf.delete();
+
+        // 5. and 6. Unload all the replaced MapFiles, close and delete.
+        Vector<Long> toDelete = new Vector<Long>(toCompactFiles.size());
+        for (Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
+          if (!toCompactFiles.contains(e.getValue())) {
+            continue;
+          }
+          Long key = e.getKey();
+          MapFile.Reader reader = this.readers.remove(key);
+          if (reader != null) {
+            reader.close();
+          }
+          toDelete.add(key);
         }
 
-        // 6. Loading the new TreeMap.
-        Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
-        this.readers.put(orderVal,
-            finalCompactedFile.getReader(this.fs, this.bloomFilter));
-        this.storefiles.put(orderVal, finalCompactedFile);
-      } catch (IOException e) {
-        LOG.error("Failed replacing compacted files. Compacted file is " +
-          finalCompactedFile.toString() + ".  Files replaced are " +
-          toCompactFiles.toString() +
-          " some of which may have been already removed", e);
+        try {
+          for (Long key: toDelete) {
+            HStoreFile hsf = this.storefiles.remove(key);
+            hsf.delete();
+          }
+
+          // 7. Loading the new TreeMap.
+          Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
+          this.readers.put(orderVal,
+              finalCompactedFile.getReader(this.fs, this.bloomFilter));
+          this.storefiles.put(orderVal, finalCompactedFile);
+        } catch (IOException e) {
+          LOG.error("Failed replacing compacted files. Compacted file is " +
+              finalCompactedFile.toString() + ".  Files replaced are " +
+              toCompactFiles.toString() +
+              " some of which may have been already removed", e);
+        }
+      } finally {
+        // 8. Releasing the write-lock
+        this.lock.writeLock().unlock();
       }
     } finally {
-      // 7. Releasing the write-lock
-      this.lock.releaseWriteLock();
+      // 9. Allow new scanners to proceed.
+      newScannerLock.writeLock().unlock();
     }
   }
 
@@ -1078,8 +1424,10 @@
    * The returned object should map column names to byte arrays (byte[]).
    */
   void getFull(HStoreKey key, TreeMap<Text, byte []> results)
-  throws IOException {
-    this.lock.obtainReadLock();
+    throws IOException {
+    
+    this.lock.readLock().lock();
+    memcache.getFull(key, results);
     try {
       MapFile.Reader[] maparray = getReaders();
       for (int i = maparray.length - 1; i >= 0; i--) {
@@ -1109,7 +1457,7 @@
       }
       
     } finally {
-      this.lock.releaseReadLock();
+      this.lock.readLock().unlock();
     }
   }
   
@@ -1125,29 +1473,33 @@
    * If 'numVersions' is negative, the method returns all available versions.
    * @param key
    * @param numVersions Number of versions to fetch.  Must be > 0.
-   * @param memcache Checked for deletions
    * @return values for the specified versions
    * @throws IOException
    */
-  byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache)
-  throws IOException {
+  byte [][] get(HStoreKey key, int numVersions) throws IOException {
     if (numVersions <= 0) {
       throw new IllegalArgumentException("Number of versions must be > 0");
     }
     
-    List<byte []> results = new ArrayList<byte []>();
-    // Keep a list of deleted cell keys.  We need this because as we go through
-    // the store files, the cell with the delete marker may be in one file and
-    // the old non-delete cell value in a later store file. If we don't keep
-    // around the fact that the cell was deleted in a newer record, we end up
-    // returning the old value if user is asking for more than one version.
-    // This List of deletes should not large since we are only keeping rows
-    // and columns that match those set on the scanner and which have delete
-    // values.  If memory usage becomes an issue, could redo as bloom filter.
-    Map<Text, List<Long>> deletes = new HashMap<Text, List<Long>>();
-    // This code below is very close to the body of the getKeys method.
-    this.lock.obtainReadLock();
+    this.lock.readLock().lock();
     try {
+      // Check the memcache
+      List<byte[]> results = this.memcache.get(key, numVersions);
+      // If we got sufficient versions from memcache, return.
+      if (results.size() == numVersions) {
+        return ImmutableBytesWritable.toArray(results);
+      }
+
+      // Keep a list of deleted cell keys.  We need this because as we go through
+      // the store files, the cell with the delete marker may be in one file and
+      // the old non-delete cell value in a later store file. If we don't keep
+      // around the fact that the cell was deleted in a newer record, we end up
+      // returning the old value if user is asking for more than one version.
+      // This List of deletes should not large since we are only keeping rows
+      // and columns that match those set on the scanner and which have delete
+      // values.  If memory usage becomes an issue, could redo as bloom filter.
+      Map<Text, List<Long>> deletes = new HashMap<Text, List<Long>>();
+      // This code below is very close to the body of the getKeys method.
       MapFile.Reader[] maparray = getReaders();
       for(int i = maparray.length - 1; i >= 0; i--) {
         MapFile.Reader map = maparray[i];
@@ -1165,7 +1517,7 @@
           if (!readkey.matchesRowCol(key)) {
             continue;
           }
-          if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
+          if (!isDeleted(readkey, readval.get(), true, deletes)) {
             results.add(readval.get());
             // Perhaps only one version is wanted.  I could let this
             // test happen later in the for loop test but it would cost
@@ -1179,7 +1531,7 @@
               readkey.matchesRowCol(key) &&
               !hasEnoughVersions(numVersions, results);
               readval = new ImmutableBytesWritable()) {
-            if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
+            if (!isDeleted(readkey, readval.get(), true, deletes)) {
               results.add(readval.get());
             }
           }
@@ -1191,7 +1543,7 @@
       return results.size() == 0 ?
         null : ImmutableBytesWritable.toArray(results);
     } finally {
-      this.lock.releaseReadLock();
+      this.lock.readLock().unlock();
     }
   }
   
@@ -1215,15 +1567,16 @@
    * matching keys found in store files appended.
    * @throws IOException
    */
-  List<HStoreKey> getKeys(final HStoreKey origin, List<HStoreKey> allKeys,
-      final int versions) throws IOException {
+  List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
+    throws IOException {
     
-    List<HStoreKey> keys = allKeys;
-    if (keys == null) {
-      keys = new ArrayList<HStoreKey>();
+    List<HStoreKey> keys = this.memcache.getKeys(origin, versions);
+    if (versions != ALL_VERSIONS && keys.size() >= versions) {
+      return keys;
     }
+    
     // This code below is very close to the body of the get method.
-    this.lock.obtainReadLock();
+    this.lock.readLock().lock();
     try {
       MapFile.Reader[] maparray = getReaders();
       for(int i = maparray.length - 1; i >= 0; i--) {
@@ -1242,7 +1595,7 @@
           if (!readkey.matchesRowCol(origin)) {
             continue;
           }
-          if (!isDeleted(readkey, readval.get(), null, null) &&
+          if (!isDeleted(readkey, readval.get(), false, null) &&
               !keys.contains(readkey)) {
             keys.add(new HStoreKey(readkey));
           }
@@ -1250,7 +1603,7 @@
               map.next(readkey, readval) &&
               readkey.matchesRowCol(origin);
               readval = new ImmutableBytesWritable()) {
-            if (!isDeleted(readkey, readval.get(), null, null) &&
+            if (!isDeleted(readkey, readval.get(), false, null) &&
                 !keys.contains(readkey)) {
               keys.add(new HStoreKey(readkey));
               if (versions != ALL_VERSIONS && keys.size() >= versions) {
@@ -1262,7 +1615,7 @@
       }
       return keys;
     } finally {
-      this.lock.releaseReadLock();
+      this.lock.readLock().unlock();
     }
   }
   
@@ -1315,7 +1668,7 @@
       return new HStoreSize(0, 0, splitable);
     }
     
-    this.lock.obtainReadLock();
+    this.lock.readLock().lock();
     try {
       Long mapIndex = Long.valueOf(0L);
       // Iterate through all the MapFiles
@@ -1340,84 +1693,52 @@
     } catch(IOException e) {
       LOG.warn("Failed getting store size", e);
     } finally {
-      this.lock.releaseReadLock();
+      this.lock.readLock().unlock();
     }
     return new HStoreSize(aggregateSize, maxSize, splitable);
   }
   
-  /**
-   * @return    Returns the number of map files currently in use
-   */
-  int countOfStoreFiles() {
-    this.lock.obtainReadLock();
-    try {
-      return storefiles.size();
-      
-    } finally {
-      this.lock.releaseReadLock();
-    }
-  }
-  
-  boolean hasReferences() {
-    boolean result = false;
-    this.lock.obtainReadLock();
-    try {
-        for (HStoreFile hsf: this.storefiles.values()) {
-          if (hsf.isReference()) {
-            break;
-          }
-        }
-      
-    } finally {
-      this.lock.releaseReadLock();
-    }
-    return result;
-  }
-  
   //////////////////////////////////////////////////////////////////////////////
   // File administration
   //////////////////////////////////////////////////////////////////////////////
 
-  /** Generate a random unique filename suffix */
-  String obtainFileLabel(Path prefix) throws IOException {
-    String testsuffix = String.valueOf(rand.nextInt(Integer.MAX_VALUE));
-    Path testpath = new Path(prefix.toString() + testsuffix);
-    while(fs.exists(testpath)) {
-      testsuffix = String.valueOf(rand.nextInt(Integer.MAX_VALUE));
-      testpath = new Path(prefix.toString() + testsuffix);
-    }
-    return testsuffix;
-  }
-
   /**
-   * Return a set of MapFile.Readers, one for each HStore file.
-   * These should be closed after the user is done with them.
+   * Return a scanner for both the memcache and the HStore files
    */
   HInternalScannerInterface getScanner(long timestamp, Text targetCols[],
-      Text firstRow) throws IOException {
-    
-    return new HStoreScanner(timestamp, targetCols, firstRow);
+      Text firstRow, RowFilterInterface filter) throws IOException {
+
+    newScannerLock.readLock().lock();           // ability to create a new
+                                                // scanner during a compaction
+    try {
+      lock.readLock().lock();                   // lock HStore
+      try {
+        return new HStoreScanner(targetCols, firstRow, timestamp, filter);
+
+      } finally {
+        lock.readLock().unlock();
+      }
+    } finally {
+      newScannerLock.readLock().unlock();
+    }
   }
-  
+
   /** {@inheritDoc} */
   @Override
   public String toString() {
     return this.storeName;
   }
 
-  //////////////////////////////////////////////////////////////////////////////
-  // This class implements the HScannerInterface.
-  // It lets the caller scan the contents of this HStore.
-  //////////////////////////////////////////////////////////////////////////////
-  
-  class HStoreScanner extends HAbstractScanner {
+  /**
+   * A scanner that iterates through the HStore files
+   */
+  private class StoreFileScanner extends HAbstractScanner {
     @SuppressWarnings("hiding")
     private MapFile.Reader[] readers;
     
-    HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
+    StoreFileScanner(long timestamp, Text[] targetCols, Text firstRow)
     throws IOException {
       super(timestamp, targetCols);
-      lock.obtainReadLock();
       try {
         this.readers = new MapFile.Reader[storefiles.size()];
         
@@ -1430,7 +1751,7 @@
         
         this.keys = new HStoreKey[readers.length];
         this.vals = new byte[readers.length][];
-
+        
         // Advance the readers to the first pos.
         for(i = 0; i < readers.length; i++) {
           keys[i] = new HStoreKey();
@@ -1539,10 +1860,281 @@
           }
           
         } finally {
-          lock.releaseReadLock();
           scannerClosed = true;
         }
       }
     }
   }
+  
+  /**
+   * Scanner scans both the memcache and the HStore
+   */
+  private class HStoreScanner implements HInternalScannerInterface {
+    private HInternalScannerInterface[] scanners;
+    private TreeMap<Text, byte []>[] resultSets;
+    private HStoreKey[] keys;
+    private boolean wildcardMatch = false;
+    private boolean multipleMatchers = false;
+    private RowFilterInterface dataFilter;
+
+    /** Create an Scanner with a handle on the memcache and HStore files. */
+    @SuppressWarnings("unchecked")
+    HStoreScanner(Text[] targetCols, Text firstRow, long timestamp,
+        RowFilterInterface filter) throws IOException {
+      
+      this.dataFilter = filter;
+      if (null != dataFilter) {
+        dataFilter.reset();
+      }
+      this.scanners = new HInternalScannerInterface[2];
+      this.resultSets = new TreeMap[scanners.length];
+      this.keys = new HStoreKey[scanners.length];
+
+      try {
+        scanners[0] = memcache.getScanner(timestamp, targetCols, firstRow);
+        scanners[1] = new StoreFileScanner(timestamp, targetCols, firstRow);
+        
+        for (int i = 0; i < scanners.length; i++) {
+          if (scanners[i].isWildcardScanner()) {
+            this.wildcardMatch = true;
+          }
+          if (scanners[i].isMultipleMatchScanner()) {
+            this.multipleMatchers = true;
+          }
+        }
+
+      } catch(IOException e) {
+        for (int i = 0; i < this.scanners.length; i++) {
+          if(scanners[i] != null) {
+            closeScanner(i);
+          }
+        }
+        throw e;
+      }
+      
+      // Advance to the first key in each scanner.
+      // All results will match the required column-set and scanTime.
+      
+      for (int i = 0; i < scanners.length; i++) {
+        keys[i] = new HStoreKey();
+        resultSets[i] = new TreeMap<Text, byte []>();
+        if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
+          closeScanner(i);
+        }
+      }
+      // As we have now successfully completed initialization, increment the
+      // activeScanner count.
+      activeScanners.incrementAndGet();
+    }
+
+    /** @return true if the scanner is a wild card scanner */
+    public boolean isWildcardScanner() {
+      return wildcardMatch;
+    }
+
+    /** @return true if the scanner is a multiple match scanner */
+    public boolean isMultipleMatchScanner() {
+      return multipleMatchers;
+    }
+
+    /** {@inheritDoc} */
+    public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
+      throws IOException {
+
+      // Filtered flag is set by filters.  If a cell has been 'filtered out'
+      // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
+      boolean filtered = true;
+      boolean moreToFollow = true;
+      while (filtered && moreToFollow) {
+        // Find the lowest-possible key.
+        Text chosenRow = null;
+        long chosenTimestamp = -1;
+        for (int i = 0; i < this.keys.length; i++) {
+          if (scanners[i] != null &&
+              (chosenRow == null ||
+              (keys[i].getRow().compareTo(chosenRow) < 0) ||
+              ((keys[i].getRow().compareTo(chosenRow) == 0) &&
+              (keys[i].getTimestamp() > chosenTimestamp)))) {
+            chosenRow = new Text(keys[i].getRow());
+            chosenTimestamp = keys[i].getTimestamp();
+          }
+        }
+        
+        // Filter whole row by row key?
+        filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
+
+        // Store the key and results for each sub-scanner. Merge them as
+        // appropriate.
+        if (chosenTimestamp >= 0 && !filtered) {
+          // Here we are setting the passed in key with current row+timestamp
+          key.setRow(chosenRow);
+          key.setVersion(chosenTimestamp);
+          key.setColumn(HConstants.EMPTY_TEXT);
+          // Keep list of deleted cell keys within this row.  We need this
+          // because as we go through scanners, the delete record may be in an
+          // early scanner and then the same record with a non-delete, non-null
+          // value in a later. Without history of what we've seen, we'll return
+          // deleted values. This List should not ever grow too large since we
+          // are only keeping rows and columns that match those set on the
+          // scanner and which have delete values.  If memory usage becomes a
+          // problem, could redo as bloom filter.
+          List<HStoreKey> deletes = new ArrayList<HStoreKey>();
+          for (int i = 0; i < scanners.length && !filtered; i++) {
+            while ((scanners[i] != null
+                && !filtered
+                && moreToFollow)
+                && (keys[i].getRow().compareTo(chosenRow) == 0)) {
+              // If we are doing a wild card match or there are multiple
+              // matchers per column, we need to scan all the older versions of 
+              // this row to pick up the rest of the family members
+              if (!wildcardMatch
+                  && !multipleMatchers
+                  && (keys[i].getTimestamp() != chosenTimestamp)) {
+                break;
+              }
+
+              // Filter out null criteria columns that are not null
+              if (dataFilter != null) {
+                filtered = dataFilter.filterNotNull(resultSets[i]);
+              }
+
+              // NOTE: We used to do results.putAll(resultSets[i]);
+              // but this had the effect of overwriting newer
+              // values with older ones. So now we only insert
+              // a result if the map does not contain the key.
+              HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT,
+                key.getTimestamp());
+              for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
+                hsk.setColumn(e.getKey());
+                if (HLogEdit.isDeleted(e.getValue())) {
+                  if (!deletes.contains(hsk)) {
+                    // Key changes as we cycle the for loop so add a copy to
+                    // the set of deletes.
+                    deletes.add(new HStoreKey(hsk));
+                  }
+                } else if (!deletes.contains(hsk) &&
+                    !filtered &&
+                    moreToFollow &&
+                    !results.containsKey(e.getKey())) {
+                  if (dataFilter != null) {
+                    // Filter whole row by column data?
+                    filtered =
+                        dataFilter.filter(chosenRow, e.getKey(), e.getValue());
+                    if (filtered) {
+                      results.clear();
+                      break;
+                    }
+                  }
+                  results.put(e.getKey(), e.getValue());
+                }
+              }
+              resultSets[i].clear();
+              if (!scanners[i].next(keys[i], resultSets[i])) {
+                closeScanner(i);
+              }
+            }
+          }          
+        }
+        
+        for (int i = 0; i < scanners.length; i++) {
+          // If the current scanner is non-null AND has a lower-or-equal
+          // row label, then its timestamp is bad. We need to advance it.
+          while ((scanners[i] != null) &&
+              (keys[i].getRow().compareTo(chosenRow) <= 0)) {
+            resultSets[i].clear();
+            if (!scanners[i].next(keys[i], resultSets[i])) {
+              closeScanner(i);
+            }
+          }
+        }
+
+        moreToFollow = chosenTimestamp >= 0;
+        
+        if (dataFilter != null) {
+          if (moreToFollow) {
+            dataFilter.rowProcessed(filtered, chosenRow);
+          }
+          if (dataFilter.filterAllRemaining()) {
+            moreToFollow = false;
+            LOG.debug("page limit");
+          }
+        }
+        if (LOG.isDebugEnabled()) {
+          if (this.dataFilter != null) {
+            LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
+          }
+        }
+        
+        if (results.size() <= 0 && !filtered) {
+          // There were no results found for this row.  Marked it as 
+          // 'filtered'-out otherwise we will not move on to the next row.
+          filtered = true;
+        }
+      }
+      
+      // If we got no results, then there is no more to follow.
+      if (results == null || results.size() <= 0) {
+        moreToFollow = false;
+      }
+      
+      // Make sure scanners closed if no more results
+      if (!moreToFollow) {
+        for (int i = 0; i < scanners.length; i++) {
+          if (null != scanners[i]) {
+            closeScanner(i);
+          }
+        }
+      }
+      
+      return moreToFollow;
+    }
+
+    
+    /** Shut down a single scanner */
+    void closeScanner(int i) {
+      try {
+        try {
+          scanners[i].close();
+        } catch (IOException e) {
+          LOG.warn("Failed closeing scanner " + i, e);
+        }
+      } finally {
+        scanners[i] = null;
+        keys[i] = null;
+        resultSets[i] = null;
+      }
+    }
+
+    /** {@inheritDoc} */
+    public void close() {
+      try {
+      for(int i = 0; i < scanners.length; i++) {
+        if(scanners[i] != null) {
+          closeScanner(i);
+        }
+      }
+      } finally {
+        synchronized (activeScanners) {
+          int numberOfScanners = activeScanners.decrementAndGet();
+          if (numberOfScanners < 0) {
+            LOG.error("number of active scanners less than zero: " +
+                numberOfScanners + " resetting to zero");
+            activeScanners.set(0);
+            numberOfScanners = 0;
+          }
+          if (numberOfScanners == 0) {
+            activeScanners.notifyAll();
+          }
+        }
+
+      }
+    }
+
+    /** {@inheritDoc} */
+    public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
+      throw new UnsupportedOperationException("Unimplemented serverside. " +
+        "next(HStoreKey, StortedMap(...) is more efficient");
+    }
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Tue Nov 20 13:53:30 2007
@@ -937,6 +937,7 @@
       this.closed = true;
     }
 
+    /** {@inheritDoc} */
     public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
       return new Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>>() {
         HStoreKey key = null;

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java Tue Nov 20 13:53:30 2007
@@ -34,7 +34,11 @@
    * Operation types.
    * @see org.apache.hadoop.io.SequenceFile.Writer
    */
-  public static enum Operation {PUT, DELETE}
+  public static enum Operation {
+    /** update a field */
+    PUT,
+    /** delete a field */
+    DELETE}
 
   private Operation op;
   private Text column;
@@ -65,7 +69,8 @@
   
   /**
    * Creates a put operation
-   * 
+   *
+   * @param operation the operation (put or get)
    * @param column column name
    * @param value column value
    */

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java Tue Nov 20 13:53:30 2007
@@ -102,9 +102,7 @@
       meta.getLog().closeAndDelete();
       
     } catch (Exception e) {
-      if(dfsCluster != null) {
-        dfsCluster.shutdown();
-      }
+      StaticTestEnvironment.shutdownDfs(dfsCluster);
       throw e;
     }
   }
@@ -115,16 +113,7 @@
   @Override
   public void tearDown() throws Exception {
     super.tearDown();
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-    }
-    if (this.fs != null) {
-      try {
-        this.fs.close();
-      } catch (IOException e) {
-        LOG.info("During tear down got a " + e.getMessage());
-      }
-    }
+    StaticTestEnvironment.shutdownDfs(dfsCluster);
   }
 
   private HRegion createAregion(Text startKey, Text endKey, int firstRow,
@@ -134,20 +123,21 @@
     
     System.out.println("created region " + region.getRegionName());
 
+    HRegionIncommon r = new HRegionIncommon(region);
     for(int i = firstRow; i < firstRow + nrows; i++) {
-      long lockid = region.startUpdate(new Text("row_"
+      long lockid = r.startUpdate(new Text("row_"
           + String.format("%1$05d", i)));
 
-      region.put(lockid, COLUMN_NAME, value.get());
-      region.commit(lockid, System.currentTimeMillis());
+      r.put(lockid, COLUMN_NAME, value.get());
+      r.commit(lockid, System.currentTimeMillis());
       if(i % 10000 == 0) {
         System.out.println("Flushing write #" + i);
-        region.flushcache(false);
+        r.flushcache();
       }
     }
     System.out.println("Rolling log...");
     region.log.rollWriter();
-    region.compactStores();
+    region.compactIfNeeded();
     region.close();
     region.getLog().closeAndDelete();
     region.getRegionInfo().setOffline(true);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Tue Nov 20 13:53:30 2007
@@ -21,6 +21,8 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Random;
 
 import junit.framework.TestCase;
 
@@ -28,6 +30,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
+import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -219,19 +222,36 @@
               && endKey.compareTo(t) <= 0) {
             break EXIT;
           }
-          long lockid = updater.startBatchUpdate(t);
           try {
-            updater.put(lockid, new Text(column), bytes);
-            if (ts == -1) {
-              updater.commit(lockid);
-            } else {
-              updater.commit(lockid, ts);
-            }
-            lockid = -1;
-          } finally {
-            if (lockid != -1) {
-              updater.abort(lockid);
+            long lockid = updater.startBatchUpdate(t);
+            try {
+              updater.put(lockid, new Text(column), bytes);
+              if (ts == -1) {
+                updater.commit(lockid);
+              } else {
+                updater.commit(lockid, ts);
+              }
+              lockid = -1;
+            } catch (RuntimeException ex) {
+              ex.printStackTrace();
+              throw ex;
+              
+            } catch (IOException ex) {
+              ex.printStackTrace();
+              throw ex;
+              
+            } finally {
+              if (lockid != -1) {
+                updater.abort(lockid);
+              }
             }
+          } catch (RuntimeException ex) {
+            ex.printStackTrace();
+            throw ex;
+            
+          } catch (IOException ex) {
+            ex.printStackTrace();
+            throw ex;
           }
         }
         // Set start character back to FIRST_CHAR after we've done first loop.
@@ -275,32 +295,56 @@
   /**
    * A class that makes a {@link Incommon} out of a {@link HRegion}
    */
-  public static class HRegionIncommon implements Incommon {
+  public static class HRegionIncommon implements Incommon, FlushCache {
     final HRegion region;
+    private final Random rand = new Random();
+    private BatchUpdate batch;
+    
+    private void checkBatch() {
+      if (batch == null) {
+        throw new IllegalStateException("No update in progress");
+      }
+    }
+    
     public HRegionIncommon(final HRegion HRegion) {
-      super();
       this.region = HRegion;
+      this.batch = null;
     }
     public void abort(long lockid) throws IOException {
-      this.region.abort(lockid);
+      this.batch = null;
     }
     public void commit(long lockid) throws IOException {
-      this.region.commit(lockid);
+      commit(lockid, HConstants.LATEST_TIMESTAMP);
     }
     public void commit(long lockid, final long ts) throws IOException {
-      this.region.commit(lockid, ts);
+      checkBatch();
+      try {
+        this.region.batchUpdate(ts, batch);
+      } finally {
+        this.batch = null;
+      }
     }
     public void put(long lockid, Text column, byte[] val) throws IOException {
-      this.region.put(lockid, column, val);
+      checkBatch();
+      this.batch.put(lockid, column, val);
     }
     public void delete(long lockid, Text column) throws IOException {
-      this.region.delete(lockid, column);
+      checkBatch();
+      this.batch.delete(lockid, column);
     }
     public void deleteAll(Text row, Text column, long ts) throws IOException {
       this.region.deleteAll(row, column, ts);
     }
     public long startBatchUpdate(Text row) throws IOException {
-      return this.region.startUpdate(row);
+      return startUpdate(row);
+    }
+    public long startUpdate(Text row) throws IOException {
+      if (this.batch != null) {
+        throw new IllegalStateException("Update already in progress");
+      }
+      long lockid = Math.abs(rand.nextLong());
+      this.batch = new BatchUpdate(lockid);
+      return batch.startUpdate(row);
     }
     public HScannerInterface getScanner(Text [] columns, Text firstRow,
         long ts)
@@ -316,6 +360,12 @@
     public byte[][] get(Text row, Text column, long ts, int versions)
         throws IOException {
       return this.region.get(row, column, ts, versions);
+    }
+    public Map<Text, byte []> getFull(Text row) throws IOException {
+      return region.getFull(row);
+    }
+    public void flushcache() throws IOException {
+      this.region.internalFlushcache(this.region.snapshotMemcaches());
     }
   }
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Tue Nov 20 13:53:30 2007
@@ -117,9 +117,14 @@
     this.deleteOnExit = deleteOnExit;
     this.shutdownDFS = false;
     if (miniHdfsFilesystem) {
-      this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
-      this.fs = cluster.getFileSystem();
-      this.shutdownDFS = true;
+      try {
+        this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
+        this.fs = cluster.getFileSystem();
+        this.shutdownDFS = true;
+      } catch (IOException e) {
+        StaticTestEnvironment.shutdownDfs(cluster);
+        throw e;
+      }
     } else {
       this.cluster = null;
       this.fs = FileSystem.get(conf);
@@ -224,26 +229,13 @@
    */
   public void shutdown() {
     this.hbaseCluster.shutdown();
-    try {
-      if (shutdownDFS && cluster != null) {
-        FileSystem fs = cluster.getFileSystem();
-        if (fs != null) {
-          LOG.info("Shutting down FileSystem");
-          fs.close();
-        }
-        if (this.cluster != null) {
-          LOG.info("Shutting down Mini DFS ");
-          cluster.shutdown();
-        }
-      }
-    } catch (IOException e) {
-      LOG.error("shutdown", e);
-    } finally {
-      // Delete all DFS files
-      if(deleteOnExit) {
-        deleteFile(new File(System.getProperty(
-            StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
-      }
+    if (shutdownDFS) {
+      StaticTestEnvironment.shutdownDfs(cluster);
+    }
+    // Delete all DFS files
+    if(deleteOnExit) {
+      deleteFile(new File(System.getProperty(
+          StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
     }
   }
 
@@ -265,7 +257,7 @@
     for (LocalHBaseCluster.RegionServerThread t:
         this.hbaseCluster.getRegionServers()) {
       for(HRegion r: t.getRegionServer().onlineRegions.values() ) {
-        r.flushcache(false);
+        r.internalFlushcache(r.snapshotMemcaches());
       }
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Tue Nov 20 13:53:30 2007
@@ -53,11 +53,9 @@
   @SuppressWarnings("null")
   public static void makeMultiRegionTable(HBaseConfiguration conf,
       MiniHBaseCluster cluster, FileSystem localFs, String tableName,
-      String columnName)
-  throws IOException {  
+      String columnName) throws IOException {  
     final int retries = 10; 
-    final long waitTime =
-      conf.getLong("hbase.master.meta.thread.rescanfrequency", 10L * 1000L);
+    final long waitTime = 20L * 1000L;
     
     // This size should make it so we always split using the addContent
     // below.  After adding all data, the first region is 1.3M. Should
@@ -106,7 +104,7 @@
     }
 
     // Flush will provoke a split next time the split-checker thread runs.
-    r.flushcache(false);
+    r.internalFlushcache(r.snapshotMemcaches());
     
     // Now, wait until split makes it into the meta table.
     int oldCount = count;
@@ -156,15 +154,19 @@
     // Wait till the parent only has reference to remaining split, one that
     // still has references.
     
-    while (getSplitParentInfo(meta, parent).size() == 3) {
-      try {
-        Thread.sleep(waitTime);
-      } catch (InterruptedException e) {
-        // continue
+    while (true) {
+      data = getSplitParentInfo(meta, parent);
+      if (data == null || data.size() == 3) {
+        try {
+          Thread.sleep(waitTime);
+        } catch (InterruptedException e) {
+          // continue
+        }
+        continue;
       }
+      break;
     }
-    LOG.info("Parent split returned " +
-        getSplitParentInfo(meta, parent).keySet().toString());
+    LOG.info("Parent split returned " + data.keySet().toString());
     
     // Call second split.
     

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java?rev=596835&r1=596834&r2=596835&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java Tue Nov 20 13:53:30 2007
@@ -20,8 +20,11 @@
 package org.apache.hadoop.hbase;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Enumeration;
 
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Appender;
 import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Layout;
@@ -33,6 +36,9 @@
  * Initializes test environment
  */
 public class StaticTestEnvironment {
+  private static final Logger LOG =
+    Logger.getLogger(StaticTestEnvironment.class.getPackage().getName());
+
   private StaticTestEnvironment() {}                    // Not instantiable
 
   /** configuration parameter name for test directory */
@@ -105,7 +111,28 @@
         consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
       }
     }
-    Logger.getLogger(
-        HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
+    LOG.setLevel(logLevel);
+  }
+  
+  /**
+   * Common method to close down a MiniDFSCluster and the associated file system
+   * 
+   * @param cluster
+   */
+  public static void shutdownDfs(MiniDFSCluster cluster) {
+    if (cluster != null) {
+      try {
+        FileSystem fs = cluster.getFileSystem();
+        if (fs != null) {
+          LOG.info("Shutting down FileSystem");
+          fs.close();
+        }
+      } catch (IOException e) {
+        LOG.error("error closing file system", e);
+      }
+
+      LOG.info("Shutting down Mini DFS ");
+      cluster.shutdown();
+    }
   }
 }



Mime
View raw message