hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r782178 [7/16] - in /hadoop/hbase/trunk: bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/client/transactional/ src/java/o...
Date Sat, 06 Jun 2009 01:26:27 GMT
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Jun  6 01:26:21 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,7 +29,6 @@
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -38,7 +37,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
@@ -79,13 +77,13 @@
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -758,8 +756,7 @@
    * @return RegionLoad instance.
    * @throws IOException
    */
-  private HServerLoad.RegionLoad createRegionLoad(final HRegion r)
-  throws IOException {
+  private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
     byte[] name = r.getRegionName();
     int stores = 0;
     int storefiles = 0;
@@ -782,8 +779,7 @@
    * @return An instance of RegionLoad.
    * @throws IOException
    */
-  public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName)
-  throws IOException {
+  public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName) {
     return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName)));
   }
 
@@ -1080,12 +1076,7 @@
           for(Map.Entry<byte [], Store> ee: r.stores.entrySet()) {
             Store store = ee.getValue(); 
             storefiles += store.getStorefilesCount();
-            try {
-              storefileIndexSize += store.getStorefilesIndexSize();
-            } catch (IOException ex) {
-              LOG.warn("error getting store file index size for " + store +
-                ": " + StringUtils.stringifyException(ex));  
-            }
+            storefileIndexSize += store.getStorefilesIndexSize();
           }
         }
       }
@@ -1630,7 +1621,7 @@
       super(Thread.currentThread().getName() + ".regionCloser." + r.toString());
       this.r = r;
     }
-
+    
     @Override
     public void run() {
       try {
@@ -1701,96 +1692,51 @@
     return getRegion(regionName).getRegionInfo();
   }
 
-  public Cell [] get(final byte [] regionName, final byte [] row,
-    final byte [] column, final long timestamp, final int numVersions) 
-  throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      List<KeyValue> results =
-        getRegion(regionName).get(row, column, timestamp, numVersions);
-      return Cell.createSingleCellArray(results);
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
-  }
 
-  public RowResult getRow(final byte [] regionName, final byte [] row, 
-    final byte [][] columns, final long ts,
-    final int numVersions, final long lockId)
+  public Result getClosestRowBefore(final byte [] regionName, 
+    final byte [] row, final byte [] family)
   throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
-      // convert the columns array into a set so it's easy to check later.
-      NavigableSet<byte []> columnSet = null;
-      if (columns != null) {
-        columnSet = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        columnSet.addAll(Arrays.asList(columns));
-      }
+      // locate the region we're operating on
       HRegion region = getRegion(regionName);
