hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r782178 [8/16] - in /hadoop/hbase/trunk: bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/client/transactional/ src/java/o...
Date Sat, 06 Jun 2009 01:26:27 GMT
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Jun  6 01:26:21 2009
@@ -24,21 +24,17 @@
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,12 +47,13 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.SequenceFile;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.util.Progressable;
@@ -171,7 +168,13 @@
     this.comparatorIgnoringType = this.comparator.getComparatorIgnoringType();
     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     this.ttl = family.getTimeToLive();
-    if (ttl != HConstants.FOREVER) {
+    if (ttl == HConstants.FOREVER) {
+      // default is unlimited ttl.
+      ttl = Long.MAX_VALUE;
+    } else if (ttl == -1) {
+      ttl = Long.MAX_VALUE;
+    } else {
+      // second -> ms adjust for user data
       this.ttl *= 1000;
     }
     this.memcache = new Memcache(this.ttl, this.comparator);
@@ -304,9 +307,8 @@
         }
         // Check this edit is for me. Also, guard against writing the special
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
-        if (/* Commented out for now -- St.Ack val.isTransactionEntry() ||*/
-            val.matchingColumnNoDelimiter(HLog.METACOLUMN,
-              HLog.METACOLUMN.length - 1) ||
+        if (/* commented out for now - stack via jgray key.isTransactionEntry() || */
+            val.matchingFamily(HLog.METAFAMILY) ||
           !Bytes.equals(key.getRegionName(), regioninfo.getRegionName()) ||
           !val.matchingFamily(family.getName())) {
           continue;
@@ -396,6 +398,21 @@
       lock.readLock().unlock();
     }
   }
