hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r782445 [8/17] - in /hadoop/hbase/trunk_on_hadoop-0.18.3: ./ bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/client/trans...
Date Sun, 07 Jun 2009 19:57:43 GMT
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java Sun Jun  7 19:57:37 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2008 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -27,22 +27,19 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
+import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -82,7 +79,7 @@
 
   // TODO: Fix this guess by studying jprofiler
   private final static int ESTIMATED_KV_HEAP_TAX = 60;
-
+  
   /**
    * Default constructor. Used for tests.
    */
@@ -202,7 +199,86 @@
     }
     return size;
   }
+  
+  /** 
+   * Write a delete
+   * @param delete
+   * @return approximate size of the passed key and value.
+   */
+  long delete(final KeyValue delete) {
+    long size = 0;
+    this.lock.readLock().lock();
+    //Have to find out what we want to do here, to find the fastest way of
+    //removing things that are under a delete.
+    //Actions that will take place here are:
+    //1. Insert a delete and remove all the affected entries already in memcache
+    //2. In the case of a Delete and the matching put is found then don't insert
+    //   the delete
+    //TODO Would be nice with if we had an iterator for this, so we could remove
+    //things that needs to be removed while iterating and don't have to go
+    //back and do it afterwards
+    
+    try {
+      boolean notpresent = false;
+      List<KeyValue> deletes = new ArrayList<KeyValue>();
+      SortedSet<KeyValue> tailSet = this.memcache.tailSet(delete);
+
+      //Parse the delete, so that it is only done once
+      byte [] deleteBuffer = delete.getBuffer();
+      int deleteOffset = delete.getOffset();
+  
+      int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
+      deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
+  
+      short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
+      deleteOffset += Bytes.SIZEOF_SHORT;
+      int deleteRowOffset = deleteOffset;
+  
+      deleteOffset += deleteRowLen;
+  
+      byte deleteFamLen = deleteBuffer[deleteOffset];
+      deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
+  
+      int deleteQualifierOffset = deleteOffset;
+      int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
+        Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - 
+        Bytes.SIZEOF_BYTE;
+      
+      deleteOffset += deleteQualifierLen;
+  
+      int deleteTimestampOffset = deleteOffset;
+      deleteOffset += Bytes.SIZEOF_LONG;
+      byte deleteType = deleteBuffer[deleteOffset];
+      
+      //Comparing with tail from memcache
+      for(KeyValue mem : tailSet) {
+        
+        DeleteCode res = DeleteCompare.deleteCompare(mem, deleteBuffer, 
+            deleteRowOffset, deleteRowLen, deleteQualifierOffset, 
+            deleteQualifierLen, deleteTimestampOffset, deleteType,
+            comparator.getRawComparator());
+        if(res == DeleteCode.DONE) {
+          break;
+        } else if (res == DeleteCode.DELETE) {
+          deletes.add(mem);
+        } // SKIP
+      }
 
+      //Delete all the entries effected by the last added delete
+      for(KeyValue del : deletes) {
+        notpresent = this.memcache.remove(del);
+        size -= heapSize(del, notpresent);
+      }
+      
+      //Adding the delete to memcache
+      notpresent = this.memcache.add(delete);
+      size += heapSize(delete, notpresent);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    return size;
+  }
+  
   /*
    * Calculate how the memcache size has changed, approximately.  Be careful.
    * If class changes, be sure to change the size calculation.
@@ -219,43 +295,6 @@
   }
 
   /**
-   * Look back through all the backlog TreeMaps to find the target.
-   * @param kv
-   * @param numVersions
-   * @return Set of KeyValues. Empty size not null if no results.
-   */
-  List<KeyValue> get(final KeyValue kv, final int numVersions) {
-    List<KeyValue> results = new ArrayList<KeyValue>();
-    get(kv, numVersions, results,
-      new TreeSet<KeyValue>(this.comparatorIgnoreType),
-      System.currentTimeMillis());
-    return results;
-  }
-
-  /**
-   * Look back through all the backlog TreeMaps to find the target.
-   * @param key
-   * @param versions
-   * @param results
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param now
-   * @return True if enough versions.
-   */
-  boolean get(final KeyValue key, final int versions,
-      List<KeyValue> results, final NavigableSet<KeyValue> deletes,
-      final long now) {
-    this.lock.readLock().lock();
-    try {
-      if (get(this.memcache, key, versions, results, deletes, now)) {
-        return true;
-      }
-      return get(this.snapshot, key, versions , results, deletes, now);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
    * @param kv Find the row that comes after this one.  If null, we return the
    * first.
    * @return Next row or null if none found.
@@ -307,86 +346,6 @@
     return result;
   }
 
-  /**
-   * Return all the available columns for the given key.  The key indicates a 
-   * row and timestamp, but not a column name.
-   * @param origin Where to start searching.  Specifies a row and timestamp.
-   * Columns are specified in following arguments.
-   * @param columns Pass null for all columns else the wanted subset.
-   * @param columnPattern Column pattern to match.
-   * @param numVersions number of versions to retrieve
-   * @param versionsCount Map of KV to Count.  Uses a Comparator that doesn't
-   * look at timestamps so only Row/Column are compared.
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param results Where to stick row results found.
-   * @return True if we found enough results for passed <code>columns</code>
-   * and <code>numVersions</code>.
-   */
-  boolean getFull(final KeyValue key, NavigableSet<byte []> columns,
-      final Pattern columnPattern,
-      int numVersions, final Map<KeyValue, HRegion.Counter> versionsCount,
-      final NavigableSet<KeyValue> deletes,
-      final List<KeyValue> results, final long now) {
-    this.lock.readLock().lock();
-    try {
-      // Used to be synchronized but now with weak iteration, no longer needed.
-      if (getFull(this.memcache, key, columns, columnPattern, numVersions,
-        versionsCount, deletes, results, now)) {
-        // Has enough results.
-        return true;
-      }
-      return getFull(this.snapshot, key, columns, columnPattern, numVersions,
-        versionsCount, deletes, results, now);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /*
-   * @param set
-   * @param target Where to start searching.
-   * @param columns
-   * @param versions
-   * @param versionCounter
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param keyvalues
-   * @return True if enough results found.
-   */
-  private boolean getFull(final ConcurrentSkipListSet<KeyValue> set,
-      final KeyValue target, final Set<byte []> columns,
-      final Pattern columnPattern,
-      final int versions, final Map<KeyValue, HRegion.Counter> versionCounter,
-      final NavigableSet<KeyValue> deletes, List<KeyValue> keyvalues,
-      final long now) {
-    boolean hasEnough = false;
-    if (target == null) {
-      return hasEnough;
-    }
-    NavigableSet<KeyValue> tailset = set.tailSet(target);
-    if (tailset == null || tailset.isEmpty()) {
-      return hasEnough;
-    }
-    // TODO: This loop same as in HStore.getFullFromStoreFile.  Make sure they
-    // are the same.
-    for (KeyValue kv: tailset) {
-      // Make sure we have not passed out the row.  If target key has a
-      // column on it, then we are looking explicit key+column combination.  If
-      // we've passed it out, also break.
-      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
-          !this.comparator.matchingRowColumn(target, kv)) {
-        break;
-      }
-      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
-        continue;
-      }
-      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
-          this.ttl, keyvalues, tailset)) {
-        hasEnough = true;
-        break;
-      }
-    }
-    return hasEnough;
-  }
 
   /**
    * @param row Row to look for.
@@ -554,45 +513,6 @@
     }
   }
 
-  /*
-   * Examine a single map for the desired key.
-   *
-   * TODO - This is kinda slow.  We need a data structure that allows for 
-   * proximity-searches, not just precise-matches.
-   * 
-   * @param set
-   * @param key
-   * @param results
-   * @param versions
-   * @param keyvalues
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param now
-   * @return True if enough versions.
-   */
-  private boolean get(final ConcurrentSkipListSet<KeyValue> set,
-      final KeyValue key, final int versions,
-      final List<KeyValue> keyvalues,
-      final NavigableSet<KeyValue> deletes,
-      final long now) {
-    NavigableSet<KeyValue> tailset = set.tailSet(key);
-    if (tailset.isEmpty()) {
-      return false;
-    }
-    boolean enoughVersions = false;
-    for (KeyValue kv : tailset) {
-      if (this.comparator.matchingRowColumn(kv, key)) {
-        if (Store.doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues,
-            tailset)) {
-          enoughVersions = true;
-          break;
-        }
-      } else {
-        // By L.N. HBASE-684, map is sorted, so we can't find match any more.
-        break;
-      }
-    }
-    return enoughVersions;
-  }
 
   /*
    * @param set
@@ -621,93 +541,160 @@
   /**
    * @return a scanner over the keys in the Memcache
    */
