hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r784618 [6/11] - in /hadoop/hbase/trunk_on_hadoop-0.18.3/src: java/ java/org/apache/hadoop/hbase/ java/org/apache/hadoop/hbase/client/ java/org/apache/hadoop/hbase/client/tableindexed/ java/org/apache/hadoop/hbase/filter/ java/org/apache/ha...
Date Sun, 14 Jun 2009 21:34:19 GMT
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java Sun Jun 14 21:34:13 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2008 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -27,19 +27,22 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
+import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -79,7 +82,7 @@
 
   // TODO: Fix this guess by studying jprofiler
   private final static int ESTIMATED_KV_HEAP_TAX = 60;
-  
+
   /**
    * Default constructor. Used for tests.
    */
@@ -199,86 +202,7 @@
     }
     return size;
   }
-  
-  /** 
-   * Write a delete
-   * @param delete
-   * @return approximate size of the passed key and value.
-   */
-  long delete(final KeyValue delete) {
-    long size = 0;
-    this.lock.readLock().lock();
-    //Have to find out what we want to do here, to find the fastest way of
-    //removing things that are under a delete.
-    //Actions that will take place here are:
-    //1. Insert a delete and remove all the affected entries already in memcache
-    //2. In the case of a Delete and the matching put is found then don't insert
-    //   the delete
-    //TODO Would be nice with if we had an iterator for this, so we could remove
-    //things that needs to be removed while iterating and don't have to go
-    //back and do it afterwards
-    
-    try {
-      boolean notpresent = false;
-      List<KeyValue> deletes = new ArrayList<KeyValue>();
-      SortedSet<KeyValue> tailSet = this.memcache.tailSet(delete);
-
-      //Parse the delete, so that it is only done once
-      byte [] deleteBuffer = delete.getBuffer();
-      int deleteOffset = delete.getOffset();
-  
-      int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
-      deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
-  
-      short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
-      deleteOffset += Bytes.SIZEOF_SHORT;
-      int deleteRowOffset = deleteOffset;
-  
-      deleteOffset += deleteRowLen;
-  
-      byte deleteFamLen = deleteBuffer[deleteOffset];
-      deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
-  
-      int deleteQualifierOffset = deleteOffset;
-      int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
-        Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - 
-        Bytes.SIZEOF_BYTE;
-      
-      deleteOffset += deleteQualifierLen;
-  
-      int deleteTimestampOffset = deleteOffset;
-      deleteOffset += Bytes.SIZEOF_LONG;
-      byte deleteType = deleteBuffer[deleteOffset];
-      
-      //Comparing with tail from memcache
-      for(KeyValue mem : tailSet) {
-        
-        DeleteCode res = DeleteCompare.deleteCompare(mem, deleteBuffer, 
-            deleteRowOffset, deleteRowLen, deleteQualifierOffset, 
-            deleteQualifierLen, deleteTimestampOffset, deleteType,
-            comparator.getRawComparator());
-        if(res == DeleteCode.DONE) {
-          break;
-        } else if (res == DeleteCode.DELETE) {
-          deletes.add(mem);
-        } // SKIP
-      }
 
-      //Delete all the entries effected by the last added delete
-      for(KeyValue del : deletes) {
-        notpresent = this.memcache.remove(del);
-        size -= heapSize(del, notpresent);
-      }
-      
-      //Adding the delete to memcache
-      notpresent = this.memcache.add(delete);
-      size += heapSize(delete, notpresent);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-    return size;
-  }
-  
   /*
    * Calculate how the memcache size has changed, approximately.  Be careful.
    * If class changes, be sure to change the size calculation.
@@ -295,6 +219,43 @@
   }
 
   /**
+   * Look back through all the backlog TreeMaps to find the target.
+   * @param kv
+   * @param numVersions
+   * @return Set of KeyValues. Empty size not null if no results.
+   */
+  List<KeyValue> get(final KeyValue kv, final int numVersions) {
+    List<KeyValue> results = new ArrayList<KeyValue>();
+    get(kv, numVersions, results,
+      new TreeSet<KeyValue>(this.comparatorIgnoreType),
+      System.currentTimeMillis());
+    return results;
+  }
+
+  /**
+   * Look back through all the backlog TreeMaps to find the target.
+   * @param key
+   * @param versions
+   * @param results
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   * @param now
+   * @return True if enough versions.
+   */
+  boolean get(final KeyValue key, final int versions,
+      List<KeyValue> results, final NavigableSet<KeyValue> deletes,
+      final long now) {
+    this.lock.readLock().lock();
+    try {
+      if (get(this.memcache, key, versions, results, deletes, now)) {
+        return true;
+      }
+      return get(this.snapshot, key, versions , results, deletes, now);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /**
    * @param kv Find the row that comes after this one.  If null, we return the
    * first.
    * @return Next row or null if none found.
@@ -346,6 +307,86 @@
     return result;
   }
 
+  /**
+   * Return all the available columns for the given key.  The key indicates a 
+   * row and timestamp, but not a column name.
+   * @param origin Where to start searching.  Specifies a row and timestamp.
+   * Columns are specified in following arguments.
+   * @param columns Pass null for all columns else the wanted subset.
+   * @param columnPattern Column pattern to match.
+   * @param numVersions number of versions to retrieve
+   * @param versionsCount Map of KV to Count.  Uses a Comparator that doesn't
+   * look at timestamps so only Row/Column are compared.
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   * @param results Where to stick row results found.
+   * @return True if we found enough results for passed <code>columns</code>
+   * and <code>numVersions</code>.
+   */
+  boolean getFull(final KeyValue key, NavigableSet<byte []> columns,
+      final Pattern columnPattern,
+      int numVersions, final Map<KeyValue, HRegion.Counter> versionsCount,
+      final NavigableSet<KeyValue> deletes,
+      final List<KeyValue> results, final long now) {
+    this.lock.readLock().lock();
+    try {
+      // Used to be synchronized but now with weak iteration, no longer needed.
+      if (getFull(this.memcache, key, columns, columnPattern, numVersions,
+        versionsCount, deletes, results, now)) {
+        // Has enough results.
+        return true;
+      }
+      return getFull(this.snapshot, key, columns, columnPattern, numVersions,
+        versionsCount, deletes, results, now);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /*
+   * @param set
+   * @param target Where to start searching.
+   * @param columns
+   * @param versions
+   * @param versionCounter
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   * @param keyvalues
+   * @return True if enough results found.
+   */
+  private boolean getFull(final ConcurrentSkipListSet<KeyValue> set,
+      final KeyValue target, final Set<byte []> columns,
+      final Pattern columnPattern,
+      final int versions, final Map<KeyValue, HRegion.Counter> versionCounter,
+      final NavigableSet<KeyValue> deletes, List<KeyValue> keyvalues,
+      final long now) {
+    boolean hasEnough = false;
+    if (target == null) {
+      return hasEnough;
+    }
+    NavigableSet<KeyValue> tailset = set.tailSet(target);
+    if (tailset == null || tailset.isEmpty()) {
+      return hasEnough;
+    }
+    // TODO: This loop same as in HStore.getFullFromStoreFile.  Make sure they
+    // are the same.
+    for (KeyValue kv: tailset) {
+      // Make sure we have not passed out the row.  If target key has a
+      // column on it, then we are looking explicit key+column combination.  If
+      // we've passed it out, also break.
+      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
+          !this.comparator.matchingRowColumn(target, kv)) {
+        break;
+      }
+      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
+        continue;
+      }
+      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
+          this.ttl, keyvalues, tailset)) {
+        hasEnough = true;
+        break;
+      }
+    }
+    return hasEnough;
+  }
 
   /**
    * @param row Row to look for.
@@ -513,6 +554,45 @@
     }
   }
 
+  /*
+   * Examine a single map for the desired key.
+   *
+   * TODO - This is kinda slow.  We need a data structure that allows for 
+   * proximity-searches, not just precise-matches.
+   * 
+   * @param set
+   * @param key
+   * @param results
+   * @param versions
+   * @param keyvalues
+   * @param deletes Pass a Set that has a Comparator that ignores key type.
+   * @param now
+   * @return True if enough versions.
+   */
+  private boolean get(final ConcurrentSkipListSet<KeyValue> set,
+      final KeyValue key, final int versions,
+      final List<KeyValue> keyvalues,
+      final NavigableSet<KeyValue> deletes,
+      final long now) {
+    NavigableSet<KeyValue> tailset = set.tailSet(key);
+    if (tailset.isEmpty()) {
+      return false;
+    }
+    boolean enoughVersions = false;
+    for (KeyValue kv : tailset) {
+      if (this.comparator.matchingRowColumn(kv, key)) {
+        if (Store.doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues,
+            tailset)) {
+          enoughVersions = true;
+          break;
+        }
+      } else {
+        // By L.N. HBASE-684, map is sorted, so we can't find match any more.
+        break;
+      }
+    }
+    return enoughVersions;
+  }
 
   /*
    * @param set
@@ -541,160 +621,93 @@
   /**
    * @return a scanner over the keys in the Memcache
    */