-      HbaseMapWritable<byte [], Cell> result =
-        region.getFull(row, columnSet, ts, numVersions, getLockFromId(lockId));
-      if (result == null || result.isEmpty())
-        return null;
-      return new RowResult(row, result);
+      // ask the region for all the data 
+      
+      Result r = region.getClosestRowBefore(row, family);
+      return r;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public RowResult getClosestRowBefore(final byte [] regionName, 
-    final byte [] row, final byte [] columnFamily)
-  throws IOException {
+  /** {@inheritDoc} */
+  public Result get(byte [] regionName, Get get) throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
-      // locate the region we're operating on
       HRegion region = getRegion(regionName);
-      // ask the region for all the data 
-      RowResult rr = region.getClosestRowBefore(row, columnFamily);
-      return rr;
-    } catch (Throwable t) {
+      return region.get(get, getLockFromId(get.getLockId()));
+    } catch(Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
-  
-  public RowResult next(final long scannerId) throws IOException {
-    RowResult[] rrs = next(scannerId, 1);
-    return rrs.length == 0 ? null : rrs[0];
-  }
 
-  public RowResult [] next(final long scannerId, int nbRows) throws IOException {
+  public boolean exists(byte [] regionName, Get get) throws IOException {
     checkOpen();
-    List<List<KeyValue>> results = new ArrayList<List<KeyValue>>();
+    requestCount.incrementAndGet();
     try {
-      String scannerName = String.valueOf(scannerId);
-      InternalScanner s = scanners.get(scannerName);
-      if (s == null) {
-        throw new UnknownScannerException("Name: " + scannerName);
-      }
-      this.leases.renewLease(scannerName);
-      for (int i = 0; i < nbRows; i++) {
-        requestCount.incrementAndGet();
-        // Collect values to be returned here
-        List<KeyValue> values = new ArrayList<KeyValue>();
-        while (s.next(values)) {
-          if (!values.isEmpty()) {
-            // Row has something in it. Return the value.
-            results.add(values);
-            break;
-          }
-        }
-      }
-      return RowResult.createRowResultArray(results);
-    } catch (Throwable t) {
+      HRegion region = getRegion(regionName);
+      Result r = region.get(get, getLockFromId(get.getLockId()));
+      return r != null && !r.isEmpty();
+    } catch(Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId)
+  public void put(final byte [] regionName, final Put put)
   throws IOException {
-    if (b.getRow() == null)
+    if (put.getRow() == null)
       throw new IllegalArgumentException("update has null row");
     
     checkOpen();
@@ -1798,24 +1744,24 @@
     HRegion region = getRegion(regionName);
     try {
       cacheFlusher.reclaimMemcacheMemory();
-      region.batchUpdate(b, getLockFromId(b.getRowLock()));
+      region.put(put, getLockFromId(put.getLockId()));
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
   
-  public int batchUpdates(final byte[] regionName, final BatchUpdate [] b)
+  public int put(final byte[] regionName, final Put [] puts)
   throws IOException {
     int i = 0;
     checkOpen();
     try {
       HRegion region = getRegion(regionName);
       this.cacheFlusher.reclaimMemcacheMemory();
-      Integer[] locks = new Integer[b.length];
-      for (i = 0; i < b.length; i++) {
+      Integer[] locks = new Integer[puts.length];
+      for (i = 0; i < puts.length; i++) {
         this.requestCount.incrementAndGet();
-        locks[i] = getLockFromId(b[i].getRowLock());
-        region.batchUpdate(b[i], locks[i]);
+        locks[i] = getLockFromId(puts[i].getLockId());
+        region.put(puts[i], locks[i]);
       }
     } catch(WrongRegionException ex) {
       return i;
@@ -1827,38 +1773,49 @@
     return -1;
   }
   
-  public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
-      final HbaseMapWritable<byte[],byte[]> expectedValues)
-  throws IOException {
-    if (b.getRow() == null)
-      throw new IllegalArgumentException("update has null row");
+
+  /**
+   * 
+   * @param regionName
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param value the expected value
+   * @param put
+   * @throws IOException
+   * @return true if the new put was execute, false otherwise
+   */
+  public boolean checkAndPut(final byte[] regionName, final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value, 
+      final Put put) throws IOException{
+    //Getting actual value
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    
     checkOpen();
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
     try {
       cacheFlusher.reclaimMemcacheMemory();
-      return region.checkAndSave(b,
-        expectedValues,getLockFromId(b.getRowLock()), true);
+      return region.checkAndPut(row, family, qualifier, value, put,
+          getLockFromId(put.getLockId()), true);
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
-
+  
   //
   // remote scanner interface
   //
 
-  public long openScanner(byte [] regionName, byte [][] cols, byte [] firstRow,
-    final long timestamp, final RowFilterInterface filter)
+  public long openScanner(byte [] regionName, Scan scan)
   throws IOException {
     checkOpen();
     NullPointerException npe = null;
     if (regionName == null) {
       npe = new NullPointerException("regionName is null");
-    } else if (cols == null) {
-      npe = new NullPointerException("columns to scan is null");
-    } else if (firstRow == null) {
-      npe = new NullPointerException("firstRow for scanner is null");
+    } else if (scan == null) {
+      npe = new NullPointerException("scan is null");
     }
     if (npe != null) {
       throw new IOException("Invalid arguments to openScanner", npe);
@@ -1866,8 +1823,7 @@
     requestCount.incrementAndGet();
     try {
       HRegion r = getRegion(regionName);
-      InternalScanner s =
-        r.getScanner(cols, firstRow, timestamp, filter);
+      InternalScanner s = r.getScanner(scan);
       long scannerId = addScanner(s);
       return scannerId;
     } catch (Throwable t) {
@@ -1886,6 +1842,47 @@
       createLease(scannerName, new ScannerListener(scannerName));
     return scannerId;
   }
+
+  public Result next(final long scannerId) throws IOException {
+    Result [] res = next(scannerId, 1);
+    if(res == null || res.length == 0) {
+      return null;
+    }
+    return res[0];
+  }
+
+  public Result [] next(final long scannerId, int nbRows) throws IOException {
+    checkOpen();
+    List<Result> results = new ArrayList<Result>();
+    try {
+	long start = System.currentTimeMillis();
+      String scannerName = String.valueOf(scannerId);
+      InternalScanner s = scanners.get(scannerName);
+      if (s == null) {
+        throw new UnknownScannerException("Name: " + scannerName);
+      }
+      this.leases.renewLease(scannerName);
+      for (int i = 0; i < nbRows; i++) {
+        requestCount.incrementAndGet();
+        // Collect values to be returned here
+        List<KeyValue> values = new ArrayList<KeyValue>();
+        boolean moreRows = s.next(values);
+        if(!values.isEmpty()) {
+          results.add(new Result(values));
+        }
+        if(!moreRows) {
+          break;
+        }
+      }
+      LOG.debug("Result[]next time: " + (System.currentTimeMillis()-start) + " (ms)");
+      return results.toArray(new Result[0]);
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
+  } 
+  
+  
+  
   
   public void close(final long scannerId) throws IOException {
     try {
@@ -1937,45 +1934,23 @@
   // Methods that do the actual work for the remote API
   //
   
-  public void deleteAll(final byte [] regionName, final byte [] row,
-      final byte [] column, final long timestamp, final long lockId) 
-  throws IOException {
-    HRegion region = getRegion(regionName);
-    region.deleteAll(row, column, timestamp, getLockFromId(lockId));
-  }
-
-  public void deleteAll(final byte [] regionName, final byte [] row,
-      final long timestamp, final long lockId) 
-  throws IOException {
-    HRegion region = getRegion(regionName);
-    region.deleteAll(row, timestamp, getLockFromId(lockId));
-  }
-
-  public void deleteAllByRegex(byte[] regionName, byte[] row, String colRegex,
-      long timestamp, long lockId) throws IOException {
-    getRegion(regionName).deleteAllByRegex(row, colRegex, timestamp, 
-        getLockFromId(lockId));
-  }
-
-  public void deleteFamily(byte [] regionName, byte [] row, byte [] family, 
-    long timestamp, final long lockId)
-  throws IOException{
-    getRegion(regionName).deleteFamily(row, family, timestamp,
-        getLockFromId(lockId));
-  }
-
-  public void deleteFamilyByRegex(byte[] regionName, byte[] row, String familyRegex,
-      long timestamp, long lockId) throws IOException {
-    getRegion(regionName).deleteFamilyByRegex(row, familyRegex, timestamp, 
-        getLockFromId(lockId));
-  }
-
-  public boolean exists(byte[] regionName, byte[] row, byte[] column,
-      long timestamp, long lockId)
+  public void delete(final byte [] regionName, final Delete delete)
   throws IOException {
-    return getRegion(regionName).exists(row, column, timestamp, 
-      getLockFromId(lockId));
+    checkOpen();
+    try {
+      boolean writeToWAL = true;
+      this.cacheFlusher.reclaimMemcacheMemory();
+      this.requestCount.incrementAndGet();
+      Integer lock = getLockFromId(delete.getLockId());
+      HRegion region = getRegion(regionName);
+      region.delete(delete, lock, writeToWAL);
+    } catch(WrongRegionException ex) {
+    } catch (NotServingRegionException ex) {
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
   }
+  
 
   public long lockRow(byte [] regionName, byte [] row)
   throws IOException {
@@ -2023,7 +1998,7 @@
    * @return intId Integer row lock used internally in HRegion
    * @throws IOException Thrown if this is not a valid client lock id.
    */
-  private Integer getLockFromId(long lockId)
+  Integer getLockFromId(long lockId)
   throws IOException {
     if(lockId == -1L) {
       return null;
@@ -2147,6 +2122,10 @@
     return Collections.unmodifiableCollection(onlineRegions.values());
   }
 
+  public HRegion [] getOnlineRegionsAsArray() {
+    return getOnlineRegions().toArray(new HRegion[0]);
+  }
+  
   /**
    * @return The HRegionInfos from online regions sorted
    */
@@ -2410,7 +2389,6 @@
         } catch (Throwable t) {
           LOG.error( "Can not start region server because "+
               StringUtils.stringifyException(t) );
-          System.exit(-1);
         }
         break;
       }
@@ -2426,39 +2404,20 @@
     }
   }
   
-  /**
-   * @param args
-   */
-  public static void main(String [] args) {
-    Configuration conf = new HBaseConfiguration();
-    @SuppressWarnings("unchecked")
-    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
-        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
-    doMain(args, regionServerClass);
-  }
-
   /** {@inheritDoc} */
-  public long incrementColumnValue(byte[] regionName, byte[] row,
-      byte[] column, long amount) throws IOException {
+  public long incrementColumnValue(byte [] regionName, byte [] row, 
+      byte [] family, byte [] qualifier, long amount)
+  throws IOException {
     checkOpen();
-    
-    NullPointerException npe = null;
+
     if (regionName == null) {
-      npe = new NullPointerException("regionName is null");
-    } else if (row == null) {
-      npe = new NullPointerException("row is null");
-    } else if (column == null) {
-      npe = new NullPointerException("column is null");
-    }
-    if (npe != null) {
-      IOException io = new IOException(
-          "Invalid arguments to incrementColumnValue", npe);
-      throw io;
+      throw new IOException("Invalid arguments to incrementColumnValue " + 
+      "regionName is null");
     }
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      return region.incrementColumnValue(row, column, amount);
+      return region.incrementColumnValue(row, family, qualifier, amount);
     } catch (IOException e) {
       checkFileSystem();
       throw e;
@@ -2479,4 +2438,17 @@
   public HServerInfo getHServerInfo() throws IOException {
     return serverInfo;
   }
+  
+  
+  /**
+   * @param args
+   */
+  public static void main(String [] args) {
+    Configuration conf = new HBaseConfiguration();
+    @SuppressWarnings("unchecked")
+    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
+        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+    doMain(args, regionServerClass);
+  }
+
 }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Sat Jun  6 01:26:21 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -41,11 +41,9 @@
  */
 public interface InternalScanner extends Closeable {
   /**
-   * Grab the next row's worth of values. The scanner will return the most
-   * recent data value for each row that is not newer than the target time
-   * passed when the scanner was created.
+   * Grab the next row's worth of values.
    * @param results
-   * @return true if data was returned
+   * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException
    */
   public boolean next(List<KeyValue> results)
@@ -55,11 +53,5 @@
    * Closes the scanner and releases any resources it has allocated
    * @throws IOException
    */
-  public void close() throws IOException;  
-  
-  /** @return true if the scanner is matching a column family or regex */
-  public boolean isWildcardScanner();
-  
-  /** @return true if the scanner is matching multiple column family members */
-  public boolean isMultipleMatchScanner();
+  public void close() throws IOException;
 }
\ No newline at end of file

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+
+/**
+ * Implements a heap merge across any number of KeyValueScanners.
+ * <p>
+ * Implements KeyValueScanner itself.
+ * <p>
+ * This class is used at the Region level to merge across Stores
+ * and at the Store level to merge across the Memcache and StoreFiles.
+ * <p>
+ * In the Region case, we also need InternalScanner.next(List), so this class
+ * also implements InternalScanner.  WARNING: As is, if you try to use this
+ * as an InternalScanner at the Store level, you will get runtime exceptions. 
+ */
+public class KeyValueHeap implements KeyValueScanner, InternalScanner {
+  
+  private PriorityQueue<KeyValueScanner> heap;
+  
+  private KeyValueScanner current = null;
+  
+  private KVScannerComparator comparator;
+  
+  /**
+   * Constructor
+   * @param scanners
+   * @param comparator
+   */
+  public KeyValueHeap(KeyValueScanner [] scanners, KVComparator comparator) {
+    this.comparator = new KVScannerComparator(comparator);
+    this.heap = new PriorityQueue<KeyValueScanner>(scanners.length, 
+        this.comparator);
+    for(KeyValueScanner scanner : scanners) {
+      if(scanner.peek() != null) {
+        this.heap.add(scanner);
+      }
+    }
+    this.current = heap.poll();
+  }
+  
+  public KeyValue peek() {
+    if(this.current == null) {
+      return null;
+    }
+    return this.current.peek();
+  }
+  
+  public KeyValue next()  {
+    if(this.current == null) {
+      return null;
+    }
+    KeyValue kvReturn = this.current.next();
+    KeyValue kvNext = this.current.peek();
+    if(kvNext == null) {
+      this.current.close();
+      this.current = this.heap.poll();
+    } else {
+      KeyValueScanner topScanner = this.heap.peek();
+      if(topScanner == null ||
+          this.comparator.compare(kvNext, topScanner.peek()) > 0) {
+        this.heap.add(this.current);
+        this.current = this.heap.poll();
+      }
+    }
+    return kvReturn;
+  }
+  
+  /**
+   * Gets the next row of keys from the top-most scanner.
+   * <p>
+   * This method takes care of updating the heap.
+   * <p>
+   * This can ONLY be called when you are using Scanners that implement
+   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * @return true if there are more keys, false if all scanners are done 
+   */
+  public boolean next(List<KeyValue> result) throws IOException {
+    InternalScanner currentAsInternal = (InternalScanner)this.current;
+    currentAsInternal.next(result);
+    KeyValue pee = this.current.peek();
+    if(pee == null) {
+      this.current.close();
+    } else {
+      this.heap.add(this.current);
+    }
+    this.current = this.heap.poll();
+    return (this.current != null);
+  }
+  
+  private class KVScannerComparator implements Comparator<KeyValueScanner> {
+    private KVComparator kvComparator;
+    /**
+     * Constructor
+     * @param kvComparator
+     */
+    public KVScannerComparator(KVComparator kvComparator) {
+      this.kvComparator = kvComparator;
+    }
+    public int compare(KeyValueScanner left, KeyValueScanner right) {
+      return compare(left.peek(), right.peek());
+    }
+    /**
+     * Compares two KeyValue
+     * @param left
+     * @param right
+     * @return less than 0 if left is smaller, 0 if equal etc..
+     */
+    public int compare(KeyValue left, KeyValue right) {
+      return this.kvComparator.compare(left, right);
+    }
+    /**
+     * @return KVComparator
+     */
+    public KVComparator getComparator() {
+      return this.kvComparator;
+    }
+  }
+
+  public void close() {
+    if(this.current != null) {
+      this.current.close();
+    }
+    KeyValueScanner scanner;
+    while((scanner = this.heap.poll()) != null) {
+      scanner.close();
+    }
+  }
+  
+  /**
+   * Seeks all scanners at or below the specified seek key.  If we earlied-out 
+   * of a row, we may end up skipping values that were never reached yet.
+   * Rather than iterating down, we want to give the opportunity to re-seek.
+   * <p>
+   * As individual scanners may run past their ends, those scanners are
+   * automatically closed and removed from the heap.
+   * @param seekKey KeyValue to seek at or after
+   * @return true if KeyValues exist at or after specified key, false if not
+   */
+  public boolean seek(KeyValue seekKey) {
+    if(this.current == null) {
+      return false;
+    }
+    this.heap.add(this.current);
+    this.current = null;
+
+    KeyValueScanner scanner;
+    while((scanner = this.heap.poll()) != null) {
+      KeyValue topKey = scanner.peek();
+      if(comparator.getComparator().compare(seekKey, topKey) <= 0) { // Correct?
+        // Top KeyValue is at-or-after Seek KeyValue
+        this.current = scanner;
+        return true;
+      }
+      if(!scanner.seek(seekKey)) {
+        scanner.close();
+      } else {
+        this.heap.add(scanner);
+      }
+    }
+    // Heap is returning empty, scanner is done
+    return false;
+  }
+
+  /**
+   * @return the current Heap
+   */
+  public PriorityQueue<KeyValueScanner> getHeap() {
+    return this.heap;
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Scanner that returns the next KeyValue.
+ */
+public interface KeyValueScanner {
+  /**
+   * Look at the next KeyValue in this scanner, but do not iterate scanner.
+   * @return the next KeyValue
+   */
+  public KeyValue peek();
+  
+  /**
+   * Return the next KeyValue in this scanner, iterating the scanner 
+   * @return the next KeyValue
+   */
+  public KeyValue next();
+  
+  /**
+   * Seek the scanner at or after the specified KeyValue.
+   * @param key
+   * @return true if scanner has values left, false if end of scanner
+   */
+  public boolean seek(KeyValue key);
+  
+  /**
+   * Close the KeyValue scanner.
+   */
+  public void close();
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java Sat Jun  6 01:26:21 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2008 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -27,22 +27,19 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
+import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -82,7 +79,7 @@
 
   // TODO: Fix this guess by studying jprofiler
   private final static int ESTIMATED_KV_HEAP_TAX = 60;
-
+  
   /**
    * Default constructor. Used for tests.
    */
@@ -202,7 +199,86 @@
     }
     return size;
   }
+  
+  /** 
+   * Write a delete
+   * @param delete
+   * @return approximate size of the passed key and value.
+   */
+  long delete(final KeyValue delete) {
+    long size = 0;
+    this.lock.readLock().lock();
+    //Have to find out what we want to do here, to find the fastest way of
+    //removing things that are under a delete.
+    //Actions that will take place here are:
+    //1. Insert a delete and remove all the affected entries already in memcache
+    //2. In the case of a Delete and the matching put is found then don't insert
+    //   the delete
+    //TODO Would be nice with if we had an iterator for this, so we could remove
+    //things that needs to be removed while iterating and don't have to go
+    //back and do it afterwards
+    
+    try {
+      boolean notpresent = false;
+      List<KeyValue> deletes = new ArrayList<KeyValue>();
+      SortedSet<KeyValue> tailSet = this.memcache.tailSet(delete);
+
+      //Parse the delete, so that it is only done once
+      byte [] deleteBuffer = delete.getBuffer();
+      int deleteOffset = delete.getOffset();
+  
+      int deleteKeyLen = Bytes.toInt(deleteBuffer, deleteOffset);
+      deleteOffset += Bytes.SIZEOF_INT + Bytes.SIZEOF_INT;
+  
+      short deleteRowLen = Bytes.toShort(deleteBuffer, deleteOffset);
+      deleteOffset += Bytes.SIZEOF_SHORT;
+      int deleteRowOffset = deleteOffset;
+  
+      deleteOffset += deleteRowLen;
+  
+      byte deleteFamLen = deleteBuffer[deleteOffset];
+      deleteOffset += Bytes.SIZEOF_BYTE + deleteFamLen;
+  
+      int deleteQualifierOffset = deleteOffset;
+      int deleteQualifierLen = deleteKeyLen - deleteRowLen - deleteFamLen -
+        Bytes.SIZEOF_SHORT - Bytes.SIZEOF_BYTE - Bytes.SIZEOF_LONG - 
+        Bytes.SIZEOF_BYTE;
+      
+      deleteOffset += deleteQualifierLen;
+  
+      int deleteTimestampOffset = deleteOffset;
+      deleteOffset += Bytes.SIZEOF_LONG;
+      byte deleteType = deleteBuffer[deleteOffset];
+      
+      //Comparing with tail from memcache
+      for(KeyValue mem : tailSet) {
+        
+        DeleteCode res = DeleteCompare.deleteCompare(mem, deleteBuffer, 
+            deleteRowOffset, deleteRowLen, deleteQualifierOffset, 
+            deleteQualifierLen, deleteTimestampOffset, deleteType,
+            comparator.getRawComparator());
+        if(res == DeleteCode.DONE) {
+          break;
+        } else if (res == DeleteCode.DELETE) {
+          deletes.add(mem);
+        } // SKIP
+      }
 
+      //Delete all the entries effected by the last added delete
+      for(KeyValue del : deletes) {
+        notpresent = this.memcache.remove(del);
+        size -= heapSize(del, notpresent);
+      }
+      
+      //Adding the delete to memcache
+      notpresent = this.memcache.add(delete);
+      size += heapSize(delete, notpresent);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    return size;
+  }
+  
   /*
    * Calculate how the memcache size has changed, approximately.  Be careful.
    * If class changes, be sure to change the size calculation.
@@ -219,43 +295,6 @@
   }
 
   /**
-   * Look back through all the backlog TreeMaps to find the target.
-   * @param kv
-   * @param numVersions
-   * @return Set of KeyValues. Empty size not null if no results.
-   */
-  List<KeyValue> get(final KeyValue kv, final int numVersions) {
-    List<KeyValue> results = new ArrayList<KeyValue>();
-    get(kv, numVersions, results,
-      new TreeSet<KeyValue>(this.comparatorIgnoreType),
-      System.currentTimeMillis());
-    return results;
-  }
-
-  /**
-   * Look back through all the backlog TreeMaps to find the target.
-   * @param key
-   * @param versions
-   * @param results
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param now
-   * @return True if enough versions.
-   */
-  boolean get(final KeyValue key, final int versions,
-      List<KeyValue> results, final NavigableSet<KeyValue> deletes,
-      final long now) {
-    this.lock.readLock().lock();
-    try {
-      if (get(this.memcache, key, versions, results, deletes, now)) {
-        return true;
-      }
-      return get(this.snapshot, key, versions , results, deletes, now);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
    * @param kv Find the row that comes after this one.  If null, we return the
    * first.
    * @return Next row or null if none found.
@@ -307,86 +346,6 @@
     return result;
   }
 
-  /**
-   * Return all the available columns for the given key.  The key indicates a 
-   * row and timestamp, but not a column name.
-   * @param origin Where to start searching.  Specifies a row and timestamp.
-   * Columns are specified in following arguments.
-   * @param columns Pass null for all columns else the wanted subset.
-   * @param columnPattern Column pattern to match.
-   * @param numVersions number of versions to retrieve
-   * @param versionsCount Map of KV to Count.  Uses a Comparator that doesn't
-   * look at timestamps so only Row/Column are compared.
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param results Where to stick row results found.
-   * @return True if we found enough results for passed <code>columns</code>
-   * and <code>numVersions</code>.
-   */
-  boolean getFull(final KeyValue key, NavigableSet<byte []> columns,
-      final Pattern columnPattern,
-      int numVersions, final Map<KeyValue, HRegion.Counter> versionsCount,
-      final NavigableSet<KeyValue> deletes,
-      final List<KeyValue> results, final long now) {
-    this.lock.readLock().lock();
-    try {
-      // Used to be synchronized but now with weak iteration, no longer needed.
-      if (getFull(this.memcache, key, columns, columnPattern, numVersions,
-        versionsCount, deletes, results, now)) {
-        // Has enough results.
-        return true;
-      }
-      return getFull(this.snapshot, key, columns, columnPattern, numVersions,
-        versionsCount, deletes, results, now);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /*
-   * @param set
-   * @param target Where to start searching.
-   * @param columns
-   * @param versions
-   * @param versionCounter
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param keyvalues
-   * @return True if enough results found.
-   */
-  private boolean getFull(final ConcurrentSkipListSet<KeyValue> set,
-      final KeyValue target, final Set<byte []> columns,
-      final Pattern columnPattern,
-      final int versions, final Map<KeyValue, HRegion.Counter> versionCounter,
-      final NavigableSet<KeyValue> deletes, List<KeyValue> keyvalues,
-      final long now) {
-    boolean hasEnough = false;
-    if (target == null) {
-      return hasEnough;
-    }
-    NavigableSet<KeyValue> tailset = set.tailSet(target);
-    if (tailset == null || tailset.isEmpty()) {
-      return hasEnough;
-    }
-    // TODO: This loop same as in HStore.getFullFromStoreFile.  Make sure they
-    // are the same.
-    for (KeyValue kv: tailset) {
-      // Make sure we have not passed out the row.  If target key has a
-      // column on it, then we are looking explicit key+column combination.  If
-      // we've passed it out, also break.
-      if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
-          !this.comparator.matchingRowColumn(target, kv)) {
-        break;
-      }
-      if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
-        continue;
-      }
-      if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
-          this.ttl, keyvalues, tailset)) {
-        hasEnough = true;
-        break;
-      }
-    }
-    return hasEnough;
-  }
 
   /**
    * @param row Row to look for.
@@ -554,45 +513,6 @@
     }
   }
 
-  /*
-   * Examine a single map for the desired key.
-   *
-   * TODO - This is kinda slow.  We need a data structure that allows for 
-   * proximity-searches, not just precise-matches.
-   * 
-   * @param set
-   * @param key
-   * @param results
-   * @param versions
-   * @param keyvalues
-   * @param deletes Pass a Set that has a Comparator that ignores key type.
-   * @param now
-   * @return True if enough versions.
-   */
-  private boolean get(final ConcurrentSkipListSet<KeyValue> set,
-      final KeyValue key, final int versions,
-      final List<KeyValue> keyvalues,
-      final NavigableSet<KeyValue> deletes,
-      final long now) {
-    NavigableSet<KeyValue> tailset = set.tailSet(key);
-    if (tailset.isEmpty()) {
-      return false;
-    }
-    boolean enoughVersions = false;
-    for (KeyValue kv : tailset) {
-      if (this.comparator.matchingRowColumn(kv, key)) {
-        if (Store.doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues,
-            tailset)) {
-          enoughVersions = true;
-          break;
-        }
-      } else {
-        // By L.N. HBASE-684, map is sorted, so we can't find match any more.
-        break;
-      }
-    }
-    return enoughVersions;
-  }
 
   /*
    * @param set
@@ -621,93 +541,160 @@
   /**
    * @return a scanner over the keys in the Memcache
    */
-  InternalScanner getScanner(long timestamp,
-    final NavigableSet<byte []> targetCols, final byte [] firstRow)
-  throws IOException {
+  KeyValueScanner getScanner() {
     this.lock.readLock().lock();
     try {
-      return new MemcacheScanner(timestamp, targetCols, firstRow);
+      return new MemcacheScanner();
     } finally {
       this.lock.readLock().unlock();
     }
   }
 
+  //
+  // HBASE-880/1249/1304
+  //
+  
+  /**
+   * Perform a single-row Get on the memcache and snapshot, placing results
+   * into the specified KV list.
+   * <p>
+   * This will return true if it is determined that the query is complete
+   * and it is not necessary to check any storefiles after this.
+   * <p>
+   * Otherwise, it will return false and you should continue on.
+   * @param startKey Starting KeyValue
+   * @param matcher Column matcher
+   * @param result List to add results to
+   * @return true if done with store (early-out), false if not
+   * @throws IOException
+   */
+  public boolean get(QueryMatcher matcher, List<KeyValue> result)
+  throws IOException {
+    this.lock.readLock().lock();
+    try {
+      if(internalGet(this.memcache, matcher, result) || matcher.isDone()) {
+        return true;
+      }
+      matcher.update();
+      if(internalGet(this.snapshot, matcher, result) || matcher.isDone()) {
+        return true;
+      }
+      return false;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+  
+  /**
+   *
+   * @param set memcache or snapshot
+   * @param matcher query matcher
+   * @param result list to add results to
+   * @return true if done with store (early-out), false if not
+   * @throws IOException
+   */
+  private boolean internalGet(SortedSet<KeyValue> set, QueryMatcher matcher,
+      List<KeyValue> result) throws IOException {
+    if(set.isEmpty()) return false;
+    // Seek to startKey
+    SortedSet<KeyValue> tailSet = set.tailSet(matcher.getStartKey());
+    
+    for (KeyValue kv : tailSet) {
+      QueryMatcher.MatchCode res = matcher.match(kv);
+      switch(res) {
+        case INCLUDE:
+          result.add(kv);
+          break;
+        case SKIP:
+          break;
+        case NEXT:
+          return false;
+        case DONE:
+          return true;
+        default:
+          throw new RuntimeException("Unexpected " + res);
+      }
+    }
+    return false;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
-  // MemcacheScanner implements the InternalScanner.
+  // MemcacheScanner implements the KeyValueScanner.
   // It lets the caller scan the contents of the Memcache.
+  // This behaves as if it were a real scanner but does not maintain position
+  // in the Memcache tree.
   //////////////////////////////////////////////////////////////////////////////
 
-  private class MemcacheScanner extends HAbstractScanner {
-    private KeyValue current;
-    private final NavigableSet<byte []> columns;
-    private final NavigableSet<KeyValue> deletes;
-    private final Map<KeyValue, Counter> versionCounter;
-    private final long now = System.currentTimeMillis();
-
-    MemcacheScanner(final long timestamp, final NavigableSet<byte []> columns,
-      final byte [] firstRow)
-    throws IOException {
-      // Call to super will create ColumnMatchers and whether this is a regex
-      // scanner or not.  Will also save away timestamp.  Also sorts rows.
-      super(timestamp, columns);
-      this.deletes = new TreeSet<KeyValue>(comparatorIgnoreType);
-      this.versionCounter =
-        new TreeMap<KeyValue, Counter>(comparatorIgnoreTimestamp);
-      this.current = KeyValue.createFirstOnRow(firstRow, timestamp);
-      // If we're being asked to scan explicit columns rather than all in 
-      // a family or columns that match regexes, cache the sorted array of
-      // columns.
-      this.columns = isWildcardScanner()? null: columns;
-    }
-
-    @Override
-    public boolean next(final List<KeyValue> keyvalues)
-    throws IOException {
-      if (this.scannerClosed) {
+  protected class MemcacheScanner implements KeyValueScanner {
+    private KeyValue current = null;
+    private List<KeyValue> result = new ArrayList<KeyValue>();
+    private int idx = 0;
+    
+    MemcacheScanner() {}
+    
+    public boolean seek(KeyValue key) {
+      try {
+        if(key == null) {
+          close();
+          return false;
+        }
+        current = key;
+        return cacheNextRow();
+      } catch(Exception e) {
+        close();
         return false;
       }
-      while (keyvalues.isEmpty() && this.current != null) {
-        // Deletes are per row.
-        if (!deletes.isEmpty()) {
-          deletes.clear();
-        }
-        if (!versionCounter.isEmpty()) {
-          versionCounter.clear();
+    }
+    
+    public KeyValue peek() {
+      if(idx >= result.size()) {
+        if(!cacheNextRow()) {
+          return null;
         }
-        // The getFull will take care of expired and deletes inside memcache.
-        // The first getFull when row is the special empty bytes will return
-        // nothing so we go around again.  Alternative is calling a getNextRow
-        // if row is null but that looks like it would take same amount of work
-        // so leave it for now.
-        getFull(this.current, isWildcardScanner()? null: this.columns, null, 1,
-          versionCounter, deletes, keyvalues, this.now);
-        for (KeyValue bb: keyvalues) {
-          if (isWildcardScanner()) {
-            // Check the results match.  We only check columns, not timestamps.
-            // We presume that timestamps have been handled properly when we
-            // called getFull.
-            if (!columnMatch(bb)) {
-              keyvalues.remove(bb);
-            }
-          }
+        return peek();
+      }
+      return result.get(idx);
+    }
+    
+    public KeyValue next() {
+      if(idx >= result.size()) {
+        if(!cacheNextRow()) {
+          return null;
         }
-        // Add any deletes found so they are available to the StoreScanner#next.
-        if (!this.deletes.isEmpty()) {
-          keyvalues.addAll(deletes);
+        return next();
+      }
+      return result.get(idx++);
+    }
+    
+    boolean cacheNextRow() {
+      NavigableSet<KeyValue> keys;
+      try {
+        keys = memcache.tailSet(current);
+      } catch(Exception e) {
+        close();
+        return false;
+      }
+      if(keys == null || keys.isEmpty()) {
+        close();
+        return false;
+      }
+      current = null;
+      byte [] row = keys.first().getRow();
+      for(KeyValue key : keys) {
+        if(comparator.compareRows(key, row) != 0) {
+          current = key;
+          break;
         }
-        this.current = getNextRow(this.current);
-        // Change current to be column-less and to have the scanners' now.  We
-        // do this because first item on 'next row' may not have the scanners'
-        // now time which will cause trouble down in getFull; same reason no
-        // column.
-        if (this.current != null) this.current = this.current.cloneRow(this.now);
+        result.add(key);
       }
-      return !keyvalues.isEmpty();
+      return true;
     }
 
     public void close() {
-      if (!scannerClosed) {
-        scannerClosed = true;
+      current = null;
+      idx = 0;
+      if(!result.isEmpty()) {
+        result.clear();
       }
     }
   }
@@ -721,8 +708,7 @@
    * @throws InterruptedException
    * @throws IOException 
    */
-  public static void main(String [] args)
-  throws InterruptedException, IOException {
+  public static void main(String [] args) {
     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java Sat Jun  6 01:26:21 2009
@@ -62,11 +62,11 @@
   protected final long globalMemcacheLimit;
   protected final long globalMemcacheLimitLowMark;
   
-  public static final float DEFAULT_UPPER = 0.4f;
-  public static final float DEFAULT_LOWER = 0.25f;
-  public static final String UPPER_KEY =
+  private static final float DEFAULT_UPPER = 0.4f;
+  private static final float DEFAULT_LOWER = 0.25f;
+  private static final String UPPER_KEY =
     "hbase.regionserver.globalMemcache.upperLimit";
-  public static final String LOWER_KEY =
+  private static final String LOWER_KEY =
     "hbase.regionserver.globalMemcache.lowerLimit";
   private long blockingStoreFilesNumber;
   private long blockingWaitTime;

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.List;
+import java.io.IOException;
+
+/**
+ * A scanner that does a minor compaction at the same time.  Doesn't need to
+ * implement ChangedReadersObserver, since it doesn't scan memcache, only store files
+ * and optionally the memcache-snapshot.
+ */
+public class MinorCompactingStoreScanner implements KeyValueScanner, InternalScanner {
+  private QueryMatcher matcher;
+
+  private KeyValueHeap heap;
+
+
+  MinorCompactingStoreScanner(Store store,
+                              KeyValueScanner [] scanners) {
+    Scan scan = new Scan();
+
+    // No max version, no ttl matching, start at first row, all columns.
+    matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
+        null, Long.MAX_VALUE, store.comparator.getRawComparator(),
+        store.versionsToReturn(Integer.MAX_VALUE));
+
+    for (KeyValueScanner scanner : scanners ) {
+      scanner.seek(matcher.getStartKey());
+    }
+
+    heap = new KeyValueHeap(scanners, store.comparator);
+  }
+
+  MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator,
+                              KeyValueScanner [] scanners) {
+    Scan scan = new Scan();
+    matcher = new ScanQueryMatcher(scan, Bytes.toBytes(cfName),
+        null, Long.MAX_VALUE, comparator.getRawComparator(),
+        Integer.MAX_VALUE);
+
+    for (KeyValueScanner scanner : scanners ) {
+      scanner.seek(matcher.getStartKey());
+    }
+
+    heap = new KeyValueHeap(scanners, comparator);
+  }
+
+  public KeyValue peek() {
+    return heap.peek();
+  }
+
+  public KeyValue next() {
+    return heap.next();
+  }
+
+  @Override
+  public boolean seek(KeyValue key) {
+    // cant seek.
+    throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner");
+  }
+
+  @Override
+  public boolean next(List<KeyValue> results) throws IOException {
+    KeyValue peeked = heap.peek();
+    if (peeked == null) {
+      close();
+      return false;
+    }
+    matcher.setRow(peeked.getRow());
+    KeyValue kv;
+    while ((kv = heap.peek()) != null) {
+      // if delete type, output no matter what:
+      if (kv.getType() != KeyValue.Type.Put.getCode())
+        results.add(kv);
+
+      switch (matcher.match(kv)) {
+        case INCLUDE:
+          results.add(heap.next());
+          continue;
+        case DONE:
+          if (results.isEmpty()) {
+            matcher.setRow(heap.peek().getRow());
+            continue;
+          }
+          return true;
+      }
+      heap.next();
+    }
+    close();
+    return false;
+  }
+
+  public void close() {
+    heap.close();
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/QueryMatcher.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.NavigableSet;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This is the primary class used to process KeyValues during a Get or Scan
+ * operation.
+ * <p>
+ * It encapsulates the handling of the column and version input parameters to 
+ * the query through a {@link ColumnTracker}.
+ * <p>
+ * Deletes are handled using the {@link DeleteTracker}.
+ * <p>
+ * All other query parameters are accessed from the client-specified Get.
+ * <p>
+ * The primary method used is {@link match} with the current KeyValue.  It will
+ * return a {@link MatchCode} 
+ * 
+ * , deletes,
+ * versions, 
+ */
+public class QueryMatcher {
+  
+  /**
+   * {@link match} return codes.  These instruct the scanner moving through
+   * Memcaches and StoreFiles what to do with the current KeyValue.
+   * <p>
+   * Additionally, this contains "early-out" language to tell the scanner to
+   * move on to the next File (Memcache or Storefile), or to return immediately.
+   */
+  static enum MatchCode {
+    /**
+     * Include KeyValue in the returned result
+     */
+    INCLUDE,
+    
+    /**
+     * Do not include KeyValue in the returned result
+     */
+    SKIP,
+    
+    /**
+     * Do not include, jump to next StoreFile or Memcache (in time order)
+     */
+    NEXT,
+    
+    /**
+     * Do not include, return current result
+     */
+    DONE,
+
+    /**
+     * These codes are used by the ScanQueryMatcher
+     */
+
+    /**
+     * Done with the row, seek there.
+     */
+    SEEK_NEXT_ROW,
+    /**
+     * Done with column, seek to next.
+     */
+    SEEK_NEXT_COL,
+
+    /**
+     * Done with scan, thanks to the row filter.
+     */
+    DONE_SCAN,
+  }
+  
+  /** Keeps track of deletes */
+  protected DeleteTracker deletes;
+  
+  /** Keeps track of columns and versions */
+  protected ColumnTracker columns;
+  
+  /** Key to seek to in Memcache and StoreFiles */
+  protected KeyValue startKey;
+  
+  /** Row comparator for the region this query is for */
+  KeyComparator rowComparator;
+  
+  /** Row the query is on */
+  protected byte [] row;
+  
+  /** TimeRange the query is for */
+  protected TimeRange tr;
+  
+  /** Oldest allowed version stamp for TTL enforcement */
+  protected long oldestStamp;
+  
+  /**
+   * Constructs a QueryMatcher for a Get.
+   * @param get
+   * @param row
+   * @param family
+   * @param columns
+   * @param ttl
+   * @param rowComparator
+   */
+  public QueryMatcher(Get get, byte [] row, byte [] family, 
+      NavigableSet<byte[]> columns, long ttl, KeyComparator rowComparator,
+      int maxVersions) {
+    this.row = row;
+    this.tr = get.getTimeRange();
+    this.oldestStamp = System.currentTimeMillis() - ttl;
+    this.rowComparator = rowComparator;
+    this.deletes =  new GetDeleteTracker(rowComparator);
+    this.startKey = KeyValue.createFirstOnRow(row);
+    // Single branch to deal with two types of Gets (columns vs all in family)
+    if(columns == null || columns.size() == 0) {
+      this.columns = new WildcardColumnTracker(maxVersions);
+    } else {
+      this.columns = new ExplicitColumnTracker(columns, maxVersions);
+    }
+  }
+
+  // For the subclasses.
+  protected QueryMatcher() {
+  }
+
+  /**
+   * Constructs a copy of an existing QueryMatcher with a new row.
+   * @param matcher
+   * @param row
+   */
+  public QueryMatcher(QueryMatcher matcher, byte [] row) {
+    this.row = row;
+    this.tr = matcher.getTimeRange();
+    this.oldestStamp = matcher.getOldestStamp();
+    this.rowComparator = matcher.getRowComparator();
+    this.columns = matcher.getColumnTracker();
+    this.deletes = matcher.getDeleteTracker();
+    this.startKey = matcher.getStartKey();
+    reset();
+  }
+  
+  /**
+   * Main method for ColumnMatcher.
+   * <p>
+   * Determines whether the specified KeyValue should be included in the
+   * result or not.
+   * <p>
+   * Contains additional language to early-out of the current file or to
+   * return immediately.
+   * <p>
+   * Things to be checked:<ul>
+   * <li>Row
+   * <li>TTL
+   * <li>Type
+   * <li>TimeRange
+   * <li>Deletes
+   * <li>Column
+   * <li>Versions
+   * @param kv KeyValue to check
+   * @return MatchCode: include, skip, next, done
+   */
+  public MatchCode match(KeyValue kv) {
+    if(this.columns.done()) {
+      return MatchCode.DONE;  // done_row
+    }
+    
+    // Directly act on KV buffer
+    byte [] bytes = kv.getBuffer();
+    int offset = kv.getOffset();
+    
+    int keyLength = Bytes.toInt(bytes, offset);
+    offset += KeyValue.ROW_OFFSET;
+    
+    short rowLength = Bytes.toShort(bytes, offset);
+    offset += Bytes.SIZEOF_SHORT;
+
+    // scanners are relying on us to check the row first, and return
+    // "NEXT" when we are there.
+    /* Check ROW
+     * If past query's row, go to next StoreFile
+     * If not reached query's row, go to next KeyValue
+     */ 
+    int ret = this.rowComparator.compareRows(row, 0, row.length,
+        bytes, offset, rowLength);
+    if(ret <= -1) {
+      // Have reached the next row
+      return MatchCode.NEXT;  // got_to_next_row (end)
+    } else if(ret >= 1) {
+      // At a previous row
+      return MatchCode.SKIP;  // skip_to_cur_row
+    }
+    offset += rowLength;
+    
+    byte familyLength = bytes[offset];
+    offset += Bytes.SIZEOF_BYTE + familyLength;
+    
+    int columnLength = keyLength + KeyValue.ROW_OFFSET -
+      (offset - kv.getOffset()) - KeyValue.TIMESTAMP_TYPE_SIZE;
+    int columnOffset = offset;
+    offset += columnLength;
+    
+    /* Check TTL
+     * If expired, go to next KeyValue
+     */
+    long timestamp = Bytes.toLong(bytes, offset);
+    if(isExpired(timestamp)) {
+      // reached the expired part, for scans, this indicates we're done.
+      return MatchCode.NEXT;  // done_row
+    }
+    offset += Bytes.SIZEOF_LONG;
+    
+    /* Check TYPE
+     * If a delete within (or after) time range, add to deletes
+     * Move to next KeyValue
+     */
+    byte type = bytes[offset];
+    // if delete type == delete family, return done_row
+    
+    if(isDelete(type)) {
+      if(tr.withinOrAfterTimeRange(timestamp)) {
+        this.deletes.add(bytes, columnOffset, columnLength, timestamp, type);
+      }
+      return MatchCode.SKIP;  // skip the delete cell.
+    }
+    
+    /* Check TimeRange
+     * If outside of range, move to next KeyValue
+     */
+    if(!tr.withinTimeRange(timestamp)) {
+      return MatchCode.SKIP;  // optimization chances here.
+    }
+    
+    /* Check Deletes
+     * If deleted, move to next KeyValue 
+     */
+    if(!deletes.isEmpty() && deletes.isDeleted(bytes, columnOffset,
+        columnLength, timestamp)) {
+      // 2 types of deletes:
+      // affects 1 cell or 1 column, so just skip the keyvalues.
+      // - delete family, so just skip to the next row.
+      return MatchCode.SKIP;
+    }
+    
+    /* Check Column and Versions
+     * Returns a MatchCode directly, identical language
+     * If matched column without enough versions, include
+     * If enough versions of this column or does not match, skip
+     * If have moved past 
+     * If enough versions of everything, 
+     */
+    return columns.checkColumn(bytes, columnOffset, columnLength);
+  }
+
+  // should be in KeyValue.
+  protected boolean isDelete(byte type) {
+    return (type != KeyValue.Type.Put.getCode());
+  }
+  
+  protected boolean isExpired(long timestamp) {
+    return (timestamp < oldestStamp);
+  }
+
+   /**
+   * If matcher returns SEEK_NEXT_COL you may be able
+   * to get a hint of the next column to seek to - call this.
+   * If it returns null, there is no hint.
+   *
+   * @return immediately after match returns SEEK_NEXT_COL - null if no hint,
+   *  else the next column we want
+   */
+  public ColumnCount getSeekColumn() {
+    return this.columns.getColumnHint();
+  }
+  
+  /**
+   * Called after reading each section (memcache, snapshot, storefiles).
+   * <p>
+   * This method will update the internal structures to be accurate for
+   * the next section. 
+   */
+  public void update() {
+    this.deletes.update();
+    this.columns.update();
+  }
+  
+  /**
+   * Resets the current columns and deletes
+   */
+  public void reset() {
+    this.deletes.reset();
+    this.columns.reset();
+  }
+
+  /**
+   * Set current row
+   * @param row
+   */
+  public void setRow(byte [] row) {
+    this.row = row;
+  }
+  
+
+  /**
+   * 
+   * @return the start key
+   */
+  public KeyValue getStartKey() {
+    return this.startKey;
+  }
+  
+  /**
+   * @return the TimeRange
+   */
+  public TimeRange getTimeRange() {
+    return this.tr;
+  }
+  
+  /**
+   * @return the oldest stamp
+   */
+  public long getOldestStamp() {
+    return this.oldestStamp;
+  }
+  
+  /**
+   * @return current KeyComparator
+   */
+  public KeyComparator getRowComparator() {
+    return this.rowComparator;
+  }
+  
+  /**
+   * @return ColumnTracker
+   */
+  public ColumnTracker getColumnTracker() {
+    return this.columns;
+  }
+  
+  /**
+   * @return DeleteTracker
+   */
+  public DeleteTracker getDeleteTracker() {
+    return this.deletes;
+  }
+  /**
+   * 
+   * @return
+   */
+  public boolean isDone() {
+    return this.columns.done();
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * This class is responsible for the tracking and enforcement of Deletes
+ * during the course of a Scan operation.
+ *
+ * It only has to enforce Delete and DeleteColumn, since the
+ * DeleteFamily is handled at a higher level.
+ *
+ * <p>
+ * This class is utilized through three methods:
+ * <ul><li>{@link add} when encountering a Delete or DeleteColumn
+ * <li>{@link isDeleted} when checking if a Put KeyValue has been deleted
+ * <li>{@link update} when reaching the end of a StoreFile or row for scans
+ * <p>
+ * This class is NOT thread-safe as queries are never multi-threaded 
+ */
+public class ScanDeleteTracker implements DeleteTracker {
+
+  private long familyStamp = -1L;
+  private byte [] deleteBuffer = null;
+  private int deleteOffset = 0;
+  private int deleteLength = 0;
+  private byte deleteType = 0;
+  private long deleteTimestamp = 0L;
+
+  private KeyValue.KeyComparator comparator;
+  
+  /**
+   * Constructor for ScanDeleteTracker
+   * @param comparator
+   */
+  public ScanDeleteTracker(KeyValue.KeyComparator comparator) {
+    this.comparator = comparator;
+  }
+  
+  /**
+   * Add the specified KeyValue to the list of deletes to check against for
+   * this row operation.
+   * <p>
+   * This is called when a Delete is encountered in a StoreFile.
+   * @param buffer KeyValue buffer
+   * @param qualifierOffset column qualifier offset
+   * @param qualifierLength column qualifier length
+   * @param timestamp timestamp
+   * @param type delete type as byte
+   */
+  @Override
+  public void add(byte[] buffer, int qualifierOffset, int qualifierLength,
+      long timestamp, byte type) {
+    if(timestamp > familyStamp) {
+      if(type == KeyValue.Type.DeleteFamily.getCode()) {
+        familyStamp = timestamp;
+        return;
+      }
+
+      if(deleteBuffer != null && type < deleteType) {
+        // same column, so ignore less specific delete
+        if(comparator.compareRows(deleteBuffer, deleteOffset, deleteLength,
+            buffer, qualifierOffset, qualifierLength) == 0){
+          return;
+        }
+      }
+      // new column, or more general delete type
+      deleteBuffer = buffer;
+      deleteOffset = qualifierOffset;
+      deleteLength = qualifierLength;
+      deleteType = type;
+      deleteTimestamp = timestamp;
+    }
+    // missing else is never called.
+  }
+
+  /** 
+   * Check if the specified KeyValue buffer has been deleted by a previously
+   * seen delete.
+   *
+   * @param buffer KeyValue buffer
+   * @param qualifierOffset column qualifier offset
+   * @param qualifierLength column qualifier length
+   * @param timestamp timestamp
+   * @return true is the specified KeyValue is deleted, false if not
+   */
+  @Override
+  public boolean isDeleted(byte [] buffer, int qualifierOffset,
+      int qualifierLength, long timestamp) {
+    if(timestamp < familyStamp) {
+      return true;
+    }
+    
+    if(deleteBuffer != null) {
+      // TODO ryan use a specific comparator
+      int ret = comparator.compareRows(deleteBuffer, deleteOffset, deleteLength,
+          buffer, qualifierOffset, qualifierLength);
+
+      if(ret == 0) {
+        if(deleteType == KeyValue.Type.DeleteColumn.getCode()) {
+          return true;
+        }
+        // Delete (aka DeleteVersion)
+        // If the timestamp is the same, keep this one
+        if (timestamp == deleteTimestamp) {
+          return true;
+        }
+        // use assert or not?
+        assert timestamp < deleteTimestamp;
+
+        // different timestamp, let's clear the buffer.
+        deleteBuffer = null;
+      } else if(ret < 0){
+        // Next column case.
+        deleteBuffer = null;
+      } else {
+        //Should never happen, throw Exception
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return deleteBuffer == null && familyStamp == 0;
+  }
+
+  @Override
+  // called between every row.
+  public void reset() {
+    familyStamp = 0L;
+    deleteBuffer = null;
+  }
+
+  @Override
+  // should not be called at all even (!)  
+  public void update() {
+    this.reset();
+  }
+
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,242 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+
+/**
+ * A query matcher that is specifically designed for the scan case.
+ */
+public class ScanQueryMatcher extends QueryMatcher {
+
+  private Filter filter;
+  // have to support old style filter for now.
+  private RowFilterInterface oldFilter;
+  // Optimization so we can skip lots of compares when we decide to skip
+  // to the next row.
+  private boolean stickyNextRow;
+  private KeyValue stopKey = null;
+
+  /**
+   * Constructs a QueryMatcher for a Scan.
+   * @param scan
+   * @param family
+   * @param columns
+   * @param ttl
+   * @param rowComparator
+   */
+  public ScanQueryMatcher(Scan scan, byte [] family,
+      NavigableSet<byte[]> columns, long ttl, 
+      KeyValue.KeyComparator rowComparator, int maxVersions) {
+    this.row = row;
+    this.tr = scan.getTimeRange();
+    this.oldestStamp = System.currentTimeMillis() - ttl;
+    this.rowComparator = rowComparator;
+    // shouldn't this be ScanDeleteTracker?
+    this.deletes =  new ScanDeleteTracker(rowComparator);
+    this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
+    this.stopKey = KeyValue.createFirstOnRow(scan.getStopRow());
+    this.filter = scan.getFilter();
+    this.oldFilter = scan.getOldFilter();
+    
+    // Single branch to deal with two types of reads (columns vs all in family)
+    if(columns == null || columns.size() == 0) {
+      // use a specialized scan for wildcard column tracker.
+      this.columns = new ScanWildcardColumnTracker(maxVersions);
+    } else {
+      // We can share the ExplicitColumnTracker, diff is we reset
+      // between rows, not between storefiles.
+      this.columns = new ExplicitColumnTracker(columns,maxVersions);
+    }
+  }
+
+  /**
+   * Determines if the caller should do one of several things:
+   * - seek/skip to the next row (MatchCode.SEEK_NEXT_ROW)
+   * - seek/skip to the next column (MatchCode.SEEK_NEXT_COL)
+   * - include the current KeyValue (MatchCode.INCLUDE)
+   * - ignore the current KeyValue (MatchCode.SKIP)
+   * - got to the next row (MatchCode.DONE)
+   * 
+   * @param kv KeyValue to check
+   * @return
+   * @throws IOException
+   */
+  public MatchCode match(KeyValue kv) {
+    if (filter != null && filter.filterAllRemaining()) {
+      return MatchCode.DONE_SCAN;
+    } else if (oldFilter != null && oldFilter.filterAllRemaining()) {
+      // the old filter runs only if the other filter didnt work.
+      return MatchCode.DONE_SCAN;
+    }
+
+    String kvStr = kv.toString();
+    byte [] bytes = kv.getBuffer();
+    int offset = kv.getOffset();
+    int initialOffset = offset; 
+    int kvLength = kv.getLength();
+
+    int keyLength = Bytes.toInt(bytes, offset, Bytes.SIZEOF_INT);
+    offset += KeyValue.ROW_OFFSET;
+    
+    short rowLength = Bytes.toShort(bytes, offset, Bytes.SIZEOF_SHORT);
+    offset += Bytes.SIZEOF_SHORT;
+    
+    int ret = this.rowComparator.compareRows(row, 0, row.length,
+        bytes, offset, rowLength);
+    if (ret <= -1) {
+      return MatchCode.DONE;
+    } else if (ret >= 1) {
+      // could optimize this, if necessary?
+      // Could also be called SEEK_TO_CURRENT_ROW, but this
+      // should be rare/never happens.
+      return MatchCode.SKIP;
+    }
+
+    // optimize case.
+    if (this.stickyNextRow)
+        return MatchCode.SEEK_NEXT_ROW;
+
+    // Give the row filter a chance to do it's job.
+    if (filter != null && filter.filterRowKey(bytes, offset, rowLength)) {
+      stickyNextRow = true; // optimize to keep from calling the filter too much.
+      return MatchCode.SEEK_NEXT_ROW;
+    } else if (oldFilter != null && oldFilter.filterRowKey(bytes, offset, rowLength)) {
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+
+    if (this.columns.done()) {
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+    
+    //Passing rowLength
+    offset += rowLength;
+
+    //Skipping family
+    byte familyLength = bytes [offset];
+    offset += familyLength + 1;
+    
+    int qualLength = keyLength + KeyValue.ROW_OFFSET -
+      (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
+    
+    long timestamp = kv.getTimestamp();
+    if (isExpired(timestamp)) {
+      // done, the rest wil also be expired as well.
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+    byte type = kv.getType();
+    if (isDelete(type)) {
+      if (tr.withinOrAfterTimeRange(timestamp)) {
+        this.deletes.add(bytes, offset, qualLength, timestamp, type);
+        // Can't early out now, because DelFam come before any other keys
+      }
+      // May be able to optimize the SKIP here, if we matched 
+      // due to a DelFam, we can skip to next row
+      // due to a DelCol, we can skip to next col
+      // But it requires more info out of isDelete().
+      // needful -> million column challenge.
+      return MatchCode.SKIP;
+    }
+
+    if (!tr.withinTimeRange(timestamp)) {
+      return MatchCode.SKIP;
+    }
+
+    if (deletes.isDeleted(bytes, offset,
+        qualLength, timestamp)) {
+      return MatchCode.SKIP;
+    }
+    
+    MatchCode colChecker =
+        columns.checkColumn(bytes, offset, qualLength);
+
+    // if SKIP -> SEEK_NEXT_COL
+    // if (NEXT,DONE) -> SEEK_NEXT_ROW
+    // if (INCLUDE) -> INCLUDE
+    if (colChecker == MatchCode.SKIP) {
+      return MatchCode.SEEK_NEXT_COL;
+    } else if (colChecker == MatchCode.NEXT || colChecker == MatchCode.DONE) {
+      stickyNextRow = true;
+      return MatchCode.SEEK_NEXT_ROW;
+    }
+
+    // else INCLUDE
+    // if (colChecker == MatchCode.INCLUDE)
+    // give the filter a chance to run.
+    if (filter == null)
+      return MatchCode.INCLUDE;
+
+    ReturnCode filterResponse = filter.filterKeyValue(kv);
+    if (filterResponse == ReturnCode.INCLUDE)
+      return MatchCode.INCLUDE;
+
+    if (filterResponse == ReturnCode.SKIP)
+      return MatchCode.SKIP;
+
+    // else
+    //if (filterResponse == ReturnCode.NEXT_ROW)
+    stickyNextRow = true;
+    return MatchCode.SEEK_NEXT_ROW;
+  }
+
+  /**
+   * If the row was otherwise going to be included, call this to last-minute
+   * check.
+   * @return
+   */
+  public boolean filterEntireRow() {
+    if (filter == null)
+      return false;
+    return filter.filterRow();
+  }
+
+  /**
+   * Set current row
+   * @param row
+   */
+  @Override
+  public void setRow(byte [] row) {
+    this.row = row;
+    reset();
+  }
+  
+  @Override
+  public void reset() {
+    super.reset();
+
+    stickyNextRow = false;
+    if (filter != null)
+      filter.reset();
+  }
+}

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Keeps track of the columns for a scan if they are not explicitly specified
+ */
+public class ScanWildcardColumnTracker implements ColumnTracker {
+  private byte [] columnBuffer = null;
+  private int columnOffset = 0;
+  private int columnLength = 0;
+  private int currentCount = 0;
+  private int maxVersions;
+
+  /**
+   * Return maxVersions of every row.
+   * @param maxVersion
+   */
+  public ScanWildcardColumnTracker(int maxVersion) {
+    this.maxVersions = maxVersion;
+  }
+
+  /**
+   * Can only return INCLUDE or SKIP, since returning "NEXT" or
+   * "DONE" would imply we have finished with this row, when
+   * this class can't figure that out.
+   *
+   * @param bytes
+   * @param offset
+   * @param length
+   * @return
+   */
+  @Override
+  public MatchCode checkColumn(byte[] bytes, int offset, int length) {
+    if (columnBuffer == null) {
+      // first iteration.
+      columnBuffer = bytes;
+      columnOffset = offset;
+      columnLength = length;
+      currentCount = 0;
+
+      if (++currentCount > maxVersions)
+        return MatchCode.SKIP;
+      return MatchCode.INCLUDE;
+    }
+    int cmp = Bytes.compareTo(bytes, offset, length,
+        columnBuffer, columnOffset, columnLength);
+    if (cmp == 0) {
+      if (++currentCount > maxVersions)
+        return MatchCode.SKIP; // skip to next col
+      return MatchCode.INCLUDE;
+    }
+
+    // new col > old col
+    if (cmp > 0) {
+      // switched columns, lets do something.x
+      columnBuffer = bytes;
+      columnOffset = offset;
+      columnLength = length;
+      currentCount = 0;
+      
+      if (++currentCount > maxVersions)
+        return MatchCode.SKIP;
+      return MatchCode.INCLUDE;
+    }
+    // new col < oldcol
+    // if (cmp < 0) {
+    throw new RuntimeException("ScanWildcardColumnTracker.checkColumn ran " +
+    		"into a column actually smaller than the previous column!");
+  }
+
+  @Override
+  public void update() {
+    // no-op, shouldn't even be called
+    throw new UnsupportedOperationException(
+        "ScanWildcardColumnTracker.update should never be called!");
+  }
+
+  @Override
+  public void reset() {
+    columnBuffer = null;
+  }
+
+  /**
+   * Used by matcher and scan/get to get a hint of the next column
+   * to seek to after checkColumn() returns SKIP.  Returns the next interesting
+   * column we want, or NULL there is none (wildcard scanner).
+   * @return
+   */
+  public ColumnCount getColumnHint() {
+    return null;
+  }
+
+
+  /**
+   * We can never know a-priori if we are done, so always return false. 
+   * @return false
+   */
+  @Override
+  public boolean done() {
+    return false;
+  }
+}



Mime
View raw message