hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r944529 [1/2] - in /hadoop/hbase/trunk/core/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/filter/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/had...
Date Sat, 15 May 2010 00:18:37 GMT
Author: rawson
Date: Sat May 15 00:18:37 2010
New Revision: 944529

URL: http://svn.apache.org/viewvc?rev=944529&view=rev
Log:
HBASE-2248  Provide new non-copy mechanism to assure atomic reads in get and scan

Added:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java
Modified:
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java Sat May 15 00:18:37 2010
@@ -201,6 +201,23 @@ public class KeyValue implements Writabl
   private int offset = 0;
   private int length = 0;
 
+  /** Here be dragons **/
+
+  // used to achieve atomic operations in the memstore.
+  public long getMemstoreTS() {
+    return memstoreTS;
+  }
+
+  public void setMemstoreTS(long memstoreTS) {
+    this.memstoreTS = memstoreTS;
+  }
+
+  // default value is 0, aka DNC
+  private long memstoreTS = 0;
+
+  /** Dragon time over, return to normal business */
+
+  
   /** Writable Constructor -- DO NOT USE */
   public KeyValue() {}
 
@@ -1468,6 +1485,21 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Creates a KeyValue that is last on the specified row id. That is,
+   * every other possible KeyValue for the given row would compareTo()
+   * less than the result of this call.
+   * @param row row key
+   * @return Last possible KeyValue on passed <code>row</code>
+   */
+  public static KeyValue createLastOnRow(final byte[] row) {
+    return new KeyValue(row, null, null, HConstants.LATEST_TIMESTAMP, Type.Minimum);
+  }
+
+  /**
+   * Create a KeyValue that is smaller than all other possible KeyValues
+   * for the given row. That is any (valid) KeyValue on 'row' would sort
+   * _after_ the result.
+   * 
    * @param row - row key (arbitrary byte array)
    * @return First possible KeyValue on passed <code>row</code>
    */
@@ -1476,6 +1508,8 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Creates a KeyValue that is smaller than all other KeyValues that
+   * are older than the passed timestamp.
    * @param row - row key (arbitrary byte array)
    * @param ts - timestamp
    * @return First possible key on passed <code>row</code> and timestamp.