-  KeyValueScanner getScanner() {
-    this.lock.readLock().lock();
-    try {
-      return new MemcacheScanner();
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  //
-  // HBASE-880/1249/1304
-  //
-  
-  /**
-   * Perform a single-row Get on the memcache and snapshot, placing results
-   * into the specified KV list.
-   * <p>
-   * This will return true if it is determined that the query is complete
-   * and it is not necessary to check any storefiles after this.
-   * <p>
-   * Otherwise, it will return false and you should continue on.
-   * @param startKey Starting KeyValue
-   * @param matcher Column matcher
-   * @param result List to add results to
-   * @return true if done with store (early-out), false if not
-   * @throws IOException
-   */
-  public boolean get(QueryMatcher matcher, List<KeyValue> result)
+  InternalScanner getScanner(long timestamp,
+    final NavigableSet<byte []> targetCols, final byte [] firstRow)
   throws IOException {
     this.lock.readLock().lock();
     try {
-      if(internalGet(this.memcache, matcher, result) || matcher.isDone()) {
-        return true;
-      }
-      matcher.update();
-      if(internalGet(this.snapshot, matcher, result) || matcher.isDone()) {
-        return true;
-      }
-      return false;
+      return new MemcacheScanner(timestamp, targetCols, firstRow);
     } finally {
       this.lock.readLock().unlock();
     }
   }
-  
-  /**
-   *
-   * @param set memcache or snapshot
-   * @param matcher query matcher
-   * @param result list to add results to
-   * @return true if done with store (early-out), false if not
-   * @throws IOException
-   */
-  private boolean internalGet(SortedSet<KeyValue> set, QueryMatcher matcher,
-      List<KeyValue> result) throws IOException {
-    if(set.isEmpty()) return false;
-    // Seek to startKey
-    SortedSet<KeyValue> tailSet = set.tailSet(matcher.getStartKey());
-    
-    for (KeyValue kv : tailSet) {
-      QueryMatcher.MatchCode res = matcher.match(kv);
-      switch(res) {
-        case INCLUDE:
-          result.add(kv);
-          break;
-        case SKIP:
-          break;
-        case NEXT:
-          return false;
-        case DONE:
-          return true;
-        default:
-          throw new RuntimeException("Unexpected " + res);
-      }
-    }
-    return false;
-  }
-  
+
   //////////////////////////////////////////////////////////////////////////////
-  // MemcacheScanner implements the KeyValueScanner.
+  // MemcacheScanner implements the InternalScanner.
   // It lets the caller scan the contents of the Memcache.
-  // This behaves as if it were a real scanner but does not maintain position
-  // in the Memcache tree.
   //////////////////////////////////////////////////////////////////////////////
 
-  protected class MemcacheScanner implements KeyValueScanner {
-    private KeyValue current = null;
-    private List<KeyValue> result = new ArrayList<KeyValue>();
-    private int idx = 0;
-    
-    MemcacheScanner() {}
-    
-    public boolean seek(KeyValue key) {
-      try {
-        if(key == null) {
-          close();
-          return false;
-        }
-        current = key;
-        return cacheNextRow();
-      } catch(Exception e) {
-        close();
+  private class MemcacheScanner extends HAbstractScanner {
+    private KeyValue current;
+    private final NavigableSet<byte []> columns;
+    private final NavigableSet<KeyValue> deletes;
+    private final Map<KeyValue, Counter> versionCounter;
+    private final long now = System.currentTimeMillis();
+
+    MemcacheScanner(final long timestamp, final NavigableSet<byte []> columns,
+      final byte [] firstRow)
+    throws IOException {
+      // Call to super will create ColumnMatchers and whether this is a regex
+      // scanner or not.  Will also save away timestamp.  Also sorts rows.
+      super(timestamp, columns);
+      this.deletes = new TreeSet<KeyValue>(comparatorIgnoreType);
+      this.versionCounter =
+        new TreeMap<KeyValue, Counter>(comparatorIgnoreTimestamp);
+      this.current = KeyValue.createFirstOnRow(firstRow, timestamp);
+      // If we're being asked to scan explicit columns rather than all in 
+      // a family or columns that match regexes, cache the sorted array of
+      // columns.
+      this.columns = isWildcardScanner()? null: columns;
+    }
+
+    @Override
+    public boolean next(final List<KeyValue> keyvalues)
+    throws IOException {
+      if (this.scannerClosed) {
         return false;
       }
-    }
-    
-    public KeyValue peek() {
-      if(idx >= result.size()) {
-        if(!cacheNextRow()) {
-          return null;
+      while (keyvalues.isEmpty() && this.current != null) {
+        // Deletes are per row.
+        if (!deletes.isEmpty()) {
+          deletes.clear();
         }
-        return peek();
-      }
-      return result.get(idx);
-    }
-    
-    public KeyValue next() {
-      if(idx >= result.size()) {
-        if(!cacheNextRow()) {
-          return null;
+        if (!versionCounter.isEmpty()) {
+          versionCounter.clear();
         }
-        return next();
-      }
-      return result.get(idx++);
-    }
-    
-    boolean cacheNextRow() {
-      NavigableSet<KeyValue> keys;
-      try {
-        keys = memcache.tailSet(current);
-      } catch(Exception e) {
-        close();
-        return false;
-      }
-      if(keys == null || keys.isEmpty()) {
-        close();
-        return false;
-      }
-      current = null;
-      byte [] row = keys.first().getRow();
-      for(KeyValue key : keys) {
-        if(comparator.compareRows(key, row) != 0) {
-          current = key;
-          break;
+        // The getFull will take care of expired and deletes inside memcache.
+        // The first getFull when row is the special empty bytes will return
+        // nothing so we go around again.  Alternative is calling a getNextRow
+        // if row is null but that looks like it would take same amount of work
+        // so leave it for now.
+        getFull(this.current, isWildcardScanner()? null: this.columns, null, 1,
+          versionCounter, deletes, keyvalues, this.now);
+        for (KeyValue bb: keyvalues) {
+          if (isWildcardScanner()) {
+            // Check the results match.  We only check columns, not timestamps.
+            // We presume that timestamps have been handled properly when we
+            // called getFull.
+            if (!columnMatch(bb)) {
+              keyvalues.remove(bb);
+            }
+          }
+        }
+        // Add any deletes found so they are available to the StoreScanner#next.
+        if (!this.deletes.isEmpty()) {
+          keyvalues.addAll(deletes);
         }
-        result.add(key);
+        this.current = getNextRow(this.current);
+        // Change current to be column-less and to have the scanners' now.  We
+        // do this because first item on 'next row' may not have the scanners'
+        // now time which will cause trouble down in getFull; same reason no
+        // column.
+        if (this.current != null) this.current = this.current.cloneRow(this.now);
       }
-      return true;
+      return !keyvalues.isEmpty();
     }
 
     public void close() {
-      current = null;
-      idx = 0;
-      if(!result.isEmpty()) {
-        result.clear();
+      if (!scannerClosed) {
+        scannerClosed = true;
       }
     }
   }
@@ -708,7 +721,8 @@
    * @throws InterruptedException
    * @throws IOException 
    */
-  public static void main(String [] args) {
+  public static void main(String [] args)
+  throws InterruptedException, IOException {
     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java Sun Jun 14 21:34:13 2009
@@ -62,11 +62,11 @@
   protected final long globalMemcacheLimit;
   protected final long globalMemcacheLimitLowMark;
   
-  private static final float DEFAULT_UPPER = 0.4f;
-  private static final float DEFAULT_LOWER = 0.25f;
-  private static final String UPPER_KEY =
+  public static final float DEFAULT_UPPER = 0.4f;
+  public static final float DEFAULT_LOWER = 0.25f;
+  public static final String UPPER_KEY =
     "hbase.regionserver.globalMemcache.upperLimit";
-  private static final String LOWER_KEY =
+  public static final String LOWER_KEY =
     "hbase.regionserver.globalMemcache.lowerLimit";
   private long blockingStoreFilesNumber;
   private long blockingWaitTime;

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Jun 14 21:34:13 2009
@@ -24,17 +24,21 @@
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,13 +51,12 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.KeyValue.KeyComparator;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.SequenceFile;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.util.Progressable;
@@ -130,7 +133,6 @@
   private final Object compactLock = new Object();
   private final int compactionThreshold;
   private final int blocksize;
-  private final boolean blockcache;
   private final boolean bloomfilter;
   private final Compression.Algorithm compression;
   
@@ -163,20 +165,13 @@
     this.fs = fs;
     this.conf = conf;
     this.bloomfilter = family.isBloomfilter();
-    this.blockcache = family.isBlockCacheEnabled();
     this.blocksize = family.getBlocksize();
     this.compression = family.getCompression();
     this.comparator = info.getComparator();
     this.comparatorIgnoringType = this.comparator.getComparatorIgnoringType();
     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
     this.ttl = family.getTimeToLive();
-    if (ttl == HConstants.FOREVER) {
-      // default is unlimited ttl.
-      ttl = Long.MAX_VALUE;
-    } else if (ttl == -1) {
-      ttl = Long.MAX_VALUE;
-    } else {
-      // second -> ms adjust for user data
+    if (ttl != HConstants.FOREVER) {
       this.ttl *= 1000;
     }
     this.memcache = new Memcache(this.ttl, this.comparator);
@@ -309,8 +304,9 @@
         }
         // Check this edit is for me. Also, guard against writing the special
         // METACOLUMN info such as HBASE::CACHEFLUSH entries
-        if (/* commented out for now - stack via jgray key.isTransactionEntry() || */
-            val.matchingFamily(HLog.METAFAMILY) ||
+        if (/* Commented out for now -- St.Ack val.isTransactionEntry() ||*/
+            val.matchingColumnNoDelimiter(HLog.METACOLUMN,
+              HLog.METACOLUMN.length - 1) ||
           !Bytes.equals(key.getRegionName(), regioninfo.getRegionName()) ||
           !val.matchingFamily(family.getName())) {
           continue;
@@ -362,7 +358,7 @@
       }
       StoreFile curfile = null;
       try {
-        curfile = new StoreFile(fs, p, blockcache, this.conf);
+        curfile = new StoreFile(fs, p, this.conf);
       } catch (IOException ioe) {
         LOG.warn("Failed open of " + p + "; presumption is that file was " +
           "corrupted at flush and lost edits picked up by commit log replay. " +
@@ -400,21 +396,6 @@
       lock.readLock().unlock();
     }
   }
-  
-  /**
-   * Adds a value to the memcache
-   * 
-   * @param kv
-   * @return memcache size delta
-   */
-  protected long delete(final KeyValue kv) {
-    lock.readLock().lock();
-    try {
-      return this.memcache.delete(kv);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
 
   /**
    * @return All store files.
@@ -495,7 +476,7 @@
     if (cache.size() == 0) {
       return null;
     }
-    long oldestTimestamp = System.currentTimeMillis() - ttl;
+    long now = System.currentTimeMillis();
     // TODO:  We can fail in the below block before we complete adding this
     // flush to list of store files.  Add cleanup of anything put on filesystem
     // if we fail.
@@ -505,7 +486,7 @@
       int entries = 0;
       try {
         for (KeyValue kv: cache) {
-          if (!isExpired(kv, oldestTimestamp)) {
+          if (!isExpired(kv, ttl, now)) {
             writer.append(kv);
             entries++;
             flushed += this.memcache.heapSize(kv, true);
@@ -518,8 +499,7 @@
         writer.close();
       }
     }
-    StoreFile sf = new StoreFile(this.fs, writer.getPath(), blockcache, 
-      this.conf);
+    StoreFile sf = new StoreFile(this.fs, writer.getPath(), this.conf);
     this.storeSize += sf.getReader().length();
     if(LOG.isDebugEnabled()) {
       LOG.debug("Added " + sf + ", entries=" + sf.getReader().getEntries() +
@@ -546,7 +526,7 @@
    */
   private HFile.Writer getWriter(final Path basedir) throws IOException {
     return StoreFile.getWriter(this.fs, basedir, this.blocksize,
-        this.compression, this.comparator.getRawComparator());
+        this.compression, this.comparator.getRawComparator(), this.bloomfilter);
   }
 
   /*
@@ -595,10 +575,8 @@
    * @param o Observer no longer interested in changes in set of Readers.
    */
   void deleteChangedReaderObserver(ChangedReadersObserver o) {
-    if(this.changedReaderObservers.size() > 0) {
-      if (!this.changedReaderObservers.remove(o)) {
-        LOG.warn("Not in set" + o);
-      }
+    if (!this.changedReaderObservers.remove(o)) {
+      LOG.warn("Not in set" + o);
     }
   }
 
@@ -815,49 +793,140 @@
     return result;
   }
 
-  /**
-   * Do a minor/major compaction.  Uses the scan infrastructure to make it easy.
+  /*
+   * @param r StoreFile list to reverse
+   * @return A new array of content of <code>readers</code>, reversed.
+   */
+  private StoreFile [] reverse(final List<StoreFile> r) {
+    List<StoreFile> copy = new ArrayList<StoreFile>(r);
+    Collections.reverse(copy);
+    // MapFile.Reader is instance of StoreFileReader so this should be ok.
+    return copy.toArray(new StoreFile[0]);
+  }
+
+  /*
+   * @param rdrs List of StoreFiles
+   * @param keys Current keys
+   * @param done Which readers are done
+   * @return The lowest current key in passed <code>rdrs</code>
+   */
+  private int getLowestKey(final HFileScanner [] rdrs, final KeyValue [] keys,
+      final boolean [] done) {
+    int lowestKey = -1;
+    for (int i = 0; i < rdrs.length; i++) {
+      if (done[i]) {
+        continue;
+      }
+      if (lowestKey < 0) {
+        lowestKey = i;
+      } else {
+        if (this.comparator.compare(keys[i], keys[lowestKey]) < 0) {
+          lowestKey = i;
+        }
+      }
+    }
+    return lowestKey;
+  }
+
+  /*
+   * Compact a list of StoreFiles.
    * 
-   * @param writer output writer
-   * @param filesToCompact which files to compact
-   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+   * We work by iterating through the readers in parallel looking at newest
+   * store file first. We always increment the lowest-ranked one. Updates to a
+   * single row/column will appear ranked by timestamp.
+   * @param compactedOut Where to write compaction.
+   * @param pReaders List of readers sorted oldest to newest.
+   * @param majorCompaction True to force a major compaction regardless of
+   * thresholds
    * @throws IOException
    */
-  private void compact(HFile.Writer writer,
-                       List<StoreFile> filesToCompact,
-                       boolean majorCompaction) throws IOException {
-    // for each file, obtain a scanner:
-    KeyValueScanner [] scanners = new KeyValueScanner[filesToCompact.size()];
-    // init:
-    for(int i = 0; i < filesToCompact.size(); ++i) {
-      // TODO open a new HFile.Reader w/o block cache.
-      scanners[i] = new StoreFileScanner(filesToCompact.get(i).getReader().getScanner());
-    }
-
-    InternalScanner scanner;
-    if (majorCompaction) {
-      Scan scan = new Scan();
-      scan.setMaxVersions(family.getMaxVersions());
-      // TODO pass in the scanners/store files.
-      scanner = new StoreScanner(this, scan, null);
-    } else {
-      scanner = new MinorCompactingStoreScanner(this, scanners);
+  private void compact(final HFile.Writer compactedOut,
+      final List<StoreFile> pReaders, final boolean majorCompaction)
+  throws IOException {
+    // Reverse order so newest store file is first.
+    StoreFile[] files = reverse(pReaders);
+    HFileScanner [] rdrs = new HFileScanner[files.length];
+    KeyValue [] kvs = new KeyValue[rdrs.length];
+    boolean[] done = new boolean[rdrs.length];
+    // Now, advance through the readers in order. This will have the
+    // effect of a run-time sort of the entire dataset.
+    int numDone = 0;
+    for (int i = 0; i < rdrs.length; i++) {
+      rdrs[i] = files[i].getReader().getScanner();
+      done[i] = !rdrs[i].seekTo();
+      if (done[i]) {
+        numDone++;
+      } else {
+        kvs[i] = rdrs[i].getKeyValue();
+      }
     }
 
-    // since scanner.next() can return 'false' but still be delivering data,
-    // we have to use a do/while loop.
-    ArrayList<KeyValue> row = new ArrayList<KeyValue>();
-    boolean more = true;
-    while ( more ) {
-      more = scanner.next(row);
-      // output to writer:
-      for (KeyValue kv : row) {
-        writer.append(kv);
+    long now = System.currentTimeMillis();
+    int timesSeen = 0;
+    KeyValue lastSeen = KeyValue.LOWESTKEY;
+    KeyValue lastDelete = null;
+    int maxVersions = family.getMaxVersions();
+    while (numDone < done.length) {
+      // Get lowest key in all store files.
+      int lowestKey = getLowestKey(rdrs, kvs, done);
+      KeyValue kv = kvs[lowestKey];
+      // If its same row and column as last key, increment times seen.
+      if (this.comparator.matchingRowColumn(lastSeen, kv)) {
+        timesSeen++;
+        // Reset last delete if not exact timestamp -- lastDelete only stops
+        // exactly the same key making it out to the compacted store file.
+        if (lastDelete != null &&
+            lastDelete.getTimestamp() != kv.getTimestamp()) {
+          lastDelete = null;
+        }
+      } else {
+        timesSeen = 1;
+        lastDelete = null;
+      }
+
+      // Don't write empty rows or columns. Only remove cells on major
+      // compaction. Remove if expired or > VERSIONS
+      if (kv.nonNullRowAndColumn()) {
+        if (!majorCompaction) {
+          // Write out all values if not a major compaction.
+          compactedOut.append(kv);
+        } else {
+          boolean expired = false;
+          boolean deleted = false;
+          if (timesSeen <= maxVersions && !(expired = isExpired(kv, ttl, now))) {
+            // If this value key is same as a deleted key, skip
+            if (lastDelete != null &&
+                this.comparatorIgnoringType.compare(kv, lastDelete) == 0) {
+              deleted = true;
+            } else if (kv.isDeleteType()) {
+              // If a deleted value, skip
+              deleted = true;
+              lastDelete = kv;
+            } else {
+              compactedOut.append(kv);
+            }
+          }
+          if (expired || deleted) {
+            // HBASE-855 remove one from timesSeen because it did not make it
+            // past expired check -- don't count against max versions.
+            timesSeen--;
+          }
+        }
       }
-      row.clear();
-    }
 
-    scanner.close();
+      // Update last-seen items
+      lastSeen = kv;
+
+      // Advance the smallest key. If that reader's all finished, then
+      // mark it as done.
+      if (!rdrs[lowestKey].next()) {
+        done[lowestKey] = true;
+        rdrs[lowestKey] = null;
+        numDone++;
+      } else {
+        kvs[lowestKey] = rdrs[lowestKey].getKeyValue();
+      }
+    }
   }
 
   /*
@@ -893,8 +962,7 @@
       LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
       return;
     }
-    StoreFile finalCompactedFile = new StoreFile(this.fs, p, blockcache, 
-      this.conf);
+    StoreFile finalCompactedFile = new StoreFile(this.fs, p, this.conf);
     this.lock.writeLock().lock();
     try {
       try {
@@ -939,25 +1007,321 @@
   // Accessors.
   // (This is the only section that is directly useful!)
   //////////////////////////////////////////////////////////////////////////////
+  
   /**
-   * @return the number of files in this store
+   * Return all the available columns for the given key.  The key indicates a 
+   * row and timestamp, but not a column name.
+   *
+   * The returned object should map column names to Cells.
+   * @param key -  Where to start searching.  Specifies a row.
+   * Columns are specified in following arguments.
+   * @param columns Can be null which means get all
+   * @param columnPattern Can be null.
+   * @param numVersions
+   * @param versionsCounter Can be null.
+   * @param keyvalues
+   * @param now -  Where to start searching.  Specifies a timestamp.
+   * @throws IOException
    */
-  public int getNumberOfstorefiles() {
-    return this.storefiles.size();
+  public void getFull(KeyValue key, final NavigableSet<byte []> columns,
+      final Pattern columnPattern,
+      final int numVersions, Map<KeyValue, HRegion.Counter> versionsCounter,
+      List<KeyValue> keyvalues, final long now)
+  throws IOException {
+    // if the key is null, we're not even looking for anything. return.
+    if (key == null) {
+      return;
+    }
+    int versions = versionsToReturn(numVersions);
+    NavigableSet<KeyValue> deletes =
+      new TreeSet<KeyValue>(this.comparatorIgnoringType);
+    // Create a Map that has results by column so we can keep count of versions.
+    // It duplicates columns but doing check of columns, we don't want to make
+    // column set each time.
+    this.lock.readLock().lock();
+    try {
+      // get from the memcache first.
+      if (this.memcache.getFull(key, columns, columnPattern, versions,
+          versionsCounter, deletes, keyvalues, now)) {
+        // May have gotten enough results, enough to return.
+        return;
+      }
+      Map<Long, StoreFile> m = this.storefiles.descendingMap();
+      for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
+          i.hasNext();) {
+        if (getFullFromStoreFile(i.next().getValue(), key, columns,
+            columnPattern, versions, versionsCounter, deletes, keyvalues)) {
+          return;
+        }
+      }
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /*
+   * @param f
+   * @param key Where to start searching.  Specifies a row and timestamp.
+   * Columns are specified in following arguments.
+   * @param columns
+   * @param versions
+   * @param versionCounter
+   * @param deletes
+   * @param keyvalues
+   * @return True if we found enough results to satisfy the <code>versions</code>
+   * and <code>columns</code> passed.
+   * @throws IOException
+   */
+  private boolean getFullFromStoreFile(StoreFile f, KeyValue target, 
+    Set<byte []> columns, final Pattern columnPattern, int versions, 
+    Map<KeyValue, HRegion.Counter> versionCounter,
+    NavigableSet<KeyValue> deletes,
+    List<KeyValue> keyvalues) 
+  throws IOException {
+    long now = System.currentTimeMillis();
+    HFileScanner scanner = f.getReader().getScanner();
+    if (!getClosest(scanner, target)) {
+      return false;
+    }
+    boolean hasEnough = false;
+    do {
+      KeyValue kv = scanner.getKeyValue();
+      // Make sure we have not passed out the row.  If target key has a
+      // column on it, then we are looking explicit key+column combination.  If
+      // we've passed it out, also break.
+      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
+          !this.comparator.matchingRowColumn(target, kv)) {
+        break;
+      }
+      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
+        continue;
+      }
+      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
+          this.ttl, keyvalues, null)) {
+        hasEnough = true;
+        break;
+      }
+    } while (scanner.next());
+    return hasEnough;
+  }
+
+  /**
+   * Code shared by {@link Memcache#getFull(KeyValue, NavigableSet, Pattern, int, Map, NavigableSet, List, long)}
+   * and {@link #getFullFromStoreFile(StoreFile, KeyValue, Set, Pattern, int, Map, NavigableSet, List)}
+   * @param c
+   * @param target
+   * @param candidate
+   * @param columns
+   * @param columnPattern
+   * @return True if <code>candidate</code> matches column and timestamp.
+   */
+  static boolean getFullCheck(final KeyValue.KVComparator c,
+      final KeyValue target, final KeyValue candidate,
+      final Set<byte []> columns, final Pattern columnPattern) {
+    // Does column match?
+    if (!Store.matchingColumns(candidate, columns)) {
+      return false;
+    }
+    // if the column pattern is not null, we use it for column matching.
+    // we will skip the keys whose column doesn't match the pattern.
+    if (columnPattern != null) {
+      if (!(columnPattern.matcher(candidate.getColumnString()).matches())) {
+        return false;
+      }
+    }
+    if (c.compareTimestamps(target, candidate) > 0)  {
+      return false;
+    }
+    return true; 
   }
-  
 
   /*
    * @param wantedVersions How many versions were asked for.
    * @return wantedVersions or this families' VERSIONS.
    */
-  int versionsToReturn(final int wantedVersions) {
+  private int versionsToReturn(final int wantedVersions) {
     if (wantedVersions <= 0) {
       throw new IllegalArgumentException("Number of versions must be > 0");
     }
     // Make sure we do not return more than maximum versions for this store.
     int maxVersions = this.family.getMaxVersions();
-    return wantedVersions > maxVersions ? maxVersions: wantedVersions;
+    return wantedVersions > maxVersions &&
+      wantedVersions != HConstants.ALL_VERSIONS? maxVersions: wantedVersions;
+  }
+  
+  /**
+   * Get the value for the indicated HStoreKey.  Grab the target value and the 
+   * previous <code>numVersions - 1</code> values, as well.
+   *
+   * Use {@link HConstants.ALL_VERSIONS} to retrieve all versions.
+   * @param key
+   * @param numVersions Number of versions to fetch.  Must be > 0.
+   * @return values for the specified versions
+   * @throws IOException
+   */
+  List<KeyValue> get(final KeyValue key, final int numVersions)
+  throws IOException {
+    // This code below is very close to the body of the getKeys method.  Any 
+    // changes in the flow below should also probably be done in getKeys.
+    // TODO: Refactor so same code used.
+    long now = System.currentTimeMillis();
+    int versions = versionsToReturn(numVersions);
+    // Keep a list of deleted cell keys.  We need this because as we go through
+    // the memcache and store files, the cell with the delete marker may be
+    // in one store and the old non-delete cell value in a later store.
+    // If we don't keep around the fact that the cell was deleted in a newer
+    // record, we end up returning the old value if user is asking for more
+    // than one version. This List of deletes should not be large since we
+    // are only keeping rows and columns that match those set on the get and
+    // which have delete values.  If memory usage becomes an issue, could
+    // redo as bloom filter.  Use sorted set because test for membership should
+    // be faster than calculating a hash.  Use a comparator that ignores ts.
+    NavigableSet<KeyValue> deletes =
+      new TreeSet<KeyValue>(this.comparatorIgnoringType);
+    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+    this.lock.readLock().lock();
+    try {
+      // Check the memcache
+      if (this.memcache.get(key, versions, keyvalues, deletes, now)) {
+        return keyvalues;
+      }
+      Map<Long, StoreFile> m = this.storefiles.descendingMap();
+      boolean hasEnough = false;
+      for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
+        StoreFile f = e.getValue();
+        HFileScanner scanner = f.getReader().getScanner();
+        if (!getClosest(scanner, key)) {
+          // Move to next file.
+          continue;
+        }
+        do {
+          KeyValue kv = scanner.getKeyValue();
+          // Make sure below matches what happens up in Memcache#get.
+          if (this.comparator.matchingRowColumn(kv, key)) {
+            if (doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues, null)) {
+              hasEnough = true;
+              break;
+            }
+          } else {
+            // Row and column don't match. Must have gone past. Move to next file.
+            break;
+          }
+        } while (scanner.next());
+        if (hasEnough) {
+          break; // Break out of files loop.
+        }
+      }
+      return keyvalues.isEmpty()? null: keyvalues;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /*
+   * Small method to check if we are over the max number of versions
+   * or we acheived this family max versions. 
+   * The later happens when we have the situation described in HBASE-621.
+   * @param versions
+   * @param c
+   * @return 
+   */
+  static boolean hasEnoughVersions(final int versions, final List<KeyValue> c) {
+    return versions > 0 && !c.isEmpty() && c.size() >= versions;
+  }
+
+  /*
+   * Used when doing getFulls.
+   * @param kv
+   * @param versions
+   * @param versionCounter
+   * @param columns
+   * @param deletes
+   * @param now
+   * @param ttl
+   * @param keyvalues
+   * @param set
+   * @return True if enough versions.
+   */
+  static boolean doKeyValue(final KeyValue kv,
+      final int versions,
+      final Map<KeyValue, Counter> versionCounter,
+      final Set<byte []> columns,
+      final NavigableSet<KeyValue> deletes,
+      final long now, 
+      final long ttl,
+      final List<KeyValue> keyvalues,
+      final SortedSet<KeyValue> set) {
+    boolean hasEnough = false;
+    if (kv.isDeleteType()) {
+      if (!deletes.contains(kv)) {
+        deletes.add(kv);
+      }
+    } else if (!deletes.contains(kv)) {
+      // Skip expired cells
+      if (!isExpired(kv, ttl, now)) {
+        if (HRegion.okToAddResult(kv, versions, versionCounter)) {
+          HRegion.addResult(kv, versionCounter, keyvalues);
+          if (HRegion.hasEnoughVersions(versions, versionCounter, columns)) {
+            hasEnough = true;
+          }
+        }
+      } else {
+        // Remove the expired.
+        Store.expiredOrDeleted(set, kv);
+      }
+    }
+    return hasEnough;
+  }
+
+  /*
+   * Used when doing get.
+   * @param kv
+   * @param versions
+   * @param deletes
+   * @param now
+   * @param ttl
+   * @param keyvalues
+   * @param set
+   * @return True if enough versions.
+   */
+  static boolean doKeyValue(final KeyValue kv, final int versions,
+      final NavigableSet<KeyValue> deletes,
+      final long now,  final long ttl,
+      final List<KeyValue> keyvalues, final SortedSet<KeyValue> set) {
+    boolean hasEnough = false;
+    if (!kv.isDeleteType()) {
+      // Filter out expired results
+      if (notExpiredAndNotInDeletes(ttl, kv, now, deletes)) {
+        if (!keyvalues.contains(kv)) {
+          keyvalues.add(kv);
+          if (hasEnoughVersions(versions, keyvalues)) {
+            hasEnough = true;
+          }
+        }
+      } else {
+        if (set != null) {
+          expiredOrDeleted(set, kv);
+        }
+      }
+    } else {
+      // Cell holds a delete value.
+      deletes.add(kv);
+    }
+    return hasEnough;
+  }
+
+  /*
+   * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
+   * has an empty column, then it just tests row equivalence. Otherwise, it uses
+   * HStoreKey.matchesRowCol().
+   * @param c Comparator to use.
+   * @param origin Key we're testing against
+   * @param target Key we're testing
+   */
+  static boolean matchingRowColumn(final KeyValue.KVComparator c,
+      final KeyValue origin, final KeyValue target) {
+    return origin.isEmptyColumn()? c.matchingRows(target, origin):
+      c.matchingRowColumn(target, origin);
   }
 
   static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
@@ -1047,12 +1411,13 @@
    */
   static boolean notExpiredAndNotInDeletes(final long ttl,
       final KeyValue key, final long now, final Set<KeyValue> deletes) {
-    return !isExpired(key, now-ttl) && (deletes == null || deletes.isEmpty() ||
+    return !isExpired(key, ttl, now) && (deletes == null || deletes.isEmpty() ||
         !deletes.contains(key));
   }
 
-  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
-    return key.getTimestamp() < oldestTimestamp;
+  static boolean isExpired(final KeyValue key, final long ttl,
+      final long now) {
+    return ttl != HConstants.FOREVER && now > key.getTimestamp() + ttl;
   }
 
   /* Find a candidate for row that is at or before passed key, searchkey, in hfile.
@@ -1328,12 +1693,13 @@
   /**
    * Return a scanner for both the memcache and the HStore files
    */
-  protected KeyValueScanner getScanner(Scan scan,
-      final NavigableSet<byte []> targetCols)
+  protected InternalScanner getScanner(long timestamp,
+      final NavigableSet<byte []> targetCols,
+      byte [] firstRow, RowFilterInterface filter)
   throws IOException {
     lock.readLock().lock();
     try {
-      return new StoreScanner(this, scan, targetCols);
+      return new StoreScanner(this, targetCols, firstRow, timestamp, filter);
     } finally {
       lock.readLock().unlock();
     }
@@ -1353,8 +1719,10 @@
 
   /**
    * @return The size of the store file indexes, in bytes.
+   * @throws IOException if there was a problem getting file sizes from the
+   * filesystem
    */
-  long getStorefilesIndexSize() {
+  long getStorefilesIndexSize() throws IOException {
     long size = 0;
     for (StoreFile s: storefiles.values())
       size += s.getReader().indexSize();
@@ -1437,114 +1805,4 @@
     }
     return false;
   }
-  
-  //
-  // HBASE-880/1249/1304
-  //
-  
-  /**
-   * Retrieve results from this store given the specified Get parameters.
-   * @param get Get operation
-   * @param columns List of columns to match, can be empty (not null)
-   * @param result List to add results to 
-   * @throws IOException
-   */
-  public void get(Get get, NavigableSet<byte[]> columns, List<KeyValue> result) 
-  throws IOException {
-    KeyComparator keyComparator = this.comparator.getRawComparator();
-
-    // Column matching and version enforcement
-    QueryMatcher matcher = new QueryMatcher(get, get.getRow(), 
-        this.family.getName(), columns, this.ttl, keyComparator,
-        versionsToReturn(get.getMaxVersions()));
-    
-    // Read from Memcache
-    if(this.memcache.get(matcher, result)) {
-      // Received early-out from memcache
-      return;
-    }
-    
-    // Check if we even have storefiles
-    if(this.storefiles.isEmpty()) {
-      return;
-    }
-    
-    // Get storefiles for this store
-    List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
-    for(StoreFile sf : this.storefiles.descendingMap().values()) {
-      storefileScanners.add(sf.getReader().getScanner());
-    }
-    
-    // StoreFileGetScan will handle reading this store's storefiles
-    StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
-    
-    // Run a GET scan and put results into the specified list 
-    scanner.get(result);
-  }
-  
-  /**
-   * Increments the value for the given row/family/qualifier
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param amount
-   * @return The new value.
-   * @throws IOException
-   */
-  public long incrementColumnValue(byte [] row, byte [] family,
-      byte [] qualifier, long amount) throws IOException{
-    long value = 0;
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    KeyComparator keyComparator = this.comparator.getRawComparator();
-
-    // Setting up the QueryMatcher
-    Get get = new Get(row);
-    NavigableSet<byte[]> qualifiers = 
-      new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    qualifiers.add(qualifier);
-    QueryMatcher matcher = new QueryMatcher(get, row, family, qualifiers,
-        this.ttl, keyComparator, 1);
-    
-    // Read from Memcache
-    if(this.memcache.get(matcher, result)) {
-      // Received early-out from memcache
-      KeyValue kv = result.get(0);
-      byte [] buffer = kv.getBuffer();
-      int valueOffset = kv.getValueOffset();
-      value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
-      Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0, 
-          Bytes.SIZEOF_LONG);
-      return value;
-    }
-    
-    // Check if we even have storefiles
-    if(this.storefiles.isEmpty()) {
-      return addNewKeyValue(row, family, qualifier, value, amount);
-    }
-    
-    // Get storefiles for this store
-    List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
-    for(StoreFile sf : this.storefiles.descendingMap().values()) {
-      storefileScanners.add(sf.getReader().getScanner());
-    }
-    
-    // StoreFileGetScan will handle reading this store's storefiles
-    StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
-    
-    // Run a GET scan and put results into the specified list 
-    scanner.get(result);
-    if(result.size() > 0) {
-      value = Bytes.toLong(result.get(0).getValue());
-    }
-    return addNewKeyValue(row, family, qualifier, value, amount);
-  }
-  
-  private long addNewKeyValue(byte [] row, byte [] family, byte [] qualifier, 
-      long value, long amount) {
-    long newValue = value + amount;
-    KeyValue newKv = new KeyValue(row, family, qualifier, Bytes.toBytes(newValue));
-    add(newKv);
-    return newValue;
-  }
-  
 }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sun Jun 14 21:34:13 2009
@@ -19,28 +19,31 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.HalfHFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import org.apache.hadoop.hbase.util.Hash;
+import org.apache.hadoop.io.RawComparator;
 
 /**
  * A Store data file.  Stores usually have one or more of these files.  They
@@ -55,7 +58,7 @@
 public class StoreFile implements HConstants {
   static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
 
-  private static final String HFILE_CACHE_SIZE_KEY = "hfile.block.cache.size";
+  public static final String HFILE_CACHE_SIZE_KEY = "hfile.block.cache.size";
 
   private static BlockCache hfileBlockCache = null;
   
@@ -70,9 +73,7 @@
   private Reference reference;
   // If this StoreFile references another, this is the other files path.
   private Path referencePath;
-  // Should the block cache be used or not.
-  private boolean blockcache;
-  
+
   // Keys for metadata stored in backing HFile.
   private static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
   // Set when we obtain a Reader.
@@ -83,7 +84,7 @@
   // If true, this file was product of a major compaction.  Its then set
   // whenever you get a Reader.
   private AtomicBoolean majorCompaction = null;
-  
+
   /*
    * Regex that will work for straight filenames and for reference names.
    * If reference, then the regex has more than just one group.  Group 1 is
@@ -99,27 +100,23 @@
   private final HBaseConfiguration conf;
 
   /**
-   * Constructor, loads a reader and it's indices, etc. May allocate a 
-   * substantial amount of ram depending on the underlying files (10-20MB?).
-   * 
-   * @param fs  The current file system to use.
-   * @param p  The path of the file.
-   * @param blockcache  <code>true</code> if the block cache is enabled.
-   * @param conf  The current configuration.
-   * @throws IOException When opening the reader fails.
+   * Constructor, loads a reader and it's indices, etc. May allocate a substantial
+   * amount of ram depending on the underlying files (10-20MB?).
+   * @param fs
+   * @param p
+   * @param conf
+   * @throws IOException
    */
-  StoreFile(final FileSystem fs, final Path p, final boolean blockcache, 
-      final HBaseConfiguration conf) 
-  throws IOException {
+  StoreFile(final FileSystem fs, final Path p, final HBaseConfiguration conf) throws IOException {
     this.conf = conf;
     this.fs = fs;
     this.path = p;
-    this.blockcache = blockcache;
     if (isReference(p)) {
       this.reference = Reference.read(fs, p);
       this.referencePath = getReferredToFile(this.path);
     }
     this.reader = open();
+
   }
 
   /**
@@ -211,12 +208,6 @@
     return this.sequenceid;
   }
 
-  /**
-   * Returns the block cache or <code>null</code> in case none should be used.
-   * 
-   * @param conf  The current configuration.
-   * @return The block cache or <code>null</code>.
-   */
   public static synchronized BlockCache getBlockCache(HBaseConfiguration conf) {
     if (hfileBlockCache != null)
       return hfileBlockCache;
@@ -230,11 +221,8 @@
     return hfileBlockCache;
   }
 
-  /**
-   * @return the blockcache
-   */
   public BlockCache getBlockCache() {
-    return blockcache ? getBlockCache(conf) : null;
+    return getBlockCache(conf);
   }
 
   /**
@@ -249,8 +237,8 @@
       throw new IllegalAccessError("Already open");
     }
     if (isReference()) {
-      this.reader = new HalfHFileReader(this.fs, this.referencePath, 
-          getBlockCache(), this.reference);
+      this.reader = new HalfHFileReader(this.fs, this.referencePath, getBlockCache(),
+        this.reference);
     } else {
       this.reader = new StoreFileReader(this.fs, this.path, getBlockCache());
     }
@@ -281,23 +269,13 @@
         this.majorCompaction.set(mc);
       }
     }
-
-    // TODO read in bloom filter here, ignore if the column family config says
-    // "no bloom filter" even if there is one in the hfile.
     return this.reader;
   }
-
+  
   /**
    * Override to add some customization on HFile.Reader
    */
   static class StoreFileReader extends HFile.Reader {
-    /**
-     * 
-     * @param fs
-     * @param path
-     * @param cache
-     * @throws IOException
-     */
     public StoreFileReader(FileSystem fs, Path path, BlockCache cache)
         throws IOException {
       super(fs, path, cache);
@@ -318,14 +296,6 @@
    * Override to add some customization on HalfHFileReader.
    */
   static class HalfStoreFileReader extends HalfHFileReader {
-    /**
-     * 
-     * @param fs
-     * @param p
-     * @param c
-     * @param r
-     * @throws IOException
-     */
     public HalfStoreFileReader(FileSystem fs, Path p, BlockCache c, Reference r)
         throws IOException {
       super(fs, p, c, r);
@@ -414,7 +384,7 @@
    */
   public static HFile.Writer getWriter(final FileSystem fs, final Path dir)
   throws IOException {
-    return getWriter(fs, dir, DEFAULT_BLOCKSIZE_SMALL, null, null);
+    return getWriter(fs, dir, DEFAULT_BLOCKSIZE_SMALL, null, null, false);
   }
 
   /**
@@ -427,12 +397,13 @@
    * @param blocksize
    * @param algorithm Pass null to get default.
    * @param c Pass null to get default.
+   * @param filter BloomFilter
    * @return HFile.Writer
    * @throws IOException
    */
   public static HFile.Writer getWriter(final FileSystem fs, final Path dir,
-                                       final int blocksize, final Compression.Algorithm algorithm,
-                                       final KeyValue.KeyComparator c)
+    final int blocksize, final Compression.Algorithm algorithm,
+    final KeyValue.KeyComparator c, final boolean filter)
   throws IOException {
     if (!fs.exists(dir)) {
       fs.mkdirs(dir);
@@ -440,7 +411,7 @@
     Path path = getUniqueFile(fs, dir);
     return new HFile.Writer(fs, path, blocksize,
       algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
-      c == null? KeyValue.KEY_COMPARATOR: c);
+      c == null? KeyValue.KEY_COMPARATOR: c, filter);
   }
 
   /**
@@ -474,6 +445,7 @@
    * @param dir
    * @param suffix
    * @return Path to a file that doesn't exist at time of this invocation.
+   * @return
    * @throws IOException
    */
   static Path getRandomFilename(final FileSystem fs, final Path dir,
@@ -493,8 +465,8 @@
    * Write file metadata.
    * Call before you call close on the passed <code>w</code> since its written
    * as metadata to that file.
-   * 
-   * @param w hfile writer
+   *
+   * @param w
    * @param maxSequenceId Maximum sequence id.
    * @throws IOException
    */

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sun Jun 14 21:34:13 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2008 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,81 +21,306 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 
 /**
- * A KeyValue scanner that iterates over a single HFile
+ * A scanner that iterates through HStore files
  */
-class StoreFileScanner implements KeyValueScanner {
+class StoreFileScanner extends HAbstractScanner
+implements ChangedReadersObserver {
+    // Keys retrieved from the sources
+  private volatile KeyValue keys[];
   
-  private HFileScanner hfs;
-  private KeyValue cur = null;
+  // Readers we go against.
+  private volatile HFileScanner [] scanners;
   
+  // Store this scanner came out of.
+  private final Store store;
+  
+  // Used around replacement of Readers if they change while we're scanning.
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private final long now = System.currentTimeMillis();
+
   /**
-   * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
-   * @param hfs HFile scanner
+   * @param store
+   * @param timestamp
+   * @param columns
+   * @param firstRow
+   * @param deletes Set of running deletes
+   * @throws IOException
    */
-  public StoreFileScanner(HFileScanner hfs) {
-    this.hfs = hfs;
-  }
-  
-  public KeyValue peek() {
-    return cur;
-  }
-  
-  public KeyValue next() {
-    KeyValue retKey = cur;
-    cur = hfs.getKeyValue();
+  public StoreFileScanner(final Store store, final long timestamp,
+    final NavigableSet<byte []> columns, final byte [] firstRow)
+  throws IOException {
+    super(timestamp, columns);
+    this.store = store;
+    this.store.addChangedReaderObserver(this);
     try {
-      hfs.next();
-    } catch(IOException e) {
-      // Only occurs if the scanner is not seeked, this is never the case
-      // as we seek immediately after construction in StoreScanner
+      openScanner(firstRow);
+    } catch (Exception ex) {
+      close();
+      IOException e = new IOException("HStoreScanner failed construction");
+      e.initCause(ex);
+      throw e;
     }
-    return retKey;
   }
-  
-  public boolean seek(KeyValue key) {
-    try {
-      if(!seekAtOrAfter(hfs, key)) {
-        close();
-        return false;
+
+  /*
+   * Go open new scanners and cue them at <code>firstRow</code>.
+   * Closes existing Readers if any.
+   * @param firstRow
+   * @throws IOException
+   */
+  private void openScanner(final byte [] firstRow) throws IOException {
+    List<HFileScanner> s =
+      new ArrayList<HFileScanner>(this.store.getStorefiles().size());
+    Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
+    for (StoreFile f: map.values()) {
+       s.add(f.getReader().getScanner());
+    }
+    this.scanners = s.toArray(new HFileScanner [] {});
+    this.keys = new KeyValue[this.scanners.length];
+    // Advance the readers to the first pos.
+    KeyValue firstKey = (firstRow != null && firstRow.length > 0)?
+      new KeyValue(firstRow, HConstants.LATEST_TIMESTAMP): null;
+    for (int i = 0; i < this.scanners.length; i++) {
+      if (firstKey != null) {
+        if (seekTo(i, firstKey)) {
+          continue;
+        }
+      }
+      while (getNext(i)) {
+        if (columnMatch(i)) {
+          break;
+        }
       }
-      cur = hfs.getKeyValue();
-      hfs.next();
-      return true;
-    } catch(IOException ioe) {
-      close();
-      return false;
     }
   }
-  
-  public void close() {
-    // Nothing to close on HFileScanner?
-    cur = null;
+
+  /**
+   * For a particular column i, find all the matchers defined for the column.
+   * Compare the column family and column key using the matchers. The first one
+   * that matches returns true. If no matchers are successful, return false.
+   * 
+   * @param i index into the keys array
+   * @return true if any of the matchers for the column match the column family
+   * and the column key.
+   * @throws IOException
+   */
+  boolean columnMatch(int i) throws IOException {
+    return columnMatch(keys[i]);
   }
-  
+
   /**
+   * Get the next set of values for this scanner.
    * 
-   * @param s
-   * @param k
-   * @return
+   * @param key The key that matched
+   * @param results All the results for <code>key</code>
+   * @return true if a match was found
    * @throws IOException
+   * 
+   * @see org.apache.hadoop.hbase.regionserver.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
    */
-  public static boolean seekAtOrAfter(HFileScanner s, KeyValue k)
+  @Override
+  public boolean next(List<KeyValue> results)
   throws IOException {
-    int result = s.seekTo(k.getBuffer(), k.getKeyOffset(), k.getKeyLength());
-    if(result < 0) {
-      // Passed KV is smaller than first KV in file, work from start of file
-      return s.seekTo();
-    } else if(result > 0) {
-      // Passed KV is larger than current KV in file, if there is a next
-      // it is the "after", if not then this scanner is done.
-      return s.next();
+    if (this.scannerClosed) {
+      return false;
+    }
+    this.lock.readLock().lock();
+    try {
+      // Find the next viable row label (and timestamp).
+      KeyValue viable = getNextViableRow();
+      if (viable == null) {
+        return false;
+      }
+
+      // Grab all the values that match this row/timestamp
+      boolean addedItem = false;
+      for (int i = 0; i < keys.length; i++) {
+        // Fetch the data
+        while ((keys[i] != null) &&
+            (this.store.comparator.compareRows(this.keys[i], viable) == 0)) {
+          // If we are doing a wild card match or there are multiple matchers
+          // per column, we need to scan all the older versions of this row
+          // to pick up the rest of the family members
+          if(!isWildcardScanner()
+              && !isMultipleMatchScanner()
+              && (keys[i].getTimestamp() != viable.getTimestamp())) {
+            break;
+          }
+          if (columnMatch(i)) {
+            // We only want the first result for any specific family member
+            // TODO: Do we have to keep a running list of column entries in
+            // the results across all of the StoreScanner?  Like we do
+            // doing getFull?
+            if (!results.contains(keys[i])) {
+              results.add(keys[i]);
+              addedItem = true;
+            }
+          }
+
+          if (!getNext(i)) {
+            closeSubScanner(i);
+          }
+        }
+        // Advance the current scanner beyond the chosen row, to
+        // a valid timestamp, so we're ready next time.
+        while ((keys[i] != null) &&
+            ((this.store.comparator.compareRows(this.keys[i], viable) <= 0) ||
+                (keys[i].getTimestamp() > this.timestamp) ||
+                !columnMatch(i))) {
+          getNext(i);
+        }
+      }
+      return addedItem;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /*
+   * @return An instance of <code>ViableRow</code>
+   * @throws IOException
+   */
+  private KeyValue getNextViableRow() throws IOException {
+    // Find the next viable row label (and timestamp).
+    KeyValue viable = null;
+    long viableTimestamp = -1;
+    long ttl = store.ttl;
+    for (int i = 0; i < keys.length; i++) {
+      // The first key that we find that matches may have a timestamp greater
+      // than the one we're looking for. We have to advance to see if there
+      // is an older version present, since timestamps are sorted descending
+      while (keys[i] != null &&
+          keys[i].getTimestamp() > this.timestamp &&
+          columnMatch(i) &&
+          getNext(i)) {
+        if (columnMatch(i)) {
+          break;
+        }
+      }
+      if((keys[i] != null)
+          // If we get here and keys[i] is not null, we already know that the
+          // column matches and the timestamp of the row is less than or equal
+          // to this.timestamp, so we do not need to test that here
+          && ((viable == null) ||
+            (this.store.comparator.compareRows(this.keys[i], viable) < 0) ||
+            ((this.store.comparator.compareRows(this.keys[i], viable) == 0) &&
+              (keys[i].getTimestamp() > viableTimestamp)))) {
+        if (ttl == HConstants.FOREVER || now < keys[i].getTimestamp() + ttl) {
+          viable = keys[i];
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("getNextViableRow :" + keys[i] + ": expired, skipped");
+          }
+        }
+      }
+    }
+    return viable;
+  }
+
+  /*
+   * The user didn't want to start scanning at the first row. This method
+   * seeks to the requested row.
+   *
+   * @param i which iterator to advance
+   * @param firstRow seek to this row
+   * @return true if we found the first row and so the scanner is properly
+   * primed or true if the row was not found and this scanner is exhausted.
+   */
+  private boolean seekTo(int i, final KeyValue firstKey)
+  throws IOException {
+    if (firstKey == null) {
+      if (!this.scanners[i].seekTo()) {
+        closeSubScanner(i);
+        return true;
+      }
+    } else {
+      // TODO: sort columns and pass in column as part of key so we get closer.
+      if (!Store.getClosest(this.scanners[i], firstKey)) {
+        closeSubScanner(i);
+        return true;
+      }
+    }
+    this.keys[i] = this.scanners[i].getKeyValue();
+    return isGoodKey(this.keys[i]);
+  }
+
+  /**
+   * Get the next value from the specified reader.
+   * 
+   * @param i which reader to fetch next value from
+   * @return true if there is more data available
+   */
+  private boolean getNext(int i) throws IOException {
+    boolean result = false;
+    while (true) {
+      if ((this.scanners[i].isSeeked() && !this.scanners[i].next()) ||
+          (!this.scanners[i].isSeeked() && !this.scanners[i].seekTo())) {
+        closeSubScanner(i);
+        break;
+      }
+      this.keys[i] = this.scanners[i].getKeyValue();
+      if (isGoodKey(this.keys[i])) {
+          result = true;
+          break;
+      }
+    }
+    return result;
+  }
+
+  /*
+   * @param kv
+   * @return True if good key candidate.
+   */
+  private boolean isGoodKey(final KeyValue kv) {
+    return !Store.isExpired(kv, this.store.ttl, this.now);
+  }
+
+  /** Close down the indicated reader. */
+  private void closeSubScanner(int i) {
+    this.scanners[i] = null;
+    this.keys[i] = null;
+  }
+
+  /** Shut it down! */
+  public void close() {
+    if (!this.scannerClosed) {
+      this.store.deleteChangedReaderObserver(this);
+      try {
+        for(int i = 0; i < this.scanners.length; i++) {
+          closeSubScanner(i);
+        }
+      } finally {
+        this.scannerClosed = true;
+      }
+    }
+  }
+
+  // Implementation of ChangedReadersObserver
+  
+  public void updateReaders() throws IOException {
+    this.lock.writeLock().lock();
+    try {
+      // The keys are currently lined up at the next row to fetch.  Pass in
+      // the current row as 'first' row and readers will be opened and cue'd
+      // up so future call to next will start here.
+      KeyValue viable = getNextViableRow();
+      openScanner(viable.getRow());
+      LOG.debug("Replaced Scanner Readers at row " +
+        viable.getRow().toString());
+    } finally {
+      this.lock.writeLock().unlock();
     }
-    // Seeked to the exact key
-    return true;
   }
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun Jun 14 21:34:13 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2008 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -25,238 +25,288 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Scanner scans both the memcache and the HStore. Coaleace KeyValue stream
- * into List<KeyValue> for a single row.
+ * Scanner scans both the memcache and the HStore
  */
-class StoreScanner implements KeyValueScanner, InternalScanner,
-ChangedReadersObserver {
+class StoreScanner implements InternalScanner,  ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
 
+  private InternalScanner [] scanners;
+  private List<KeyValue> [] resultSets;
+  private boolean wildcardMatch = false;
+  private boolean multipleMatchers = false;
+  private RowFilterInterface dataFilter;
   private Store store;
-
-  private ScanQueryMatcher matcher;
-
-  private KeyValueHeap heap;
-
+  private final long timestamp;
+  private final NavigableSet<byte []> columns;
+  
+  // Indices for memcache scanner and hstorefile scanner.
+  private static final int MEMS_INDEX = 0;
+  private static final int HSFS_INDEX = MEMS_INDEX + 1;
+  
   // Used around transition from no storefile to the first.
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   // Used to indicate that the scanner has closed (see HBASE-1107)
   private final AtomicBoolean closing = new AtomicBoolean(false);
 
-  /**
-   * Opens a scanner across memcache, snapshot, and all StoreFiles.
-   */
-  StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
+  /** Create an Scanner with a handle on the memcache and HStore files. */
+  @SuppressWarnings("unchecked")
+  StoreScanner(Store store, final NavigableSet<byte []> targetCols,
+    byte [] firstRow, long timestamp, RowFilterInterface filter) 
+  throws IOException {
     this.store = store;
-    matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
-        columns, store.ttl, store.comparator.getRawComparator(),
-        store.versionsToReturn(scan.getMaxVersions()));
-
-    List<KeyValueScanner> scanners = getStoreFileScanners();
-    scanners.add(store.memcache.getScanner());
-
-    // Seek all scanners to the initial key
-    for(KeyValueScanner scanner : scanners) {
-      scanner.seek(matcher.getStartKey());
+    this.dataFilter = filter;
+    if (null != dataFilter) {
+      dataFilter.reset();
+    }
+    this.scanners = new InternalScanner[2];
+    this.resultSets = new List[scanners.length];
+    // Save these args in case we need them later handling change in readers
+    // See updateReaders below.
+    this.timestamp = timestamp;
+    this.columns = targetCols;
+    try {
+      scanners[MEMS_INDEX] =
+        store.memcache.getScanner(timestamp, targetCols, firstRow);
+      scanners[HSFS_INDEX] =
+        new StoreFileScanner(store, timestamp, targetCols, firstRow);
+      for (int i = MEMS_INDEX; i < scanners.length; i++) {
+        checkScannerFlags(i);
+      }
+    } catch (IOException e) {
+      doClose();
+      throw e;
+    }
+    
+    // Advance to the first key in each scanner.
+    // All results will match the required column-set and scanTime.
+    for (int i = MEMS_INDEX; i < scanners.length; i++) {
+      setupScanner(i);
     }
-
-    // Combine all seeked scanners with a heap
-    heap = new KeyValueHeap(
-        scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);
-
     this.store.addChangedReaderObserver(this);
   }
-
-  // Constructor for testing.
-  StoreScanner(Scan scan, byte [] colFamily,
-      long ttl, KeyValue.KVComparator comparator,
-      final NavigableSet<byte[]> columns,
-      KeyValueScanner [] scanners) {
-    this.store = null;
-    this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, 
-        comparator.getRawComparator(), scan.getMaxVersions());
-
-    // Seek all scanners to the initial key
-    for(KeyValueScanner scanner : scanners) {
-      scanner.seek(matcher.getStartKey());
+  
+  /*
+   * @param i Index.
+   */
+  private void checkScannerFlags(final int i) {
+    if (this.scanners[i].isWildcardScanner()) {
+      this.wildcardMatch = true;
+    }
+    if (this.scanners[i].isMultipleMatchScanner()) {
+      this.multipleMatchers = true;
     }
-
-    heap = new KeyValueHeap(
-        scanners, comparator);
-  }
-
-  public KeyValue peek() {
-    return this.heap.peek();
   }
-
-  public KeyValue next() {
-    // throw runtime exception perhaps?
-    throw new RuntimeException("Never call StoreScanner.next()");
+  
+  /*
+   * Do scanner setup.
+   * @param i
+   * @throws IOException
+   */
+  private void setupScanner(final int i) throws IOException {
+    this.resultSets[i] = new ArrayList<KeyValue>();
+    if (this.scanners[i] != null && !this.scanners[i].next(this.resultSets[i])) {
+      closeScanner(i);
+    }
   }
 
-  public void close() {
-    this.closing.set(true);
-    // under test, we dont have a this.store
-    if (this.store != null)
-      this.store.deleteChangedReaderObserver(this);
-    this.heap.close();
+  /** @return true if the scanner is a wild card scanner */
+  public boolean isWildcardScanner() {
+    return this.wildcardMatch;
   }
 
-  public boolean seek(KeyValue key) {
-
-    return this.heap.seek(key);
+  /** @return true if the scanner is a multiple match scanner */
+  public boolean isMultipleMatchScanner() {
+    return this.multipleMatchers;
   }
 
-  /**
-   * Get the next row of values from this Store.
-   * @param result
-   * @return true if there are more rows, false if scanner is done
-   */
-  public boolean next(List<KeyValue> result) throws IOException {
-    // this wont get us the next row if the previous round hasn't iterated
-    // past all the cols from the previous row. Potential bug!
-    KeyValue peeked = this.heap.peek();
-    if (peeked == null) {
-      close();
-      return false;
-    }
-    matcher.setRow(peeked.getRow());
-    KeyValue kv;
-    while((kv = this.heap.peek()) != null) {
-      QueryMatcher.MatchCode mc = matcher.match(kv);
-      switch(mc) {
-        case INCLUDE:
-          KeyValue next = this.heap.next();
-          result.add(next);
+  public boolean next(List<KeyValue> results)
+  throws IOException {
+    this.lock.readLock().lock();
+    try {
+    // Filtered flag is set by filters.  If a cell has been 'filtered out'
+    // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
+    boolean filtered = true;
+    boolean moreToFollow = true;
+    while (filtered && moreToFollow) {
+      // Find the lowest-possible key.
+      KeyValue chosen = null;
+      long chosenTimestamp = -1;
+      for (int i = 0; i < this.scanners.length; i++) {
+        KeyValue kv = this.resultSets[i] == null || this.resultSets[i].isEmpty()?
+          null: this.resultSets[i].get(0);
+        if (kv == null) {
           continue;
-        case DONE:
-          // what happens if we have 0 results?
-          if (result.isEmpty()) {
-            // try the next one.
-            matcher.setRow(this.heap.peek().getRow());
-            continue;
-          }
-          if (matcher.filterEntireRow()) {
-            // wow, well, um, reset the result and continue.
-            result.clear();
-            matcher.setRow(heap.peek().getRow());
-            continue;
-          }
-
-          return true;
+        }
+        if (scanners[i] != null &&
+            (chosen == null ||
+              (this.store.comparator.compareRows(kv, chosen) < 0) ||
+              ((this.store.comparator.compareRows(kv, chosen) == 0) &&
+              (kv.getTimestamp() > chosenTimestamp)))) {
+          chosen = kv;
+          chosenTimestamp = chosen.getTimestamp();
+        }
+      }
 
-        case DONE_SCAN:
-          close();
-          return false;
-
-        case SEEK_NEXT_ROW:
-          // TODO see comments in SEEK_NEXT_COL
-          /*
-          KeyValue rowToSeek =
-              new KeyValue(kv.getRow(),
-                  0,
-                  KeyValue.Type.Minimum);
-          heap.seek(rowToSeek);
-           */
-          heap.next();
-          break;
-
-        case SEEK_NEXT_COL:
-          // TODO hfile needs 'hinted' seeking to prevent it from
-          // reseeking from the start of the block on every dang seek.
-          // We need that API and expose it the scanner chain.
-          /*
-          ColumnCount hint = matcher.getSeekColumn();
-          KeyValue colToSeek;
-          if (hint == null) {
-            // seek to the 'last' key on this column, this is defined
-            // as the key with the same row, fam, qualifier,
-            // smallest timestamp, largest type.
-            colToSeek =
-                new KeyValue(kv.getRow(),
-                    kv.getFamily(),
-                    kv.getColumn(),
-                    Long.MIN_VALUE,
-                    KeyValue.Type.Minimum);
-          } else {
-            // This is ugmo.  Move into KeyValue convience method.
-            // First key on a column is:
-            // same row, cf, qualifier, max_timestamp, max_type, no value.
-            colToSeek =
-                new KeyValue(kv.getRow(),
-                    0,
-                    kv.getRow().length,
-
-                    kv.getFamily(),
-                    0,
-                    kv.getFamily().length,
-
-                    hint.getBuffer(),
-                    hint.getOffset(),
-                    hint.getLength(),
-
-                    Long.MAX_VALUE,
-                    KeyValue.Type.Maximum,
-                    null,
-                    0,
-                    0);
+      // Filter whole row by row key?
+      filtered = dataFilter == null || chosen == null? false:
+        dataFilter.filterRowKey(chosen.getBuffer(), chosen.getRowOffset(),
+          chosen.getRowLength());
+
+      // Store results for each sub-scanner.
+      if (chosenTimestamp >= 0 && !filtered) {
+        NavigableSet<KeyValue> deletes =
+          new TreeSet<KeyValue>(this.store.comparatorIgnoringType);
+        for (int i = 0; i < scanners.length && !filtered; i++) {
+          if ((scanners[i] != null && !filtered && moreToFollow &&
+              this.resultSets[i] != null && !this.resultSets[i].isEmpty())) {
+            // Test this resultset is for the 'chosen' row.
+            KeyValue firstkv = resultSets[i].get(0);
+            if (!this.store.comparator.matchingRows(firstkv, chosen)) {
+              continue;
+            }
+            // Its for the 'chosen' row, work it.
+            for (KeyValue kv: resultSets[i]) {
+              if (kv.isDeleteType()) {
+                deletes.add(kv);
+              } else if ((deletes.isEmpty() || !deletes.contains(kv)) &&
+                  !filtered && moreToFollow && !results.contains(kv)) {
+                if (this.dataFilter != null) {
+                  // Filter whole row by column data?
+                  int rowlength = kv.getRowLength();
+                  int columnoffset = kv.getColumnOffset(rowlength);
+                  filtered = dataFilter.filterColumn(kv.getBuffer(),
+                      kv.getRowOffset(), rowlength,
+                    kv.getBuffer(), columnoffset, kv.getColumnLength(columnoffset),
+                    kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+                  if (filtered) {
+                    results.clear();
+                    break;
+                  }
+                }
+                results.add(kv);
+                /* REMOVING BECAUSE COULD BE BUNCH OF DELETES IN RESULTS
+                   AND WE WANT TO INCLUDE THEM -- below short-circuit is
+                   probably not wanted.
+                // If we are doing a wild card match or there are multiple
+                // matchers per column, we need to scan all the older versions of 
+                // this row to pick up the rest of the family members
+                if (!wildcardMatch && !multipleMatchers &&
+                    (kv.getTimestamp() != chosenTimestamp)) {
+                  break;
+                }
+                */
+              }
+            }
+            // Move on to next row.
+            resultSets[i].clear();
+            if (!scanners[i].next(resultSets[i])) {
+              closeScanner(i);
+            }
           }
-          heap.seek(colToSeek);
-           */
+        }
+      }
 
-          heap.next();
-          break;
+      moreToFollow = chosenTimestamp >= 0;
+      if (dataFilter != null) {
+        if (dataFilter.filterAllRemaining()) {
+          moreToFollow = false;
+        }
+      }
 
-        case SKIP:
-          this.heap.next();
-          break;
+      if (results.isEmpty() && !filtered) {
+        // There were no results found for this row.  Marked it as 
+        // 'filtered'-out otherwise we will not move on to the next row.
+        filtered = true;
+      }
+    }
+    
+    // If we got no results, then there is no more to follow.
+    if (results == null || results.isEmpty()) {
+      moreToFollow = false;
+    }
+    
+    // Make sure scanners closed if no more results
+    if (!moreToFollow) {
+      for (int i = 0; i < scanners.length; i++) {
+        if (null != scanners[i]) {
+          closeScanner(i);
+        }
       }
     }
-    if(result.size() > 0) {
-      return true;
+    
+    return moreToFollow;
+    } finally {
+      this.lock.readLock().unlock();
     }
-    // No more keys
-    close();
-    return false;
   }
 
-  private List<KeyValueScanner> getStoreFileScanners() {
-    List<HFileScanner> s =
-      new ArrayList<HFileScanner>(this.store.getStorefilesCount());
-    Map<Long, StoreFile> map = this.store.getStorefiles().descendingMap();
-    for(StoreFile sf : map.values()) {
-      s.add(sf.getReader().getScanner());
-    }
-    List<KeyValueScanner> scanners =
-      new ArrayList<KeyValueScanner>(s.size()+1);
-    for(HFileScanner hfs : s) {
-      scanners.add(new StoreFileScanner(hfs));
+  /** Shut down a single scanner */
+  void closeScanner(int i) {
+    try {
+      try {
+        scanners[i].close();
+      } catch (IOException e) {
+        LOG.warn(Bytes.toString(store.storeName) + " failed closing scanner " +
+          i, e);
+      }
+    } finally {
+      scanners[i] = null;
+      resultSets[i] = null;
     }
-    return scanners;
   }
 
+  public void close() {
+    this.closing.set(true);
+    this.store.deleteChangedReaderObserver(this);
+    doClose();
+  }
+  
+  private void doClose() {
+    for (int i = MEMS_INDEX; i < scanners.length; i++) {
+      if (scanners[i] != null) {
+        closeScanner(i);
+      }
+    }
+  }
+  
   // Implementation of ChangedReadersObserver
+  
   public void updateReaders() throws IOException {
     if (this.closing.get()) {
       return;
     }
     this.lock.writeLock().lock();
     try {
-      // Could do this pretty nicely with KeyValueHeap, but the existing
-      // implementation of this method only updated if no existing storefiles?
-      // Lets discuss.
-      return;
+      Map<Long, StoreFile> map = this.store.getStorefiles();
+      if (this.scanners[HSFS_INDEX] == null && map != null && map.size() > 0) {
+        // Presume that we went from no readers to at least one -- need to put
+        // a HStoreScanner in place.
+        try {
+          // I think its safe getting key from mem at this stage -- it shouldn't have
+          // been flushed yet
+          // TODO: MAKE SURE WE UPDATE FROM TRUNNK.
+          this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
+              this.timestamp, this. columns, this.resultSets[MEMS_INDEX].get(0).getRow());
+          checkScannerFlags(HSFS_INDEX);
+          setupScanner(HSFS_INDEX);
+          LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");
+        } catch (IOException e) {
+          doClose();
+          throw e;
+        }
+      }
     } finally {
       this.lock.writeLock().unlock();
     }



Mime
View raw message