-  InternalScanner getScanner(long timestamp,
-    final NavigableSet<byte []> targetCols, final byte [] firstRow)
-  throws IOException {
+  KeyValueScanner getScanner() {
     this.lock.readLock().lock();
     try {
-      return new MemcacheScanner(timestamp, targetCols, firstRow);
+      return new MemcacheScanner();
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
+  //
+  // HBASE-880/1249/1304
+  //
+  
+  /**
+   * Perform a single-row Get on the memcache and snapshot, placing results
+   * into the specified KV list.
+   * <p>
+   * This will return true if it is determined that the query is complete
+   * and it is not necessary to check any storefiles after this.
+   * <p>
+   * Otherwise, it will return false and you should continue on.
+   * @param startKey Starting KeyValue
+   * @param matcher Column matcher
+   * @param result List to add results to
+   * @return true if done with store (early-out), false if not
+   * @throws IOException
+   */
+  public boolean get(QueryMatcher matcher, List<KeyValue> result)
+  throws IOException {
+    this.lock.readLock().lock();
+    try {
+      if(internalGet(this.memcache, matcher, result) || matcher.isDone()) {
+        return true;
+      }
+      matcher.update();
+      if(internalGet(this.snapshot, matcher, result) || matcher.isDone()) {
+        return true;
+      }
+      return false;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /**
+   *
+   * @param set memcache or snapshot
+   * @param matcher query matcher
+   * @param result list to add results to
+   * @return true if done with store (early-out), false if not
+   * @throws IOException
+   */
+  private boolean internalGet(SortedSet<KeyValue> set, QueryMatcher matcher,
+      List<KeyValue> result) throws IOException {
+    if(set.isEmpty()) return false;
+    // Seek to startKey
+    SortedSet<KeyValue> tailSet = set.tailSet(matcher.getStartKey());
+    
+    for (KeyValue kv : tailSet) {
+      QueryMatcher.MatchCode res = matcher.match(kv);
+      switch(res) {
+        case INCLUDE:
+          result.add(kv);
+          break;
+        case SKIP:
+          break;
+        case NEXT:
+          return false;
+        case DONE:
+          return true;
+        default:
+          throw new RuntimeException("Unexpected " + res);
+      }
+    }
+    return false;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
-  // MemcacheScanner implements the InternalScanner.
+  // MemcacheScanner implements the KeyValueScanner.
   // It lets the caller scan the contents of the Memcache.
+  // This behaves as if it were a real scanner but does not maintain position
+  // in the Memcache tree.
   //////////////////////////////////////////////////////////////////////////////
 
-  private class MemcacheScanner extends HAbstractScanner {
-    private KeyValue current;
-    private final NavigableSet<byte []> columns;
-    private final NavigableSet<KeyValue> deletes;
-    private final Map<KeyValue, Counter> versionCounter;
-    private final long now = System.currentTimeMillis();
-
-    MemcacheScanner(final long timestamp, final NavigableSet<byte []> columns,
-      final byte [] firstRow)
-    throws IOException {
-      // Call to super will create ColumnMatchers and whether this is a regex
-      // scanner or not.  Will also save away timestamp.  Also sorts rows.
-      super(timestamp, columns);
-      this.deletes = new TreeSet<KeyValue>(comparatorIgnoreType);
-      this.versionCounter =
-        new TreeMap<KeyValue, Counter>(comparatorIgnoreTimestamp);
-      this.current = KeyValue.createFirstOnRow(firstRow, timestamp);
-      // If we're being asked to scan explicit columns rather than all in 
-      // a family or columns that match regexes, cache the sorted array of
-      // columns.
-      this.columns = isWildcardScanner()? null: columns;
-    }
-
-    @Override
-    public boolean next(final List<KeyValue> keyvalues)
-    throws IOException {
-      if (this.scannerClosed) {
+  protected class MemcacheScanner implements KeyValueScanner {
+    private KeyValue current = null;
+    private List<KeyValue> result = new ArrayList<KeyValue>();
+    private int idx = 0;
+    
+    MemcacheScanner() {}
+    
+    public boolean seek(KeyValue key) {
+      try {
+        if(key == null) {
+          close();
+          return false;
+        }
+        current = key;
+        return cacheNextRow();
+      } catch(Exception e) {
+        close();
         return false;
       }
-      while (keyvalues.isEmpty() && this.current != null) {
-        // Deletes are per row.
-        if (!deletes.isEmpty()) {
-          deletes.clear();
-        }
-        if (!versionCounter.isEmpty()) {
-          versionCounter.clear();
+    }
+    
+    public KeyValue peek() {
+      if(idx >= result.size()) {
+        if(!cacheNextRow()) {
+          return null;
         }
-        // The getFull will take care of expired and deletes inside memcache.
-        // The first getFull when row is the special empty bytes will return
-        // nothing so we go around again.  Alternative is calling a getNextRow
-        // if row is null but that looks like it would take same amount of work
-        // so leave it for now.
-        getFull(this.current, isWildcardScanner()? null: this.columns, null, 1,
-          versionCounter, deletes, keyvalues, this.now);
-        for (KeyValue bb: keyvalues) {
-          if (isWildcardScanner()) {
-            // Check the results match.  We only check columns, not timestamps.
-            // We presume that timestamps have been handled properly when we
-            // called getFull.
-            if (!columnMatch(bb)) {
-              keyvalues.remove(bb);
-            }
-          }
+        return peek();
+      }
+      return result.get(idx);
+    }
+    
+    public KeyValue next() {
+      if(idx >= result.size()) {
+        if(!cacheNextRow()) {
+          return null;
         }
-        // Add any deletes found so they are available to the StoreScanner#next.
-        if (!this.deletes.isEmpty()) {
-          keyvalues.addAll(deletes);
+        return next();
+      }
+      return result.get(idx++);
+    }
+    
+    boolean cacheNextRow() {
+      NavigableSet<KeyValue> keys;
+      try {
+        keys = memcache.tailSet(current);
+      } catch(Exception e) {
+        close();
+        return false;
+      }
+      if(keys == null || keys.isEmpty()) {
+        close();
+        return false;
+      }
+      current = null;
+      byte [] row = keys.first().getRow();
+      for(KeyValue key : keys) {
+        if(comparator.compareRows(key, row) != 0) {
+          current = key;
+          break;
         }
-        this.current = getNextRow(this.current);
-        // Change current to be column-less and to have the scanners' now.  We
-        // do this because first item on 'next row' may not have the scanners'
-        // now time which will cause trouble down in getFull; same reason no
-        // column.
-        if (this.current != null) this.current = this.current.cloneRow(this.now);
+        result.add(key);
       }
-      return !keyvalues.isEmpty();
+      return true;
     }
 
     public void close() {
-      if (!scannerClosed) {
-        scannerClosed = true;
+      current = null;
+      idx = 0;
+      if(!result.isEmpty()) {
+        result.clear();
       }
     }
   }
@@ -721,8 +708,7 @@
    * @throws InterruptedException
    * @throws IOException 
    */
-  public static void main(String [] args)
-  throws InterruptedException, IOException {
+  public static void main(String [] args) {
     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java Sun Jun  7 19:57:37 2009
@@ -62,11 +62,11 @@
   protected final long globalMemcacheLimit;
   protected final long globalMemcacheLimitLowMark;
   
-  public static final float DEFAULT_UPPER = 0.4f;
-  public static final float DEFAULT_LOWER = 0.25f;
-  public static final String UPPER_KEY =
+  private static final float DEFAULT_UPPER = 0.4f;
+  private static final float DEFAULT_LOWER = 0.25f;
+  private static final String UPPER_KEY =
     "hbase.regionserver.globalMemcache.upperLimit";
-  public static final String LOWER_KEY =
+  private static final String LOWER_KEY =
     "hbase.regionserver.globalMemcache.lowerLimit";
   private long blockingStoreFilesNumber;
   private long blockingWaitTime;

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.List;
+import java.io.IOException;
+
+/**
+ * A scanner that does a minor compaction at the same time.  Doesn't need to
+ * implement ChangedReadersObserver, since it doesn't scan memcache, only store files
+ * and optionally the memcache-snapshot.
+ */
+public class MinorCompactingStoreScanner implements KeyValueScanner, InternalScanner {
+  private QueryMatcher matcher;
+
+  private KeyValueHeap heap;
+
+
+  MinorCompactingStoreScanner(Store store,
+                              KeyValueScanner [] scanners) {
+    Scan scan = new Scan();
+
+    // No max version, no ttl matching, start at first row, all columns.
+    matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
+        null, Long.MAX_VALUE, store.comparator.getRawComparator(),
+        store.versionsToReturn(Integer.MAX_VALUE));
+
+    for (KeyValueScanner scanner : scanners ) {
+      scanner.seek(matcher.getStartKey());
+    }
+
+    heap = new KeyValueHeap(scanners, store.comparator);
+  }
+
+  MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator,
+                              KeyValueScanner [] scanners) {
+    Scan scan = new Scan();
+    matcher = new ScanQueryMatcher(scan, Bytes.toBytes(cfName),
+        null, Long.MAX_VALUE, comparator.getRawComparator(),
+        Integer.MAX_VALUE);
+
+    for (KeyValueScanner scanner : scanners ) {
+      scanner.seek(matcher.getStartKey());
+    }
+
+    heap = new KeyValueHeap(scanners, comparator);
+  }
+
+  public KeyValue peek() {
+    return heap.peek();
+  }
+
+  public KeyValue next() {
+    return heap.next();
+  }
+
+  @Override
+  public boolean seek(KeyValue key) {
+    // cant seek.
+    throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
+  }
+
+  @Override
+  public boolean next(List<KeyValue> results) throws IOException {
+    KeyValue peeked = heap.peek();
+    if (peeked == null) {
+      close();
+      return false;
+    }
+    matcher.setRow(peeked.getRow());
+    KeyValue kv;
+    while ((kv = heap.peek()) != null) {
+      // if delete type, output no matter what:
+      if (kv.getType() != KeyValue.Type.Put.getCode())
+        results.add(kv);
+
+      switch (matcher.match(kv)) {
+        case INCLUDE:
+          results.add(heap.next());
+          continue;
+        case DONE:
+          if (results.isEmpty()) {
+            matcher.setRow(heap.peek().getRow());
+            continue;
+          }
+          return true;
+      }
+      heap.next();
+    }
+    close();
+    return false;
+  }
+
+  public void close() {
+    heap.close();
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.NavigableSet;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This is the primary class used to process KeyValues during a Get or Scan
+ * operation.
+ * <p>
+ * It encapsulates the handling of the column and version input parameters to 
+ * the query through a {@link ColumnTracker}.
+ * <p>
+ * Deletes are handled using the {@link DeleteTracker}.
+ * <p>
+ * All other query parameters are accessed from the client-specified Get.
+ * <p>
+ * The primary method used is {@link match} with the current KeyValue.  It will
+ * return a {@link MatchCode} 
+ * 
+ * , deletes,
+ * versions, 
+ */
+public class QueryMatcher {
+  
+  /**
+   * {@link match} return codes.  These instruct the scanner moving through
+   * Memcaches and StoreFiles what to do with the current KeyValue.
+   * <p>
+   * Additionally, this contains "early-out" language to tell the scanner to
+   * move on to the next File (Memcache or Storefile), or to return immediately.
+   */
+  static enum MatchCode {
+    /**
+     * Include KeyValue in the returned result
+     */
+    INCLUDE,
+    
+    /**
+     * Do not include KeyValue in the returned result
+     */
+    SKIP,
+    
+    /**
+     * Do not include, jump to next StoreFile or Memcache (in time order)
+     */
+    NEXT,
+    
+    /**
+     * Do not include, return current result
+     */
+    DONE,
+
+    /**
+     * These codes are used by the ScanQueryMatcher
+     */
+
+    /**
+     * Done with the row, seek there.
+     */
+    SEEK_NEXT_ROW,
+    /**
+     * Done with column, seek to next.
+     */
+    SEEK_NEXT_COL,
+
+    /**
+     * Done with scan, thanks to the row filter.
+     */
+    DONE_SCAN,
+  }
+  
+  /** Keeps track of deletes */
+  protected DeleteTracker deletes;
+  
+  /** Keeps track of columns and versions */
+  protected ColumnTracker columns;
+  
+  /** Key to seek to in Memcache and StoreFiles */
+  protected KeyValue startKey;
+  
+  /** Row comparator for the region this query is for */
+  KeyComparator rowComparator;
+  
+  /** Row the query is on */
+  protected byte [] row;
+  
+  /** TimeRange the query is for */
+  protected TimeRange tr;
+  
+  /** Oldest allowed version stamp for TTL enforcement */
+  protected long oldestStamp;
+  
+  /**
+   * Constructs a QueryMatcher for a Get.
+   * @param get
+   * @param row
+   * @param family
+   * @param columns
+   * @param ttl
+   * @param rowComparator
+   */
+  public QueryMatcher(Get get, byte [] row, byte [] family, 
+      NavigableSet<byte[]> columns, long ttl, KeyComparator rowComparator,
+      int maxVersions) {
+    this.row = row;
+    this.tr = get.getTimeRange();
+    this.oldestStamp = System.currentTimeMillis() - ttl;
+    this.rowComparator = rowComparator;
+    this.deletes =  new GetDeleteTracker(rowComparator);
+    this.startKey = KeyValue.createFirstOnRow(row);
+    // Single branch to deal with two types of Gets (columns vs all in family)
+    if(columns == null || columns.size() == 0) {
+      this.columns = new WildcardColumnTracker(maxVersions);
+    } else {
+      this.columns = new ExplicitColumnTracker(columns, maxVersions);
+    }
+  }
+
+  // For the subclasses.
+  protected QueryMatcher() {
+  }
+
+  /**
+   * Constructs a copy of an existing QueryMatcher with a new row.
+   * @param matcher
+   * @param row
+   */
+  public QueryMatcher(QueryMatcher matcher, byte [] row) {
+    this.row = row;
+    this.tr = matcher.getTimeRange();
+    this.oldestStamp = matcher.getOldestStamp();
+    this.rowComparator = matcher.getRowComparator();
+    this.columns = matcher.getColumnTracker();
+    this.deletes = matcher.getDeleteTracker();
+    this.startKey = matcher.getStartKey();
+    reset();
+  }
+  
+  /**
+   * Main method for ColumnMatcher.
+   * <p>
+   * Determines whether the specified KeyValue should be included in the
+   * result or not.
+   * <p>
+   * Contains additional language to early-out of the current file or to
+   * return immediately.
+   * <p>
+   * Things to be checked:<ul>
+   * <li>Row
+   * <li>TTL
+   * <li>Type
+   * <li>TimeRange
+   * <li>Deletes
+   * <li>Column
+   * <li>Versions
+   * @param kv KeyValue to check
+   * @return MatchCode: include, skip, next, done
+   */
+  public MatchCode match(KeyValue kv) {
+    if(this.columns.done()) {
+      return MatchCode.DONE;  // done_row
+    }
+    
+    // Directly act on KV buffer
+    byte [] bytes = kv.getBuffer();
+    int offset = kv.getOffset();
+    
+    int keyLength = Bytes.toInt(bytes, offset);
+    offset += KeyValue.ROW_OFFSET;
+    
+    short rowLength = Bytes.toShort(bytes, offset);
+    offset += Bytes.SIZEOF_SHORT;
+
+    // scanners are relying on us to check the row first, and return
+    // "NEXT" when we are there.
+    /* Check ROW
+     * If past query's row, go to next StoreFile
+     * If not reached query's row, go to next KeyValue
+     */ 
+    int ret = this.rowComparator.compareRows(row, 0, row.length,
+        bytes, offset, rowLength);
+    if(ret <= -1) {
+      // Have reached the next row
+      return MatchCode.NEXT;  // got_to_next_row (end)
+    } else if(ret >= 1) {
+      // At a previous row
+      return MatchCode.SKIP;  // skip_to_cur_row
+    }
+    offset += rowLength;
+    
+    byte familyLength = bytes[offset];
+    offset += Bytes.SIZEOF_BYTE + familyLength;
+    
+    int columnLength = keyLength + KeyValue.ROW_OFFSET -
+      (offset - kv.getOffset()) - KeyValue.TIMESTAMP_TYPE_SIZE;
+    int columnOffset = offset;
+    offset += columnLength;
+    
+    /* Check TTL
+     * If expired, go to next KeyValue
+     */
+    long timestamp = Bytes.toLong(bytes, offset);
+    if(isExpired(timestamp)) {
+      // reached the expired part, for scans, this indicates we're done.
+      return MatchCode.NEXT;  // done_row
+    }
+    offset += Bytes.SIZEOF_LONG;
+    
+    /* Check TYPE
+     * If a delete within (or after) time range, add to deletes
+     * Move to next KeyValue
+     */
+    byte type = bytes[offset];
+    // if delete type == delete family, return done_row
+    
+    if(isDelete(type)) {
+      if(tr.withinOrAfterTimeRange(timestamp)) {
+        this.deletes.add(bytes, columnOffset, columnLength, timestamp, type);
+      }
+      return MatchCode.SKIP;  // skip the delete cell.
+    }
+    
+    /* Check TimeRange
+     * If outside of range, move to next KeyValue
+     */
+    if(!tr.withinTimeRange(timestamp)) {
+      return MatchCode.SKIP;  // optimization chances here.
+    }
+    
+    /* Check Deletes
+     * If deleted, move to next KeyValue 
+     */
+    if(!deletes.isEmpty() && deletes.isDeleted(bytes, columnOffset,
+        columnLength, timestamp)) {
+      // 2 types of deletes:
+      // affects 1 cell or 1 column, so just skip the keyvalues.
+      // - delete family, so just skip to the next row.
+      return MatchCode.SKIP;
+    }
+    
+    /* Check Column and Versions
+     * Returns a MatchCode directly, identical language
+     * If matched column without enough versions, include
+     * If enough versions of this column or does not match, skip
+     * If have moved past 
+     * If enough versions of everything, 
+     */
+    return columns.checkColumn(bytes, columnOffset, columnLength);
+  }
+
+  // should be in KeyValue.
+  protected boolean isDelete(byte type) {
+    return (type != KeyValue.Type.Put.getCode());
+  }
+  
+  protected boolean isExpired(long timestamp) {
+    return (timestamp < oldestStamp);
+  }
+
+   /**
+   * If matcher returns SEEK_NEXT_COL you may be able
+   * to get a hint of the next column to seek to - call this.
+   * If it returns null, there is no hint.
+   *
+   * @return immediately after match returns SEEK_NEXT_COL - null if no hint,
+   *  else the next column we want
+   */
+  public ColumnCount getSeekColumn() {
+    return this.columns.getColumnHint();
+  }
+  
+  /**
+   * Called after reading each section (memcache, snapshot, storefiles).
+   * <p>
+   * This method will update the internal structures to be accurate for
+   * the next section. 
+   */
+  public void update() {
+    this.deletes.update();
+    this.columns.update();
+  }
+  
+  /**
+   * Resets the current columns and deletes
+   */
+  public void reset() {
+    this.deletes.reset();
+    this.columns.reset();
+  }
+
+  /**
+   * Set current row
+   * @param row
+   */
+  public void setRow(byte [] row) {
+    this.row = row;
+  }
+  
+
+  /**
+   * 
+   * @return the start key
+   */
+  public KeyValue getStartKey() {
+    return this.startKey;
+  }
+  
+  /**
+   * @return the TimeRange
+   */
+  public TimeRange getTimeRange() {
+    return this.tr;
+  }
+  
+  /**
+   * @return the oldest stamp
+   */
+  public long getOldestStamp() {
+    return this.oldestStamp;
+  }
+  
+  /**
+   * @return current KeyComparator
+   */
+  public KeyComparator getRowComparator() {
+    return this.rowComparator;
+  }
+  
+  /**
+   * @return ColumnTracker
+   */
+  public ColumnTracker getColumnTracker() {
+    return this.columns;
+  }
+  
+  /**
+   * @return DeleteTracker
+   */
+  public DeleteTracker getDeleteTracker() {
+    return this.deletes;
+  }
+  /**
+   * 
+   * @return
+   */
+  public boolean isDone() {
+    return this.columns.done();
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * This class is responsible for the tracking and enforcement of Deletes
+ * during the course of a Scan operation.
+ *
+ * It only has to enforce Delete and DeleteColumn, since the
+ * DeleteFamily is handled at a higher level.
+ *
+ * <p>
+ * This class is utilized through three methods:
+ * <ul><li>{@link add} when encountering a Delete or DeleteColumn
+ * <li>{@link isDeleted} when checking if a Put KeyValue has been deleted
+ * <li>{@link update} when reaching the end of a StoreFile or row for scans
+ * <p>
+ * This class is NOT thread-safe as queries are never multi-threaded 
+ */
+public class ScanDeleteTracker implements DeleteTracker {
+
+  private long familyStamp = -1L;
+  private byte [] deleteBuffer = null;
+  private int deleteOffset = 0;
+  private int deleteLength = 0;
+  private byte deleteType = 0;
+  private long deleteTimestamp = 0L;
+
+  private KeyValue.KeyComparator comparator;
+  
+  /**
+   * Constructor for ScanDeleteTracker
+   * @param comparator
+   */
+  public ScanDeleteTracker(KeyValue.KeyComparator comparator) {
+    this.comparator = comparator;
+  }
+  
+  /**
+   * Add the specified KeyValue to the list of deletes to check against for
+   * this row operation.
+   * <p>
+   * This is called when a Delete is encountered in a StoreFile.
+   * @param buffer KeyValue buffer
+   * @param qualifierOffset column qualifier offset
+   * @param qualifierLength column qualifier length
+   * @param timestamp timestamp
+   * @param type delete type as byte
+   */
+  @Override
+  public void add(byte[] buffer, int qualifierOffset, int qualifierLength,
+      long timestamp, byte type) {
+    if(timestamp > familyStamp) {
+      if(type == KeyValue.Type.DeleteFamily.getCode()) {
+        familyStamp = timestamp;
+        return;
+      }
+
+      if(deleteBuffer != null && type < deleteType) {
+        // same column, so ignore less specific delete
+        if(comparator.compareRows(deleteBuffer, deleteOffset, deleteLength,
+            buffer, qualifierOffset, qualifierLength) == 0){
+          return;
+        }
+      }
+      // new column, or more general delete type
+      deleteBuffer = buffer;
+      deleteOffset = qualifierOffset;
+      deleteLength = qualifierLength;
+      deleteType = type;
+      deleteTimestamp = timestamp;
+    }
+    // missing else is never called.
+  }
+
+  /** 
+   * Check if the specified KeyValue buffer has been deleted by a previously
+   * seen delete.
+   *
+   * @param buffer KeyValue buffer
+   * @param qualifierOffset column qualifier offset
+   * @param qualifierLength column qualifier length
+   * @param timestamp timestamp
+   * @return true is the specified KeyValue is deleted, false if not
+   */
+  @Override
+  public boolean isDeleted(byte [] buffer, int qualifierOffset,
+      int qualifierLength, long timestamp) {
+    if(timestamp < familyStamp) {
+      return true;
+    }
+    
+    if(deleteBuffer != null) {
+      // TODO ryan use a specific comparator
+      int ret = comparator.compareRows(deleteBuffer, deleteOffset, deleteLength,
+          buffer, qualifierOffset, qualifierLength);
+
+      if(ret == 0) {
+        if(deleteType == KeyValue.Type.DeleteColumn.getCode()) {
+          return true;
+        }
+        // Delete (aka DeleteVersion)
+        // If the timestamp is the same, keep this one
+        if (timestamp == deleteTimestamp) {
+          return true;
+        }
+        // use assert or not?
+        assert timestamp < deleteTimestamp;
+
+        // different timestamp, let's clear the buffer.
+        deleteBuffer = null;
+      } else if(ret < 0){
+        // Next column case.
+        deleteBuffer = null;
+      } else {
+        //Should never happen, throw Exception
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return deleteBuffer == null && familyStamp == 0;
+  }
+
+  @Override
+  // called between every row.
+  public void reset() {
+    familyStamp = 0L;
+    deleteBuffer = null;
+  }
+
+  @Override
+  // should not be called at all even (!)  
+  public void update() {
+    this.reset();
+  }
+
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,242 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+
+/**
+ * A query matcher that is specifically designed for the scan case.
+ */
+public class ScanQueryMatcher extends QueryMatcher {
+
+  private Filter filter;
+  // have to support old style filter for now.
+  private RowFilterInterface oldFilter;
+  // Optimization so we can skip lots of compares when we decide to skip
+  // to the next row.
+  private boolean stickyNextRow;
+  private KeyValue stopKey = null;
+
+  /**
+   * Constructs a QueryMatcher for a Scan.
+   * @param scan
+   * @param family
+   * @param columns
+   * @param ttl
+   * @param rowComparator
+   */
+  public ScanQueryMatcher(Scan scan, byte [] family,
+      NavigableSet<byte[]> columns, long ttl, 
+      KeyValue.KeyComparator rowComparator, int maxVersions) {
+    this.row = row;
+    this.tr = scan.getTimeRange();
+    this.oldestStamp = System.currentTimeMillis() - ttl;
+    this.rowComparator = rowComparator;
+    // shouldn't this be ScanDeleteTracker?
+    this.deletes =  new ScanDeleteTracker(rowComparator);
+    this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
+    this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+    this.filter = scan.getFilter();
+    this.oldFilter = scan.getOldFilter();
+    
+    // Single branch to deal with two types of reads (columns vs all in family)
+    if(columns == null || columns.size() == 0) {
+      // use a specialized scan for wildcard column tracker.
+      this.columns = new ScanWildcardColumnTracker(maxVersions);
+    } else {
+      // We can share the ExplicitColumnTracker, diff is we reset
+      // between rows, not between storefiles.
+      this.columns = new ExplicitColumnTracker(columns,maxVersions);
+    }
+  }
+
+  /**
+   * Determines if the caller should do one of several things:
+   * - seek/skip to the next row (MatchCode.SEEK_NEXT_ROW)
+   * - seek/skip to the next column (MatchCode.SEEK_NEXT_COL)
+   * - include the current KeyValue (MatchCode.INCLUDE)
+   * - ignore the current KeyValue (MatchCode.SKIP)
+   * - got to the next row (MatchCode.DONE)
+   * 
+   * @param kv KeyValue to check
+   * @return
+   * @throws IOException
+   */
+  public MatchCode match(KeyValue kv) {
+    if (filter != null && filter.filterAllRemaining()) {
+      return MatchCode.DONE_SCAN;
+    } else if (oldFilter != null && oldFilter.filterAllRemaining()) {
+      // the old filter runs only if the other filter didnt work.
+      return MatchCode.DONE_SCAN;
+    }
+
+    String kvStr = kv.toString();
+    byte [] bytes = kv.getBuffer();
+    int offset = kv.getOffset();
+    int initialOffset = offset; 
+    int kvLength = kv.getLength();
+
+    int keyLength = Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
+    offset += KeyValue.ROW_OFFSET;
+    
+    short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
+    offset += Bytes.SIZEOF_SHORT;
+    
+    int ret = this.rowComparator.compareRows(row, 0, row.length,
+        bytes, offset, rowLength);
+    if (ret <= -1) {
+      return MatchCode.DONE;
+    } else if (ret >= 1) {
+      // could optimize this, if necessary?
+      // Could also be called SEEK_TO_CURRENT_ROW, but this
+      // should be rare/never happens.
+      return MatchCode.SKIP;
+    }
+
+    // optimize case.
+    if (this.stickyNextRow)
+        return MatchCode.SEEK_NEXT_ROW;
+
+    // Give the row filter a chance to do it's job.
+    if (filter != null && filter.filterRowKey(bytes, offset, rowLength)) {
+      stickyNextRow = true; // optimize to keep from calling the filter too much.
+      return MatchCode.SEEK_NEXT_ROW;
+    } else if (oldFilter != null && oldFilter.filterRowKey(bytes, offset, rowLength)) {
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+
+    if (this.columns.done()) {
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+    
+    //Passing rowLength
+    offset += rowLength;
+
+    //Skipping family
+    byte familyLength = bytes [offset];
+    offset += familyLength + 1;
+    
+    int qualLength = keyLength + KeyValue.ROW_OFFSET -
+      (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
+    
+    long timestamp = kv.getTimestamp();
+    if (isExpired(timestamp)) {
+      // done, the rest wil also be expired as well.
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+    byte type = kv.getType();
+    if (isDelete(type)) {
+      if (tr.withinOrAfterTimeRange(timestamp)) {
+        this.deletes.add(bytes, offset, qualLength, timestamp, type);
+        // Can't early out now, because DelFam come before any other keys
+      }
+      // May be able to optimize the SKIP here, if we matched 
+      // due to a DelFam, we can skip to next row
+      // due to a DelCol, we can skip to next col
+      // But it requires more info out of isDelete().
+      // needful -> million column challenge.
+      return MatchCode.SKIP;
+    }
+
+    if (!tr.withinTimeRange(timestamp)) {
+      return MatchCode.SKIP;
+    }
+
+    if (deletes.isDeleted(bytes, offset,
+        qualLength, timestamp)) {
+      return MatchCode.SKIP;
+    }
+    
+    MatchCode colChecker =
+        columns.checkColumn(bytes, offset, qualLength);
+
+    // if SKIP -> SEEK_NEXT_COL
+    // if (NEXT,DONE) -> SEEK_NEXT_ROW
+    // if (INCLUDE) -> INCLUDE
+    if (colChecker == MatchCode.SKIP) {
+      return MatchCode.SEEK_NEXT_COL;
+    } else if (colChecker == MatchCode.NEXT || colChecker == MatchCode.DONE) {
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+    // else INCLUDE
+    // if (colChecker == MatchCode.INCLUDE)
+    // give the filter a chance to run.
+    if (filter == null)
+      return MatchCode.INCLUDE;
+
+    ReturnCode filterResponse = filter.filterKeyValue(kv);
+    if (filterResponse == ReturnCode.INCLUDE)
+      return MatchCode.INCLUDE;
+
+    if (filterResponse == ReturnCode.SKIP)
+      return MatchCode.SKIP;
+
+    // else
+    //if (filterResponse == ReturnCode.NEXT_ROW)
+    stickyNextRow = true;
+    return MatchCode.SEEK_NEXT_ROW;
+  }
+
+  /**
+   * If the row was otherwise going to be included, call this to last-minute
+   * check.
+   * @return
+   */
+  public boolean filterEntireRow() {
+    if (filter == null)
+      return false;
+    return filter.filterRow();
+  }
+
+  /**
+   * Set current row
+   * @param row
+   */
+  @Override
+  public void setRow(byte [] row) {
+    this.row = row;
+    reset();
+  }
+  
+  @Override
+  public void reset() {
+    super.reset();
+
+    stickyNextRow = false;
+    if (filter != null)
+      filter.reset();
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Keeps track of the columns for a scan if they are not explicitly specified
+ */
+public class ScanWildcardColumnTracker implements ColumnTracker {
+  private byte [] columnBuffer = null;
+  private int columnOffset = 0;
+  private int columnLength = 0;
+  private int currentCount = 0;
+  private int maxVersions;
+
+  /**
+   * Return maxVersions of every row.
+   * @param maxVersion
+   */
+  public ScanWildcardColumnTracker(int maxVersion) {
+    this.maxVersions = maxVersion;
+  }
+
+  /**
+   * Can only return INCLUDE or SKIP, since returning "NEXT" or
+   * "DONE" would imply we have finished with this row, when
+   * this class can't figure that out.
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return
+   */
+  @Override
+  public MatchCode checkColumn(byte[] bytes, int offset, int length) {
+    if (columnBuffer == null) {
+      // first iteration.
+      columnBuffer = bytes;
+      columnOffset = offset;
+      columnLength = length;
+      currentCount = 0;
+
+      if (++currentCount > maxVersions)
+        return MatchCode.SKIP;
+      return MatchCode.INCLUDE;
+    }
+    int cmp = Bytes.compareTo(bytes, offset, length,
+        columnBuffer, columnOffset, columnLength);
+    if (cmp == 0) {
+      if (++currentCount > maxVersions)
+        return MatchCode.SKIP; // skip to next col
+      return MatchCode.INCLUDE;
+    }
+
+    // new col > old col
+    if (cmp > 0) {
+      // switched columns, lets do something.x
+      columnBuffer = bytes;
+      columnOffset = offset;
+      columnLength = length;
+      currentCount = 0;
+      
+      if (++currentCount > maxVersions)
+        return MatchCode.SKIP;
+      return MatchCode.INCLUDE;
+    }
+    // new col < oldcol
+    // if (cmp < 0) {
+    throw new RuntimeException("ScanWildcardColumnTracker.checkColumn ran " +
+    		"into a column actually smaller than the previous column!");
+  }
+
+  @Override
+  public void update() {
+    // no-op, shouldn't even be called
+    throw new UnsupportedOperationException(
+        "ScanWildcardColumnTracker.update should never be called!");
+  }
+
+  @Override
+  public void reset() {
+    columnBuffer = null;
+  }
+
+  /**
+   * Used by matcher and scan/get to get a hint of the next column
+   * to seek to after checkColumn() returns SKIP.  Returns the next interesting
+   * column we want, or NULL there is none (wildcard scanner).
+   * @return
+   */
+  public ColumnCount getColumnHint() {
+    return null;
+  }
+
+
+  /**
+   * We can never know a-priori if we are done, so always return false. 
+   * @return false
+   */
+  @Override
+  public boolean done() {
+    return false;
+  }
+}

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Jun  7 19:57:37 2009
@@ -24,21 +24,17 @@
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,12 +47,13 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.SequenceFile;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.util.Progressable;
@@ -171,7 +168,13 @@
     this.comparatorIgnoringType = this.comparator.getComparatorIgnoringType();
     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     this.ttl = family.getTimeToLive();
-    if (ttl != HConstants.FOREVER) {
+    if (ttl == HConstants.FOREVER) {
+      // default is unlimited ttl.
+      ttl = Long.MAX_VALUE;
+    } else if (ttl == -1) {
+      ttl = Long.MAX_VALUE;
+    } else {
+      // second -> ms adjust for user data
       this.ttl *= 1000;
     }
     this.memcache = new Memcache(this.ttl, this.comparator);
@@ -304,9 +307,8 @@
         }
         // Check this edit is for me. Also, guard against writing the special
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
-        if (/* Commented out for now -- St.Ack val.isTransactionEntry() ||*/
-            val.matchingColumnNoDelimiter(HLog.METACOLUMN,
-              HLog.METACOLUMN.length - 1) ||
+        if (/* commented out for now - stack via jgray key.isTransactionEntry() || */
+            val.matchingFamily(HLog.METAFAMILY) ||
           !Bytes.equals(key.getRegionName(), regioninfo.getRegionName()) ||
           !val.matchingFamily(family.getName())) {
           continue;
@@ -396,6 +398,21 @@
       lock.readLock().unlock();
     }
   }
+  
+  /**
+   * Adds a value to the memcache
+   * 
+   * @param kv
+   * @return memcache size delta
+   */
+  protected long delete(final KeyValue kv) {
+    lock.readLock().lock();
+    try {
+      return this.memcache.delete(kv);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
 
   /**
    * @return All store files.
@@ -476,7 +493,7 @@
     if (cache.size() == 0) {
       return null;
     }
-    long now = System.currentTimeMillis();
+    long oldestTimestamp = System.currentTimeMillis() - ttl;
     // TODO:  We can fail in the below block before we complete adding this
     // flush to list of store files.  Add cleanup of anything put on filesystem
     // if we fail.
@@ -486,7 +503,7 @@
       int entries = 0;
       try {
         for (KeyValue kv: cache) {
-          if (!isExpired(kv, ttl, now)) {
+          if (!isExpired(kv, oldestTimestamp)) {
             writer.append(kv);
             entries++;
             flushed += this.memcache.heapSize(kv, true);
@@ -526,7 +543,7 @@
    */
   private HFile.Writer getWriter(final Path basedir) throws IOException {
     return StoreFile.getWriter(this.fs, basedir, this.blocksize,
-        this.compression, this.comparator.getRawComparator(), this.bloomfilter);
+        this.compression, this.comparator.getRawComparator());
   }
 
   /*
@@ -575,8 +592,10 @@
    * @param o Observer no longer interested in changes in set of Readers.
    */
   void deleteChangedReaderObserver(ChangedReadersObserver o) {
-    if (!this.changedReaderObservers.remove(o)) {
-      LOG.warn("Not in set" + o);
+    if(this.changedReaderObservers.size() > 0) {
+      if (!this.changedReaderObservers.remove(o)) {
+        LOG.warn("Not in set" + o);
+      }
     }
   }
 
@@ -793,140 +812,49 @@
     return result;
   }
 
-  /*
-   * @param r StoreFile list to reverse
-   * @return A new array of content of <code>readers</code>, reversed.
-   */
-  private StoreFile [] reverse(final List<StoreFile> r) {
-    List<StoreFile> copy = new ArrayList<StoreFile>(r);
-    Collections.reverse(copy);
-    // MapFile.Reader is instance of StoreFileReader so this should be ok.
-    return copy.toArray(new StoreFile[0]);
-  }
-
-  /*
-   * @param rdrs List of StoreFiles
-   * @param keys Current keys
-   * @param done Which readers are done
-   * @return The lowest current key in passed <code>rdrs</code>
-   */
-  private int getLowestKey(final HFileScanner [] rdrs, final KeyValue [] keys,
-      final boolean [] done) {
-    int lowestKey = -1;
-    for (int i = 0; i < rdrs.length; i++) {
-      if (done[i]) {
-        continue;
-      }
-      if (lowestKey < 0) {
-        lowestKey = i;
-      } else {
-        if (this.comparator.compare(keys[i], keys[lowestKey]) < 0) {
-          lowestKey = i;
-        }
-      }
-    }
-    return lowestKey;
-  }
-
-  /*
-   * Compact a list of StoreFiles.
+  /**
+   * Do a minor/major compaction.  Uses the scan infrastructure to make it easy.
    * 
-   * We work by iterating through the readers in parallel looking at newest
-   * store file first. We always increment the lowest-ranked one. Updates to a
-   * single row/column will appear ranked by timestamp.
-   * @param compactedOut Where to write compaction.
-   * @param pReaders List of readers sorted oldest to newest.
-   * @param majorCompaction True to force a major compaction regardless of
-   * thresholds
+   * @param writer output writer
+   * @param filesToCompact which files to compact
+   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
    * @throws IOException
    */
-  private void compact(final HFile.Writer compactedOut,
-      final List<StoreFile> pReaders, final boolean majorCompaction)
-  throws IOException {
-    // Reverse order so newest store file is first.
-    StoreFile[] files = reverse(pReaders);
-    HFileScanner [] rdrs = new HFileScanner[files.length];
-    KeyValue [] kvs = new KeyValue[rdrs.length];
-    boolean[] done = new boolean[rdrs.length];
-    // Now, advance through the readers in order. This will have the
-    // effect of a run-time sort of the entire dataset.
-    int numDone = 0;
-    for (int i = 0; i < rdrs.length; i++) {
-      rdrs[i] = files[i].getReader().getScanner();
-      done[i] = !rdrs[i].seekTo();
-      if (done[i]) {
-        numDone++;
-      } else {
-        kvs[i] = rdrs[i].getKeyValue();
-      }
+  private void compact(HFile.Writer writer,
+                       List<StoreFile> filesToCompact,
+                       boolean majorCompaction) throws IOException {
+    // for each file, obtain a scanner:
+    KeyValueScanner [] scanners = new KeyValueScanner[filesToCompact.size()];
+    // init:
+    for(int i = 0; i < filesToCompact.size(); ++i) {
+      // TODO open a new HFile.Reader w/o block cache.
+      scanners[i] = new StoreFileScanner(filesToCompact.get(i).getReader().getScanner());
+    }
+
+    InternalScanner scanner;
+    if (majorCompaction) {
+      Scan scan = new Scan();
+      scan.setMaxVersions(family.getMaxVersions());
+      // TODO pass in the scanners/store files.
+      scanner = new StoreScanner(this, scan, null);
+    } else {
+      scanner = new MinorCompactingStoreScanner(this, scanners);
     }
 
-    long now = System.currentTimeMillis();
-    int timesSeen = 0;
-    KeyValue lastSeen = KeyValue.LOWESTKEY;
-    KeyValue lastDelete = null;
-    int maxVersions = family.getMaxVersions();
-    while (numDone < done.length) {
-      // Get lowest key in all store files.
-      int lowestKey = getLowestKey(rdrs, kvs, done);
-      KeyValue kv = kvs[lowestKey];
-      // If its same row and column as last key, increment times seen.
-      if (this.comparator.matchingRowColumn(lastSeen, kv)) {
-        timesSeen++;
-        // Reset last delete if not exact timestamp -- lastDelete only stops
-        // exactly the same key making it out to the compacted store file.
-        if (lastDelete != null &&
-            lastDelete.getTimestamp() != kv.getTimestamp()) {
-          lastDelete = null;
-        }
-      } else {
-        timesSeen = 1;
-        lastDelete = null;
-      }
-
-      // Don't write empty rows or columns. Only remove cells on major
-      // compaction. Remove if expired or > VERSIONS
-      if (kv.nonNullRowAndColumn()) {
-        if (!majorCompaction) {
-          // Write out all values if not a major compaction.
-          compactedOut.append(kv);
-        } else {
-          boolean expired = false;
-          boolean deleted = false;
-          if (timesSeen <= maxVersions && !(expired = isExpired(kv, ttl, now))) {
-            // If this value key is same as a deleted key, skip
-            if (lastDelete != null &&
-                this.comparatorIgnoringType.compare(kv, lastDelete) == 0) {
-              deleted = true;
-            } else if (kv.isDeleteType()) {
-              // If a deleted value, skip
-              deleted = true;
-              lastDelete = kv;
-            } else {
-              compactedOut.append(kv);
-            }
-          }
-          if (expired || deleted) {
-            // HBASE-855 remove one from timesSeen because it did not make it
-            // past expired check -- don't count against max versions.
-            timesSeen--;
-          }
-        }
-      }
-
-      // Update last-seen items
-      lastSeen = kv;
-
-      // Advance the smallest key. If that reader's all finished, then
-      // mark it as done.
-      if (!rdrs[lowestKey].next()) {
-        done[lowestKey] = true;
-        rdrs[lowestKey] = null;
-        numDone++;
-      } else {
-        kvs[lowestKey] = rdrs[lowestKey].getKeyValue();
+    // since scanner.next() can return 'false' but still be delivering data,
+    // we have to use a do/while loop.
+    ArrayList<KeyValue> row = new ArrayList<KeyValue>();
+    boolean more = true;
+    while ( more ) {
+      more = scanner.next(row);
+      // output to writer:
+      for (KeyValue kv : row) {
+        writer.append(kv);
       }
+      row.clear();
     }
+
+    scanner.close();
   }
 
   /*
@@ -1007,321 +935,25 @@
   // Accessors.
   // (This is the only section that is directly useful!)
   //////////////////////////////////////////////////////////////////////////////
-  
   /**
-   * Return all the available columns for the given key.  The key indicates a 
-   * row and timestamp, but not a column name.
-   *
-   * The returned object should map column names to Cells.
-   * @param key -  Where to start searching.  Specifies a row.
-   * Columns are specified in following arguments.
-   * @param columns Can be null which means get all
-   * @param columnPattern Can be null.
-   * @param numVersions
-   * @param versionsCounter Can be null.
-   * @param keyvalues
-   * @param now -  Where to start searching.  Specifies a timestamp.
-   * @throws IOException
+   * @return the number of files in this store
    */
-  public void getFull(KeyValue key, final NavigableSet<byte []> columns,
-      final Pattern columnPattern,
-      final int numVersions, Map<KeyValue, HRegion.Counter> versionsCounter,
-      List<KeyValue> keyvalues, final long now)
-  throws IOException {
-    // if the key is null, we're not even looking for anything. return.
-    if (key == null) {
-      return;
-    }
-    int versions = versionsToReturn(numVersions);
-    NavigableSet<KeyValue> deletes =
-      new TreeSet<KeyValue>(this.comparatorIgnoringType);
-    // Create a Map that has results by column so we can keep count of versions.
-    // It duplicates columns but doing check of columns, we don't want to make
-    // column set each time.
-    this.lock.readLock().lock();
-    try {
-      // get from the memcache first.
-      if (this.memcache.getFull(key, columns, columnPattern, versions,
-          versionsCounter, deletes, keyvalues, now)) {
-        // May have gotten enough results, enough to return.
-        return;
-      }
-      Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
-          i.hasNext();) {
-        if (getFullFromStoreFile(i.next().getValue(), key, columns,
-            columnPattern, versions, versionsCounter, deletes, keyvalues)) {
-          return;
-        }
-      }
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /*
-   * @param f
-   * @param key Where to start searching.  Specifies a row and timestamp.
-   * Columns are specified in following arguments.
-   * @param columns
-   * @param versions
-   * @param versionCounter
-   * @param deletes
-   * @param keyvalues
-   * @return True if we found enough results to satisfy the <code>versions</code>
-   * and <code>columns</code> passed.
-   * @throws IOException
-   */
-  private boolean getFullFromStoreFile(StoreFile f, KeyValue target, 
-    Set<byte []> columns, final Pattern columnPattern, int versions, 
-    Map<KeyValue, HRegion.Counter> versionCounter,
-    NavigableSet<KeyValue> deletes,
-    List<KeyValue> keyvalues) 
-  throws IOException {
-    long now = System.currentTimeMillis();
-    HFileScanner scanner = f.getReader().getScanner();
-    if (!getClosest(scanner, target)) {
-      return false;
-    }
-    boolean hasEnough = false;
-    do {
-      KeyValue kv = scanner.getKeyValue();
-      // Make sure we have not passed out the row.  If target key has a
-      // column on it, then we are looking explicit key+column combination.  If
-      // we've passed it out, also break.
-      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
-          !this.comparator.matchingRowColumn(target, kv)) {
-        break;
-      }
-      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
-        continue;
-      }
-      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
-          this.ttl, keyvalues, null)) {
-        hasEnough = true;
-        break;
-      }
-    } while (scanner.next());
-    return hasEnough;
-  }
-
-  /**
-   * Code shared by {@link Memcache#getFull(KeyValue, NavigableSet, Pattern, int, Map, NavigableSet, List, long)}
-   * and {@link #getFullFromStoreFile(StoreFile, KeyValue, Set, Pattern, int, Map, NavigableSet, List)}
-   * @param c
-   * @param target
-   * @param candidate
-   * @param columns
-   * @param columnPattern
-   * @return True if <code>candidate</code> matches column and timestamp.
-   */
-  static boolean getFullCheck(final KeyValue.KVComparator c,
-      final KeyValue target, final KeyValue candidate,
-      final Set<byte []> columns, final Pattern columnPattern) {
-    // Does column match?
-    if (!Store.matchingColumns(candidate, columns)) {
-      return false;
-    }
-    // if the column pattern is not null, we use it for column matching.
-    // we will skip the keys whose column doesn't match the pattern.
-    if (columnPattern != null) {
-      if (!(columnPattern.matcher(candidate.getColumnString()).matches())) {
-        return false;
-      }
-    }
-    if (c.compareTimestamps(target, candidate) > 0)  {
-      return false;
-    }
-    return true; 
+  public int getNumberOfstorefiles() {
+    return this.storefiles.size();
   }
+  
 
   /*
    * @param wantedVersions How many versions were asked for.
    * @return wantedVersions or this families' VERSIONS.
    */
-  private int versionsToReturn(final int wantedVersions) {
+  int versionsToReturn(final int wantedVersions) {
     if (wantedVersions <= 0) {
       throw new IllegalArgumentException("Number of versions must be > 0");
     }
     // Make sure we do not return more than maximum versions for this store.
     int maxVersions = this.family.getMaxVersions();
-    return wantedVersions > maxVersions &&
-      wantedVersions != HConstants.ALL_VERSIONS? maxVersions: wantedVersions;
-  }
-  
-  /**
-   * Get the value for the indicated HStoreKey.  Grab the target value and the 
-   * previous <code>numVersions - 1</code> values, as well.
-   *
-   * Use {@link HConstants.ALL_VERSIONS} to retrieve all versions.
-   * @param key
-   * @param numVersions Number of versions to fetch.  Must be > 0.
-   * @return values for the specified versions
-   * @throws IOException
-   */
-  List<KeyValue> get(final KeyValue key, final int numVersions)
-  throws IOException {
-    // This code below is very close to the body of the getKeys method.  Any 
-    // changes in the flow below should also probably be done in getKeys.
-    // TODO: Refactor so same code used.
-    long now = System.currentTimeMillis();
-    int versions = versionsToReturn(numVersions);
-    // Keep a list of deleted cell keys.  We need this because as we go through
-    // the memcache and store files, the cell with the delete marker may be
-    // in one store and the old non-delete cell value in a later store.
-    // If we don't keep around the fact that the cell was deleted in a newer
-    // record, we end up returning the old value if user is asking for more
-    // than one version. This List of deletes should not be large since we
-    // are only keeping rows and columns that match those set on the get and
-    // which have delete values.  If memory usage becomes an issue, could
-    // redo as bloom filter.  Use sorted set because test for membership should
-    // be faster than calculating a hash.  Use a comparator that ignores ts.
-    NavigableSet<KeyValue> deletes =
-      new TreeSet<KeyValue>(this.comparatorIgnoringType);
-    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-    this.lock.readLock().lock();
-    try {
-      // Check the memcache
-      if (this.memcache.get(key, versions, keyvalues, deletes, now)) {
-        return keyvalues;
-      }
-      Map<Long, StoreFile> m = this.storefiles.descendingMap();
-      boolean hasEnough = false;
-      for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
-        StoreFile f = e.getValue();
-        HFileScanner scanner = f.getReader().getScanner();
-        if (!getClosest(scanner, key)) {
-          // Move to next file.
-          continue;
-        }
-        do {
-          KeyValue kv = scanner.getKeyValue();
-          // Make sure below matches what happens up in Memcache#get.
-          if (this.comparator.matchingRowColumn(kv, key)) {
-            if (doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues, null)) {
-              hasEnough = true;
-              break;
-            }
-          } else {
-            // Row and column don't match. Must have gone past. Move to next file.
-            break;
-          }
-        } while (scanner.next());
-        if (hasEnough) {
-          break; // Break out of files loop.
-        }
-      }
-      return keyvalues.isEmpty()? null: keyvalues;
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /*
-   * Small method to check if we are over the max number of versions
-   * or we acheived this family max versions. 
-   * The later happens when we have the situation described in HBASE-621.
-   * @param versions
-   * @param c
-   * @return 
-   */
-  static boolean hasEnoughVersions(final int versions, final List<KeyValue> c) {
-    return versions > 0 && !c.isEmpty() && c.size() >= versions;
-  }
-
-  /*
-   * Used when doing getFulls.
-   * @param kv
-   * @param versions
-   * @param versionCounter
-   * @param columns
-   * @param deletes
-   * @param now
-   * @param ttl
-   * @param keyvalues
-   * @param set
-   * @return True if enough versions.
-   */
-  static boolean doKeyValue(final KeyValue kv,
-      final int versions,
-      final Map<KeyValue, Counter> versionCounter,
-      final Set<byte []> columns,
-      final NavigableSet<KeyValue> deletes,
-      final long now, 
-      final long ttl,
-      final List<KeyValue> keyvalues,
-      final SortedSet<KeyValue> set) {
-    boolean hasEnough = false;
-    if (kv.isDeleteType()) {
-      if (!deletes.contains(kv)) {
-        deletes.add(kv);
-      }
-    } else if (!deletes.contains(kv)) {
-      // Skip expired cells
-      if (!isExpired(kv, ttl, now)) {
-        if (HRegion.okToAddResult(kv, versions, versionCounter)) {
-          HRegion.addResult(kv, versionCounter, keyvalues);
-          if (HRegion.hasEnoughVersions(versions, versionCounter, columns)) {
-            hasEnough = true;
-          }
-        }
-      } else {
-        // Remove the expired.
-        Store.expiredOrDeleted(set, kv);
-      }
-    }
-    return hasEnough;
-  }
-
-  /*
-   * Used when doing get.
-   * @param kv
-   * @param versions
-   * @param deletes
-   * @param now
-   * @param ttl
-   * @param keyvalues
-   * @param set
-   * @return True if enough versions.
-   */
-  static boolean doKeyValue(final KeyValue kv, final int versions,
-      final NavigableSet<KeyValue> deletes,
-      final long now,  final long ttl,
-      final List<KeyValue> keyvalues, final SortedSet<KeyValue> set) {
-    boolean hasEnough = false;
-    if (!kv.isDeleteType()) {
-      // Filter out expired results
-      if (notExpiredAndNotInDeletes(ttl, kv, now, deletes)) {
-        if (!keyvalues.contains(kv)) {
-          keyvalues.add(kv);
-          if (hasEnoughVersions(versions, keyvalues)) {
-            hasEnough = true;
-          }
-        }
-      } else {
-        if (set != null) {
-          expiredOrDeleted(set, kv);
-        }
-      }
-    } else {
-      // Cell holds a delete value.
-      deletes.add(kv);
-    }
-    return hasEnough;
-  }
-
-  /*
-   * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
-   * has an empty column, then it just tests row equivalence. Otherwise, it uses
-   * HStoreKey.matchesRowCol().
-   * @param c Comparator to use.
-   * @param origin Key we're testing against
-   * @param target Key we're testing
-   */
-  static boolean matchingRowColumn(final KeyValue.KVComparator c,
-      final KeyValue origin, final KeyValue target) {
-    return origin.isEmptyColumn()? c.matchingRows(target, origin):
-      c.matchingRowColumn(target, origin);
+    return wantedVersions > maxVersions ? maxVersions: wantedVersions;
   }
 
   static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
@@ -1411,13 +1043,12 @@
    */
   static boolean notExpiredAndNotInDeletes(final long ttl,
       final KeyValue key, final long now, final Set<KeyValue> deletes) {
-    return !isExpired(key, ttl, now) && (deletes == null || deletes.isEmpty() ||
+    return !isExpired(key, now-ttl) && (deletes == null || deletes.isEmpty() ||
         !deletes.contains(key));
   }
 
-  static boolean isExpired(final KeyValue key, final long ttl,
-      final long now) {
-    return ttl != HConstants.FOREVER && now > key.getTimestamp() + ttl;
+  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
+    return key.getTimestamp() < oldestTimestamp;
   }
 
   /* Find a candidate for row that is at or before passed key, searchkey, in hfile.
@@ -1693,13 +1324,12 @@
   /**
    * Return a scanner for both the memcache and the HStore files
    */
-  protected InternalScanner getScanner(long timestamp,
-      final NavigableSet<byte []> targetCols,
-      byte [] firstRow, RowFilterInterface filter)
+  protected KeyValueScanner getScanner(Scan scan,
+      final NavigableSet<byte []> targetCols)
   throws IOException {
     lock.readLock().lock();
     try {
-      return new StoreScanner(this, targetCols, firstRow, timestamp, filter);
+      return new StoreScanner(this, scan, targetCols);
     } finally {
       lock.readLock().unlock();
     }
@@ -1722,7 +1352,7 @@
    * @throws IOException if there was a problem getting file sizes from the
    * filesystem
    */
-  long getStorefilesIndexSize() throws IOException {
+  long getStorefilesIndexSize() {
     long size = 0;
     for (StoreFile s: storefiles.values())
       size += s.getReader().indexSize();
@@ -1805,4 +1435,114 @@
     }
     return false;
   }
+  
+  //
+  // HBASE-880/1249/1304
+  //
+  
+  /**
+   * Retrieve results from this store given the specified Get parameters.
+   * @param get Get operation
+   * @param columns List of columns to match, can be empty (not null)
+   * @param result List to add results to 
+   * @throws IOException
+   */
+  public void get(Get get, NavigableSet<byte[]> columns, List<KeyValue> result) 
+  throws IOException {
+    KeyComparator keyComparator = this.comparator.getRawComparator();
+
+    // Column matching and version enforcement
+    QueryMatcher matcher = new QueryMatcher(get, get.getRow(), 
+        this.family.getName(), columns, this.ttl, keyComparator,
+        versionsToReturn(get.getMaxVersions()));
+    
+    // Read from Memcache
+    if(this.memcache.get(matcher, result)) {
+      // Received early-out from memcache
+      return;
+    }
+    
+    // Check if we even have storefiles
+    if(this.storefiles.isEmpty()) {
+      return;
+    }
+    
+    // Get storefiles for this store
+    List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
+    for(StoreFile sf : this.storefiles.descendingMap().values()) {
+      storefileScanners.add(sf.getReader().getScanner());
+    }
+    
+    // StoreFileGetScan will handle reading this store's storefiles
+    StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
+    
+    // Run a GET scan and put results into the specified list 
+    scanner.get(result);
+  }
+  
+  /**
+   * Increments the value for the given row/family/qualifier
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param amount
+   * @return
+   * @throws IOException
+   */
+  public long incrementColumnValue(byte [] row, byte [] family,
+      byte [] qualifier, long amount) throws IOException{
+    long value = 0;
+    List<KeyValue> result = new ArrayList<KeyValue>();
+    KeyComparator keyComparator = this.comparator.getRawComparator();
+
+    // Setting up the QueryMatcher
+    Get get = new Get(row);
+    NavigableSet<byte[]> qualifiers = 
+      new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+    qualifiers.add(qualifier);
+    QueryMatcher matcher = new QueryMatcher(get, row, family, qualifiers,
+        this.ttl, keyComparator, 1);
+    
+    // Read from Memcache
+    if(this.memcache.get(matcher, result)) {
+      // Received early-out from memcache
+      KeyValue kv = result.get(0);
+      byte [] buffer = kv.getBuffer();
+      int valueOffset = kv.getValueOffset();
+      value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
+      Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0, 
+          Bytes.SIZEOF_LONG);
+      return value;
+    }
+    
+    // Check if we even have storefiles
+    if(this.storefiles.isEmpty()) {
+      return addNewKeyValue(row, family, qualifier, value, amount);
+    }
+    
+    // Get storefiles for this store
+    List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
+    for(StoreFile sf : this.storefiles.descendingMap().values()) {
+      storefileScanners.add(sf.getReader().getScanner());
+    }
+    
+    // StoreFileGetScan will handle reading this store's storefiles
+    StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
+    
+    // Run a GET scan and put results into the specified list 
+    scanner.get(result);
+    if(result.size() > 0) {
+      value = Bytes.toLong(result.get(0).getValue());
+    }
+    return addNewKeyValue(row, family, qualifier, value, amount);
+  }
+  
+  private long addNewKeyValue(byte [] row, byte [] family, byte [] qualifier, 
+      long value, long amount) {
+    long newValue = value + amount;
+    KeyValue newKv = new KeyValue(row, family, qualifier, Bytes.toBytes(newValue));
+    add(newKv);
+    return newValue;
+  }
+  
 }



Mime
View raw message