@@ -1487,8 +1521,11 @@ public class KeyValue implements Writabl
 
   /**
    * @param row - row key (arbitrary byte array)
+   * @param c column - {@link #parseColumn(byte[])} is called to split
+   * the column.
    * @param ts - timestamp
    * @return First possible key on passed <code>row</code>, column and timestamp
+   * @deprecated
    */
   public static KeyValue createFirstOnRow(final byte [] row, final byte [] c,
       final long ts) {
@@ -1497,14 +1534,17 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Create a KeyValue for the specified row, family and qualifier that would be
+   * smaller than all other possible KeyValues that have the same row,family,qualifier.
+   * Used for seeking.
    * @param row - row key (arbitrary byte array)
-   * @param f - family name
-   * @param q - column qualifier
+   * @param family - family name
+   * @param qualifier - column qualifier
    * @return First possible key on passed <code>row</code>, and column.
    */
-  public static KeyValue createFirstOnRow(final byte [] row, final byte [] f,
-      final byte [] q) {
-    return new KeyValue(row, f, q, HConstants.LATEST_TIMESTAMP, Type.Maximum);
+  public static KeyValue createFirstOnRow(final byte [] row, final byte [] family,
+      final byte [] qualifier) {
+    return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
   }
 
   /**
@@ -1689,9 +1729,6 @@ public class KeyValue implements Writabl
         return compare;
       }
 
-      // if row matches, and no column in the 'left' AND put type is 'minimum',
-      // then return that left is larger than right.
-
       // Compare column family.  Start compare past row and family length.
       int lcolumnoffset = Bytes.SIZEOF_SHORT + lrowlength + 1 + loffset;
       int rcolumnoffset = Bytes.SIZEOF_SHORT + rrowlength + 1 + roffset;
@@ -1700,17 +1737,25 @@ public class KeyValue implements Writabl
       int rcolumnlength = rlength - TIMESTAMP_TYPE_SIZE -
         (rcolumnoffset - roffset);
 
+      // if row matches, and no column in the 'left' AND put type is 'minimum',
+      // then return that left is larger than right.
+      
       // This supports 'last key on a row' - the magic is if there is no column in the
       // left operand, and the left operand has a type of '0' - magical value,
       // then we say the left is bigger.  This will let us seek to the last key in
       // a row.
 
       byte ltype = left[loffset + (llength - 1)];
+      byte rtype = right[roffset + (rlength - 1)];
 
       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
         return 1; // left is bigger.
       }
+      if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
+        return -1;
+      }
 
+      // TODO the family and qualifier should be compared separately
       compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right,
           rcolumnoffset, rcolumnlength);
       if (compare != 0) {
@@ -1732,9 +1777,6 @@ public class KeyValue implements Writabl
       if (!this.ignoreType) {
         // Compare types. Let the delete types sort ahead of puts; i.e. types
         // of higher numbers sort before those of lesser numbers
-
-        // ltype is defined above
-        byte rtype = right[roffset + (rlength - 1)];
         return (0xff & rtype) - (0xff & ltype);
       }
       return 0;
@@ -1772,9 +1814,10 @@ public class KeyValue implements Writabl
 
   // HeapSize
   public long heapSize() {
-    return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE +
-        ClassSize.align(ClassSize.ARRAY + length) +
-        (2 * Bytes.SIZEOF_INT));
+    return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE + 
+        ClassSize.align(ClassSize.ARRAY + length) + 
+        (2 * Bytes.SIZEOF_INT) +
+        Bytes.SIZEOF_LONG);
   }
 
   // this overload assumes that the length bytes have already been read,

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat May 15 00:18:37 2010
@@ -150,6 +150,24 @@ public class Scan implements Writable {
   }
 
   /**
+   * Builds a scan object with the same specs as get.
+   * @param get get to model scan after
+   */
+  public Scan(Get get) {
+    this.startRow = get.getRow();
+    this.stopRow = get.getRow();
+    this.filter = get.getFilter();
+    this.maxVersions = get.getMaxVersions();
+    this.tr = get.getTimeRange();
+    this.familyMap = get.getFamilyMap();
+  }
+
+  public boolean isGetScan() {
+    return this.startRow != null && this.startRow.length > 0 &&
+      Bytes.equals(this.startRow, this.stopRow);
+  }
+
+  /**
    * Get all columns from the specified family.
    * <p>
    * Overrides previous calls to addColumn for this family.

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/filter/SingleColumnValueFilter.java Sat May 15 00:18:37 2010
@@ -177,9 +177,6 @@ public class SingleColumnValueFilter imp
     // byte array copy?
     int compareResult =
       this.comparator.compareTo(Arrays.copyOfRange(data, offset, offset + length));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("compareResult=" + compareResult + " " + Bytes.toString(data, offset, length));
-    }
     switch (this.compareOp) {
     case LESS:
       return compareResult <= 0;

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java?rev=944529&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java (added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/DebugPrint.java Sat May 15 00:18:37 2010
@@ -0,0 +1,50 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DebugPrint {
+
+private static final AtomicBoolean enabled = new AtomicBoolean(false);
+  private static final Object sync = new Object();
+  public static StringBuilder out = new StringBuilder();
+
+  static public void enable() {
+    enabled.set(true);
+  }
+  static public void disable() {
+    enabled.set(false);
+  }
+
+  static public void reset() {
+    synchronized (sync) {
+      enable(); // someone wants us enabled basically.
+
+      out = new StringBuilder();
+    }
+  }
+  static public void dumpToFile(String file) throws IOException {
+    FileWriter f = new FileWriter(file);
+    synchronized (sync) {
+      f.write(out.toString());
+    }
+    f.close();
+  }
+
+  public static void println(String m) {
+    if (!enabled.get()) {
+      System.out.println(m);
+      return;
+    }
+
+    synchronized (sync) {
+      String threadName = Thread.currentThread().getName();
+      out.append("<");
+      out.append(threadName);
+      out.append("> ");
+      out.append(m);
+      out.append("\n");
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat May 15 00:18:37 2010
@@ -54,22 +54,27 @@ import org.apache.hadoop.hbase.util.Writ
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+ import java.io.IOException;
+ import java.io.UnsupportedEncodingException;
+ import java.lang.reflect.Constructor;
+ import java.util.AbstractList;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.NavigableSet;
+ import java.util.TreeMap;
+ import java.util.TreeSet;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Random;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ConcurrentSkipListMap;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * HRegion stores data for a certain region of a table.  It stores all columns
@@ -175,6 +180,8 @@ public class HRegion implements HConstan
 
     /**
      * Set flags that make this region read-only.
+     *
+     * @param onOff flip value for region r/o setting
      */
     synchronized void setReadOnly(final boolean onOff) {
       this.writesEnabled = !onOff;
@@ -190,7 +197,7 @@ public class HRegion implements HConstan
     }
   }
 
-  private volatile WriteState writestate = new WriteState();
+  private final WriteState writestate = new WriteState();
 
   final long memstoreFlushSize;
   private volatile long lastFlushTime;
@@ -210,6 +217,9 @@ public class HRegion implements HConstan
   private long minSequenceId;
   private boolean splitRequest;
 
+  private final ReadWriteConsistencyControl rwcc =
+      new ReadWriteConsistencyControl();
+
   /**
    * Name of the region info file that resides just under the region directory.
    */
@@ -296,10 +306,10 @@ public class HRegion implements HConstan
   /**
    * Initialize this region and get it ready to roll.
    * Called after construction.
-   *
-   * @param initialFiles
-   * @param reporter
-   * @throws IOException
+   * 
+   * @param initialFiles path
+   * @param reporter progressable
+   * @throws IOException e
    */
   public void initialize(Path initialFiles, final Progressable reporter)
   throws IOException {
@@ -437,6 +447,10 @@ public class HRegion implements HConstan
     return this.closing.get();
   }
 
+   public ReadWriteConsistencyControl getRWCC() {
+     return rwcc;
+   }
+   
   /**
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
    * service any more calls.
@@ -447,8 +461,8 @@ public class HRegion implements HConstan
    * @return Vector of all the storage files that the HRegion's component
    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
    * vector if already closed and null if judged that it should not close.
-   *
-   * @throws IOException
+   * 
+   * @throws IOException e
    */
   public List<StoreFile> close() throws IOException {
     return close(false);
@@ -465,8 +479,8 @@ public class HRegion implements HConstan
    * @return Vector of all the storage files that the HRegion's component
    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
    * we are not to close at this time or we are already closed.
-   *
-   * @throws IOException
+   * 
+   * @throws IOException e
    */
   public List<StoreFile> close(final boolean abort) throws IOException {
     if (isClosed()) {
@@ -598,6 +612,7 @@ public class HRegion implements HConstan
   }
 
   /** @return the last time the region was flushed */
+  @SuppressWarnings({"UnusedDeclaration"})
   public long getLastFlushTime() {
     return this.lastFlushTime;
   }
@@ -699,8 +714,7 @@ public class HRegion implements HConstan
         HRegion.newHRegion(basedir, log, fs, conf, regionBInfo, null);
       moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
 
-      HRegion regions[] = new HRegion [] {regionA, regionB};
-      return regions;
+      return new HRegion [] {regionA, regionB};
     }
   }
 
@@ -774,7 +788,7 @@ public class HRegion implements HConstan
    * server does them sequentially and not in parallel.
    *
    * @return mid key if split is needed
-   * @throws IOException
+   * @throws IOException e
    */
   public byte [] compactStores() throws IOException {
     boolean majorCompaction = this.forceMajorCompaction;
@@ -795,7 +809,7 @@ public class HRegion implements HConstan
    *
    * @param majorCompaction True to force a major compaction regardless of thresholds
    * @return split row if split is needed
-   * @throws IOException
+   * @throws IOException e
    */
   byte [] compactStores(final boolean majorCompaction)
   throws IOException {
@@ -863,8 +877,8 @@ public class HRegion implements HConstan
    * time-sensitive thread.
    *
    * @return true if cache was flushed
-   *
-   * @throws IOException
+   * 
+   * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
@@ -929,8 +943,8 @@ public class HRegion implements HConstan
    * <p> This method may block for some time.
    *
    * @return true if the region needs compacting
-   *
-   * @throws IOException
+   * 
+   * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
@@ -958,10 +972,13 @@ public class HRegion implements HConstan
     // during the flush
     long sequenceId = -1L;
     long completeSequenceId = -1L;
+
+    // we have to take a write lock during snapshot, or else a write could
+    // end up in both snapshot and memstore (makes it difficult to do atomic
+    // rows then)
     this.updatesLock.writeLock().lock();
-    // Get current size of memstores.
     final long currentMemStoreSize = this.memstoreSize.get();
-    List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>();
+    List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
     try {
       sequenceId = log.startCacheFlush();
       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
@@ -970,6 +987,13 @@ public class HRegion implements HConstan
         storeFlushers.add(s.getStoreFlusher(completeSequenceId));
       }
 
+      // This thread is going to cause a whole bunch of scanners to reseek.
+      // They are depending
+      // on a thread-local to know where to read from.
+      // The reason why we set it up high is so that each HRegionScanner only
+      // has a single read point for all its sub-StoreScanners.
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
       // prepare flush (take a snapshot)
       for (StoreFlusher flusher : storeFlushers) {
         flusher.prepare();
@@ -978,6 +1002,8 @@ public class HRegion implements HConstan
       this.updatesLock.writeLock().unlock();
     }
 
+    LOG.debug("Finished snapshotting, commencing flushing stores");
+
     // Any failure from here on out will be catastrophic requiring server
     // restart so hlog content can be replayed and put back into the memstore.
     // Otherwise, the snapshot content while backed up in the hlog, it will not
@@ -992,15 +1018,37 @@ public class HRegion implements HConstan
         flusher.flushCache();
       }
 
-      internalPreFlushcacheCommit();
+      Callable<Void> atomicWork = internalPreFlushcacheCommit();
+
+      LOG.debug("Caches flushed, doing commit now (which includes update scanners)");
 
-      /*
-       * Switch between memstore and the new store file(s).
+      /**
+       * Switch between memstore(snapshot) and the new store file
        */
-      for (StoreFlusher flusher : storeFlushers) {
-        boolean needsCompaction = flusher.commit();
-        if (needsCompaction) {
-          compactionRequested = true;
+      if (atomicWork != null) {
+        LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock");
+        newScannerLock.writeLock().lock();
+      }
+
+      try {
+        // update this again to make sure we are 'fresh'
+        ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+        if (atomicWork != null) {
+          atomicWork.call();
+        }
+
+        // Switch snapshot (in memstore) -> new hfile (thus causing
+        // all the store scanners to reset/reseek).
+        for (StoreFlusher flusher : storeFlushers) {
+          boolean needsCompaction = flusher.commit();
+          if (needsCompaction) {
+            compactionRequested = true;
+          }
+        }
+      } finally {
+        if (atomicWork != null) {
+          newScannerLock.writeLock().unlock();
         }
       }
 
@@ -1049,16 +1097,22 @@ public class HRegion implements HConstan
     return compactionRequested;
   }
 
-  /**
-   * A hook for sub-classes wishing to perform operations prior to the
-   * cache flush commit stage.
-   *
-   * @throws IOException allow children to throw exception
-   */
-  protected void internalPreFlushcacheCommit() throws IOException {
-  }
-  
-  /**
+
+   /**
+    * A hook for sub classed wishing to perform operations prior to the cache
+    * flush commit stage.
+    *
+    * If a subclass wishes that an atomic update of their work and the
+    * flush commit stage happens, they should return a callable. The new scanner
+    * lock will be acquired and released.
+
+    * @throws java.io.IOException allow children to throw exception
+    */
+   protected Callable<Void> internalPreFlushcacheCommit() throws IOException {
+     return null;
+   }
+
+   /**
    * Get the sequence number to be associated with this cache flush. Used by
    * TransactionalRegion to not complete pending transactions.
    *
@@ -1093,9 +1147,9 @@ public class HRegion implements HConstan
    * <i>ts</i>.
    *
    * @param row row key
-   * @param family
+   * @param family column family to find on
    * @return map of values
-   * @throws IOException
+   * @throws IOException read exceptions
    */
   public Result getClosestRowBefore(final byte [] row, final byte [] family)
   throws IOException {
@@ -1112,11 +1166,9 @@ public class HRegion implements HConstan
       if (key == null) {
         return null;
       }
-      // This will get all results for this store.  TODO: Do we need to do this?
       Get get = new Get(key.getRow());
-      List<KeyValue> results = new ArrayList<KeyValue>();
-      store.get(get, null, results);
-      return new Result(results);
+      get.addFamily(family);
+      return get(get, null);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
@@ -1130,7 +1182,7 @@ public class HRegion implements HConstan
    *
    * @param scan configured {@link Scan}
    * @return InternalScanner
-   * @throws IOException
+   * @throws IOException read exceptions
    */
   public InternalScanner getScanner(Scan scan)
   throws IOException {
@@ -1160,8 +1212,7 @@ public class HRegion implements HConstan
     }
   }
 
-  protected InternalScanner instantiateInternalScanner(Scan scan,
-                                                       List<KeyValueScanner> additionalScanners) throws IOException {
+  protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
     return new RegionScanner(scan, additionalScanners);
   }
 
@@ -1169,15 +1220,16 @@ public class HRegion implements HConstan
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
   /**
-   * @param delete
-   * @param lockid
-   * @param writeToWAL
-   * @throws IOException
+   * @param delete delete object
+   * @param lockid existing lock id, or null for grab a lock
+   * @param writeToWAL append to the write ahead lock or not
+   * @throws IOException read exceptions
    */
   public void delete(Delete delete, Integer lockid, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
     checkResources();
+    Integer lid = null;
     splitsAndClosesLock.readLock().lock();
     Integer lid = null;
     try {
@@ -1185,7 +1237,7 @@ public class HRegion implements HConstan
       // If we did not pass an existing row lock, obtain a new one
       lid = getLock(lockid, row);
 
-      //Check to see if this is a deleteRow insert
+      // Check to see if this is a deleteRow insert
       if(delete.getFamilyMap().isEmpty()){
         for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
           // Don't eat the timestamp
@@ -1220,7 +1272,9 @@ public class HRegion implements HConstan
     long now = System.currentTimeMillis();
     byte [] byteNow = Bytes.toBytes(now);
     boolean flush = false;
-    this.updatesLock.readLock().lock();
+
+    updatesLock.readLock().lock();
+    ReadWriteConsistencyControl.WriteEntry w = null;
 
     try {
 
@@ -1237,21 +1291,21 @@ public class HRegion implements HConstan
           if (kv.isLatestTimestamp() && kv.isDeleteType()) {
             byte[] qual = kv.getQualifier();
             if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
+
             Integer count = kvCount.get(qual);
             if (count == null) {
-              kvCount.put(qual, new Integer(1));
+              kvCount.put(qual, 1);
             } else {
-              kvCount.put(qual, new Integer(count+1));
+              kvCount.put(qual, count + 1);
             }
             count = kvCount.get(qual);
 
-            List<KeyValue> result = new ArrayList<KeyValue>(1);
-            Get g = new Get(kv.getRow());
-            g.setMaxVersions(count);
-            NavigableSet<byte []> qualifiers =
-              new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-            qualifiers.add(qual);
-            get(store, g, qualifiers, result);
+            Get get = new Get(kv.getRow());
+            get.setMaxVersions(count);
+            get.addColumn(family, qual);
+
+            List<KeyValue> result = get(get);
+
             if (result.size() < count) {
               // Nothing to delete
               kv.updateLatestStamp(byteNow);
@@ -1294,11 +1348,11 @@ public class HRegion implements HConstan
         }
       }
 
+      // Now make changes to the memstore.
+
       long size = 0;
+      w = rwcc.beginMemstoreInsert();
 
-      //
-      // Now make changes to the memstore.
-      //
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
 
         byte[] family = e.getKey();
@@ -1306,13 +1360,17 @@ public class HRegion implements HConstan
 
         Store store = getStore(family);
         for (KeyValue kv: kvs) {
+          kv.setMemstoreTS(w.getWriteNumber());
           size = this.memstoreSize.addAndGet(store.delete(kv));
         }
       }
       flush = isFlushSize(size);
     } finally {
+      if (w != null) rwcc.completeMemstoreInsert(w);
+
       this.updatesLock.readLock().unlock();
     }
+
     if (flush) {
       // Request a cache flush.  Do it outside update lock.
       requestFlush();
@@ -1361,6 +1419,7 @@ public class HRegion implements HConstan
     // will be extremely rare; we'll deal with it when it happens.
     checkResources();
     splitsAndClosesLock.readLock().lock();
+
     try {
       // We obtain a per-row lock, so other clients will block while one client
       // performs an update. The read lock is released by the client calling
@@ -1370,6 +1429,7 @@ public class HRegion implements HConstan
       byte [] row = put.getRow();
       // If we did not pass an existing row lock, obtain a new one
       Integer lid = getLock(lockid, row);
+
       byte [] now = Bytes.toBytes(System.currentTimeMillis());
       try {
         // All edits for the given row (across all column families) must happen atomically.
@@ -1418,15 +1478,12 @@ public class HRegion implements HConstan
       Integer lid = getLock(lockId, get.getRow());
       List<KeyValue> result = new ArrayList<KeyValue>();
       try {
-        //Getting data
-        for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
-          get.getFamilyMap().entrySet()) {
-          get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
-        }
+        result = get(get);
+
         boolean matches = false;
         if (result.size() == 0 && expectedValue.length == 0) {
           matches = true;
-        } else if(result.size() == 1) {
+        } else if (result.size() == 1) {
           //Compare the expected value with the actual value
           byte [] actualValue = result.get(0).getValue();
           matches = Bytes.equals(expectedValue, actualValue);
@@ -1542,6 +1599,7 @@ public class HRegion implements HConstan
   /**
    * Add updates first to the hlog and then add values to memstore.
    * Warning: Assumption is caller has lock on passed in row.
+   * @param family
    * @param edits Cell updates by column
    * @praram now
    * @throws IOException
@@ -1561,12 +1619,12 @@ public class HRegion implements HConstan
    * @throws IOException
    */
   private void put(final Map<byte [], List<KeyValue>> familyMap,
-      boolean writeToWAL)
-  throws IOException {
+      boolean writeToWAL) throws IOException {
     long now = System.currentTimeMillis();
     byte[] byteNow = Bytes.toBytes(now);
     boolean flush = false;
     this.updatesLock.readLock().lock();
+    ReadWriteConsistencyControl.WriteEntry w = null;
     try {
       WALEdit walEdit = new WALEdit();
 
@@ -1605,8 +1663,11 @@ public class HRegion implements HConstan
         this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
            walEdit, now);
       }
-
+      
       long size = 0;
+
+      w = rwcc.beginMemstoreInsert();
+
       // now make changes to the memstore
       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
         byte[] family = e.getKey();
@@ -1614,11 +1675,14 @@ public class HRegion implements HConstan
 
         Store store = getStore(family);
         for (KeyValue kv: edits) {
+          kv.setMemstoreTS(w.getWriteNumber());
           size = this.memstoreSize.addAndGet(store.add(kv));
         }
       }
       flush = isFlushSize(size);
     } finally {
+      if (w != null) rwcc.completeMemstoreInsert(w);
+
       this.updatesLock.readLock().unlock();
     }
     if (flush) {
@@ -1862,11 +1926,15 @@ public class HRegion implements HConstan
     private final byte [] stopRow;
     private Filter filter;
     private List<KeyValue> results = new ArrayList<KeyValue>();
+    private int isScan;
     private int batch;
     // Doesn't need to be volatile, always accessed under a sync'ed method
     private boolean filterClosed = false;
 
     RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+      //DebugPrint.println("HRegionScanner.<init>, threadpoint = " + ReadWriteConsistencyControl.getThreadReadPoint());
       this.filter = scan.getFilter();
       this.batch = scan.getBatch();
 
@@ -1875,7 +1943,8 @@ public class HRegion implements HConstan
       } else {
         this.stopRow = scan.getStopRow();
       }
-
+      this.isScan = scan.isGetScan() ? -1 : 0;
+      
       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
       if (additionalScanners != null) {
         scanners.addAll(additionalScanners);
@@ -1897,6 +1966,9 @@ public class HRegion implements HConstan
       if (filter != null) {
         filter.reset();
       }
+
+      // Start the next row read and reset the thread point
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     }
 
     public synchronized boolean next(List<KeyValue> outResults, int limit)
@@ -1911,6 +1983,9 @@ public class HRegion implements HConstan
         throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
           " is closing=" + closing.get() + " or closed=" + closed.get());
       }
+
+      // This could be a new thread from the last time we called next().
+      ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
       results.clear();
       boolean returnResult = nextInternal(limit);
       if (!returnResult && filter != null && filter.filterRow()) {
@@ -2494,10 +2569,10 @@ public class HRegion implements HConstan
   // HBASE-880
   //
   /**
-   * @param get
-   * @param lockid
+   * @param get get object
+   * @param lockid existing lock id, or null for no previous lock
    * @return result
-   * @throws IOException
+   * @throws IOException read exceptions
    */
   public Result get(final Get get, final Integer lockid) throws IOException {
     // Verify families are all valid
@@ -2510,24 +2585,28 @@ public class HRegion implements HConstan
         get.addFamily(family);
       }
     }
-    // Lock row
-    Integer lid = getLock(lockid, get.getRow());
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    try {
-      for (Map.Entry<byte[],NavigableSet<byte[]>> entry:
-          get.getFamilyMap().entrySet()) {
-        get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
+    List<KeyValue> result = get(get);
+
     return new Result(result);
   }
 
-  private void get(final Store store, final Get get,
-    final NavigableSet<byte []> qualifiers, List<KeyValue> result)
-  throws IOException {
-    store.get(get, qualifiers, result);
+  /*
+   * Do a get based on the get parameter.
+   */
+  private List<KeyValue> get(final Get get) throws IOException {
+    Scan scan = new Scan(get);
+
+    List<KeyValue> results = new ArrayList<KeyValue>();
+
+    InternalScanner scanner = null;
+    try {
+      scanner = getScanner(scan);
+      scanner.next(results);
+    } finally {
+      if (scanner != null)
+        scanner.close();
+    }
+    return results;
   }
 
   /**
@@ -2536,6 +2615,7 @@ public class HRegion implements HConstan
    * @param family
    * @param qualifier
    * @param amount
+   * @param writeToWAL
    * @return The new value.
    * @throws IOException
    */
@@ -2550,6 +2630,7 @@ public class HRegion implements HConstan
     try {
       Store store = stores.get(family);
 
+      // TODO call the proper GET API
       // Get the old value:
       Get get = new Get(row);
       get.addColumn(family, qualifier);
@@ -2614,8 +2695,8 @@ public class HRegion implements HConstan
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
       (5 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
-      (20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
-
+      (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
+  
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
       ClassSize.ATOMIC_LONG + ClassSize.ATOMIC_INTEGER +

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueSkipListSet.java Sat May 15 00:18:37 2010
@@ -167,6 +167,7 @@ class KeyValueSkipListSet implements Nav
   }
 
   public boolean contains(Object o) {
+    //noinspection SuspiciousMethodCalls
     return this.delegatee.containsKey(o);
   }
 

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sat May 15 00:18:37 2010
@@ -20,15 +20,6 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
@@ -43,6 +34,15 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+
 /**
  * The MemStore holds in-memory modifications to the Store.  Modifications
  * are {@link KeyValue}s.  When asked to flush, current memstore is moved
@@ -80,10 +80,6 @@ public class MemStore implements HeapSiz
   // Used to track own heapSize
   final AtomicLong size;
 
-  // All access must be synchronized.
-  final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
-    new CopyOnWriteArraySet<ChangedMemStoreObserver>();
-
   /**
    * Default constructor. Used for tests.
    */
@@ -131,7 +127,6 @@ public class MemStore implements HeapSiz
         if (!this.kvset.isEmpty()) {
           this.snapshot = this.kvset;
           this.kvset = new KeyValueSkipListSet(this.comparator);
-          tellChangedMemStoreObservers();
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
         }
@@ -141,15 +136,6 @@ public class MemStore implements HeapSiz
     }
   }
 
-  /*
-   * Tell outstanding scanners that memstore has changed.
-   */
-  private void tellChangedMemStoreObservers() {
-    for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
-      o.changedMemStore();
-    }
-  }
-
   /**
    * Return the current snapshot.
    * Called by flusher to get current snapshot made by a previous
@@ -180,7 +166,6 @@ public class MemStore implements HeapSiz
       // create a new snapshot and let the old one go.
       if (!ss.isEmpty()) {
         this.snapshot = new KeyValueSkipListSet(this.comparator);
-        tellChangedMemStoreObservers();
       }
     } finally {
       this.lock.writeLock().unlock();
@@ -212,69 +197,8 @@ public class MemStore implements HeapSiz
   long delete(final KeyValue delete) {
     long s = 0;
     this.lock.readLock().lock();
-    //Have to find out what we want to do here, to find the fastest way of
-    //removing things that are under a delete.
-    //Actions that will take place here are:
-    //1. Insert a delete and remove all the affected entries already in memstore
-    //2. In the case of a Delete and the matching put is found then don't insert
-    //   the delete
-    //TODO Would be nice with if we had an iterator for this, so we could remove
-    //things that needs to be removed while iterating and don't have to go
-    //back and do it afterwards
 
     try {
-      boolean notpresent = false;
-      List<KeyValue> deletes = new ArrayList<KeyValue>();
-      SortedSet<KeyValue> tail = this.kvset.tailSet(delete);
-
-      //Parse the delete, so that it is only done once
-      byte [] deleteBuffer = delete.getBuffer();
-      int deleteOffset = delete.getOffset();
-
-      int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
-      deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
-
-      short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
-      deleteOffset += Bytes.SIZEOF_SHORT;
-      int deleteRowOffset = deleteOffset;
-
-      deleteOffset += deleteRowLen;
-
-      byte deleteFamLen = deleteBuffer[deleteOffset];
-      deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
-
-      int deleteQualifierOffset = deleteOffset;
-      int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
-        Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG -
-        Bytes.SIZEOF_BYTE;
-
-      deleteOffset += deleteQualifierLen;
-
-      int deleteTimestampOffset = deleteOffset;
-      deleteOffset += Bytes.SIZEOF_LONG;
-      byte deleteType = deleteBuffer[deleteOffset];
-
-      //Comparing with tail from memstore
-      for (KeyValue kv : tail) {
-        DeleteCode res = DeleteCompare.deleteCompare(kv, deleteBuffer,
-            deleteRowOffset, deleteRowLen, deleteQualifierOffset,
-            deleteQualifierLen, deleteTimestampOffset, deleteType,
-            comparator.getRawComparator());
-        if (res == DeleteCode.DONE) {
-          break;
-        } else if (res == DeleteCode.DELETE) {
-          deletes.add(kv);
-        } // SKIP
-      }
-
-      //Delete all the entries effected by the last added delete
-      for (KeyValue kv : deletes) {
-        notpresent = this.kvset.remove(kv);
-        s -= heapSizeChange(kv, notpresent);
-      }
-
-      // Adding the delete to memstore. Add any value, as long as
-      // same instance each time.
       s += heapSizeChange(delete, this.kvset.add(delete));
     } finally {
       this.lock.readLock().unlock();
@@ -335,7 +259,7 @@ public class MemStore implements HeapSiz
   }
 
   /**
-   * @param state
+   * @param state column/delete tracking state
    */
   void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
     this.lock.readLock().lock();
@@ -459,7 +383,7 @@ public class MemStore implements HeapSiz
     this.lock.readLock().lock();
     try {
       KeyValueScanner [] scanners = new KeyValueScanner[1];
-      scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
+      scanners[0] = new MemStoreScanner();
       return scanners;
     } finally {
       this.lock.readLock().unlock();
@@ -481,10 +405,8 @@ public class MemStore implements HeapSiz
    * @param matcher Column matcher
    * @param result List to add results to
    * @return true if done with store (early-out), false if not
-   * @throws IOException
    */
-  public boolean get(QueryMatcher matcher, List<KeyValue> result)
-  throws IOException {
+  public boolean get(QueryMatcher matcher, List<KeyValue> result) {
     this.lock.readLock().lock();
     try {
       if(internalGet(this.kvset, matcher, result) || matcher.isDone()) {
@@ -501,11 +423,11 @@ public class MemStore implements HeapSiz
    * Gets from either the memstore or the snapshop, and returns a code
    * to let you know which is which.
    *
-   * @param matcher
-   * @param result
+   * @param matcher query matcher
+   * @param result puts results here
    * @return 1 == memstore, 2 == snapshot, 0 == none
    */
-  int getWithCode(QueryMatcher matcher, List<KeyValue> result) throws IOException {
+  int getWithCode(QueryMatcher matcher, List<KeyValue> result) {
     this.lock.readLock().lock();
     try {
       boolean fromMemstore = internalGet(this.kvset, matcher, result);
@@ -540,11 +462,9 @@ public class MemStore implements HeapSiz
    * @param matcher query matcher
    * @param result list to add results to
    * @return true if done with store (early-out), false if not
-   * @throws IOException
    */
   boolean internalGet(final NavigableSet<KeyValue> set,
-      final QueryMatcher matcher, final List<KeyValue> result)
-  throws IOException {
+      final QueryMatcher matcher, final List<KeyValue> result) {
     if(set.isEmpty()) return false;
     // Seek to startKey
     SortedSet<KeyValue> tail = set.tailSet(matcher.getStartKey());
@@ -574,163 +494,144 @@ public class MemStore implements HeapSiz
    * map and snapshot.
    * This behaves as if it were a real scanner but does not maintain position.
    */
-  protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
-    private List<KeyValue> result = new ArrayList<KeyValue>();
-    private int idx = 0;
-    // Make access atomic.
-    private FirstOnRow firstOnNextRow = new FirstOnRow();
-    // Keep reference to Set so can remove myself when closed.
-    private final Set<ChangedMemStoreObserver> observers;
+  protected class MemStoreScanner implements KeyValueScanner {
+    // Next row information for either kvset or snapshot
+    private KeyValue kvsetNextRow = null;
+    private KeyValue snapshotNextRow = null;
+
+    // iterator based scanning.
+    Iterator<KeyValue> kvsetIt;
+    Iterator<KeyValue> snapshotIt;
 
-    MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
+    /*
+    Some notes...
+
+     So memstorescanner is fixed at creation time. this includes pointers/iterators into
+    existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
+    snapshot is moved.  since kvset is null there is no point on reseeking on both,
+      we can save us the trouble. During the snapshot->hfile transition, the memstore
+      scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
+      potentially do something smarter by adjusting the existing memstore scanner.
+
+      But there is a greater problem here, that being once a scanner has progressed
+      during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
+      if a scan lasts a little while, there is a chance for new entries in kvset to
+      become available but we will never see them.  This needs to be handled at the
+      StoreScanner level with coordination with MemStoreScanner.
+
+    */
+    
+    MemStoreScanner() {
       super();
-      this.observers = observers;
-      this.observers.add(this);
-    }
 
-    public boolean seek(KeyValue key) {
-      try {
-        if (key == null) {
-          close();
-          return false;
-        }
-        this.firstOnNextRow.set(key);
-        return cacheNextRow();
-      } catch(Exception e) {
-        close();
-        return false;
-      }
+      //DebugPrint.println(" MS new@" + hashCode());
     }
 
-    public KeyValue peek() {
-      if (idx >= this.result.size()) {
-        if (!cacheNextRow()) {
-          return null;
+    protected KeyValue getNext(Iterator<KeyValue> it) {
+      KeyValue ret = null;
+      long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+      //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
+      
+      while (ret == null && it.hasNext()) {
+        KeyValue v = it.next();
+        if (v.getMemstoreTS() <= readPoint) {
+          // keep it.
+          ret = v;
         }
-        return peek();
       }
-      return result.get(idx);
+      return ret;
     }
 
-    public KeyValue next() {
-      if (idx >= result.size()) {
-        if (!cacheNextRow()) {
-          return null;
-        }
-        return next();
+    public synchronized boolean seek(KeyValue key) {
+      if (key == null) {
+        close();
+        return false;
       }
-      return this.result.get(idx++);
-    }
 
-    /**
-     * @return True if successfully cached a next row.
-     */
-    boolean cacheNextRow() {
-      // Prevent snapshot being cleared while caching a row.
-      lock.readLock().lock();
-      try {
-        this.result.clear();
-        this.idx = 0;
-        // Look at each set, kvset and snapshot.
-        // Both look for matching entries for this.current row returning what
-        // they
-        // have as next row after this.current (or null if nothing in set or if
-        // nothing follows.
-        KeyValue kvsetNextRow = cacheNextRow(kvset);
-        KeyValue snapshotNextRow = cacheNextRow(snapshot);
-        if (kvsetNextRow == null && snapshotNextRow == null) {
-          // Nothing more in memstore but we might have gotten current row
-          // results
-          // Indicate at end of store by setting next row to null.
-          this.firstOnNextRow.set(null);
-          return !this.result.isEmpty();
-        } else if (kvsetNextRow != null && snapshotNextRow != null) {
-          // Set current at the lowest of the two values.
-          int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
-          this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
-        } else {
-          this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
-        }
-        return true;
-      } finally {
-        lock.readLock().unlock();
-      }
-    }
+      // kvset and snapshot will never be empty.
+      // if tailSet cant find anything, SS is empty (not null).
+      SortedSet<KeyValue> kvTail = kvset.tailSet(key);
+      SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
 
-    /*
-     * See if set has entries for the <code>this.current</code> row.  If so,
-     * add them to <code>this.result</code>.
-     * @param set Set to examine
-     * @return Next row in passed <code>set</code> or null if nothing in this
-     * passed <code>set</code>
-     */
-    private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
-      if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
-      SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
-      if (tail == null || tail.isEmpty()) return null;
-      KeyValue first = tail.first();
-      KeyValue nextRow = null;
-      for (KeyValue kv: tail) {
-        if (comparator.compareRows(first, kv) != 0) {
-          nextRow = kv;
-          break;
-        }
-        this.result.add(kv);
-      }
-      return nextRow;
-    }
+      kvsetIt = kvTail.iterator();
+      snapshotIt = snapshotTail.iterator();
 
-    public void close() {
-      this.firstOnNextRow.set(null);
-      idx = 0;
-      if (!result.isEmpty()) {
-        result.clear();
-      }
-      this.observers.remove(this);
+      kvsetNextRow = getNext(kvsetIt);
+      snapshotNextRow = getNext(snapshotIt);
+
+
+      //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
+      //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
+      //    kvset.size() + " threadread = " + readPoint);
+      //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
+      //    snapshot.size() + " threadread = " + readPoint);
+
+      
+      KeyValue lowest = getLowest();
+
+      // has data := (lowest != null)
+      return lowest != null;
     }
 
-    public void changedMemStore() {
-      this.firstOnNextRow.reset();
+    public synchronized KeyValue peek() {
+      //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+      return getLowest();
     }
-  }
 
-  /*
-   * Private class that holds firstOnRow and utility.
-   * Usually firstOnRow is the first KeyValue we find on next row rather than
-   * the absolute minimal first key (empty column, Type.Maximum, maximum ts).
-   * Usually its ok being sloppy with firstOnRow letting it be the first thing
-   * found on next row -- this works -- but if the memstore changes on us, reset
-   * firstOnRow to be the ultimate firstOnRow.  We play sloppy with firstOnRow
-   * usually so we don't have to  allocate a new KeyValue each time firstOnRow
-   * is updated.
-   */
-  private static class FirstOnRow {
-    private KeyValue firstOnRow = null;
 
-    FirstOnRow() {
-      super();
+    public synchronized KeyValue next() {
+      KeyValue theNext = getLowest();
+
+      if (theNext == null) {
+          return null;
+      }
+
+      // Advance one of the iterators
+      if (theNext == kvsetNextRow) {
+        kvsetNextRow = getNext(kvsetIt);
+      } else {
+        snapshotNextRow = getNext(snapshotIt);
+      }
+
+      //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
+      //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
+      //    getLowest() + " threadpoint=" + readpoint);
+      return theNext;
     }
 
-    synchronized void set(final KeyValue kv) {
-      this.firstOnRow = kv;
+    protected KeyValue getLowest() {
+      return getLower(kvsetNextRow,
+          snapshotNextRow);
     }
 
-    /* Reset firstOnRow to a 'clean', absolute firstOnRow.
+    /*
+     * Returns the lower of the two key values, or null if they are both null.
+     * This uses comparator.compare() to compare the KeyValue using the memstore
+     * comparator.
      */
-    synchronized void reset() {
-      if (this.firstOnRow == null) return;
-      this.firstOnRow =
-         new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
+    protected KeyValue getLower(KeyValue first, KeyValue second) {
+      if (first == null && second == null) {
+        return null;
+      }
+      if (first != null && second != null) {
+        int compare = comparator.compare(first, second);
+        return (compare <= 0 ? first : second);
+      }
+      return (first != null ? first : second);
     }
 
-    synchronized KeyValue get() {
-      return this.firstOnRow;
+    public synchronized void close() {
+      this.kvsetNextRow = null;
+      this.snapshotNextRow = null;
+
+      this.kvsetIt = null;
+      this.snapshotIt = null;
     }
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
-
+      ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
+  
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
@@ -770,7 +671,7 @@ public class MemStore implements HeapSiz
    * enough.  See hbase-900.  Fills memstores then waits so user can heap
    * dump and bring up resultant hprof in something like jprofiler which
    * allows you get 'deep size' on objects.
-   * @param args
+   * @param args main args
    */
   public static void main(String [] args) {
     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
@@ -807,16 +708,4 @@ public class MemStore implements HeapSiz
     }
     LOG.info("Exiting.");
   }
-
-  /**
-   * Observers want to know about MemStore changes.
-   * Called when snapshot is cleared and when we make one.
-   */
-  interface ChangedMemStoreObserver {
-    /**
-     * Notify observers.
-     * @throws IOException
-     */
-    void changedMemStore();
-  }
 }

Added: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java?rev=944529&view=auto
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java (added)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ReadWriteConsistencyControl.java Sat May 15 00:18:37 2010
@@ -0,0 +1,106 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages the read/write consistency within memstore. This provides
+ * an interface for readers to determine what entries to ignore, and
+ * a mechanism for writers to obtain new write numbers, then "commit"
+ * the new writes for readers to read (thus forming atomic transactions).
+ */
+public class ReadWriteConsistencyControl {
+  private final AtomicLong memstoreRead = new AtomicLong();
+  private final AtomicLong memstoreWrite = new AtomicLong();
+  // This is the pending queue of writes.
+  private final LinkedList<WriteEntry> writeQueue =
+      new LinkedList<WriteEntry>();
+
+  private static final ThreadLocal<Long> perThreadReadPoint =
+      new ThreadLocal<Long>();
+
+  public static long getThreadReadPoint() {
+    return perThreadReadPoint.get();
+  }
+  
+  public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
+    perThreadReadPoint.set(rwcc.memstoreReadPoint());
+    return getThreadReadPoint();
+  }
+
+  public WriteEntry beginMemstoreInsert() {
+    synchronized (writeQueue) {
+      long nextWriteNumber = memstoreWrite.incrementAndGet();
+      WriteEntry e = new WriteEntry(nextWriteNumber);
+      writeQueue.add(e);
+      return e;
+    }
+  }
+  public void completeMemstoreInsert(WriteEntry e) {
+    synchronized (writeQueue) {
+      e.markCompleted();
+
+      long nextReadValue = -1;
+      boolean ranOnce=false;
+      while (!writeQueue.isEmpty()) {
+        ranOnce=true;
+        WriteEntry queueFirst = writeQueue.getFirst();
+
+        if (nextReadValue > 0) {
+          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
+            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+                + nextReadValue + " next: " + queueFirst.getWriteNumber());
+          }
+        }
+
+        if (queueFirst.isCompleted()) {
+          nextReadValue = queueFirst.getWriteNumber();
+          writeQueue.removeFirst();
+        } else {
+          break;
+        }
+      }
+
+      if (!ranOnce) {
+        throw new RuntimeException("never was a first");
+      }
+
+      if (nextReadValue > 0) {
+        memstoreRead.set(nextReadValue);
+      }
+    }
+
+    // Spin until any other concurrent puts have finished. This makes sure that
+    // if we move on to construct a scanner, we'll get read-your-own-writes
+    // consistency. We anticipate that since puts to the memstore are very fast,
+    // this will be on the order of microseconds - so spinning should be faster
+    // than a condition variable.
+    int spun = 0;
+    while (memstoreRead.get() < e.getWriteNumber()) {
+      spun++;
+    }
+    // Could potentially expose spun as a metric
+  }
+
+  public long memstoreReadPoint() {
+    return memstoreRead.get();
+  }
+
+
+  public static class WriteEntry {
+    private long writeNumber;
+    private boolean completed = false;
+    WriteEntry(long writeNumber) {
+      this.writeNumber = writeNumber;
+    }
+    void markCompleted() {
+      this.completed = true;
+    }
+    boolean isCompleted() {
+      return this.completed;
+    }
+    long getWriteNumber() {
+      return this.writeNumber;
+    }
+  }
+}

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Sat May 15 00:18:37 2010
@@ -34,6 +34,7 @@ public class ScanQueryMatcher extends Qu
   // Optimization so we can skip lots of compares when we decide to skip
   // to the next row.
   private boolean stickyNextRow;
+  private KeyValue stopKey = null;
 
   /**
    * Constructs a QueryMatcher for a Scan.
@@ -51,6 +52,11 @@ public class ScanQueryMatcher extends Qu
     this.rowComparator = rowComparator;
     this.deletes =  new ScanDeleteTracker();
     this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
+    if (scan.isGetScan()) {
+      this.stopKey = KeyValue.createLastOnRow(scan.getStopRow());
+    } else {
+      this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+    }
     this.filter = scan.getFilter();
 
     // Single branch to deal with two types of reads (columns vs all in family)

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat May 15 00:18:37 2010
@@ -609,9 +609,12 @@ public class Store implements HConstants
     this.lock.writeLock().lock();
     try {
       this.storefiles.put(Long.valueOf(logCacheFlushId), sf);
+
+      this.memstore.clearSnapshot(set);
+
       // Tell listeners of the change in readers.
       notifyChangedReadersObservers();
-      this.memstore.clearSnapshot(set);
+
       return this.storefiles.size() >= this.compactionThreshold;
     } finally {
       this.lock.writeLock().unlock();
@@ -639,10 +642,8 @@ public class Store implements HConstants
    * @param o Observer no longer interested in changes in set of Readers.
    */
   void deleteChangedReaderObserver(ChangedReadersObserver o) {
-    if(this.changedReaderObservers.size() > 0) {
-      if (!this.changedReaderObservers.remove(o)) {
-        LOG.warn("Not in set" + o);
-      }
+    if (!this.changedReaderObservers.remove(o)) {
+      LOG.warn("Not in set" + o);
     }
   }
 
@@ -993,6 +994,10 @@ public class Store implements HConstants
           Long orderVal = Long.valueOf(result.getMaxSequenceId());
           this.storefiles.put(orderVal, result);
         }
+
+        // WARN ugly hack here, but necessary sadly.
+        ReadWriteConsistencyControl.resetThreadReadPoint(region.getRWCC());
+        
         // Tell observers that list of StoreFiles has changed.
         notifyChangedReadersObservers();
         // Finally, delete old store files.
@@ -1449,7 +1454,12 @@ public class Store implements HConstants
   }
 
   /**
-   * Increments the value for the given row/family/qualifier
+   * Increments the value for the given row/family/qualifier.
+   *
+   * This function will always be seen as atomic by other readers
+   * because it only puts a single KV to memstore. Thus no
+   * read/write control necessary.
+   * 
    * @param row
    * @param f
    * @param qualifier

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat May 15 00:18:37 2010
@@ -32,7 +32,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Scanner scans both the memstore and the HStore. Coaleace KeyValue stream
@@ -46,10 +45,15 @@ class StoreScanner implements KeyValueSc
   private boolean cacheBlocks;
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
-  private final AtomicBoolean closing = new AtomicBoolean(false);
+  private boolean closing = false;
+  private final boolean isGet;
 
   /**
    * Opens a scanner across memstore, snapshot, and all StoreFiles.
+   *
+   * @param store who we scan
+   * @param scan the spec
+   * @param columns which columns we are scanning
    */
   StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
     this.store = store;
@@ -58,9 +62,11 @@ class StoreScanner implements KeyValueSc
         columns, store.ttl, store.comparator.getRawComparator(),
         store.versionsToReturn(scan.getMaxVersions()));
 
+    this.isGet = scan.isGetScan();
     List<KeyValueScanner> scanners = getScanners();
 
     // Seek all scanners to the initial key
+    // TODO if scan.isGetScan, use bloomfilters to skip seeking
     for(KeyValueScanner scanner : scanners) {
       scanner.seek(matcher.getStartKey());
     }
@@ -76,10 +82,14 @@ class StoreScanner implements KeyValueSc
    * Used for major compactions.<p>
    *
    * Opens a scanner across specified StoreFiles.
+   * @param store who we scan
+   * @param scan the spec
+   * @param scanners ancilliary scanners
    */
   StoreScanner(Store store, Scan scan, KeyValueScanner [] scanners) {
     this.store = store;
     this.cacheBlocks = false;
+    this.isGet = false;
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
         null, store.ttl, store.comparator.getRawComparator(),
         store.versionsToReturn(scan.getMaxVersions()));
@@ -99,6 +109,7 @@ class StoreScanner implements KeyValueSc
       final NavigableSet<byte[]> columns,
       final KeyValueScanner [] scanners) {
     this.store = null;
+    this.isGet = false;
     this.cacheBlocks = scan.getCacheBlocks();
     this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
         comparator.getRawComparator(), scan.getMaxVersions());
@@ -132,8 +143,8 @@ class StoreScanner implements KeyValueSc
   }
 
   public synchronized void close() {
-    boolean state = this.closing.getAndSet(true);
-    if (state) return;
+    if (this.closing) return;
+    this.closing = true;
     // under test, we dont have a this.store
     if (this.store != null)
       this.store.deleteChangedReaderObserver(this);
@@ -146,11 +157,12 @@ class StoreScanner implements KeyValueSc
 
   /**
    * Get the next row of values from this Store.
-   * @param result
+   * @param outResult
    * @param limit
    * @return true if there are more rows, false if scanner is done
    */
   public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
+    //DebugPrint.println("SS.next");
     KeyValue peeked = this.heap.peek();
     if (peeked == null) {
       close();
@@ -161,6 +173,7 @@ class StoreScanner implements KeyValueSc
     List<KeyValue> results = new ArrayList<KeyValue>();
     LOOP: while((kv = this.heap.peek()) != null) {
       QueryMatcher.MatchCode qcode = matcher.match(kv);
+      //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
       switch(qcode) {
         case INCLUDE:
           KeyValue next = this.heap.next();
@@ -228,8 +241,8 @@ class StoreScanner implements KeyValueSc
         LOG.warn("StoreFile " + sf + " has null Reader");
         continue;
       }
-      // Get a scanner that does not use pread.
-      s.add(r.getScanner(this.cacheBlocks, false));
+      // If isGet, use pread, else false, dont use pread
+      s.add(r.getScanner(this.cacheBlocks, isGet));
     }
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(s.size()+1);
@@ -241,12 +254,16 @@ class StoreScanner implements KeyValueSc
 
   // Implementation of ChangedReadersObserver
   public synchronized void updateReaders() throws IOException {
-    if (this.closing.get()) return;
+    if (this.closing) return;
     KeyValue topKey = this.peek();
     if (topKey == null) return;
+
     List<KeyValueScanner> scanners = getScanners();
 
-    // Seek all scanners to the initial key
+    // close the previous scanners:
+    this.heap.close(); // bubble thru and close all scanners.
+    this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
+
     for(KeyValueScanner scanner : scanners) {
       scanner.seek(topKey);
     }

Modified: hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hadoop/hbase/trunk/core/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Sat May 15 00:18:37 2010
@@ -365,6 +365,7 @@ public class FSUtils {
     return true;
   }
 
+  // TODO move this method OUT of FSUtils. No dependencies to HMaster
   /**
    * Returns the total overall fragmentation percentage. Includes .META. and
    * -ROOT- as well.

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/TestKeyValue.java Sat May 15 00:18:37 2010
@@ -277,4 +277,49 @@ public class TestKeyValue extends TestCa
     // TODO actually write this test!
 
   }
+
+  private final byte[] rowA = Bytes.toBytes("rowA");
+  private final byte[] rowB = Bytes.toBytes("rowB");
+
+  private final byte[] family = Bytes.toBytes("family");
+  private final byte[] qualA = Bytes.toBytes("qfA");
+
+  private void assertKVLess(KeyValue.KVComparator c,
+                            KeyValue less,
+                            KeyValue greater) {
+    int cmp = c.compare(less,greater);
+    assertTrue(cmp < 0);
+    cmp = c.compare(greater,less);
+    assertTrue(cmp > 0);
+  }
+
+  public void testFirstLastOnRow() {
+    final KVComparator c = KeyValue.COMPARATOR;
+    long ts = 1;
+
+    // These are listed in sort order (ie: every one should be less
+    // than the one on the next line).
+    final KeyValue firstOnRowA = KeyValue.createFirstOnRow(rowA);
+    final KeyValue kvA_1 = new KeyValue(rowA, null, null, ts, Type.Put);
+    final KeyValue kvA_2 = new KeyValue(rowA, family, qualA, ts, Type.Put);
+        
+    final KeyValue lastOnRowA = KeyValue.createLastOnRow(rowA);
+    final KeyValue firstOnRowB = KeyValue.createFirstOnRow(rowB);
+    final KeyValue kvB = new KeyValue(rowB, family, qualA, ts, Type.Put);
+
+    assertKVLess(c, firstOnRowA, firstOnRowB);
+    assertKVLess(c, firstOnRowA, kvA_1);
+    assertKVLess(c, firstOnRowA, kvA_2);
+    assertKVLess(c, kvA_1, kvA_2);
+    assertKVLess(c, kvA_2, firstOnRowB);
+    assertKVLess(c, kvA_1, firstOnRowB);
+
+    assertKVLess(c, lastOnRowA, firstOnRowB);
+    assertKVLess(c, firstOnRowB, kvB);
+    assertKVLess(c, lastOnRowA, kvB);
+
+    assertKVLess(c, kvA_2, lastOnRowA);
+    assertKVLess(c, kvA_1, lastOnRowA);
+    assertKVLess(c, firstOnRowA, lastOnRowA);
+  }
 }

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Sat May 15 00:18:37 2010
@@ -308,6 +308,32 @@ public class TestFromClientSide {
       CompareFilter.CompareOp.GREATER_OR_EQUAL));
     assertEquals(rowCount - endKeyCount, countGreater);
   }
+  
+  /*
+   * Load table with rows from 'aaa' to 'zzz'.
+   * @param t
+   * @return Count of rows loaded.
+   * @throws IOException
+   */
+  private int loadTable(final HTable t) throws IOException {
+    // Add data to table.
+    byte[] k = new byte[3];
+    int rowCount = 0;
+    for (byte b1 = 'a'; b1 < 'z'; b1++) {
+      for (byte b2 = 'a'; b2 < 'z'; b2++) {
+        for (byte b3 = 'a'; b3 < 'z'; b3++) {
+          k[0] = b1;
+          k[1] = b2;
+          k[2] = b3;
+          Put put = new Put(k);
+          put.add(FAMILY, new byte[0], k);
+          t.put(put);
+          rowCount++;
+        }
+      }
+    }
+    return rowCount;
+  }
 
   /*
    * @param key
@@ -1452,7 +1478,7 @@ public class TestFromClientSide {
     ht.put(put);
 
     delete = new Delete(ROW);
-    delete.deleteColumn(FAMILIES[0], QUALIFIER);
+    delete.deleteColumn(FAMILIES[0], QUALIFIER); // ts[4]
     ht.delete(delete);
 
     get = new Get(ROW);
@@ -1487,23 +1513,24 @@ public class TestFromClientSide {
     // But alas, this is not to be.  We can't put them back in either case.
 
     put = new Put(ROW);
-    put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]);
-    put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]);
+    put.add(FAMILIES[0], QUALIFIER, ts[0], VALUES[0]); // 1000
+    put.add(FAMILIES[0], QUALIFIER, ts[4], VALUES[4]); // 5000
     ht.put(put);
 
-    // The Get returns the latest value but then does not return the
-    // oldest, which was never deleted, ts[1].
 
+    // It used to be due to the internal implementation of Get, that
+    // the Get() call would return ts[4] UNLIKE the Scan below. With
+    // the switch to using Scan for Get this is no longer the case.
     get = new Get(ROW);
     get.addFamily(FAMILIES[0]);
     get.setMaxVersions(Integer.MAX_VALUE);
     result = ht.get(get);
-    assertNResult(result, ROW, FAMILIES[0], QUALIFIER,
-        new long [] {ts[2], ts[3], ts[4]},
-        new byte[][] {VALUES[2], VALUES[3], VALUES[4]},
+    assertNResult(result, ROW, FAMILIES[0], QUALIFIER, 
+        new long [] {ts[1], ts[2], ts[3]},
+        new byte[][] {VALUES[1], VALUES[2], VALUES[3]},
         0, 2);
 
-    // The Scanner returns the previous values, the expected-unexpected behavior
+    // The Scanner returns the previous values, the expected-naive-unexpected behavior
 
     scan = new Scan(ROW);
     scan.addFamily(FAMILIES[0]);
@@ -1537,6 +1564,15 @@ public class TestFromClientSide {
     put.add(FAMILIES[2], QUALIFIER, ts[3], VALUES[3]);
     ht.put(put);
 
+    // Assert that above went in.
+    get = new Get(ROWS[2]);
+    get.addFamily(FAMILIES[1]);
+    get.addFamily(FAMILIES[2]);
+    get.setMaxVersions(Integer.MAX_VALUE);
+    result = ht.get(get);
+    assertTrue("Expected 4 key but received " + result.size() + ": " + result,
+        result.size() == 4);
+
     delete = new Delete(ROWS[0]);
     delete.deleteFamily(FAMILIES[2]);
     ht.delete(delete);
@@ -1596,8 +1632,7 @@ public class TestFromClientSide {
     get.addFamily(FAMILIES[2]);
     get.setMaxVersions(Integer.MAX_VALUE);
     result = ht.get(get);
-    assertTrue("Expected 1 key but received " + result.size(),
-        result.size() == 1);
+    assertEquals(1, result.size());
     assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
         new long [] {ts[2]},
         new byte[][] {VALUES[2]},
@@ -1608,8 +1643,7 @@ public class TestFromClientSide {
     scan.addFamily(FAMILIES[2]);
     scan.setMaxVersions(Integer.MAX_VALUE);
     result = getSingleScanResult(ht, scan);
-    assertTrue("Expected 1 key but received " + result.size(),
-        result.size() == 1);
+    assertEquals(1, result.size());
     assertNResult(result, ROWS[2], FAMILIES[2], QUALIFIER,
         new long [] {ts[2]},
         new byte[][] {VALUES[2]},
@@ -1696,7 +1730,7 @@ public class TestFromClientSide {
     }
   }
 
-  /**
+  /*
    * Baseline "scalability" test.
    *
    * Tests one hundred families, one million columns, one million versions
@@ -2661,12 +2695,11 @@ public class TestFromClientSide {
         "Got row [" + Bytes.toString(result.getRow()) +"]",
         equals(row, result.getRow()));
     int expectedResults = end - start + 1;
-    assertTrue("Expected " + expectedResults + " keys but result contains "
-        + result.size(), result.size() == expectedResults);
+    assertEquals(expectedResults, result.size());
 
     KeyValue [] keys = result.sorted();
 
-    for(int i=0;i<keys.length;i++) {
+    for (int i=0; i<keys.length; i++) {
       byte [] value = values[end-i];
       long ts = stamps[end-i];
       KeyValue key = keys[i];
@@ -2824,9 +2857,9 @@ public class TestFromClientSide {
   }
 
   private boolean equals(byte [] left, byte [] right) {
-    if(left == null && right == null) return true;
-    if(left == null && right.length == 0) return true;
-    if(right == null && left.length == 0) return true;
+    if (left == null && right == null) return true;
+    if (left == null && right.length == 0) return true;
+    if (right == null && left.length == 0) return true;
     return Bytes.equals(left, right);
   }
 

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestGetDeleteTracker.java Sat May 15 00:18:37 2010
@@ -255,8 +255,8 @@ public class TestGetDeleteTracker extend
     }
     //update()
     dt.update();
-    assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts3));
-    assertEquals(false, dt.isDeleted(col2, 0, col2Len, ts1));
+    assertFalse(dt.isDeleted(col2, 0, col2Len, ts3));
+    assertFalse(dt.isDeleted(col2, 0, col2Len, ts1));
   }
   public void testIsDeleted_Delete(){
     //Building lists

Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=944529&r1=944528&r2=944529&view=diff
==============================================================================
--- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat May 15 00:18:37 2010
@@ -93,7 +93,7 @@ public class TestHRegion extends HBaseTe
   //////////////////////////////////////////////////////////////////////////////
 
 
-  /**
+  /*
    * An involved filter test.  Has multiple column families and deletes in mix.
    */
   public void testWeirdCacheBehaviour() throws Exception {
@@ -361,6 +361,34 @@ public class TestHRegion extends HBaseTe
   //////////////////////////////////////////////////////////////////////////////
   // Delete tests
   //////////////////////////////////////////////////////////////////////////////
+  public void testDelete_multiDeleteColumn() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] qual = Bytes.toBytes("qualifier");
+    byte [] value = Bytes.toBytes("value");
+
+    Put put = new Put(row1);
+    put.add(fam1, qual, 1, value);
+    put.add(fam1, qual, 2, value);
+
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+
+    region.put(put);
+
+    // We do support deleting more than 1 'latest' version
+    Delete delete = new Delete(row1);
+    delete.deleteColumn(fam1, qual);
+    delete.deleteColumn(fam1, qual);
+    region.delete(delete, null, false);
+
+    Get get = new Get(row1);
+    get.addFamily(fam1);
+    Result r = region.get(get, null);
+    assertEquals(0, r.size());
+  }
+
   public void testDelete_CheckFamily() throws IOException {
     byte [] tableName = Bytes.toBytes("testtable");
     byte [] row1 = Bytes.toBytes("row1");
@@ -369,11 +397,9 @@ public class TestHRegion extends HBaseTe
     byte [] fam3 = Bytes.toBytes("fam3");
     byte [] fam4 = Bytes.toBytes("fam4");
 
-    byte[][] families  = {fam1, fam2, fam3};
-
     //Setting up region
     String method = this.getName();
-    initHRegion(tableName, method, families);
+    initHRegion(tableName, method, fam1, fam2, fam3);
 
     List<KeyValue> kvs  = new ArrayList<KeyValue>();
     kvs.add(new KeyValue(row1, fam4, null, null));
@@ -1455,6 +1481,41 @@ public class TestHRegion extends HBaseTe
     assertICV(row, fam1, qual1, value+amount);
   }
 
+  public void testIncrementColumnValue_BumpSnapshot() throws IOException {
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 42L;
+    long incr = 44L;
+
+    // first put something in kvset, then snapshot it.
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
+
+    // get the store in question:
+    Store s = region.getStore(fam1);
+    s.snapshot(); //bam
+
+    // now increment:
+    long newVal = region.incrementColumnValue(row, fam1, qual1,
+        incr, false);
+
+    assertEquals(value+incr, newVal);
+
+    // get both versions:
+    Get get = new Get(row);
+    get.setMaxVersions();
+    get.addColumn(fam1,qual1);
+
+    Result r = region.get(get, null);
+    assertEquals(2, r.size());
+    KeyValue first = r.raw()[0];
+    KeyValue second = r.raw()[1];
+
+    assertTrue("ICV failed to upgrade timestamp",
+        first.getTimestamp() != second.getTimestamp());
+  }
+  
   public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
     initHRegion(tableName, getName(), fam1);
 
@@ -1952,9 +2013,9 @@ public class TestHRegion extends HBaseTe
     FlushThread flushThread = new FlushThread();
     flushThread.start();
 
-    Scan scan = new Scan();
-    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
-      new BinaryComparator(Bytes.toBytes("row0"))));
+    Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
+//    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,
+//      new BinaryComparator(Bytes.toBytes("row0"))));
 
     int expectedCount = numFamilies * numQualifiers;
     List<KeyValue> res = new ArrayList<KeyValue>();
@@ -1967,7 +2028,7 @@ public class TestHRegion extends HBaseTe
       }
 
       if (i != 0 && i % flushInterval == 0) {
-        //System.out.println("scan iteration = " + i);
+        //System.out.println("flush scan iteration = " + i);
         flushThread.flush();
       }
 
@@ -1976,14 +2037,18 @@ public class TestHRegion extends HBaseTe
       InternalScanner scanner = region.getScanner(scan);
       while (scanner.next(res)) ;
       if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
-        Assert.assertEquals("i=" + i, expectedCount, res.size());
+        assertEquals("i=" + i, expectedCount, res.size());
         long timestamp = res.get(0).getTimestamp();
-        Assert.assertTrue(timestamp >= prevTimestamp);
+        assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
+            timestamp >= prevTimestamp);
         prevTimestamp = timestamp;
       }
     }
 
     putThread.done();
+
+    region.flushcache();
+
     putThread.join();
     putThread.checkNoError();
 
@@ -2028,15 +2093,16 @@ public class TestHRegion extends HBaseTe
           for (int r = 0; r < numRows; r++) {
             byte[] row = Bytes.toBytes("row" + r);
             Put put = new Put(row);
-            for (int f = 0; f < families.length; f++) {
-              for (int q = 0; q < qualifiers.length; q++) {
-                put.add(families[f], qualifiers[q], (long) val,
-                  Bytes.toBytes(val));
+            for (byte[] family : families) {
+              for (byte[] qualifier : qualifiers) {
+                put.add(family, qualifier, (long) val,
+                    Bytes.toBytes(val));
               }
             }
+//            System.out.println("Putting of kvsetsize=" + put.size());
             region.put(put);
-            if (val > 0 && val % 47 == 0){
-              //System.out.println("put iteration = " + val);
+            if (val > 0 && val % 47 == 0) {
+              System.out.println("put iteration = " + val);
               Delete delete = new Delete(row, (long)val-30, null);
               region.delete(delete, null, true);
             }
@@ -2123,6 +2189,9 @@ public class TestHRegion extends HBaseTe
     }
 
     putThread.done();
+    
+    region.flushcache();
+
     putThread.join();
     putThread.checkNoError();
 



Mime
View raw message