+  
+  /**
+   * Adds a value to the memcache
+   * 
+   * @param kv
+   * @return memcache size delta
+   */
+  protected long delete(final KeyValue kv) {
+    lock.readLock().lock();
+    try {
+      return this.memcache.delete(kv);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
 
   /**
    * @return All store files.
@@ -476,7 +493,7 @@
     if (cache.size() == 0) {
       return null;
     }
-    long now = System.currentTimeMillis();
+    long oldestTimestamp = System.currentTimeMillis() - ttl;
     // TODO:  We can fail in the below block before we complete adding this
     // flush to list of store files.  Add cleanup of anything put on filesystem
     // if we fail.
@@ -486,7 +503,7 @@
       int entries = 0;
       try {
         for (KeyValue kv: cache) {
-          if (!isExpired(kv, ttl, now)) {
+          if (!isExpired(kv, oldestTimestamp)) {
             writer.append(kv);
             entries++;
             flushed += this.memcache.heapSize(kv, true);
@@ -575,8 +592,10 @@
    * @param o Observer no longer interested in changes in set of Readers.
    */
   void deleteChangedReaderObserver(ChangedReadersObserver o) {
-    if (!this.changedReaderObservers.remove(o)) {
-      LOG.warn("Not in set" + o);
+    if(this.changedReaderObservers.size() > 0) {
+      if (!this.changedReaderObservers.remove(o)) {
+        LOG.warn("Not in set" + o);
+      }
     }
   }
 
@@ -793,140 +812,49 @@
     return result;
   }
 
-  /*
-   * @param r StoreFile list to reverse
-   * @return A new array of content of <code>readers</code>, reversed.
-   */
-  private StoreFile [] reverse(final List<StoreFile> r) {
-    List<StoreFile> copy = new ArrayList<StoreFile>(r);
-    Collections.reverse(copy);
-    // MapFile.Reader is instance of StoreFileReader so this should be ok.
-    return copy.toArray(new StoreFile[0]);
-  }
-
-  /*
-   * @param rdrs List of StoreFiles
-   * @param keys Current keys
-   * @param done Which readers are done
-   * @return The lowest current key in passed <code>rdrs</code>
-   */
-  private int getLowestKey(final HFileScanner [] rdrs, final KeyValue [] keys,
-      final boolean [] done) {
-    int lowestKey = -1;
-    for (int i = 0; i < rdrs.length; i++) {
-      if (done[i]) {
-        continue;
-      }
-      if (lowestKey < 0) {
-        lowestKey = i;
-      } else {
-        if (this.comparator.compare(keys[i], keys[lowestKey]) < 0) {
-          lowestKey = i;
-        }
-      }
-    }
-    return lowestKey;
-  }
-
-  /*
-   * Compact a list of StoreFiles.
+  /**
+   * Do a minor/major compaction.  Uses the scan infrastructure to make it easy.
    * 
-   * We work by iterating through the readers in parallel looking at newest
-   * store file first. We always increment the lowest-ranked one. Updates to a
-   * single row/column will appear ranked by timestamp.
-   * @param compactedOut Where to write compaction.
-   * @param pReaders List of readers sorted oldest to newest.
-   * @param majorCompaction True to force a major compaction regardless of
-   * thresholds
+   * @param writer output writer
+   * @param filesToCompact which files to compact
+   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
    * @throws IOException
    */
-  private void compact(final HFile.Writer compactedOut,
-      final List<StoreFile> pReaders, final boolean majorCompaction)
-  throws IOException {
-    // Reverse order so newest store file is first.
-    StoreFile[] files = reverse(pReaders);
-    HFileScanner [] rdrs = new HFileScanner[files.length];
-    KeyValue [] kvs = new KeyValue[rdrs.length];
-    boolean[] done = new boolean[rdrs.length];
-    // Now, advance through the readers in order. This will have the
-    // effect of a run-time sort of the entire dataset.
-    int numDone = 0;
-    for (int i = 0; i < rdrs.length; i++) {
-      rdrs[i] = files[i].getReader().getScanner();
-      done[i] = !rdrs[i].seekTo();
-      if (done[i]) {
-        numDone++;
-      } else {
-        kvs[i] = rdrs[i].getKeyValue();
-      }
+  private void compact(HFile.Writer writer,
+                       List<StoreFile> filesToCompact,
+                       boolean majorCompaction) throws IOException {
+    // for each file, obtain a scanner:
+    KeyValueScanner [] scanners = new KeyValueScanner[filesToCompact.size()];
+    // init:
+    for(int i = 0; i < filesToCompact.size(); ++i) {
+      // TODO open a new HFile.Reader w/o block cache.
+      scanners[i] = new StoreFileScanner(filesToCompact.get(i).getReader().getScanner());
+    }
+
+    InternalScanner scanner;
+    if (majorCompaction) {
+      Scan scan = new Scan();
+      scan.setMaxVersions(family.getMaxVersions());
+      // TODO pass in the scanners/store files.
+      scanner = new StoreScanner(this, scan, null);
+    } else {
+      scanner = new MinorCompactingStoreScanner(this, scanners);
     }
 
-    long now = System.currentTimeMillis();
-    int timesSeen = 0;
-    KeyValue lastSeen = KeyValue.LOWESTKEY;
-    KeyValue lastDelete = null;
-    int maxVersions = family.getMaxVersions();
-    while (numDone < done.length) {
-      // Get lowest key in all store files.
-      int lowestKey = getLowestKey(rdrs, kvs, done);
-      KeyValue kv = kvs[lowestKey];
-      // If its same row and column as last key, increment times seen.
-      if (this.comparator.matchingRowColumn(lastSeen, kv)) {
-        timesSeen++;
-        // Reset last delete if not exact timestamp -- lastDelete only stops
-        // exactly the same key making it out to the compacted store file.
-        if (lastDelete != null &&
-            lastDelete.getTimestamp() != kv.getTimestamp()) {
-          lastDelete = null;
-        }
-      } else {
-        timesSeen = 1;
-        lastDelete = null;
-      }
-
-      // Don't write empty rows or columns. Only remove cells on major
-      // compaction. Remove if expired or > VERSIONS
-      if (kv.nonNullRowAndColumn()) {
-        if (!majorCompaction) {
-          // Write out all values if not a major compaction.
-          compactedOut.append(kv);
-        } else {
-          boolean expired = false;
-          boolean deleted = false;
-          if (timesSeen <= maxVersions && !(expired = isExpired(kv, ttl, now))) {
-            // If this value key is same as a deleted key, skip
-            if (lastDelete != null &&
-                this.comparatorIgnoringType.compare(kv, lastDelete) == 0) {
-              deleted = true;
-            } else if (kv.isDeleteType()) {
-              // If a deleted value, skip
-              deleted = true;
-              lastDelete = kv;
-            } else {
-              compactedOut.append(kv);
-            }
-          }
-          if (expired || deleted) {
-            // HBASE-855 remove one from timesSeen because it did not make it
-            // past expired check -- don't count against max versions.
-            timesSeen--;
-          }
-        }
-      }
-
-      // Update last-seen items
-      lastSeen = kv;
-
-      // Advance the smallest key. If that reader's all finished, then
-      // mark it as done.
-      if (!rdrs[lowestKey].next()) {
-        done[lowestKey] = true;
-        rdrs[lowestKey] = null;
-        numDone++;
-      } else {
-        kvs[lowestKey] = rdrs[lowestKey].getKeyValue();
+    // since scanner.next() can return 'false' but still be delivering data,
+    // we have to use a do/while loop.
+    ArrayList<KeyValue> row = new ArrayList<KeyValue>();
+    boolean more = true;
+    while ( more ) {
+      more = scanner.next(row);
+      // output to writer:
+      for (KeyValue kv : row) {
+        writer.append(kv);
       }
+      row.clear();
     }
+
+    scanner.close();
   }
 
   /*
@@ -1007,321 +935,25 @@
   // Accessors.
   // (This is the only section that is directly useful!)
   //////////////////////////////////////////////////////////////////////////////
-  
   /**
-   * Return all the available columns for the given key.  The key indicates a 
-   * row and timestamp, but not a column name.
-   *
-   * The returned object should map column names to Cells.
-   * @param key -  Where to start searching.  Specifies a row.
-   * Columns are specified in following arguments.
-   * @param columns Can be null which means get all
-   * @param columnPattern Can be null.
-   * @param numVersions
-   * @param versionsCounter Can be null.
-   * @param keyvalues
-   * @param now -  Where to start searching.  Specifies a timestamp.
-   * @throws IOException
+   * @return the number of files in this store
    */
-  public void getFull(KeyValue key, final NavigableSet<byte []> columns,
-      final Pattern columnPattern,
-      final int numVersions, Map<KeyValue, HRegion.Counter> versionsCounter,
-      List<KeyValue> keyvalues, final long now)
-  throws IOException {
-    // if the key is null, we're not even looking for anything. return.
-    if (key == null) {
-      return;
-    }
-    int versions = versionsToReturn(numVersions);
-    NavigableSet<KeyValue> deletes =
-      new TreeSet<KeyValue>(this.comparatorIgnoringType);
-    // Create a Map that has results by column so we can keep count of versions.
-    // It duplicates columns but doing check of columns, we don't want to make
-    // column set each time.
-    this.lock.readLock().lock();
-    try {
-      // get from the memcache first.
-      if (this.memcache.getFull(key, columns, columnPattern, versions,
-          versionsCounter, deletes, keyvalues, now)) {
-        // May have gotten enough results, enough to return.
-        return;
-      }
-      Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
-          i.hasNext();) {
-        if (getFullFromStoreFile(i.next().getValue(), key, columns,
-            columnPattern, versions, versionsCounter, deletes, keyvalues)) {
-          return;
-        }
-      }
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /*
-   * @param f
-   * @param key Where to start searching.  Specifies a row and timestamp.
-   * Columns are specified in following arguments.
-   * @param columns
-   * @param versions
-   * @param versionCounter
-   * @param deletes
-   * @param keyvalues
-   * @return True if we found enough results to satisfy the <code>versions</code>
-   * and <code>columns</code> passed.
-   * @throws IOException
-   */
-  private boolean getFullFromStoreFile(StoreFile f, KeyValue target, 
-    Set<byte []> columns, final Pattern columnPattern, int versions, 
-    Map<KeyValue, HRegion.Counter> versionCounter,
-    NavigableSet<KeyValue> deletes,
-    List<KeyValue> keyvalues) 
-  throws IOException {
-    long now = System.currentTimeMillis();
-    HFileScanner scanner = f.getReader().getScanner();
-    if (!getClosest(scanner, target)) {
-      return false;
-    }
-    boolean hasEnough = false;
-    do {
-      KeyValue kv = scanner.getKeyValue();
-      // Make sure we have not passed out the row.  If target key has a
-      // column on it, then we are looking explicit key+column combination.  If
-      // we've passed it out, also break.
-      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
-          !this.comparator.matchingRowColumn(target, kv)) {
-        break;
-      }
-      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
-        continue;
-      }
-      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
-          this.ttl, keyvalues, null)) {
-        hasEnough = true;
-        break;
-      }
-    } while (scanner.next());
-    return hasEnough;
-  }
-
-  /**
-   * Code shared by {@link Memcache#getFull(KeyValue, NavigableSet, Pattern, int, Map, NavigableSet, List, long)}
-   * and {@link #getFullFromStoreFile(StoreFile, KeyValue, Set, Pattern, int, Map, NavigableSet, List)}
-   * @param c
-   * @param target
-   * @param candidate
-   * @param columns
-   * @param columnPattern
-   * @return True if <code>candidate</code> matches column and timestamp.
-   */
-  static boolean getFullCheck(final KeyValue.KVComparator c,
-      final KeyValue target, final KeyValue candidate,
-      final Set<byte []> columns, final Pattern columnPattern) {
-    // Does column match?
-    if (!Store.matchingColumns(candidate, columns)) {
-      return false;
-    }
-    // if the column pattern is not null, we use it for column matching.
-    // we will skip the keys whose column doesn't match the pattern.
-    if (columnPattern != null) {
-      if (!(columnPattern.matcher(candidate.getColumnString()).matches())) {
-        return false;
-      }
-    }
-    if (c.compareTimestamps(target, candidate) > 0)  {
-      return false;
-    }
-    return true; 
+  public int getNumberOfstorefiles() {
+    return this.storefiles.size();
   }
+  
 
   /*
    * @param wantedVersions How many versions were asked for.
    * @return wantedVersions or this families' VERSIONS.
    */
-  private int versionsToReturn(final int wantedVersions) {
+  int versionsToReturn(final int wantedVersions) {
     if (wantedVersions <= 0) {
       throw new IllegalArgumentException("Number of versions must be > 0");
     }
     // Make sure we do not return more than maximum versions for this store.
     int maxVersions = this.family.getMaxVersions();
-    return wantedVersions > maxVersions &&
-      wantedVersions != HConstants.ALL_VERSIONS? maxVersions: wantedVersions;
-  }
-  
-  /**
-   * Get the value for the indicated HStoreKey.  Grab the target value and the 
-   * previous <code>numVersions - 1</code> values, as well.
-   *
-   * Use {@link HConstants.ALL_VERSIONS} to retrieve all versions.
-   * @param key
-   * @param numVersions Number of versions to fetch.  Must be > 0.
-   * @return values for the specified versions
-   * @throws IOException
-   */
-  List<KeyValue> get(final KeyValue key, final int numVersions)
-  throws IOException {
-    // This code below is very close to the body of the getKeys method.  Any 
-    // changes in the flow below should also probably be done in getKeys.
-    // TODO: Refactor so same code used.
-    long now = System.currentTimeMillis();
-    int versions = versionsToReturn(numVersions);
-    // Keep a list of deleted cell keys.  We need this because as we go through
-    // the memcache and store files, the cell with the delete marker may be
-    // in one store and the old non-delete cell value in a later store.
-    // If we don't keep around the fact that the cell was deleted in a newer
-    // record, we end up returning the old value if user is asking for more
-    // than one version. This List of deletes should not be large since we
-    // are only keeping rows and columns that match those set on the get and
-    // which have delete values.  If memory usage becomes an issue, could
-    // redo as bloom filter.  Use sorted set because test for membership should
-    // be faster than calculating a hash.  Use a comparator that ignores ts.
-    NavigableSet<KeyValue> deletes =
-      new TreeSet<KeyValue>(this.comparatorIgnoringType);
-    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-    this.lock.readLock().lock();
-    try {
-      // Check the memcache
-      if (this.memcache.get(key, versions, keyvalues, deletes, now)) {
-        return keyvalues;
-      }
-      Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      boolean hasEnough = false;
-      for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
-        StoreFile f = e.getValue();
-        HFileScanner scanner = f.getReader().getScanner();
-        if (!getClosest(scanner, key)) {
-          // Move to next file.
-          continue;
-        }
-        do {
-          KeyValue kv = scanner.getKeyValue();
-          // Make sure below matches what happens up in Memcache#get.
-          if (this.comparator.matchingRowColumn(kv, key)) {
-            if (doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues, null)) {
-              hasEnough = true;
-              break;
-            }
-          } else {
-            // Row and column don't match. Must have gone past. Move to next file.
-            break;
-          }
-        } while (scanner.next());
-        if (hasEnough) {
-          break; // Break out of files loop.
-        }
-      }
-      return keyvalues.isEmpty()? null: keyvalues;
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /*
-   * Small method to check if we are over the max number of versions
-   * or we acheived this family max versions. 
-   * The later happens when we have the situation described in HBASE-621.
-   * @param versions
-   * @param c
-   * @return 
-   */
-  static boolean hasEnoughVersions(final int versions, final List<KeyValue> c) {
-    return versions > 0 && !c.isEmpty() && c.size() >= versions;
-  }
-
-  /*
-   * Used when doing getFulls.
-   * @param kv
-   * @param versions
-   * @param versionCounter
-   * @param columns
-   * @param deletes
-   * @param now
-   * @param ttl
-   * @param keyvalues
-   * @param set
-   * @return True if enough versions.
-   */
-  static boolean doKeyValue(final KeyValue kv,
-      final int versions,
-      final Map<KeyValue, Counter> versionCounter,
-      final Set<byte []> columns,
-      final NavigableSet<KeyValue> deletes,
-      final long now, 
-      final long ttl,
-      final List<KeyValue> keyvalues,
-      final SortedSet<KeyValue> set) {
-    boolean hasEnough = false;
-    if (kv.isDeleteType()) {
-      if (!deletes.contains(kv)) {
-        deletes.add(kv);
-      }
-    } else if (!deletes.contains(kv)) {
-      // Skip expired cells
-      if (!isExpired(kv, ttl, now)) {
-        if (HRegion.okToAddResult(kv, versions, versionCounter)) {
-          HRegion.addResult(kv, versionCounter, keyvalues);
-          if (HRegion.hasEnoughVersions(versions, versionCounter, columns)) {
-            hasEnough = true;
-          }
-        }
-      } else {
-        // Remove the expired.
-        Store.expiredOrDeleted(set, kv);
-      }
-    }
-    return hasEnough;
-  }
-
-  /*
-   * Used when doing get.
-   * @param kv
-   * @param versions
-   * @param deletes
-   * @param now
-   * @param ttl
-   * @param keyvalues
-   * @param set
-   * @return True if enough versions.
-   */
-  static boolean doKeyValue(final KeyValue kv, final int versions,
-      final NavigableSet<KeyValue> deletes,
-      final long now,  final long ttl,
-      final List<KeyValue> keyvalues, final SortedSet<KeyValue> set) {
-    boolean hasEnough = false;
-    if (!kv.isDeleteType()) {
-      // Filter out expired results
-      if (notExpiredAndNotInDeletes(ttl, kv, now, deletes)) {
-        if (!keyvalues.contains(kv)) {
-          keyvalues.add(kv);
-          if (hasEnoughVersions(versions, keyvalues)) {
-            hasEnough = true;
-          }
-        }
-      } else {
-        if (set != null) {
-          expiredOrDeleted(set, kv);
-        }
-      }
-    } else {
-      // Cell holds a delete value.
-      deletes.add(kv);
-    }
-    return hasEnough;
-  }
-
-  /*
-   * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
-   * has an empty column, then it just tests row equivalence. Otherwise, it uses
-   * HStoreKey.matchesRowCol().
-   * @param c Comparator to use.
-   * @param origin Key we're testing against
-   * @param target Key we're testing
-   */
-  static boolean matchingRowColumn(final KeyValue.KVComparator c,
-      final KeyValue origin, final KeyValue target) {
-    return origin.isEmptyColumn()? c.matchingRows(target, origin):
-      c.matchingRowColumn(target, origin);
+    return wantedVersions > maxVersions ? maxVersions: wantedVersions;
   }
 
   static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
@@ -1411,13 +1043,12 @@
    */
   static boolean notExpiredAndNotInDeletes(final long ttl,
       final KeyValue key, final long now, final Set<KeyValue> deletes) {
-    return !isExpired(key, ttl, now) && (deletes == null || deletes.isEmpty() ||
+    return !isExpired(key, now-ttl) && (deletes == null || deletes.isEmpty() ||
         !deletes.contains(key));
   }
 
-  static boolean isExpired(final KeyValue key, final long ttl,
-      final long now) {
-    return ttl != HConstants.FOREVER && now > key.getTimestamp() + ttl;
+  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
+    return key.getTimestamp() < oldestTimestamp;
   }
 
   /* Find a candidate for row that is at or before passed key, searchkey, in hfile.
@@ -1693,13 +1324,12 @@
   /**
    * Return a scanner for both the memcache and the HStore files
    */
-  protected InternalScanner getScanner(long timestamp,
-      final NavigableSet<byte []> targetCols,
-      byte [] firstRow, RowFilterInterface filter)
+  protected KeyValueScanner getScanner(Scan scan,
+      final NavigableSet<byte []> targetCols)
   throws IOException {
     lock.readLock().lock();
     try {
-      return new StoreScanner(this, targetCols, firstRow, timestamp, filter);
+      return new StoreScanner(this, scan, targetCols);
     } finally {
       lock.readLock().unlock();
     }
@@ -1722,7 +1352,7 @@
    * @throws IOException if there was a problem getting file sizes from the
    * filesystem
    */
-  long getStorefilesIndexSize() throws IOException {
+  long getStorefilesIndexSize() {
     long size = 0;
     for (StoreFile s: storefiles.values())
       size += s.getReader().indexSize();
@@ -1805,4 +1435,114 @@
     }
     return false;
   }
+  
+  //
+  // HBASE-880/1249/1304
+  //
+  
+  /**
+   * Retrieve results from this store given the specified Get parameters.
+   * @param get Get operation
+   * @param columns List of columns to match, can be empty (not null)
+   * @param result List to add results to 
+   * @throws IOException
+   */
+  public void get(Get get, NavigableSet<byte[]> columns, List<KeyValue> result) 
+  throws IOException {
+    KeyComparator keyComparator = this.comparator.getRawComparator();
+
+    // Column matching and version enforcement
+    QueryMatcher matcher = new QueryMatcher(get, get.getRow(), 
+        this.family.getName(), columns, this.ttl, keyComparator,
+        versionsToReturn(get.getMaxVersions()));
+    
+    // Read from Memcache
+    if(this.memcache.get(matcher, result)) {
+      // Received early-out from memcache
+      return;
+    }
+    
+    // Check if we even have storefiles
+    if(this.storefiles.isEmpty()) {
+      return;
+    }
+    
+    // Get storefiles for this store
+    List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
+    for(StoreFile sf : this.storefiles.descendingMap().values()) {
+      storefileScanners.add(sf.getReader().getScanner());
+    }
+    
+    // StoreFileGetScan will handle reading this store's storefiles
+    StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
+    
+    // Run a GET scan and put results into the specified list 
+    scanner.get(result);
+  }
+  
+  /**
+   * Increments the value for the given row/family/qualifier
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param amount
+   * @return
+   * @throws IOException
+   */
+  public long incrementColumnValue(byte [] row, byte [] family,
+      byte [] qualifier, long amount) throws IOException{
+    long value = 0;
+    List<KeyValue> result = new ArrayList<KeyValue>();
+    KeyComparator keyComparator = this.comparator.getRawComparator();
+
+    // Setting up the QueryMatcher
+    Get get = new Get(row);
+    NavigableSet<byte[]> qualifiers = 
+      new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    qualifiers.add(qualifier);
+    QueryMatcher matcher = new QueryMatcher(get, row, family, qualifiers,
+        this.ttl, keyComparator, 1);
+    
+    // Read from Memcache
+    if(this.memcache.get(matcher, result)) {
+      // Received early-out from memcache
+      KeyValue kv = result.get(0);
+      byte [] buffer = kv.getBuffer();
+      int valueOffset = kv.getValueOffset();
+      value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
+      Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0, 
+          Bytes.SIZEOF_LONG);
+      return value;
+    }
+    
+    // Check if we even have storefiles
+    if(this.storefiles.isEmpty()) {
+      return addNewKeyValue(row, family, qualifier, value, amount);
+    }
+    
+    // Get storefiles for this store
+    List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
+    for(StoreFile sf : this.storefiles.descendingMap().values()) {
+      storefileScanners.add(sf.getReader().getScanner());
+    }
+    
+    // StoreFileGetScan will handle reading this store's storefiles
+    StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
+    
+    // Run a GET scan and put results into the specified list 
+    scanner.get(result);
+    if(result.size() > 0) {
+      value = Bytes.toLong(result.get(0).getValue());
+    }
+    return addNewKeyValue(row, family, qualifier, value, amount);
+  }
+  
+  private long addNewKeyValue(byte [] row, byte [] family, byte [] qualifier, 
+      long value, long amount) {
+    long newValue = value + amount;
+    KeyValue newKv = new KeyValue(row, family, qualifier, Bytes.toBytes(newValue));
+    add(newKv);
+    return newValue;
+  }
+  
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sat Jun  6 01:26:21 2009
@@ -40,10 +40,7 @@
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
-import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Hash;
-import org.apache.hadoop.io.RawComparator;
 
 /**
  * A Store data file.  Stores usually have one or more of these files.  They
@@ -58,7 +55,7 @@
 public class StoreFile implements HConstants {
   static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
-  public static final String HFILE_CACHE_SIZE_KEY = "hfile.block.cache.size";
+  private static final String HFILE_CACHE_SIZE_KEY = "hfile.block.cache.size";
 
   private static BlockCache hfileBlockCache = null;
   
@@ -100,14 +97,15 @@
   private final HBaseConfiguration conf;
 
   /**
-   * Constructor, loads a reader and it's indices, etc. May allocate a substantial
-   * amount of ram depending on the underlying files (10-20MB?).
+   * Constructor, loads a reader and it's indices, etc. May allocate a 
+   * substantial amount of ram depending on the underlying files (10-20MB?).
    * @param fs
    * @param p
    * @param conf
    * @throws IOException
    */
-  StoreFile(final FileSystem fs, final Path p, final HBaseConfiguration conf) throws IOException {
+  StoreFile(final FileSystem fs, final Path p, final HBaseConfiguration conf) 
+  throws IOException {
     this.conf = conf;
     this.fs = fs;
     this.path = p;
@@ -208,6 +206,11 @@
     return this.sequenceid;
   }
 
+  /**
+   * 
+   * @param conf
+   * @return
+   */
   public static synchronized BlockCache getBlockCache(HBaseConfiguration conf) {
     if (hfileBlockCache != null)
       return hfileBlockCache;
@@ -221,6 +224,9 @@
     return hfileBlockCache;
   }
 
+  /**
+   * @return the blockcache
+   */
   public BlockCache getBlockCache() {
     return getBlockCache(conf);
   }
@@ -237,8 +243,8 @@
       throw new IllegalAccessError("Already open");
     }
     if (isReference()) {
-      this.reader = new HalfHFileReader(this.fs, this.referencePath, getBlockCache(),
-        this.reference);
+      this.reader = new HalfHFileReader(this.fs, this.referencePath, 
+          getBlockCache(), this.reference);
     } else {
       this.reader = new StoreFileReader(this.fs, this.path, getBlockCache());
     }
@@ -276,6 +282,13 @@
    * Override to add some customization on HFile.Reader
    */
   static class StoreFileReader extends HFile.Reader {
+    /**
+     * 
+     * @param fs
+     * @param path
+     * @param cache
+     * @throws IOException
+     */
     public StoreFileReader(FileSystem fs, Path path, BlockCache cache)
         throws IOException {
       super(fs, path, cache);
@@ -296,6 +309,14 @@
    * Override to add some customization on HalfHFileReader.
    */
   static class HalfStoreFileReader extends HalfHFileReader {
+    /**
+     * 
+     * @param fs
+     * @param p
+     * @param c
+     * @param r
+     * @throws IOException
+     */
     public HalfStoreFileReader(FileSystem fs, Path p, BlockCache c, Reference r)
         throws IOException {
       super(fs, p, c, r);
@@ -445,7 +466,6 @@
    * @param dir
    * @param suffix
    * @return Path to a file that doesn't exist at time of this invocation.
-   * @return
    * @throws IOException
    */
   static Path getRandomFilename(final FileSystem fs, final Path dir,
@@ -465,8 +485,8 @@
    * Write file metadata.
    * Call before you call close on the passed <code>w</code> since its written
    * as metadata to that file.
-   *
-   * @param w
+   * 
+   * @param w hfile writer
    * @param maxSequenceId Maximum sequence id.
    * @throws IOException
    */

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileGetScan.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+
+/**
+ * Use to execute a get by scanning all the store files in order.
+ */
+public class StoreFileGetScan {
+
+  private List<HFileScanner> scanners;
+  private QueryMatcher matcher;
+
+  private KeyValue startKey;
+  
+  /**
+   * Constructor
+   * @param scanners
+   * @param matcher
+   */
+  public StoreFileGetScan(List<HFileScanner> scanners, QueryMatcher matcher) {
+    this.scanners = scanners;
+    this.matcher = matcher;
+    this.startKey = matcher.getStartKey();
+  }
+
+  /**
+   * Performs a GET operation across multiple StoreFiles.
+   * <p>
+   * This style of StoreFile scanning goes through each
+   * StoreFile in its entirety, most recent first, before
+   * proceeding to the next StoreFile.
+   * <p>
+   * This strategy allows for optimal, stateless (no persisted Scanners)
+   * early-out scenarios.    
+   * @param result List to add results to
+   * @throws IOException
+   */
+  public void get(List<KeyValue> result) throws IOException {
+    for(HFileScanner scanner : this.scanners) {
+      this.matcher.update();
+      if(getStoreFile(scanner, result) || matcher.isDone()) {
+        return;
+      }
+    }
+  }
+  
+  /**
+   * Performs a GET operation on a single StoreFile.
+   * @param scanner
+   * @param result
+   * @return true if done with this store, false if must continue to next
+   * @throws IOException 
+   */
+  public boolean getStoreFile(HFileScanner scanner, List<KeyValue> result) 
+  throws IOException {
+    if(scanner.seekTo(startKey.getBuffer(), startKey.getKeyOffset(),
+        startKey.getKeyLength()) == -1) {
+      // No keys in StoreFile at or after specified startKey
+      // First row may be = our row, so we have to check anyways.
+      byte [] firstKey = scanner.getReader().getFirstKey();
+      short rowLen = Bytes.toShort(firstKey, 0, Bytes.SIZEOF_SHORT);
+      int rowOffset = Bytes.SIZEOF_SHORT;
+      if (this.matcher.rowComparator.compareRows(firstKey, rowOffset, rowLen,
+          startKey.getBuffer(), startKey.getRowOffset(), startKey.getRowLength())
+          != 0)
+        return false;
+      scanner.seekTo();
+    }
+    do {
+      KeyValue kv = scanner.getKeyValue();
+      switch(matcher.match(kv)) {
+        case INCLUDE:
+          result.add(kv);
+          break;
+        case SKIP:
+          break;
+        case NEXT:
+          return false;
+        case DONE:
+          return true;
+      }
+    } while(scanner.next());
+    return false;
+  }
+  
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sat Jun  6 01:26:21 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2008 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,306 +21,81 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 
 /**
- * A scanner that iterates through HStore files
+ * A KeyValue scanner that iterates over a single HFile
  */
-class StoreFileScanner extends HAbstractScanner
-implements ChangedReadersObserver {
-    // Keys retrieved from the sources
-  private volatile KeyValue keys[];
+class StoreFileScanner implements KeyValueScanner {
   
-  // Readers we go against.
-  private volatile HFileScanner [] scanners;
+  private HFileScanner hfs;
+  private KeyValue cur = null;
   
-  // Store this scanner came out of.
-  private final Store store;
-  
-  // Used around replacement of Readers if they change while we're scanning.
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-  private final long now = System.currentTimeMillis();
-
   /**
-   * @param store
-   * @param timestamp
-   * @param columns
-   * @param firstRow
-   * @param deletes Set of running deletes
-   * @throws IOException
+   * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
+   * @param hfs HFile scanner
    */
-  public StoreFileScanner(final Store store, final long timestamp,
-    final NavigableSet<byte []> columns, final byte [] firstRow)
-  throws IOException {
-    super(timestamp, columns);
-    this.store = store;
-    this.store.addChangedReaderObserver(this);
-    try {
-      openScanner(firstRow);
-    } catch (Exception ex) {
-      close();
-      IOException e = new IOException("HStoreScanner failed construction");
-      e.initCause(ex);
-      throw e;
-    }
+  public StoreFileScanner(HFileScanner hfs) {
+    this.hfs = hfs;
   }
-
-  /*
-   * Go open new scanners and cue them at <code>firstRow</code>.
-   * Closes existing Readers if any.
-   * @param firstRow
-   * @throws IOException
-   */
-  private void openScanner(final byte [] firstRow) throws IOException {
-    List<HFileScanner> s =
-      new ArrayList<HFileScanner>(this.store.getStorefiles().size());
-    Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
-    for (StoreFile f: map.values()) {
-       s.add(f.getReader().getScanner());
-    }
-    this.scanners = s.toArray(new HFileScanner [] {});
-    this.keys = new KeyValue[this.scanners.length];
-    // Advance the readers to the first pos.
-    KeyValue firstKey = (firstRow != null && firstRow.length > 0)?
-      new KeyValue(firstRow, HConstants.LATEST_TIMESTAMP): null;
-    for (int i = 0; i < this.scanners.length; i++) {
-      if (firstKey != null) {
-        if (seekTo(i, firstKey)) {
-          continue;
-        }
-      }
-      while (getNext(i)) {
-        if (columnMatch(i)) {
-          break;
-        }
-      }
-    }
-  }
-
-  /**
-   * For a particular column i, find all the matchers defined for the column.
-   * Compare the column family and column key using the matchers. The first one
-   * that matches returns true. If no matchers are successful, return false.
-   * 
-   * @param i index into the keys array
-   * @return true if any of the matchers for the column match the column family
-   * and the column key.
-   * @throws IOException
-   */
-  boolean columnMatch(int i) throws IOException {
-    return columnMatch(keys[i]);
+  
+  public KeyValue peek() {
+    return cur;
   }
-
-  /**
-   * Get the next set of values for this scanner.
-   * 
-   * @param key The key that matched
-   * @param results All the results for <code>key</code>
-   * @return true if a match was found
-   * @throws IOException
-   * 
-   * @see org.apache.hadoop.hbase.regionserver.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
-   */
-  @Override
-  public boolean next(List<KeyValue> results)
-  throws IOException {
-    if (this.scannerClosed) {
-      return false;
-    }
-    this.lock.readLock().lock();
+  
+  public KeyValue next() {
+    KeyValue retKey = cur;
+    cur = hfs.getKeyValue();
     try {
-      // Find the next viable row label (and timestamp).
-      KeyValue viable = getNextViableRow();
-      if (viable == null) {
-        return false;
-      }
-
-      // Grab all the values that match this row/timestamp
-      boolean addedItem = false;
-      for (int i = 0; i < keys.length; i++) {
-        // Fetch the data
-        while ((keys[i] != null) &&
-            (this.store.comparator.compareRows(this.keys[i], viable) == 0)) {
-          // If we are doing a wild card match or there are multiple matchers
-          // per column, we need to scan all the older versions of this row
-          // to pick up the rest of the family members
-          if(!isWildcardScanner()
-              && !isMultipleMatchScanner()
-              && (keys[i].getTimestamp() != viable.getTimestamp())) {
-            break;
-          }
-          if (columnMatch(i)) {
-            // We only want the first result for any specific family member
-            // TODO: Do we have to keep a running list of column entries in
-            // the results across all of the StoreScanner?  Like we do
-            // doing getFull?
-            if (!results.contains(keys[i])) {
-              results.add(keys[i]);
-              addedItem = true;
-            }
-          }
-
-          if (!getNext(i)) {
-            closeSubScanner(i);
-          }
-        }
-        // Advance the current scanner beyond the chosen row, to
-        // a valid timestamp, so we're ready next time.
-        while ((keys[i] != null) &&
-            ((this.store.comparator.compareRows(this.keys[i], viable) <= 0) ||
-                (keys[i].getTimestamp() > this.timestamp) ||
-                !columnMatch(i))) {
-          getNext(i);
-        }
-      }
-      return addedItem;
-    } finally {
-      this.lock.readLock().unlock();
+      hfs.next();
+    } catch(IOException e) {
+      // Only occurs if the scanner is not seeked, this is never the case
+      // as we seek immediately after construction in StoreScanner
     }
+    return retKey;
   }
-
-  /*
-   * @return An instance of <code>ViableRow</code>
-   * @throws IOException
-   */
-  private KeyValue getNextViableRow() throws IOException {
-    // Find the next viable row label (and timestamp).
-    KeyValue viable = null;
-    long viableTimestamp = -1;
-    long ttl = store.ttl;
-    for (int i = 0; i < keys.length; i++) {
-      // The first key that we find that matches may have a timestamp greater
-      // than the one we're looking for. We have to advance to see if there
-      // is an older version present, since timestamps are sorted descending
-      while (keys[i] != null &&
-          keys[i].getTimestamp() > this.timestamp &&
-          columnMatch(i) &&
-          getNext(i)) {
-        if (columnMatch(i)) {
-          break;
-        }
-      }
-      if((keys[i] != null)
-          // If we get here and keys[i] is not null, we already know that the
-          // column matches and the timestamp of the row is less than or equal
-          // to this.timestamp, so we do not need to test that here
-          && ((viable == null) ||
-            (this.store.comparator.compareRows(this.keys[i], viable) < 0) ||
-            ((this.store.comparator.compareRows(this.keys[i], viable) == 0) &&
-              (keys[i].getTimestamp() > viableTimestamp)))) {
-        if (ttl == HConstants.FOREVER || now < keys[i].getTimestamp() + ttl) {
-          viable = keys[i];
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("getNextViableRow :" + keys[i] + ": expired, skipped");
-          }
-        }
+  
+  public boolean seek(KeyValue key) {
+    try {
+      if(!seekAtOrAfter(hfs, key)) {
+        close();
+        return false;
       }
+      cur = hfs.getKeyValue();
+      hfs.next();
+      return true;
+    } catch(IOException ioe) {
+      close();
+      return false;
     }
-    return viable;
   }
-
-  /*
-   * The user didn't want to start scanning at the first row. This method
-   * seeks to the requested row.
-   *
-   * @param i which iterator to advance
-   * @param firstRow seek to this row
-   * @return true if we found the first row and so the scanner is properly
-   * primed or true if the row was not found and this scanner is exhausted.
-   */
-  private boolean seekTo(int i, final KeyValue firstKey)
-  throws IOException {
-    if (firstKey == null) {
-      if (!this.scanners[i].seekTo()) {
-        closeSubScanner(i);
-        return true;
-      }
-    } else {
-      // TODO: sort columns and pass in column as part of key so we get closer.
-      if (!Store.getClosest(this.scanners[i], firstKey)) {
-        closeSubScanner(i);
-        return true;
-      }
-    }
-    this.keys[i] = this.scanners[i].getKeyValue();
-    return isGoodKey(this.keys[i]);
+  
+  public void close() {
+    // Nothing to close on HFileScanner?
+    cur = null;
   }
-
+  
   /**
-   * Get the next value from the specified reader.
    * 
-   * @param i which reader to fetch next value from
-   * @return true if there is more data available
-   */
-  private boolean getNext(int i) throws IOException {
-    boolean result = false;
-    while (true) {
-      if ((this.scanners[i].isSeeked() && !this.scanners[i].next()) ||
-          (!this.scanners[i].isSeeked() && !this.scanners[i].seekTo())) {
-        closeSubScanner(i);
-        break;
-      }
-      this.keys[i] = this.scanners[i].getKeyValue();
-      if (isGoodKey(this.keys[i])) {
-          result = true;
-          break;
-      }
-    }
-    return result;
-  }
-
-  /*
-   * @param kv
-   * @return True if good key candidate.
+   * @param s
+   * @param k
+   * @return
+   * @throws IOException
    */
-  private boolean isGoodKey(final KeyValue kv) {
-    return !Store.isExpired(kv, this.store.ttl, this.now);
-  }
-
-  /** Close down the indicated reader. */
-  private void closeSubScanner(int i) {
-    this.scanners[i] = null;
-    this.keys[i] = null;
-  }
-
-  /** Shut it down! */
-  public void close() {
-    if (!this.scannerClosed) {
-      this.store.deleteChangedReaderObserver(this);
-      try {
-        for(int i = 0; i < this.scanners.length; i++) {
-          closeSubScanner(i);
-        }
-      } finally {
-        this.scannerClosed = true;
-      }
-    }
-  }
-
-  // Implementation of ChangedReadersObserver
-  
-  public void updateReaders() throws IOException {
-    this.lock.writeLock().lock();
-    try {
-      // The keys are currently lined up at the next row to fetch.  Pass in
-      // the current row as 'first' row and readers will be opened and cue'd
-      // up so future call to next will start here.
-      KeyValue viable = getNextViableRow();
-      openScanner(viable.getRow());
-      LOG.debug("Replaced Scanner Readers at row " +
-        viable.getRow().toString());
-    } finally {
-      this.lock.writeLock().unlock();
+  public static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
+  throws IOException {
+    int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
+    if(result < 0) {
+      // Passed KV is smaller than first KV in file, work from start of file
+      return s.seekTo();
+    } else if(result > 0) {
+      // Passed KV is larger than current KV in file, if there is a next
+      // it is the "after", if not then this scanner is done.
+      return s.next();
     }
+    // Seeked to the exact key
+    return true;
   }
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Jun  6 01:26:21 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2008 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -25,288 +25,238 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 
 /**
- * Scanner scans both the memcache and the HStore
+ * Scanner scans both the memcache and the HStore. Coaleace KeyValue stream
+ * into List<KeyValue> for a single row.
  */
-class StoreScanner implements InternalScanner,  ChangedReadersObserver {
+class StoreScanner implements KeyValueScanner, InternalScanner,
+ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
 
-  private InternalScanner [] scanners;
-  private List<KeyValue> [] resultSets;
-  private boolean wildcardMatch = false;
-  private boolean multipleMatchers = false;
-  private RowFilterInterface dataFilter;
   private Store store;
-  private final long timestamp;
-  private final NavigableSet<byte []> columns;
-  
-  // Indices for memcache scanner and hstorefile scanner.
-  private static final int MEMS_INDEX = 0;
-  private static final int HSFS_INDEX = MEMS_INDEX + 1;
-  
+
+  private ScanQueryMatcher matcher;
+
+  private KeyValueHeap heap;
+
   // Used around transition from no storefile to the first.
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
   private final AtomicBoolean closing = new AtomicBoolean(false);
 
-  /** Create an Scanner with a handle on the memcache and HStore files. */
-  @SuppressWarnings("unchecked")
-  StoreScanner(Store store, final NavigableSet<byte []> targetCols,
-    byte [] firstRow, long timestamp, RowFilterInterface filter) 
-  throws IOException {
+  /**
+   * Opens a scanner across memcache, snapshot, and all StoreFiles.
+   */
+  StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
     this.store = store;
-    this.dataFilter = filter;
-    if (null != dataFilter) {
-      dataFilter.reset();
-    }
-    this.scanners = new InternalScanner[2];
-    this.resultSets = new List[scanners.length];
-    // Save these args in case we need them later handling change in readers
-    // See updateReaders below.
-    this.timestamp = timestamp;
-    this.columns = targetCols;
-    try {
-      scanners[MEMS_INDEX] =
-        store.memcache.getScanner(timestamp, targetCols, firstRow);
-      scanners[HSFS_INDEX] =
-        new StoreFileScanner(store, timestamp, targetCols, firstRow);
-      for (int i = MEMS_INDEX; i < scanners.length; i++) {
-        checkScannerFlags(i);
-      }
-    } catch (IOException e) {
-      doClose();
-      throw e;
-    }
-    
-    // Advance to the first key in each scanner.
-    // All results will match the required column-set and scanTime.
-    for (int i = MEMS_INDEX; i < scanners.length; i++) {
-      setupScanner(i);
+    matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
+        columns, store.ttl, store.comparator.getRawComparator(),
+        store.versionsToReturn(scan.getMaxVersions()));
+
+    List<KeyValueScanner> scanners = getStoreFileScanners();
+    scanners.add(store.memcache.getScanner());
+
+    // Seek all scanners to the initial key
+    for(KeyValueScanner scanner : scanners) {
+      scanner.seek(matcher.getStartKey());
     }
+
+    // Combine all seeked scanners with a heap
+    heap = new KeyValueHeap(
+        scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);
+
     this.store.addChangedReaderObserver(this);
   }
-  
-  /*
-   * @param i Index.
-   */
-  private void checkScannerFlags(final int i) {
-    if (this.scanners[i].isWildcardScanner()) {
-      this.wildcardMatch = true;
-    }
-    if (this.scanners[i].isMultipleMatchScanner()) {
-      this.multipleMatchers = true;
+
+  // Constructor for testing.
+  StoreScanner(Scan scan, byte [] colFamily,
+      long ttl, KeyValue.KVComparator comparator,
+      final NavigableSet<byte[]> columns,
+      KeyValueScanner [] scanners) {
+    this.store = null;
+    this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, 
+        comparator.getRawComparator(), scan.getMaxVersions());
+
+    // Seek all scanners to the initial key
+    for(KeyValueScanner scanner : scanners) {
+      scanner.seek(matcher.getStartKey());
     }
+
+    heap = new KeyValueHeap(
+        scanners, comparator);
   }
-  
-  /*
-   * Do scanner setup.
-   * @param i
-   * @throws IOException
-   */
-  private void setupScanner(final int i) throws IOException {
-    this.resultSets[i] = new ArrayList<KeyValue>();
-    if (this.scanners[i] != null && !this.scanners[i].next(this.resultSets[i])) {
-      closeScanner(i);
-    }
+
+  public KeyValue peek() {
+    return this.heap.peek();
   }
 
-  /** @return true if the scanner is a wild card scanner */
-  public boolean isWildcardScanner() {
-    return this.wildcardMatch;
+  public KeyValue next() {
+    // throw runtime exception perhaps?
+    throw new RuntimeException("Never call StoreScanner.next()");
   }
 
-  /** @return true if the scanner is a multiple match scanner */
-  public boolean isMultipleMatchScanner() {
-    return this.multipleMatchers;
+  public void close() {
+    this.closing.set(true);
+    // under test, we dont have a this.store
+    if (this.store != null)
+      this.store.deleteChangedReaderObserver(this);
+    this.heap.close();
   }
 
-  public boolean next(List<KeyValue> results)
-  throws IOException {
-    this.lock.readLock().lock();
-    try {
-    // Filtered flag is set by filters.  If a cell has been 'filtered out'
-    // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
-    boolean filtered = true;
-    boolean moreToFollow = true;
-    while (filtered && moreToFollow) {
-      // Find the lowest-possible key.
-      KeyValue chosen = null;
-      long chosenTimestamp = -1;
-      for (int i = 0; i < this.scanners.length; i++) {
-        KeyValue kv = this.resultSets[i] == null || this.resultSets[i].isEmpty()?
-          null: this.resultSets[i].get(0);
-        if (kv == null) {
+  public boolean seek(KeyValue key) {
+
+    return this.heap.seek(key);
+  }
+
+  /**
+   * Get the next row of values from this Store.
+   * @param result
+   * @return true if there are more rows, false if scanner is done
+   */
+  public boolean next(List<KeyValue> result) throws IOException {
+    // this wont get us the next row if the previous round hasn't iterated
+    // past all the cols from the previous row. Potential bug!
+    KeyValue peeked = this.heap.peek();
+    if (peeked == null) {
+      close();
+      return false;
+    }
+    matcher.setRow(peeked.getRow());
+    KeyValue kv;
+    while((kv = this.heap.peek()) != null) {
+      QueryMatcher.MatchCode mc = matcher.match(kv);
+      switch(mc) {
+        case INCLUDE:
+          KeyValue next = this.heap.next();
+          result.add(next);
           continue;
-        }
-        if (scanners[i] != null &&
-            (chosen == null ||
-              (this.store.comparator.compareRows(kv, chosen) < 0) ||
-              ((this.store.comparator.compareRows(kv, chosen) == 0) &&
-              (kv.getTimestamp() > chosenTimestamp)))) {
-          chosen = kv;
-          chosenTimestamp = chosen.getTimestamp();
-        }
-      }
+        case DONE:
+          // what happens if we have 0 results?
+          if (result.isEmpty()) {
+            // try the next one.
+            matcher.setRow(this.heap.peek().getRow());
+            continue;
+          }
+          if (matcher.filterEntireRow()) {
+            // wow, well, um, reset the result and continue.
+            result.clear();
+            matcher.setRow(heap.peek().getRow());
+            continue;
+          }
+
+          return true;
 
-      // Filter whole row by row key?
-      filtered = dataFilter == null || chosen == null? false:
-        dataFilter.filterRowKey(chosen.getBuffer(), chosen.getRowOffset(),
-          chosen.getRowLength());
-
-      // Store results for each sub-scanner.
-      if (chosenTimestamp >= 0 && !filtered) {
-        NavigableSet<KeyValue> deletes =
-          new TreeSet<KeyValue>(this.store.comparatorIgnoringType);
-        for (int i = 0; i < scanners.length && !filtered; i++) {
-          if ((scanners[i] != null && !filtered && moreToFollow &&
-              this.resultSets[i] != null && !this.resultSets[i].isEmpty())) {
-            // Test this resultset is for the 'chosen' row.
-            KeyValue firstkv = resultSets[i].get(0);
-            if (!this.store.comparator.matchingRows(firstkv, chosen)) {
-              continue;
-            }
-            // Its for the 'chosen' row, work it.
-            for (KeyValue kv: resultSets[i]) {
-              if (kv.isDeleteType()) {
-                deletes.add(kv);
-              } else if ((deletes.isEmpty() || !deletes.contains(kv)) &&
-                  !filtered && moreToFollow && !results.contains(kv)) {
-                if (this.dataFilter != null) {
-                  // Filter whole row by column data?
-                  int rowlength = kv.getRowLength();
-                  int columnoffset = kv.getColumnOffset(rowlength);
-                  filtered = dataFilter.filterColumn(kv.getBuffer(),
-                      kv.getRowOffset(), rowlength,
-                    kv.getBuffer(), columnoffset, kv.getColumnLength(columnoffset),
-                    kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
-                  if (filtered) {
-                    results.clear();
-                    break;
-                  }
-                }
-                results.add(kv);
-                /* REMOVING BECAUSE COULD BE BUNCH OF DELETES IN RESULTS
-                   AND WE WANT TO INCLUDE THEM -- below short-circuit is
-                   probably not wanted.
-                // If we are doing a wild card match or there are multiple
-                // matchers per column, we need to scan all the older versions of 
-                // this row to pick up the rest of the family members
-                if (!wildcardMatch && !multipleMatchers &&
-                    (kv.getTimestamp() != chosenTimestamp)) {
-                  break;
-                }
-                */
-              }
-            }
-            // Move on to next row.
-            resultSets[i].clear();
-            if (!scanners[i].next(resultSets[i])) {
-              closeScanner(i);
-            }
+        case DONE_SCAN:
+          close();
+          return false;
+
+        case SEEK_NEXT_ROW:
+          // TODO see comments in SEEK_NEXT_COL
+          /*
+          KeyValue rowToSeek =
+              new KeyValue(kv.getRow(),
+                  0,
+                  KeyValue.Type.Minimum);
+          heap.seek(rowToSeek);
+           */
+          heap.next();
+          break;
+
+        case SEEK_NEXT_COL:
+          // TODO hfile needs 'hinted' seeking to prevent it from
+          // reseeking from the start of the block on every dang seek.
+          // We need that API and expose it the scanner chain.
+          /*
+          ColumnCount hint = matcher.getSeekColumn();
+          KeyValue colToSeek;
+          if (hint == null) {
+            // seek to the 'last' key on this column, this is defined
+            // as the key with the same row, fam, qualifier,
+            // smallest timestamp, largest type.
+            colToSeek =
+                new KeyValue(kv.getRow(),
+                    kv.getFamily(),
+                    kv.getColumn(),
+                    Long.MIN_VALUE,
+                    KeyValue.Type.Minimum);
+          } else {
+            // This is ugmo.  Move into KeyValue convience method.
+            // First key on a column is:
+            // same row, cf, qualifier, max_timestamp, max_type, no value.
+            colToSeek =
+                new KeyValue(kv.getRow(),
+                    0,
+                    kv.getRow().length,
+
+                    kv.getFamily(),
+                    0,
+                    kv.getFamily().length,
+
+                    hint.getBuffer(),
+                    hint.getOffset(),
+                    hint.getLength(),
+
+                    Long.MAX_VALUE,
+                    KeyValue.Type.Maximum,
+                    null,
+                    0,
+                    0);
           }
-        }
-      }
+          heap.seek(colToSeek);
+           */
 
-      moreToFollow = chosenTimestamp >= 0;
-      if (dataFilter != null) {
-        if (dataFilter.filterAllRemaining()) {
-          moreToFollow = false;
-        }
-      }
+          heap.next();
+          break;
 
-      if (results.isEmpty() && !filtered) {
-        // There were no results found for this row.  Marked it as 
-        // 'filtered'-out otherwise we will not move on to the next row.
-        filtered = true;
-      }
-    }
-    
-    // If we got no results, then there is no more to follow.
-    if (results == null || results.isEmpty()) {
-      moreToFollow = false;
-    }
-    
-    // Make sure scanners closed if no more results
-    if (!moreToFollow) {
-      for (int i = 0; i < scanners.length; i++) {
-        if (null != scanners[i]) {
-          closeScanner(i);
-        }
+        case SKIP:
+          this.heap.next();
+          break;
       }
     }
-    
-    return moreToFollow;
-    } finally {
-      this.lock.readLock().unlock();
+    if(result.size() > 0) {
+      return true;
     }
+    // No more keys
+    close();
+    return false;
   }
 
-  /** Shut down a single scanner */
-  void closeScanner(int i) {
-    try {
-      try {
-        scanners[i].close();
-      } catch (IOException e) {
-        LOG.warn(Bytes.toString(store.storeName) + " failed closing scanner " +
-          i, e);
-      }
-    } finally {
-      scanners[i] = null;
-      resultSets[i] = null;
+  private List<KeyValueScanner> getStoreFileScanners() {
+    List<HFileScanner> s =
+      new ArrayList<HFileScanner>(this.store.getStorefilesCount());
+    Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
+    for(StoreFile sf : map.values()) {
+      s.add(sf.getReader().getScanner());
+    }
+    List<KeyValueScanner> scanners =
+      new ArrayList<KeyValueScanner>(s.size()+1);
+    for(HFileScanner hfs : s) {
+      scanners.add(new StoreFileScanner(hfs));
     }
+    return scanners;
   }
 
-  public void close() {
-    this.closing.set(true);
-    this.store.deleteChangedReaderObserver(this);
-    doClose();
-  }
-  
-  private void doClose() {
-    for (int i = MEMS_INDEX; i < scanners.length; i++) {
-      if (scanners[i] != null) {
-        closeScanner(i);
-      }
-    }
-  }
-  
   // Implementation of ChangedReadersObserver
-  
   public void updateReaders() throws IOException {
     if (this.closing.get()) {
       return;
     }
     this.lock.writeLock().lock();
     try {
-      Map<Long, StoreFile> map = this.store.getStorefiles();
-      if (this.scanners[HSFS_INDEX] == null && map != null && map.size() > 0) {
-        // Presume that we went from no readers to at least one -- need to put
-        // a HStoreScanner in place.
-        try {
-          // I think its safe getting key from mem at this stage -- it shouldn't have
-          // been flushed yet
-          // TODO: MAKE SURE WE UPDATE FROM TRUNNK.
-          this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
-              this.timestamp, this. columns, this.resultSets[MEMS_INDEX].get(0).getRow());
-          checkScannerFlags(HSFS_INDEX);
-          setupScanner(HSFS_INDEX);
-          LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");
-        } catch (IOException e) {
-          doClose();
-          throw e;
-        }
-      }
+      // Could do this pretty nicely with KeyValueHeap, but the existing
+      // implementation of this method only updated if no existing storefiles?
+      // Lets discuss.
+      return;
     } finally {
       this.lock.writeLock().unlock();
     }

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/WildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/WildcardColumnTracker.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/WildcardColumnTracker.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/WildcardColumnTracker.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,314 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is used for the tracking and enforcement of columns and numbers 
+ * of versions during the course of a Get or Scan operation, when all available
+ * column qualifiers have been asked for in the query.
+ * <p>
+ * This class is utilized by {@link QueryMatcher} through two methods:
+ * <ul><li>{@link checkColumn} is called when a Put satisfies all other
+ * conditions of the query.  This method returns a {@link MatchCode} to define
+ * what action should be taken.
+ * <li>{@link update} is called at the end of every StoreFile or Memcache.
+ * <p>
+ * This class is NOT thread-safe as queries are never multi-threaded 
+ */
+public class WildcardColumnTracker implements ColumnTracker {
+  
+  private int maxVersions;
+  
+  protected List<ColumnCount> columns;
+  private int index;
+  private ColumnCount column;
+  
+  private List<ColumnCount> newColumns; 
+  private int newIndex;
+  private ColumnCount newColumn;
+  
+  /**
+   * Default constructor.
+   * @param maxVersions maximum versions to return per columns
+   */
+  public WildcardColumnTracker(int maxVersions) {
+    this.maxVersions = maxVersions;
+    reset();
+  }
+  
+  public void reset() {
+    this.index = 0;
+    this.column = null;
+    this.columns = null;
+    this.newColumns = new ArrayList<ColumnCount>();
+    this.newIndex = 0;
+    this.newColumn = null;
+  }
+  
+  /**
+   * Can never early-out from reading more storefiles in Wildcard case.
+   */
+  public boolean done() {
+    return false;
+  }
+
+  // wildcard scanners never have column hints.
+  public ColumnCount getColumnHint() {
+    return null;
+  }
+
+  /**
+   * Checks against the parameters of the query and the columns which have
+   * already been processed by this query.
+   * @param bytes KeyValue buffer
+   * @param offset offset to the start of the qualifier
+   * @param length length of the qualifier
+   * @return MatchCode telling QueryMatcher what action to take
+   */
+  public MatchCode checkColumn(byte [] bytes, int offset, int length) {
+
+    // Nothing to match against, add to new and include
+    if(this.column == null && this.newColumn == null) {
+      newColumns.add(new ColumnCount(bytes, offset, length, 1));
+      this.newColumn = newColumns.get(newIndex);
+      return MatchCode.INCLUDE;
+    }
+    
+    // Nothing old, compare against new
+    if(this.column == null && this.newColumn != null) {
+      int ret = Bytes.compareTo(newColumn.getBuffer(), newColumn.getOffset(), 
+          newColumn.getLength(), bytes, offset, length);
+      
+      // Same column
+      if(ret == 0) {
+        if(newColumn.increment() > this.maxVersions) {
+          return MatchCode.SKIP;
+        }
+        return MatchCode.INCLUDE;
+      }
+      
+      // Specified column is bigger than current column
+      // Move down current column and check again
+      if(ret <= -1) {
+        if(++newIndex == newColumns.size()) {
+          // No more, add to end and include
+          newColumns.add(new ColumnCount(bytes, offset, length, 1));
+          this.newColumn = newColumns.get(newIndex);
+          return MatchCode.INCLUDE;
+        }
+        this.newColumn = newColumns.get(newIndex);
+        return checkColumn(bytes, offset, length);
+      }
+      
+      // ret >= 1
+      // Specified column is smaller than current column
+      // Nothing to match against, add to new and include
+      newColumns.add(new ColumnCount(bytes, offset, length, 1));
+      this.newColumn = newColumns.get(++newIndex);
+      return MatchCode.INCLUDE;
+    }
+    
+    // Nothing new, compare against old
+    if(this.newColumn == null && this.column != null) {
+      int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), 
+          column.getLength(), bytes, offset, length);
+      
+      // Same column
+      if(ret == 0) {
+        if(column.increment() > this.maxVersions) {
+          return MatchCode.SKIP;
+        }
+        return MatchCode.INCLUDE;
+      }
+      
+      // Specified column is bigger than current column
+      // Move down current column and check again
+      if(ret <= -1) {
+        if(++index == columns.size()) {
+          // No more, add to new and include (new was empty prior to this)
+          newColumns.add(new ColumnCount(bytes, offset, length, 1));
+          this.newColumn = newColumns.get(newIndex);
+          this.column = null;
+          return MatchCode.INCLUDE;
+        }
+        this.column = columns.get(index);
+        return checkColumn(bytes, offset, length);
+      }
+      
+      // ret >= 1
+      // Specified column is smaller than current column
+      // Nothing to match against, add to new and include
+      newColumns.add(new ColumnCount(bytes, offset, length, 1));
+      this.newColumn = newColumns.get(newIndex);
+      return MatchCode.INCLUDE;
+    }
+    
+    
+    // There are new and old, figure which to check first
+    int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), 
+        column.getLength(), newColumn.getBuffer(), newColumn.getOffset(), 
+        newColumn.getLength());
+        
+    // Old is smaller than new, compare against old
+    if(ret <= -1) {
+      ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), 
+          column.getLength(), bytes, offset, length);
+      
+      // Same column
+      if(ret == 0) {
+        if(column.increment() > this.maxVersions) {
+          return MatchCode.SKIP;
+        }
+        return MatchCode.INCLUDE;
+      }
+      
+      // Specified column is bigger than current column
+      // Move down current column and check again
+      if(ret <= -1) {
+        if(++index == columns.size()) {
+          this.column = null;
+        } else {
+          this.column = columns.get(index);
+        }
+        return checkColumn(bytes, offset, length);
+      }
+      
+      // ret >= 1
+      // Specified column is smaller than current column
+      // Nothing to match against, add to new and include
+      newColumns.add(new ColumnCount(bytes, offset, length, 1));
+      return MatchCode.INCLUDE;
+    }
+    
+    // Cannot be equal, so ret >= 1
+    // New is smaller than old, compare against new
+    
+    ret = Bytes.compareTo(newColumn.getBuffer(), newColumn.getOffset(), 
+        newColumn.getLength(), bytes, offset, length);
+    
+    // Same column
+    if(ret == 0) {
+      if(newColumn.increment() > this.maxVersions) {
+        return MatchCode.SKIP;
+      }
+      return MatchCode.INCLUDE;
+    }
+    
+    // Specified column is bigger than current column
+    // Move down current column and check again
+    if(ret <= -1) {
+      if(++newIndex == newColumns.size()) {
+        this.newColumn = null;
+      } else {
+        this.newColumn = newColumns.get(newIndex);
+      }
+      return checkColumn(bytes, offset, length);
+    }
+    
+    // ret >= 1
+    // Specified column is smaller than current column
+    // Nothing to match against, add to new and include
+    newColumns.add(new ColumnCount(bytes, offset, length, 1));
+    return MatchCode.INCLUDE;
+  }
+  
+  /**
+   * Called at the end of every StoreFile or Memcache.
+   */
+  public void update() {
+    // If no previous columns, use new columns and return
+    if(this.columns == null || this.columns.size() == 0) {
+      if(this.newColumns.size() > 0){
+        finalize(newColumns);
+      }
+      return;
+    }
+    
+    // If no new columns, retain previous columns and return
+    if(this.newColumns.size() == 0) {
+      this.index = 0;
+      this.column = this.columns.get(index);
+      return;
+    }
+    
+    // Merge previous columns with new columns
+    // There will be no overlapping
+    List<ColumnCount> mergeColumns = new ArrayList<ColumnCount>(
+        columns.size() + newColumns.size());
+    index = 0;
+    newIndex = 0;
+    column = columns.get(0);
+    newColumn = newColumns.get(0);
+    while(true) {
+      int ret = Bytes.compareTo(
+          column.getBuffer(), column.getOffset(),column.getLength(), 
+          newColumn.getBuffer(), newColumn.getOffset(), newColumn.getLength());
+      
+      // Existing is smaller than new, add existing and iterate it
+      if(ret <= -1) {
+        mergeColumns.add(column);
+        if(++index == columns.size()) {
+          // No more existing left, merge down rest of new and return 
+          mergeDown(mergeColumns, newColumns, newIndex);
+          finalize(mergeColumns);
+          return;
+        }
+        column = columns.get(index);
+        continue;
+      }
+      
+      // New is smaller than existing, add new and iterate it
+      mergeColumns.add(newColumn);
+      if(++newIndex == newColumns.size()) {
+        // No more new left, merge down rest of existing and return
+        mergeDown(mergeColumns, columns, index);
+        finalize(mergeColumns);
+        return;
+      }
+      newColumn = newColumns.get(newIndex);
+      continue;
+    }
+  }
+  
+  private void mergeDown(List<ColumnCount> mergeColumns, 
+      List<ColumnCount> srcColumns, int srcIndex) {
+    int index = srcIndex;
+    while(index < srcColumns.size()) {
+      mergeColumns.add(srcColumns.get(index++));
+    }
+  }
+  
+  private void finalize(List<ColumnCount> mergeColumns) {
+    this.columns = mergeColumns;
+    this.index = 0;
+    this.column = this.columns.size() > 0? columns.get(index) : null;
+    
+    this.newColumns = new ArrayList<ColumnCount>();
+    this.newIndex = 0;
+    this.newColumn = null;
+  }
+  
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java Sat Jun  6 01:26:21 2009
@@ -1,57 +0,0 @@
-/**
- * Copyright 2008 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.transactional;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-
-/**
- * Cleans up committed transactions when they are no longer needed to verify
- * pending transactions.
- */
-class CleanOldTransactionsChore extends Chore {
-
-  private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
-  private static final int DEFAULT_SLEEP = 60 * 1000;
-
-  private final TransactionalRegionServer regionServer;
-
-  /**
-   * @param regionServer
-   * @param stopRequest
-   */
-  public CleanOldTransactionsChore(
-      final TransactionalRegionServer regionServer,
-      final AtomicBoolean stopRequest) {
-    super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP),
-        stopRequest);
-    this.regionServer = regionServer;
-  }
-
-  @Override
-  protected void chore() {
-    for (HRegion region : regionServer.getOnlineRegions()) {
-      ((TransactionalRegion) region).removeUnNeededCommitedTransactions();
-    }
-  }
-
-}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowController.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowController.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowController.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowController.java Sat Jun  6 01:26:21 2009
@@ -24,7 +24,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.rest.descriptors.RowUpdateDescriptor;
@@ -82,7 +84,6 @@
       throws HBaseRestException {
     RowModel innerModel = getModel();
 
-    BatchUpdate b;
     RowUpdateDescriptor rud = parser
         .getRowUpdateDescriptor(input, pathSegments);
 
@@ -92,14 +93,15 @@
       return;
     }
 
-    b = new BatchUpdate(rud.getRowName());
+    Put put = new Put(Bytes.toBytes(rud.getRowName()));
 
     for (byte[] key : rud.getColVals().keySet()) {
-      b.put(key, rud.getColVals().get(key));
+      byte [][] famAndQf = KeyValue.parseColumn(key);
+      put.add(famAndQf[0], famAndQf[1], rud.getColVals().get(key));
     }
 
     try {
-      innerModel.post(rud.getTableName().getBytes(), b);
+      innerModel.post(rud.getTableName().getBytes(), put);
       s.setOK();
     } catch (HBaseRestException e) {
       s.setUnsupportedMediaType(e.getMessage());

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowModel.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowModel.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowModel.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/rest/RowModel.java Sat Jun  6 01:26:21 2009
@@ -25,8 +25,13 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.rest.descriptors.TimestampsDescriptor;
@@ -41,44 +46,51 @@
     super.initialize(conf, admin);
   }
 
+  @Deprecated
   public RowResult get(byte[] tableName, byte[] rowName)
       throws HBaseRestException {
+    return get(tableName, new Get(rowName)).getRowResult();
+  }
+
+  public Result get(byte[] tableName, Get get)
+  throws HBaseRestException {
     try {
       HTable table = new HTable(tableName);
-      return table.getRow(rowName);
+      return table.get(get);
     } catch (IOException e) {
       throw new HBaseRestException(e);
     }
   }
-
+  
+  @Deprecated
   public RowResult get(byte[] tableName, byte[] rowName, byte[][] columns)
       throws HBaseRestException {
-    try {
-      HTable table = new HTable(tableName);
-      return table.getRow(rowName, columns);
-    } catch (IOException e) {
-      throw new HBaseRestException(e);
+    Get get = new Get(rowName);
+    for(byte [] column : columns) {
+      byte [][] famAndQf = KeyValue.parseColumn(column);
+      get.addColumn(famAndQf[0], famAndQf[1]);
     }
+    return get(tableName, get).getRowResult();
   }
 
+  @Deprecated
   public RowResult get(byte[] tableName, byte[] rowName, byte[][] columns,
       long timestamp) throws HBaseRestException {
-    try {
-      HTable table = new HTable(tableName);
-      return table.getRow(rowName, columns, timestamp);
-    } catch (IOException e) {
-      throw new HBaseRestException(e);
+    Get get = new Get(rowName);
+    for(byte [] column : columns) {
+      byte [][] famAndQf = KeyValue.parseColumn(column);
+      get.addColumn(famAndQf[0], famAndQf[1]);
     }
+    get.setTimeStamp(timestamp);
+    return get(tableName, get).getRowResult();
   }
-
+  
+  @Deprecated
   public RowResult get(byte[] tableName, byte[] rowName, long timestamp)
       throws HBaseRestException {
-    try {
-      HTable table = new HTable(tableName);
-      return table.getRow(rowName, timestamp);
-    } catch (IOException e) {
-      throw new HBaseRestException(e);
-    }
+    Get get = new Get(rowName);
+    get.setTimeStamp(timestamp);
+    return get(tableName, get).getRowResult();
   }
 
   public TimestampsDescriptor getTimestamps(
@@ -98,41 +110,48 @@
 
   }
 
-  public void post(byte[] tableName, BatchUpdate b) throws HBaseRestException {
+  public void post(byte[] tableName, Put put) throws HBaseRestException {
     try {
       HTable table = new HTable(tableName);
-      table.commit(b);
+      table.put(put);
     } catch (IOException e) {
       throw new HBaseRestException(e);
     }
   }
 
-  public void post(byte[] tableName, List<BatchUpdate> b)
+  public void post(byte[] tableName, List<Put> puts)
       throws HBaseRestException {
     try {
       HTable table = new HTable(tableName);
-      table.commit(b);
+      table.put(puts);
     } catch (IOException e) {
       throw new HBaseRestException(e);
     }
   }
-
+  
+  @Deprecated
   public void delete(byte[] tableName, byte[] rowName)
       throws HBaseRestException {
-    try {
-      HTable table = new HTable(tableName);
-      table.deleteAll(rowName);
-    } catch (IOException e) {
-      throw new HBaseRestException(e);
-    }
+    Delete delete = new Delete(rowName);
+    delete(tableName, delete);
   }
 
-  public void delete(byte[] tableName, byte[] rowName, byte[][] columns) throws HBaseRestException {
+  @Deprecated
+  public void delete(byte[] tableName, byte[] rowName, byte[][] columns)
+  throws HBaseRestException {
+    Delete delete = new Delete(rowName);
+    for(byte [] column : columns) {
+      byte [][] famAndQf = KeyValue.parseColumn(column);
+      delete.deleteColumn(famAndQf[0], famAndQf[1]);
+    }
+    delete(tableName, delete);
+  }
+  
+  public void delete(byte[] tableName, Delete delete)
+  throws HBaseRestException {
     try {
       HTable table = new HTable(tableName);
-      for (byte[] column : columns) {
-        table.deleteAll(rowName, column);
-      }
+      table.delete(delete);
     } catch (IOException e) {
       throw new HBaseRestException(e);
     }



Mime
View raw message