hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r782445 [7/17] - in /hadoop/hbase/trunk_on_hadoop-0.18.3: ./ bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/client/trans...
Date Sun, 07 Jun 2009 19:57:43 GMT
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Jun  7 19:57:37 2009
@@ -1,5 +1,5 @@
  /**
- * Copyright 2008 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -22,12 +22,9 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,7 +33,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,7 +40,6 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -54,19 +49,16 @@
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionHistorian;
-import org.apache.hadoop.hbase.ValueOverMaxLengthException;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -106,14 +98,15 @@
  * regionName is a unique identifier for this HRegion. (startKey, endKey]
  * defines the keyspace for this HRegion.
  */
-public class HRegion implements HConstants {
+public class HRegion implements HConstants { // , Writable{
   static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
   final AtomicBoolean closed = new AtomicBoolean(false);
-  /* Closing can take some time; use the closing flag if there is stuff we don't want
-   * to do while in closing state; e.g. like offer this region up to the master as a region
-   * to close if the carrying regionserver is overloaded.  Once set, it is never cleared.
+  /* Closing can take some time; use the closing flag if there is stuff we don't 
+   * want to do while in closing state; e.g. like offer this region up to the 
+   * master as a region to close if the carrying regionserver is overloaded.
+   * Once set, it is never cleared.
    */
   private final AtomicBoolean closing = new AtomicBoolean(false);
   private final RegionHistorian historian;
@@ -126,6 +119,13 @@
     new ConcurrentHashMap<Integer, byte []>();
   protected final Map<byte [], Store> stores =
     new ConcurrentSkipListMap<byte [], Store>(KeyValue.FAMILY_COMPARATOR);
+  
+  //These variable are just used for getting data out of the region, to test on
+  //client side
+  // private int numStores = 0;
+  // private int [] storeSize = null;
+  // private byte [] name = null;
+  
   final AtomicLong memcacheSize = new AtomicLong(0);
 
   // This is the table subdirectory.
@@ -137,7 +137,6 @@
   final Path regiondir;
   private final Path regionCompactionDir;
   KeyValue.KVComparator comparator;
-  private KeyValue.KVComparator comparatorIgnoreTimestamp;
 
   /*
    * Set this when scheduling compaction if want the next compaction to be a
@@ -210,6 +209,24 @@
     Bytes.toBytes(REGIONINFO_FILE);
 
   /**
+   * Should only be used for testing purposes
+   */
+  public HRegion(){
+    this.basedir = null;
+    this.blockingMemcacheSize = 0;
+    this.conf = null;
+    this.flushListener = null;
+    this.fs = null;
+    this.historian = null;
+    this.memcacheFlushSize = 0;
+    this.log = null;
+    this.regionCompactionDir = null;
+    this.regiondir = null;
+    this.regionInfo = null;
+    this.threadWakeFrequency = 0L;
+  }
+  
+  /**
    * HRegion constructor.
    *
    * @param basedir qualified path of directory where region should be located,
@@ -234,8 +251,6 @@
       HRegionInfo regionInfo, FlushRequester flushListener) {
     this.basedir = basedir;
     this.comparator = regionInfo.getComparator();
-    this.comparatorIgnoreTimestamp =
-      this.comparator.getComparatorIgnoringTimestamps();
     this.log = log;
     this.fs = fs;
     this.conf = conf;
@@ -503,6 +518,11 @@
     return this.regionInfo.getRegionName();
   }
 
+  /** @return region name as string for logging */
+  public String getRegionNameAsString() {
+    return this.regionInfo.getRegionNameAsString();
+  }
+
   /** @return HTableDescriptor for this region */
   public HTableDescriptor getTableDesc() {
     return this.regionInfo.getTableDesc();
@@ -965,197 +985,6 @@
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Fetch multiple versions of a single data item, with timestamp.
-   *
-   * @param row
-   * @param column
-   * @param ts
-   * @param nv
-   * @return Results or null if none.
-   * @throws IOException
-   */
-  public List<KeyValue> get(final byte[] row, final byte[] column, final long ts,
-      final int nv) 
-  throws IOException {
-    long timestamp = ts == -1? HConstants.LATEST_TIMESTAMP : ts;
-    int numVersions = nv == -1? 1 : nv;
-    splitsAndClosesLock.readLock().lock();
-    try {
-      if (this.closed.get()) {
-        throw new IOException("Region " + this + " closed");
-      }
-      // Make sure this is a valid row and valid column
-      checkRow(row);
-      checkColumn(column);
-      // Don't need a row lock for a simple get
-      List<KeyValue> result = getStore(column).
-        get(KeyValue.createFirstOnRow(row, column, timestamp), numVersions);
-      // Guarantee that we return null instead of a zero-length array, 
-      // if there are no results to return.
-      return (result == null || result.isEmpty())? null : result;
-    } finally {
-      splitsAndClosesLock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Data structure with a counter that is accessible rather than create a
-   * new Integer every time we want to up the counter.  Initializes at count 1.
-   */
-  static class Counter {
-    int counter = 1;
-  }
-
-  /*
-   * Check to see if we've not gone over threshold for this particular
-   * column.
-   * @param kv
-   * @param versions
-   * @param versionsCount
-   * @return True if its ok to add current value.
-   */
-  static boolean okToAddResult(final KeyValue kv, final int versions,
-      final Map<KeyValue, HRegion.Counter> versionsCount) {
-    if (versionsCount == null) {
-      return true;
-    }
-    if (versionsCount.containsKey(kv)) {
-      if (versionsCount.get(kv).counter < versions) {
-        return true;
-      }
-    } else {
-      return true;
-    }
-    return false;
-  }
-
-  /*
-   * Used adding item found to list of results getting.
-   * @param kv
-   * @param versionsCount
-   * @param results
-   */
-  static void addResult(final KeyValue kv,
-      final Map<KeyValue, HRegion.Counter> versionsCount,
-      final List<KeyValue> results) {
-    // Don't add if already present; i.e. ignore second entry.
-    if (results.contains(kv)) return;
-    results.add(kv);
-    if (versionsCount == null) {
-      return;
-    }
-    if (!versionsCount.containsKey(kv)) {
-      versionsCount.put(kv, new HRegion.Counter());
-    } else {
-      versionsCount.get(kv).counter++;
-    }
-  }
-
-  /*
-   * @param versions Number of versions to get.
-   * @param versionsCount May be null.
-   * @param columns Columns we want to fetch.
-   * @return True if has enough versions.
-   */
-  static boolean hasEnoughVersions(final int versions,
-      final Map<KeyValue, HRegion.Counter> versionsCount,
-      final Set<byte []> columns) {
-    if (columns == null || versionsCount == null) {
-      // Wants all columns so just keep going
-      return false;
-    }
-    if (columns.size() > versionsCount.size()) {
-      return false;
-    }
-    if (versions == 1) {
-      return true;
-    }
-    // Need to look at each to make sure at least versions.
-    for (Map.Entry<KeyValue, HRegion.Counter> e: versionsCount.entrySet()) {
-      if (e.getValue().counter < versions) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Fetch all the columns for the indicated row at a specified timestamp.
-   * Returns a HbaseMapWritable that maps column names to values.
-   *
-   * We should eventually use Bloom filters here, to reduce running time.  If 
-   * the database has many column families and is very sparse, then we could be 
-   * checking many files needlessly.  A small Bloom for each row would help us 
-   * determine which column groups are useful for that row.  That would let us 
-   * avoid a bunch of disk activity.
-   *
-   * @param row
-   * @param columns Array of columns you'd like to retrieve. When null, get all.
-   * @param ts
-   * @param numVersions number of versions to retrieve
-   * @param lockid
-   * @return HbaseMapWritable<columnName, Cell> values
-   * @throws IOException
-   */
-  public HbaseMapWritable<byte [], Cell> getFull(final byte [] row,
-      final NavigableSet<byte []> columns, final long ts,
-      final int numVersions, final Integer lockid) 
-  throws IOException {
-    // Check columns passed
-    if (columns != null) {
-      for (byte [] column: columns) {
-        checkColumn(column);
-      }
-    }
-    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-    Map<KeyValue, Counter> versionCounter =
-      new TreeMap<KeyValue, Counter>(this.comparatorIgnoreTimestamp);
-    Integer lid = getLock(lockid,row);
-    HashSet<Store> storeSet = new HashSet<Store>();
-    try {
-      // Get the concerned columns or all of them
-      if (columns != null) {
-        for (byte[] bs : columns) {
-          Store store = stores.get(bs);
-          if (store != null) {
-            storeSet.add(store);
-          }
-        }
-      } else {
-        storeSet.addAll(stores.values());
-      }
-      long timestamp =
-        (ts == HConstants.LATEST_TIMESTAMP)? System.currentTimeMillis(): ts;
-      KeyValue key = KeyValue.createFirstOnRow(row, timestamp);
-      // For each column name that is just a column family, open the store
-      // related to it and fetch everything for that row. HBASE-631
-      // Also remove each store from storeSet so that these stores
-      // won't be opened for no reason. HBASE-783
-      if (columns != null) {
-        for (byte [] bs : columns) {
-          // TODO: Fix so we use comparator in KeyValue that looks at
-          // column family portion only.
-          if (KeyValue.getFamilyDelimiterIndex(bs, 0, bs.length) == (bs.length - 1)) {
-            Store store = stores.get(bs);
-            store.getFull(key, null, null, numVersions, versionCounter,
-              keyvalues, timestamp);
-            storeSet.remove(store);
-          }
-        }
-      }
-      for (Store targetStore: storeSet) {
-        targetStore.getFull(key, columns, null, numVersions, versionCounter,
-          keyvalues, timestamp);
-      }
-      
-      return Cell.createCells(keyvalues);
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
   /**
    * Return all the data for the row that matches <i>row</i> exactly, 
    * or the one that immediately preceeds it, at or immediately before 
@@ -1165,9 +994,9 @@
    * @return map of values
    * @throws IOException
    */
-  RowResult getClosestRowBefore(final byte [] row)
+  Result getClosestRowBefore(final byte [] row)
   throws IOException{
-    return getClosestRowBefore(row, HConstants.COLUMN_FAMILY);
+    return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
   }
 
   /**
@@ -1176,12 +1005,13 @@
    * <i>ts</i>.
    * 
    * @param row row key
+   * @param family
    * @param columnFamily Must include the column family delimiter character.
    * @return map of values
    * @throws IOException
    */
-  public RowResult getClosestRowBefore(final byte [] row,
-    final byte [] columnFamily)
+  public Result getClosestRowBefore(final byte [] row,
+    final byte [] family)
   throws IOException{
     // look across all the HStores for this region and determine what the
     // closest key is across all column families, since the data may be sparse
@@ -1189,9 +1019,10 @@
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
-      Store store = getStore(columnFamily);
+      Store store = getStore(family);
       KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
+      LOG.debug("getClosestRowBefore looking for: " + Bytes.toStringBinary(row));
       key = store.getRowKeyAtOrBefore(kv);
       if (key == null) {
         return null;
@@ -1202,52 +1033,45 @@
       if (!this.comparator.matchingRows(kv, key)) {
         kv = new KeyValue(key.getRow(), HConstants.LATEST_TIMESTAMP);
       }
-      store.getFull(kv, null, null, 1, null, results, System.currentTimeMillis());
-      // Convert to RowResult.  TODO: Remove need to do this.
-      return RowResult.createRowResult(results);
+      Get get = new Get(key.getRow());
+      store.get(get, null, results);
+      
+      return new Result(results);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
   }
 
+  //TODO
   /**
    * Return an iterator that scans over the HRegion, returning the indicated 
-   * columns for only the rows that match the data filter.  This Iterator must
-   * be closed by the caller.
+   * columns and rows specified by the {@link Scan}.
+   * <p>
+   * This Iterator must be closed by the caller.
    *
-   * @param cols columns to scan. If column name is a column family, all
-   * columns of the specified column family are returned.  Its also possible
-   * to pass a regex in the column qualifier. A column qualifier is judged to
-   * be a regex if it contains at least one of the following characters:
-   * <code>\+|^&*$[]]}{)(</code>.
-   * @param firstRow row which is the starting point of the scan
-   * @param timestamp only return rows whose timestamp is <= this value
-   * @param filter row filter
+   * @param scan configured {@link Scan}
    * @return InternalScanner
    * @throws IOException
    */
-  public InternalScanner getScanner(byte[][] cols, byte [] firstRow,
-    long timestamp, RowFilterInterface filter) 
+  public InternalScanner getScanner(Scan scan)
   throws IOException {
     newScannerLock.readLock().lock();
     try {
       if (this.closed.get()) {
         throw new IOException("Region " + this + " closed");
       }
-      HashSet<Store> storeSet = new HashSet<Store>();
-      NavigableSet<byte []> columns =
-        new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-      // Below we make up set of stores we want scanners on and we fill out the
-      // list of columns.
-      for (int i = 0; i < cols.length; i++) {
-        columns.add(cols[i]);
-        Store s = stores.get(cols[i]);
-        if (s != null) {
-          storeSet.add(s);
+      // Verify families are all valid
+      if(scan.hasFamilies()) {
+        for(byte [] family : scan.getFamilyMap().keySet()) {
+          checkFamily(family);
+        }
+      } else { // Adding all families to scanner
+        for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
+          scan.addFamily(family);
         }
       }
-      return new HScanner(columns, firstRow, timestamp,
-        storeSet.toArray(new Store [storeSet.size()]), filter);
+      return new RegionScanner(scan);
+      
     } finally {
       newScannerLock.readLock().unlock();
     }
@@ -1256,44 +1080,136 @@
   //////////////////////////////////////////////////////////////////////////////
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
+  /**
+   * @param delete
+   * @param lockid
+   * @param writeToWAL
+   * @throws IOException
+   */
+  public void delete(Delete delete, Integer lockid, boolean writeToWAL)
+  throws IOException {
+    checkReadOnly();
+    checkResources();
+    splitsAndClosesLock.readLock().lock();
+    Integer lid = null;
+    try {
+      byte [] row = delete.getRow();
+      // If we did not pass an existing row lock, obtain a new one
+      lid = getLock(lockid, row);
+
+      //Check to see if this is a deleteRow insert
+      if(delete.getFamilyMap().isEmpty()){
+        for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
+          delete.deleteFamily(family);
+        }
+      } else {
+        for(byte [] family : delete.getFamilyMap().keySet()) {
+          if(family == null) {
+            throw new NoSuchColumnFamilyException("Empty family is invalid");
+          }
+          checkFamily(family);
+        }
+      }
+      
+      for(Map.Entry<byte[], List<KeyValue>> e: delete.getFamilyMap().entrySet()) {
+        byte [] family = e.getKey();
+        delete(family, e.getValue(), writeToWAL);
+      }
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+      splitsAndClosesLock.readLock().unlock();
+    }
+  }
+  
+  
+  /**
+   * @param family
+   * @param kvs
+   * @param writeToWAL
+   * @throws IOException
+   */
+  public void delete(byte [] family, List<KeyValue> kvs, boolean writeToWAL)
+  throws IOException {
+    long now = System.currentTimeMillis();
+    boolean flush = false;
+    this.updatesLock.readLock().lock();
+    try {
+      if (writeToWAL) {
+        this.log.append(regionInfo.getRegionName(),
+          regionInfo.getTableDesc().getName(), kvs,
+          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
+      }
+      long size = 0;
+      Store store = getStore(family);
+      for (KeyValue kv: kvs) {
+        // Check if time is LATEST, change to time of most recent addition if so
+        // This is expensive.
+        if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+          List<KeyValue> result = new ArrayList<KeyValue>(1);
+          Get g = new Get(kv.getRow());
+          NavigableSet<byte []> qualifiers =
+            new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+          qualifiers.add(kv.getQualifier());
+          get(store, g, qualifiers, result);
+          if (result.isEmpty()) {
+            // Nothing to delete
+            continue;
+          }
+          if (result.size() > 1) {
+            throw new RuntimeException("Unexpected size: " + result.size());
+          }
+          KeyValue getkv = result.get(0);
+          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+            getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+        }
+        size = this.memcacheSize.addAndGet(store.delete(kv));
+      }
+      flush = isFlushSize(size);
+    } finally {
+      this.updatesLock.readLock().unlock();
+    }
+    if (flush) {
+      // Request a cache flush.  Do it outside update lock.
+      requestFlush();
+    }
+  }
   
   /**
-   * @param b
+   * @param put
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b) throws IOException {
-    this.batchUpdate(b, null, true);
+  public void put(Put put) throws IOException {
+    this.put(put, null, true);
   }
   
   /**
-   * @param b
+   * @param put
    * @param writeToWAL
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException {
-    this.batchUpdate(b, null, writeToWAL);
+  public void put(Put put, boolean writeToWAL) throws IOException {
+    this.put(put, null, writeToWAL);
   }
 
-  
   /**
-   * @param b
+   * @param put
    * @param lockid
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException {
-    this.batchUpdate(b, lockid, true);
+  public void put(Put put, Integer lockid) throws IOException {
+    this.put(put, lockid, true);
   }
-  
+
   /**
-   * @param b
+   * @param put
    * @param lockid
-   * @param writeToWAL if true, then we write this update to the log
+   * @param writeToWAL
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
+  public void put(Put put, Integer lockid, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
-    validateValuesLength(b);
+//    validateValuesLength(put);
 
     // Do a rough check that we have resources to accept a write.  The check is
     // 'rough' in that between the resource check and the call to obtain a 
@@ -1307,49 +1223,18 @@
       // #commit or #abort or if the HRegionServer lease on the lock expires.
       // See HRegionServer#RegionListener for how the expire on HRegionServer
       // invokes a HRegion#abort.
-      byte [] row = b.getRow();
+      byte [] row = put.getRow();
       // If we did not pass an existing row lock, obtain a new one
       Integer lid = getLock(lockid, row);
-      long now = System.currentTimeMillis();
-      long commitTime = b.getTimestamp() == LATEST_TIMESTAMP?
-        now: b.getTimestamp();
-      Set<byte []> latestTimestampDeletes = null;
-      List<KeyValue> edits = new ArrayList<KeyValue>();
+      byte [] now = Bytes.toBytes(System.currentTimeMillis());
       try {
-        for (BatchOperation op: b) {
-          byte [] column = op.getColumn();
-          checkColumn(column);
-          KeyValue kv = null;
-          if (op.isPut()) {
-            kv = new KeyValue(row, column, commitTime, op.getValue());
-          } else {
-            // Its a delete.
-            if (b.getTimestamp() == LATEST_TIMESTAMP) {
-              // Save off these deletes of the most recent thing added on the
-              // family.
-              if (latestTimestampDeletes == null) {
-                latestTimestampDeletes =
-                  new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
-              }
-              latestTimestampDeletes.add(op.getColumn());
-              continue;
-            }
-            // Its an explicit timestamp delete
-            kv = new KeyValue(row, column, commitTime, KeyValue.Type.Delete,
-              HConstants.EMPTY_BYTE_ARRAY);
-          }
-          edits.add(kv);
-        }
-        if (!edits.isEmpty()) {
-          update(edits, writeToWAL, now);
-        }
-        if (latestTimestampDeletes != null &&
-            !latestTimestampDeletes.isEmpty()) {
-          // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
-          // as edits.  Need to do individually after figuring which is latest
-          // timestamp to delete.
-          for (byte [] column: latestTimestampDeletes) {
-            deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
+        for(Map.Entry<byte[], List<KeyValue>> entry : 
+          put.getFamilyMap().entrySet()) {
+          byte [] family = entry.getKey();
+          checkFamily(family);
+          List<KeyValue> puts = entry.getValue();
+          if(updateKeys(puts, now)) {
+            put(family, puts, writeToWAL);
           }
         }
       } finally {
@@ -1360,127 +1245,119 @@
     }
   }
 
+  
+  //TODO, Think that gets/puts and deletes should be refactored a bit so that 
+  //the getting of the lock happens before, so that you would just pass it into
+  //the methods. So in the case of checkAndPut you could just do lockRow, 
+  //get, put, unlockRow or something
   /**
-   * Performs an atomic check and save operation. Checks if
-   * the specified expected values have changed, and if not
-   * applies the update.
    * 
-   * @param b the update to apply
-   * @param expectedValues the expected values to check
-   * @param lockid
-   * @param writeToWAL whether or not to write to the write ahead log
-   * @return true if update was applied
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param expectedValue
+   * @param put
+   * @param lockId
+   * @param writeToWAL
    * @throws IOException
+   * @return true if the new put was execute, false otherwise
    */
-  public boolean checkAndSave(BatchUpdate b,
-    HbaseMapWritable<byte[], byte[]> expectedValues, Integer lockid,
-    boolean writeToWAL)
-  throws IOException {
-    // This is basically a copy of batchUpdate with the atomic check and save
-    // added in. So you should read this method with batchUpdate. I will
-    // comment the areas that I have changed where I have not changed, you
-    // should read the comments from the batchUpdate method
-    boolean success = true;
+  public boolean checkAndPut(byte [] row, byte [] family, byte [] qualifier,
+      byte [] expectedValue, Put put, Integer lockId, boolean writeToWAL) 
+  throws IOException{
     checkReadOnly();
-    validateValuesLength(b);
+    //TODO, add check for value length or maybe even better move this to the 
+    //client if this becomes a global setting
     checkResources();
     splitsAndClosesLock.readLock().lock();
     try {
-      byte[] row = b.getRow();
-      long now = System.currentTimeMillis();
-      Integer lid = getLock(lockid,row);
+      Get get = new Get(row, put.getRowLock());
+      checkFamily(family);
+      get.addColumn(family, qualifier);
+
+      byte [] now = Bytes.toBytes(System.currentTimeMillis());
+
+      // Lock row
+      Integer lid = getLock(lockId, get.getRow()); 
+      List<KeyValue> result = new ArrayList<KeyValue>();
       try {
-        NavigableSet<byte []> keySet =
-          new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        keySet.addAll(expectedValues.keySet());
-        Map<byte[],Cell> actualValues = getFull(row, keySet,
-          HConstants.LATEST_TIMESTAMP, 1,lid);
-        for (byte[] key : keySet) {
-	  // If test fails exit
-	  Cell cell = actualValues.get(key);
-	  byte[] actualValue = new byte[] {};
-	  if (cell != null) 
-	    actualValue = cell.getValue();
-	  if(!Bytes.equals(actualValue,
-			   expectedValues.get(key))) {
-	    success = false;
-	    break;
-	  }
-	}
-        if (success) {
-          long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
-            now: b.getTimestamp();
-          Set<byte []> latestTimestampDeletes = null;
-          List<KeyValue> edits = new ArrayList<KeyValue>();
-          for (BatchOperation op: b) {
-            byte [] column = op.getColumn();
-            KeyValue kv = null;
-            if (op.isPut()) {
-              kv = new KeyValue(row, column, commitTime, op.getValue());
-            } else {
-              // Its a delete.
-              if (b.getTimestamp() == LATEST_TIMESTAMP) {
-                // Save off these deletes of the most recent thing added on
-                // the family.
-                if (latestTimestampDeletes == null) {
-                  latestTimestampDeletes =
-                    new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
-                }
-                latestTimestampDeletes.add(op.getColumn());
-              } else {
-                // Its an explicit timestamp delete
-                kv = new KeyValue(row, column, commitTime,
-                  KeyValue.Type.Delete, HConstants.EMPTY_BYTE_ARRAY);
-              }
-            }
-            edits.add(kv);
-          }
-          if (!edits.isEmpty()) {
-            update(edits, writeToWAL, now);
-          }
-          if (latestTimestampDeletes != null &&
-              !latestTimestampDeletes.isEmpty()) {
-            // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
-            // as edits.  Need to do individually after figuring which is latest
-            // timestamp to delete.
-            for (byte [] column: latestTimestampDeletes) {
-              deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
+        //Getting data
+        for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
+          get.getFamilyMap().entrySet()) {
+          get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
+        }
+        boolean matches = false;
+        if (result.size() == 0 && expectedValue.length == 0) {
+          matches = true;
+        } else if(result.size() == 1) {
+          //Compare the expected value with the actual value
+          byte [] actualValue = result.get(0).getValue();
+          matches = Bytes.equals(expectedValue, actualValue);
+        }
+        //If matches put the new put
+        if(matches) {
+          for(Map.Entry<byte[], List<KeyValue>> entry :
+            put.getFamilyMap().entrySet()) {
+            byte [] fam = entry.getKey();
+            checkFamily(fam);
+            List<KeyValue> puts = entry.getValue();
+            if(updateKeys(puts, now)) {
+              put(fam, puts, writeToWAL);
             }
           }
+          return true;  
         }
+        return false;
       } finally {
-        if(lockid == null) releaseRowLock(lid);
+        if(lockId == null) releaseRowLock(lid);
       }
     } finally {
       splitsAndClosesLock.readLock().unlock();
-    }
-    return success;
+    }    
   }
-
-  /*
-   * Utility method to verify values length
-   * @param batchUpdate The update to verify
-   * @throws IOException Thrown if a value is too long
-   */
-  private void validateValuesLength(BatchUpdate batchUpdate)
-  throws IOException {
-    for (Iterator<BatchOperation> iter = 
-      batchUpdate.iterator(); iter.hasNext();) {
-      BatchOperation operation = iter.next();
-      if (operation.getValue() != null) {
-        HColumnDescriptor fam = this.regionInfo.getTableDesc().
-          getFamily(operation.getColumn());
-        if (fam != null) {
-          int maxLength = fam.getMaxValueLength();
-          if (operation.getValue().length > maxLength) {
-            throw new ValueOverMaxLengthException("Value in column "
-                + Bytes.toString(operation.getColumn()) + " is too long. "
-                + operation.getValue().length + " instead of " + maxLength);
-          }
-        }
-      }
+      
+  
+  /**
+   * Checks if any stamps are > now.  If so, sets them to now.
+   * <p>
+   * This acts to be prevent users from inserting future stamps as well as
+   * to replace LATEST_TIMESTAMP with now.
+   * @param keys
+   * @param now
+   * @return
+   */
+  private boolean updateKeys(List<KeyValue> keys, byte [] now) {
+    if(keys == null || keys.isEmpty()) {
+      return false;
     }
+    for(KeyValue key : keys) {
+      key.updateLatestStamp(now);
+    }
+    return true;
   }
+  
+
+//  /*
+//   * Utility method to verify values length. 
+//   * @param batchUpdate The update to verify
+//   * @throws IOException Thrown if a value is too long
+//   */
+//  private void validateValuesLength(Put put)
+//  throws IOException {
+//    Map<byte[], List<KeyValue>> families = put.getFamilyMap();
+//    for(Map.Entry<byte[], List<KeyValue>> entry : families.entrySet()) {
+//      HColumnDescriptor hcd = 
+//        this.regionInfo.getTableDesc().getFamily(entry.getKey());
+//      int maxLen = hcd.getMaxValueLength();
+//      for(KeyValue kv : entry.getValue()) {
+//        if(kv.getValueLength() > maxLen) {
+//          throw new ValueOverMaxLengthException("Value in column "
+//            + Bytes.toString(kv.getColumn()) + " is too long. "
+//            + kv.getValueLength() + " > " + maxLen);
+//        }
+//      }
+//    }
+//  }
 
   /*
    * Check if resources to support an update.
@@ -1517,230 +1394,6 @@
           + Thread.currentThread().getName() + "'");
     }
   }
-  
-  /**
-   * Delete all cells of the same age as the passed timestamp or older.
-   * @param row
-   * @param column
-   * @param ts Delete all entries that have this timestamp or older
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteAll(final byte [] row, final byte [] column, final long ts,
-      final Integer lockid)
-  throws IOException {
-    checkColumn(column);
-    checkReadOnly();
-    Integer lid = getLock(lockid,row);
-    try {
-      // Delete ALL versions rather than column family VERSIONS.  If we just did
-      // VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
-      deleteMultiple(row, column, ts, ALL_VERSIONS, System.currentTimeMillis());
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
-  /**
-   * Delete all cells of the same age as the passed timestamp or older.
-   * @param row
-   * @param ts Delete all entries that have this timestamp or older
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteAll(final byte [] row, final long ts, final Integer lockid)
-  throws IOException {
-    checkReadOnly();
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    long time = ts;
-    if (ts == HConstants.LATEST_TIMESTAMP) {
-      time = now;
-    }
-    KeyValue kv = KeyValue.createFirstOnRow(row, time);
-    try {
-      for (Store store : stores.values()) {
-        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, time);
-        List<KeyValue> edits = new ArrayList<KeyValue>();
-        for (KeyValue key: keyvalues) {
-          // This is UGLY. COPY OF KEY PART OF KeyValue.
-          edits.add(key.cloneDelete());
-        }
-        update(edits, now);
-      }
-    } finally {
-      if (lockid == null) releaseRowLock(lid);
-    }
-  }
-  
-  /**
-   * Delete all cells for a row with matching columns with timestamps
-   * less than or equal to <i>timestamp</i>. 
-   * 
-   * @param row The row to operate on
-   * @param columnRegex The column regex 
-   * @param timestamp Timestamp to match
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteAllByRegex(final byte [] row, final String columnRegex, 
-      final long timestamp, final Integer lockid) throws IOException {
-    checkReadOnly();
-    Pattern columnPattern = Pattern.compile(columnRegex);
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    KeyValue kv = new KeyValue(row, timestamp);
-    try {
-      for (Store store : stores.values()) {
-        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-        store.getFull(kv, null, columnPattern, ALL_VERSIONS, null, keyvalues,
-          now);
-        List<KeyValue> edits = new ArrayList<KeyValue>();
-        for (KeyValue key: keyvalues) {
-          edits.add(key.cloneDelete());
-        }
-        update(edits, now);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
-  /**
-   * Delete all cells for a row with matching column family with timestamps
-   * less than or equal to <i>timestamp</i>.
-   *
-   * @param row The row to operate on
-   * @param family The column family to match
-   * @param timestamp Timestamp to match
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteFamily(byte [] row, byte [] family, long timestamp,
-      final Integer lockid)
-  throws IOException{
-    checkReadOnly();
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    try {
-      // find the HStore for the column family
-      Store store = getStore(family);
-      // find all the keys that match our criteria
-      List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-      store.getFull(new KeyValue(row, timestamp), null, null, ALL_VERSIONS,
-        null, keyvalues, now);
-      // delete all the cells
-      List<KeyValue> edits = new ArrayList<KeyValue>();
-      for (KeyValue kv: keyvalues) {
-        edits.add(kv.cloneDelete());
-      }
-      update(edits, now);
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
-  /**
-   * Delete all cells for a row with all the matching column families by
-   * familyRegex with timestamps less than or equal to <i>timestamp</i>.
-   * 
-   * @param row The row to operate on
-   * @param familyRegex The column family regex for matching. This regex
-   * expression just match the family name, it didn't include <code>:<code>
-   * @param timestamp Timestamp to match
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteFamilyByRegex(byte [] row, String familyRegex,
-      final long timestamp, final Integer lockid)
-  throws IOException {
-    checkReadOnly();
-    // construct the family regex pattern
-    Pattern familyPattern = Pattern.compile(familyRegex);
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    KeyValue kv = new KeyValue(row, timestamp);
-    try {
-      for(Store store: stores.values()) {
-        String familyName = Bytes.toString(store.getFamily().getName());
-        // check the family name match the family pattern.
-        if(!(familyPattern.matcher(familyName).matches())) 
-          continue;
-        
-        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, now);
-        List<KeyValue> edits = new ArrayList<KeyValue>();
-        for (KeyValue k: keyvalues) {
-          edits.add(k.cloneDelete());
-        }
-        update(edits, now);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-  
-  /*
-   * Delete one or many cells.
-   * Used to support {@link #deleteAll(byte [], byte [], long)} and deletion of
-   * latest cell.
-   * @param row
-   * @param column
-   * @param ts Timestamp to start search on.
-   * @param versions How many versions to delete. Pass
-   * {@link HConstants#ALL_VERSIONS} to delete all.
-   * @param now
-   * @throws IOException
-   */
-  private void deleteMultiple(final byte [] row, final byte [] column,
-      final long ts, final int versions, final long now)
-  throws IOException {
-    checkReadOnly();
-    // We used to have a getKeys method that purportedly only got the keys and
-    // not the keys and values.  We now just do getFull.  For memcache values,
-    // shouldn't matter if we get key and value since it'll be the entry that
-    // is in memcache.  For the keyvalues from storefile, could be saving if
-    // we only returned key component. TODO.
-    List<KeyValue> keys = get(row, column, ts, versions);
-    if (keys != null && keys.size() > 0) {
-      // I think the below edits don't have to be storted.  Its deletes.
-      // hey don't have to go in in exact sorted order (we don't have to worry
-      // about the meta or root sort comparator here).
-      List<KeyValue> edits = new ArrayList<KeyValue>();
-      for (KeyValue key: keys) {
-        edits.add(key.cloneDelete());
-      }
-      update(edits, now);
-    }
-  }
-
-  /**
-   * Tests for the existence of any cells for a given coordinate.
-   * 
-   * @param row the row
-   * @param column the column, or null
-   * @param timestamp the timestamp, or HConstants.LATEST_VERSION for any
-   * @param lockid the existing lock, or null 
-   * @return true if cells exist for the row, false otherwise
-   * @throws IOException
-   */
-  public boolean exists(final byte[] row, final byte[] column, 
-    final long timestamp, final Integer lockid) 
-  throws IOException {
-    checkRow(row);
-    Integer lid = getLock(lockid, row);
-    try {
-      NavigableSet<byte []> columns = null;
-      if (column != null) {
-        columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        columns.add(column);
-      }
-      return !getFull(row, columns, timestamp, 1, lid).isEmpty();
-    } finally {
-      if (lockid == null) releaseRowLock(lid);
-    }
-  }
 
   /**
    * @throws IOException Throws exception if region is in read-only mode.
@@ -1758,9 +1411,9 @@
    * @praram now
    * @throws IOException
    */
-  private void update(final List<KeyValue> edits, final long now)
+  private void put(final byte [] family, final List<KeyValue> edits)
   throws IOException {
-    this.update(edits, true, now);
+    this.put(family, edits, true);
   }
 
   /** 
@@ -1771,9 +1424,8 @@
    * @param now
    * @throws IOException
    */
-  private void update(final List<KeyValue> edits, boolean writeToWAL,
-    final long now)
-  throws IOException {
+  private void put(final byte [] family, final List<KeyValue> edits, 
+      boolean writeToWAL) throws IOException {
     if (edits == null || edits.isEmpty()) {
       return;
     }
@@ -1781,14 +1433,15 @@
     this.updatesLock.readLock().lock();
     try {
       if (writeToWAL) {
+        long now = System.currentTimeMillis();
         this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), edits,
           (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
       }
       long size = 0;
+      Store store = getStore(family);
       for (KeyValue kv: edits) {
-        // TODO: Fix -- do I have to do a getColumn here?
-        size = this.memcacheSize.addAndGet(getStore(kv.getColumn()).add(kv));
+        size = this.memcacheSize.addAndGet(store.add(kv));
       }
       flush = isFlushSize(size);
     } finally {
@@ -1826,7 +1479,6 @@
   }
   
   // Do any reconstruction needed from the log
-  @SuppressWarnings("unused")
   protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
     Progressable reporter)
   throws UnsupportedEncodingException, IOException {
@@ -1865,23 +1517,6 @@
           Bytes.toString(row) + "'");
     }
   }
-  
-  /*
-   * Make sure this is a valid column for the current table
-   * @param columnName
-   * @throws NoSuchColumnFamilyException
-   */
-  private void checkColumn(final byte [] column)
-  throws NoSuchColumnFamilyException {
-    if (column == null) {
-      return;
-    }
-    if (!regionInfo.getTableDesc().hasFamily(column)) {
-      throw new NoSuchColumnFamilyException("Column family on " +
-        Bytes.toString(column) + " does not exist in region " + this
-          + " in table " + regionInfo.getTableDesc());
-    }
-  }
 
   /**
    * Obtain a lock on the given row.  Blocks until success.
@@ -1906,7 +1541,7 @@
    * @throws IOException
    * @return The id of the held lock.
    */
-  Integer obtainRowLock(final byte [] row) throws IOException {
+  public Integer obtainRowLock(final byte [] row) throws IOException {
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
@@ -2018,161 +1653,86 @@
     return this.basedir;
   }
 
+  
+  //TODO
   /**
-   * HScanner is an iterator through a bunch of rows in an HRegion.
+   * RegionScanner is an iterator through a bunch of rows in an HRegion.
+   * <p>
+   * It is used to combine scanners from multiple Stores (aka column families).
    */
-  private class HScanner implements InternalScanner {
-    private InternalScanner[] scanners;
-    private List<KeyValue> [] resultSets;
-    private RowFilterInterface filter;
-
-    /** Create an HScanner with a handle on many HStores. */
-    @SuppressWarnings("unchecked")
-    HScanner(final NavigableSet<byte []> columns, byte [] firstRow,
-      long timestamp, final Store [] stores, final RowFilterInterface filter)
-    throws IOException {
-      this.filter = filter;
-      this.scanners = new InternalScanner[stores.length];
+  class RegionScanner implements InternalScanner {
+    
+    private KeyValueHeap storeHeap;
+    private byte [] stopRow;
+    
+    RegionScanner(Scan scan) throws IOException {
+      if(Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
+        this.stopRow = null;
+      } else {
+        this.stopRow = scan.getStopRow();
+      }
+      
+      List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
       try {
-        for (int i = 0; i < stores.length; i++) {
-          // Only pass relevant columns to each store
-          NavigableSet<byte[]> columnSubset =
-            new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-          for (byte [] c: columns) {
-            if (KeyValue.FAMILY_COMPARATOR.compare(stores[i].storeName, c) == 0) {
-              columnSubset.add(c);
-            }
-          }
-          RowFilterInterface f = filter;
-          if (f != null) {
-            // Need to replicate filters.
-            // At least WhileMatchRowFilter will mess up the scan if only
-            // one shared across many rows. See HADOOP-2467.
-            f = (RowFilterInterface) WritableUtils.clone(filter, conf);
-          }
-          scanners[i] = stores[i].getScanner(timestamp, columnSubset, firstRow, f);
+        for(Map.Entry<byte[], NavigableSet<byte[]>> entry : 
+          scan.getFamilyMap().entrySet()) {
+          Store store = stores.get(entry.getKey());
+          scanners.add(store.getScanner(scan, entry.getValue()));
         }
       } catch (IOException e) {
-        for (int i = 0; i < this.scanners.length; i++) {
-          if (scanners[i] != null) {
-            closeScanner(i);
+        for(KeyValueScanner scanner : scanners) {
+          if(scanner != null) {
+            close(scanner);
           }
         }
         throw e;
       }
-
-      // Advance to the first key in each store.
-      // All results will match the required column-set and scanTime.
-      this.resultSets = new List[scanners.length];
-      for (int i = 0; i < scanners.length; i++) {
-        resultSets[i] = new ArrayList<KeyValue>();
-        if(scanners[i] != null && !scanners[i].next(resultSets[i])) {
-          closeScanner(i);
-        }
-      }
-
+      
+      this.storeHeap = 
+        new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
+      
       // As we have now successfully completed initialization, increment the
       // activeScanner count.
       activeScannerCount.incrementAndGet();
     }
 
+    /**
+     * Get the next row of results from this region.
+     * @param results list to append results to
+     * @return true if there are more rows, false if scanner is done
+     */
     public boolean next(List<KeyValue> results)
     throws IOException {
-      boolean moreToFollow = false;
-      boolean filtered = false;
-      do {
-        // Find the lowest key across all stores.
-        KeyValue chosen = null;
-        long chosenTimestamp = -1;
-        for (int i = 0; i < this.scanners.length; i++) {
-          if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
-            continue;
-          }
-          KeyValue kv = this.resultSets[i].get(0);
-          if (chosen == null ||
-               (comparator.compareRows(kv, chosen) < 0) ||
-               ((comparator.compareRows(kv, chosen) == 0) &&
-                 (kv.getTimestamp() > chosenTimestamp))) {
-            chosen = kv;
-            chosenTimestamp = chosen.getTimestamp();
-          }
-        }
-
-        // Store results from each sub-scanner.
-        if (chosenTimestamp >= 0) {
-          for (int i = 0; i < scanners.length; i++) {
-            if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
-              continue;
-            }
-            KeyValue kv = this.resultSets[i].get(0);
-            if (comparator.compareRows(kv, chosen) == 0) {
-              results.addAll(this.resultSets[i]);
-              resultSets[i].clear();
-              if (!scanners[i].next(resultSets[i])) {
-                closeScanner(i);
-              }
-            }
-          }
-        }
-
-        moreToFollow = chosenTimestamp >= 0;
-        if (results == null || results.size() <= 0) {
-          // If we got no results, then there is no more to follow.
-          moreToFollow = false;
-        }
-
-        filtered = filter == null ? false : filter.filterRow(results);
-        if (filter != null && filter.filterAllRemaining()) {
-          moreToFollow = false;
-        }
-        
-        if (moreToFollow) {
-          if (filter != null) {
-            filter.rowProcessed(filtered, chosen.getBuffer(), chosen.getRowOffset(),
-              chosen.getRowLength());
-          }
-          if (filtered) {
-            results.clear();
-          }
-        }
-      } while(filtered && moreToFollow);
-
-      // Make sure scanners closed if no more results
-      if (!moreToFollow) {
-        for (int i = 0; i < scanners.length; i++) {
-          if (null != scanners[i]) {
-            closeScanner(i);
-          }
-        }
+      // This method should probably be reorganized a bit... has gotten messy
+      KeyValue kv = this.storeHeap.peek();
+      if(kv == null) {
+        return false;
       }
-      
-      return moreToFollow;
-    }
-
-    /** Shut down a single scanner */
-    void closeScanner(int i) {
-      try {
-        try {
-          scanners[i].close();
-        } catch (IOException e) {
-          LOG.warn("Failed closing scanner " + i, e);
-        }
-      } finally {
-        scanners[i] = null;
-        // These data members can be null if exception in constructor
-        if (resultSets != null) {
-          resultSets[i] = null;
+      byte [] currentRow = kv.getRow();
+      // See if we passed stopRow
+      if(stopRow != null &&
+          comparator.compareRows(stopRow, 0, stopRow.length, 
+              currentRow, 0, currentRow.length)
+          <= 0){
+        return false;
+      }
+      this.storeHeap.next(results);
+      while(true) {
+        kv = this.storeHeap.peek();
+        if(kv == null) {
+          return false;
+        }
+        byte [] row = kv.getRow();
+        if(!Bytes.equals(currentRow, row)) {
+          return true;
         }
+        this.storeHeap.next(results);
       }
     }
 
     public void close() {
       try {
-        for(int i = 0; i < scanners.length; i++) {
-          if(scanners[i] != null) {
-            closeScanner(i);
-          }
-        }
+        storeHeap.close();
       } finally {
         synchronized (activeScannerCount) {
           int count = activeScannerCount.decrementAndGet();
@@ -2188,14 +1748,22 @@
         }
       }
     }
-
-    public boolean isWildcardScanner() {
-      throw new UnsupportedOperationException("Unimplemented on HScanner");
+    /**
+     * 
+     * @param scanner to be closed
+     */
+    public void close(KeyValueScanner scanner) {
+      try {
+        scanner.close();
+      } catch(NullPointerException npe) {}
+    }
+    
+    /**
+     * @return the current storeHeap
+     */
+    public KeyValueHeap getStoreHeap() {
+      return this.storeHeap;
     }
-
-    public boolean isMultipleMatchScanner() {
-      throw new UnsupportedOperationException("Unimplemented on HScanner");
-    }  
   }
   
   // Utility methods
@@ -2282,9 +1850,9 @@
     Integer lid = meta.obtainRowLock(row);
     try {
       List<KeyValue> edits = new ArrayList<KeyValue>();
-      edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
-        Writables.getBytes(r.getRegionInfo())));
-      meta.update(edits, System.currentTimeMillis());
+      edits.add(new KeyValue(row, CATALOG_FAMILY, REGIONINFO_QUALIFIER,
+          System.currentTimeMillis(), Writables.getBytes(r.getRegionInfo())));
+      meta.put(HConstants.CATALOG_FAMILY, edits);
     } finally {
       meta.releaseRowLock(lid);
     }
@@ -2304,8 +1872,9 @@
   public static void removeRegionFromMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final byte [] regionName)
   throws IOException {
-    srvr.deleteFamily(metaRegionName, regionName, HConstants.COLUMN_FAMILY,
-      HConstants.LATEST_TIMESTAMP, -1L);
+    Delete delete = new Delete(regionName);
+    delete.deleteFamily(HConstants.CATALOG_FAMILY);
+    srvr.delete(metaRegionName, delete);
   }
 
   /**
@@ -2319,14 +1888,18 @@
   public static void offlineRegionInMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final HRegionInfo info)
   throws IOException {
-    BatchUpdate b = new BatchUpdate(info.getRegionName());
+    // Puts and Deletes used to be "atomic" here.  We can use row locks if
+    // we need to keep that property, or we can expand Puts and Deletes to
+    // allow them to be committed at once.
+    byte [] row = info.getRegionName();
+    Put put = new Put(row);
     info.setOffline(true);
-    b.put(COL_REGIONINFO, Writables.getBytes(info));
-    b.delete(COL_SERVER);
-    b.delete(COL_STARTCODE);
-    // If carrying splits, they'll be in place when we show up on new
-    // server.
-    srvr.batchUpdate(metaRegionName, b, -1L);
+    put.add(CATALOG_FAMILY, REGIONINFO_QUALIFIER, Writables.getBytes(info));
+    srvr.put(metaRegionName, put);
+    Delete del = new Delete(row);
+    del.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
+    del.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+    srvr.delete(metaRegionName, del);
   }
   
   /**
@@ -2340,12 +1913,10 @@
   public static void cleanRegionInMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final HRegionInfo info)
   throws IOException {
-    BatchUpdate b = new BatchUpdate(info.getRegionName());
-    b.delete(COL_SERVER);
-    b.delete(COL_STARTCODE);
-    // If carrying splits, they'll be in place when we show up on new
-    // server.
-    srvr.batchUpdate(metaRegionName, b, LATEST_TIMESTAMP);
+    Delete del = new Delete(info.getRegionName());
+    del.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
+    del.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+    srvr.delete(metaRegionName, del);
   }
 
   /**
@@ -2638,67 +2209,127 @@
     }
   }
 
-  public long incrementColumnValue(byte[] row, byte[] column, long amount)
+  
+  //
+  // HBASE-880
+  //
+  /**
+   * @param get
+   * @param lockid
+   * @return result
+   * @throws IOException
+   */
+  public Result get(final Get get, final Integer lockid) throws IOException {
+    // Verify families are all valid
+    if(get.hasFamilies()) {
+      for(byte [] family : get.familySet()) {
+        checkFamily(family);
+      }
+    } else { // Adding all families to scanner
+      for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
+        get.addFamily(family);
+      }
+    }
+    // Lock row
+    Integer lid = getLock(lockid, get.getRow()); 
+    List<KeyValue> result = new ArrayList<KeyValue>();
+    try {
+      for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
+          get.getFamilyMap().entrySet()) {
+        get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
+      }
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+    }
+    return new Result(result);
+  }
+
+  private void get(final Store store, final Get get,
+    final NavigableSet<byte []> qualifiers, List<KeyValue> result)
+  throws IOException {
+    store.get(get, qualifiers, result);
+  }
+
+  /**
+   * 
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param amount
+   * @return
+   * @throws IOException
+   */
+  public long incrementColumnValue(byte [] row, byte [] family,
+      byte [] qualifier, long amount)
   throws IOException {
     checkRow(row);
-    checkColumn(column);
     
+    // Lock row
     Integer lid = obtainRowLock(row);
-    splitsAndClosesLock.readLock().lock();
+    long result = 0L;
     try {
-      KeyValue kv = new KeyValue(row, column);
-      long ts = System.currentTimeMillis();
-      byte [] value = null;
-
-      Store store = getStore(column);
-
-      List<KeyValue> c;
-      // Try the memcache first.
-      store.lock.readLock().lock();
-      try {
-        c = store.memcache.get(kv, 1);
-      } finally {
-        store.lock.readLock().unlock();
-      }
-      // Pick the latest value out of List<Cell> c:
-      if (c.size() >= 1) {
-        // Use the memcache timestamp value.
-        LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) +
-          "/" + Bytes.toString(column));
-        ts = c.get(0).getTimestamp();
-        value = c.get(0).getValue();
-      }
-
-      if (value == null) {
-        // Check the store (including disk) for the previous value.
-        c = store.get(kv, 1);
-        if (c != null && c.size() == 1) {
-          LOG.debug("Using HFile previous value for " + Bytes.toString(row) +
-            "/" + Bytes.toString(column));
-          value = c.get(0).getValue();
-        } else if (c != null && c.size() > 1) {
-          throw new DoNotRetryIOException("more than 1 value returned in " +
-            "incrementColumnValue from Store");
-        }
-      }
-      
-      if (value == null) {
-        // Doesn't exist
-        LOG.debug("Creating new counter value for " + Bytes.toString(row) +
-          "/"+ Bytes.toString(column));
-        value = Bytes.toBytes(amount);
-      } else {
-        if (amount == 0) return Bytes.toLong(value);
-        value = Bytes.incrementBytes(value, amount);
-      }
-
-      BatchUpdate b = new BatchUpdate(row, ts);
-      b.put(column, value);
-      batchUpdate(b, lid, true);
-      return Bytes.toLong(value);
+      Store store = stores.get(family);
+      result = store.incrementColumnValue(row, family, qualifier, amount);
     } finally {
-      splitsAndClosesLock.readLock().unlock();
-      releaseRowLock(lid);
+      if(lid == null) {
+        releaseRowLock(lid);
+      }
+    }
+    return result;
+  }
+    
+  
+  //
+  // New HBASE-880 Helpers
+  //
+  
+  private void checkFamily(final byte [] family) 
+  throws NoSuchColumnFamilyException {
+    if(!regionInfo.getTableDesc().hasFamily(family)) {
+      throw new NoSuchColumnFamilyException("Column family " +
+          Bytes.toString(family) + " does not exist in region " + this
+            + " in table " + regionInfo.getTableDesc());
     }
   }
-}
+  
+  
+//  //HBaseAdmin Debugging 
+//  /**
+//   * @return number of stores in the region
+//   */
+//  public int getNumStores() {
+//    return this.numStores;
+//  }
+//  /**
+//   * @return the name of the region
+//   */
+//  public byte [] getRegionsName() {
+//    return this.name;
+//  }
+//  /**
+//   * @return the number of files in every store
+//   */
+//  public int [] getStoresSize() {
+//    return this.storeSize;
+//  }
+//  
+//  //Writable, used for debugging purposes only
+//  public void readFields(final DataInput in)
+//  throws IOException {
+//    this.name = Bytes.readByteArray(in);
+//    this.numStores = in.readInt();
+//    this.storeSize = new int [numStores];
+//    for(int i=0; i<this.numStores; i++) {
+//      this.storeSize[i] = in.readInt();
+//    }
+//  }
+//
+//  public void write(final DataOutput out)
+//  throws IOException {
+//    Bytes.writeByteArray(out, this.regionInfo.getRegionName());
+//    out.writeInt(this.stores.size());
+//    for(Store store : this.stores.values()) {
+//      out.writeInt(store.getNumberOfstorefiles());
+//    }
+//  }
+}
\ No newline at end of file

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Jun  7 19:57:37 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,7 +29,6 @@
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -38,7 +37,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
@@ -79,13 +77,13 @@
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -224,8 +222,6 @@
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
 
-  private final long rpcTimeout;
-
   // Address passed in to constructor.
   private final HServerAddress address;
 
@@ -277,8 +273,6 @@
     this.numRegionsToReport =                                        
       conf.getInt("hbase.regionserver.numregionstoreport", 10);      
 
-    this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
-
     reinitialize();
   }
 
@@ -758,8 +752,7 @@
    * @return RegionLoad instance.
    * @throws IOException
    */
-  private HServerLoad.RegionLoad createRegionLoad(final HRegion r)
-  throws IOException {
+  private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
     byte[] name = r.getRegionName();
     int stores = 0;
     int storefiles = 0;
@@ -782,8 +775,7 @@
    * @return An instance of RegionLoad.
    * @throws IOException
    */
-  public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName)
-  throws IOException {
+  public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName) {
     return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName)));
   }
 
@@ -1080,12 +1072,7 @@
           for(Map.Entry<byte [], Store> ee: r.stores.entrySet()) {
             Store store = ee.getValue(); 
             storefiles += store.getStorefilesCount();
-            try {
-              storefileIndexSize += store.getStorefilesIndexSize();
-            } catch (IOException ex) {
-              LOG.warn("error getting store file index size for " + store +
-                ": " + StringUtils.stringifyException(ex));  
-            }
+            storefileIndexSize += store.getStorefilesIndexSize();
           }
         }
       }
@@ -1630,7 +1617,7 @@
       super(Thread.currentThread().getName() + ".regionCloser." + r.toString());
       this.r = r;
     }
-
+    
     @Override
     public void run() {
       try {
@@ -1701,96 +1688,51 @@
     return getRegion(regionName).getRegionInfo();
   }
 
-  public Cell [] get(final byte [] regionName, final byte [] row,
-    final byte [] column, final long timestamp, final int numVersions) 
-  throws IOException {
-    checkOpen();
-    requestCount.incrementAndGet();
-    try {
-      List<KeyValue> results =
-        getRegion(regionName).get(row, column, timestamp, numVersions);
-      return Cell.createSingleCellArray(results);
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
-  }
 
-  public RowResult getRow(final byte [] regionName, final byte [] row, 
-    final byte [][] columns, final long ts,
-    final int numVersions, final long lockId)
+  public Result getClosestRowBefore(final byte [] regionName, 
+    final byte [] row, final byte [] family)
   throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
-      // convert the columns array into a set so it's easy to check later.
-      NavigableSet<byte []> columnSet = null;
-      if (columns != null) {
-        columnSet = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        columnSet.addAll(Arrays.asList(columns));
-      }
+      // locate the region we're operating on
       HRegion region = getRegion(regionName);
-      HbaseMapWritable<byte [], Cell> result =
-        region.getFull(row, columnSet, ts, numVersions, getLockFromId(lockId));
-      if (result == null || result.isEmpty())
-        return null;
-      return new RowResult(row, result);
+      // ask the region for all the data 
+      
+      Result r = region.getClosestRowBefore(row, family);
+      return r;
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public RowResult getClosestRowBefore(final byte [] regionName, 
-    final byte [] row, final byte [] columnFamily)
-  throws IOException {
+  /** {@inheritDoc} */
+  public Result get(byte [] regionName, Get get) throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
-      // locate the region we're operating on
       HRegion region = getRegion(regionName);
-      // ask the region for all the data 
-      RowResult rr = region.getClosestRowBefore(row, columnFamily);
-      return rr;
-    } catch (Throwable t) {
+      return region.get(get, getLockFromId(get.getLockId()));
+    } catch(Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
-  
-  public RowResult next(final long scannerId) throws IOException {
-    RowResult[] rrs = next(scannerId, 1);
-    return rrs.length == 0 ? null : rrs[0];
-  }
 
-  public RowResult [] next(final long scannerId, int nbRows) throws IOException {
+  public boolean exists(byte [] regionName, Get get) throws IOException {
     checkOpen();
-    List<List<KeyValue>> results = new ArrayList<List<KeyValue>>();
+    requestCount.incrementAndGet();
     try {
-      String scannerName = String.valueOf(scannerId);
-      InternalScanner s = scanners.get(scannerName);
-      if (s == null) {
-        throw new UnknownScannerException("Name: " + scannerName);
-      }
-      this.leases.renewLease(scannerName);
-      for (int i = 0; i < nbRows; i++) {
-        requestCount.incrementAndGet();
-        // Collect values to be returned here
-        List<KeyValue> values = new ArrayList<KeyValue>();
-        while (s.next(values)) {
-          if (!values.isEmpty()) {
-            // Row has something in it. Return the value.
-            results.add(values);
-            break;
-          }
-        }
-      }
-      return RowResult.createRowResultArray(results);
-    } catch (Throwable t) {
+      HRegion region = getRegion(regionName);
+      Result r = region.get(get, getLockFromId(get.getLockId()));
+      return r != null && !r.isEmpty();
+    } catch(Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId)
+  public void put(final byte [] regionName, final Put put)
   throws IOException {
-    if (b.getRow() == null)
+    if (put.getRow() == null)
       throw new IllegalArgumentException("update has null row");
     
     checkOpen();
@@ -1798,24 +1740,24 @@
     HRegion region = getRegion(regionName);
     try {
       cacheFlusher.reclaimMemcacheMemory();
-      region.batchUpdate(b, getLockFromId(b.getRowLock()));
+      region.put(put, getLockFromId(put.getLockId()));
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
   
-  public int batchUpdates(final byte[] regionName, final BatchUpdate [] b)
+  public int put(final byte[] regionName, final Put [] puts)
   throws IOException {
     int i = 0;
     checkOpen();
     try {
       HRegion region = getRegion(regionName);
       this.cacheFlusher.reclaimMemcacheMemory();
-      Integer[] locks = new Integer[b.length];
-      for (i = 0; i < b.length; i++) {
+      Integer[] locks = new Integer[puts.length];
+      for (i = 0; i < puts.length; i++) {
         this.requestCount.incrementAndGet();
-        locks[i] = getLockFromId(b[i].getRowLock());
-        region.batchUpdate(b[i], locks[i]);
+        locks[i] = getLockFromId(puts[i].getLockId());
+        region.put(puts[i], locks[i]);
       }
     } catch(WrongRegionException ex) {
       return i;
@@ -1827,38 +1769,49 @@
     return -1;
   }
   
-  public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
-      final HbaseMapWritable<byte[],byte[]> expectedValues)
-  throws IOException {
-    if (b.getRow() == null)
-      throw new IllegalArgumentException("update has null row");
+
+  /**
+   * 
+   * @param regionName
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param value the expected value
+   * @param put
+   * @throws IOException
+   * @return true if the new put was execute, false otherwise
+   */
+  public boolean checkAndPut(final byte[] regionName, final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value, 
+      final Put put) throws IOException{
+    //Getting actual value
+    Get get = new Get(row);
+    get.addColumn(family, qualifier);
+    
     checkOpen();
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
     try {
       cacheFlusher.reclaimMemcacheMemory();
-      return region.checkAndSave(b,
-        expectedValues,getLockFromId(b.getRowLock()), true);
+      return region.checkAndPut(row, family, qualifier, value, put,
+          getLockFromId(put.getLockId()), true);
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
-
+  
   //
   // remote scanner interface
   //
 
-  public long openScanner(byte [] regionName, byte [][] cols, byte [] firstRow,
-    final long timestamp, final RowFilterInterface filter)
+  public long openScanner(byte [] regionName, Scan scan)
   throws IOException {
     checkOpen();
     NullPointerException npe = null;
     if (regionName == null) {
       npe = new NullPointerException("regionName is null");
-    } else if (cols == null) {
-      npe = new NullPointerException("columns to scan is null");
-    } else if (firstRow == null) {
-      npe = new NullPointerException("firstRow for scanner is null");
+    } else if (scan == null) {
+      npe = new NullPointerException("scan is null");
     }
     if (npe != null) {
       throw new IOException("Invalid arguments to openScanner", npe);
@@ -1866,8 +1819,7 @@
     requestCount.incrementAndGet();
     try {
       HRegion r = getRegion(regionName);
-      InternalScanner s =
-        r.getScanner(cols, firstRow, timestamp, filter);
+      InternalScanner s = r.getScanner(scan);
       long scannerId = addScanner(s);
       return scannerId;
     } catch (Throwable t) {
@@ -1886,6 +1838,47 @@
       createLease(scannerName, new ScannerListener(scannerName));
     return scannerId;
   }
+
+  public Result next(final long scannerId) throws IOException {
+    Result [] res = next(scannerId, 1);
+    if(res == null || res.length == 0) {
+      return null;
+    }
+    return res[0];
+  }
+
+  public Result [] next(final long scannerId, int nbRows) throws IOException {
+    checkOpen();
+    List<Result> results = new ArrayList<Result>();
+    try {
+	long start = System.currentTimeMillis();
+      String scannerName = String.valueOf(scannerId);
+      InternalScanner s = scanners.get(scannerName);
+      if (s == null) {
+        throw new UnknownScannerException("Name: " + scannerName);
+      }
+      this.leases.renewLease(scannerName);
+      for (int i = 0; i < nbRows; i++) {
+        requestCount.incrementAndGet();
+        // Collect values to be returned here
+        List<KeyValue> values = new ArrayList<KeyValue>();
+        boolean moreRows = s.next(values);
+        if(!values.isEmpty()) {
+          results.add(new Result(values));
+        }
+        if(!moreRows) {
+          break;
+        }
+      }
+      LOG.debug("Result[]next time: " + (System.currentTimeMillis()-start) + " (ms)");
+      return results.toArray(new Result[0]);
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
+  } 
+  
+  
+  
   
   public void close(final long scannerId) throws IOException {
     try {
@@ -1937,45 +1930,23 @@
   // Methods that do the actual work for the remote API
   //
   
-  public void deleteAll(final byte [] regionName, final byte [] row,
-      final byte [] column, final long timestamp, final long lockId) 
+  public void delete(final byte [] regionName, final Delete delete)
   throws IOException {
-    HRegion region = getRegion(regionName);
-    region.deleteAll(row, column, timestamp, getLockFromId(lockId));
-  }
-
-  public void deleteAll(final byte [] regionName, final byte [] row,
-      final long timestamp, final long lockId) 
-  throws IOException {
-    HRegion region = getRegion(regionName);
-    region.deleteAll(row, timestamp, getLockFromId(lockId));
-  }
-
-  public void deleteAllByRegex(byte[] regionName, byte[] row, String colRegex,
-      long timestamp, long lockId) throws IOException {
-    getRegion(regionName).deleteAllByRegex(row, colRegex, timestamp, 
-        getLockFromId(lockId));
-  }
-
-  public void deleteFamily(byte [] regionName, byte [] row, byte [] family, 
-    long timestamp, final long lockId)
-  throws IOException{
-    getRegion(regionName).deleteFamily(row, family, timestamp,
-        getLockFromId(lockId));
-  }
-
-  public void deleteFamilyByRegex(byte[] regionName, byte[] row, String familyRegex,
-      long timestamp, long lockId) throws IOException {
-    getRegion(regionName).deleteFamilyByRegex(row, familyRegex, timestamp, 
-        getLockFromId(lockId));
-  }
-
-  public boolean exists(byte[] regionName, byte[] row, byte[] column,
-      long timestamp, long lockId)
-  throws IOException {
-    return getRegion(regionName).exists(row, column, timestamp, 
-      getLockFromId(lockId));
+    checkOpen();
+    try {
+      boolean writeToWAL = true;
+      this.cacheFlusher.reclaimMemcacheMemory();
+      this.requestCount.incrementAndGet();
+      Integer lock = getLockFromId(delete.getLockId());
+      HRegion region = getRegion(regionName);
+      region.delete(delete, lock, writeToWAL);
+    } catch(WrongRegionException ex) {
+    } catch (NotServingRegionException ex) {
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
   }
+  
 
   public long lockRow(byte [] regionName, byte [] row)
   throws IOException {
@@ -2023,7 +1994,7 @@
    * @return intId Integer row lock used internally in HRegion
    * @throws IOException Thrown if this is not a valid client lock id.
    */
-  private Integer getLockFromId(long lockId)
+  Integer getLockFromId(long lockId)
   throws IOException {
     if(lockId == -1L) {
       return null;
@@ -2147,6 +2118,10 @@
     return Collections.unmodifiableCollection(onlineRegions.values());
   }
 
+  public HRegion [] getOnlineRegionsAsArray() {
+    return getOnlineRegions().toArray(new HRegion[0]);
+  }
+  
   /**
    * @return The HRegionInfos from online regions sorted
    */
@@ -2410,7 +2385,6 @@
         } catch (Throwable t) {
           LOG.error( "Can not start region server because "+
               StringUtils.stringifyException(t) );
-          System.exit(-1);
         }
         break;
       }
@@ -2426,39 +2400,20 @@
     }
   }
   
-  /**
-   * @param args
-   */
-  public static void main(String [] args) {
-    Configuration conf = new HBaseConfiguration();
-    @SuppressWarnings("unchecked")
-    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
-        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
-    doMain(args, regionServerClass);
-  }
-
   /** {@inheritDoc} */
-  public long incrementColumnValue(byte[] regionName, byte[] row,
-      byte[] column, long amount) throws IOException {
+  public long incrementColumnValue(byte [] regionName, byte [] row, 
+      byte [] family, byte [] qualifier, long amount)
+  throws IOException {
     checkOpen();
-    
-    NullPointerException npe = null;
+
     if (regionName == null) {
-      npe = new NullPointerException("regionName is null");
-    } else if (row == null) {
-      npe = new NullPointerException("row is null");
-    } else if (column == null) {
-      npe = new NullPointerException("column is null");
-    }
-    if (npe != null) {
-      IOException io = new IOException(
-          "Invalid arguments to incrementColumnValue", npe);
-      throw io;
+      throw new IOException("Invalid arguments to incrementColumnValue " + 
+      "regionName is null");
     }
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      return region.incrementColumnValue(row, column, amount);
+      return region.incrementColumnValue(row, family, qualifier, amount);
     } catch (IOException e) {
       checkFileSystem();
       throw e;
@@ -2479,4 +2434,17 @@
   public HServerInfo getHServerInfo() throws IOException {
     return serverInfo;
   }
+  
+  
+  /**
+   * @param args
+   */
+  public static void main(String [] args) {
+    Configuration conf = new HBaseConfiguration();
+    @SuppressWarnings("unchecked")
+    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
+        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+    doMain(args, regionServerClass);
+  }
+
 }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Sun Jun  7 19:57:37 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2007 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -41,11 +41,9 @@
  */
 public interface InternalScanner extends Closeable {
   /**
-   * Grab the next row's worth of values. The scanner will return the most
-   * recent data value for each row that is not newer than the target time
-   * passed when the scanner was created.
+   * Grab the next row's worth of values.
    * @param results
-   * @return true if data was returned
+   * @return true if more rows exist after this one, false if scanner is done
    * @throws IOException
    */
   public boolean next(List<KeyValue> results)
@@ -55,11 +53,5 @@
    * Closes the scanner and releases any resources it has allocated
    * @throws IOException
    */
-  public void close() throws IOException;  
-  
-  /** @return true if the scanner is matching a column family or regex */
-  public boolean isWildcardScanner();
-  
-  /** @return true if the scanner is matching multiple column family members */
-  public boolean isMultipleMatchScanner();
+  public void close() throws IOException;
 }
\ No newline at end of file

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,197 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+
+/**
+ * Implements a heap merge across any number of KeyValueScanners.
+ * <p>
+ * Implements KeyValueScanner itself.
+ * <p>
+ * This class is used at the Region level to merge across Stores
+ * and at the Store level to merge across the Memcache and StoreFiles.
+ * <p>
+ * In the Region case, we also need InternalScanner.next(List), so this class
+ * also implements InternalScanner.  WARNING: As is, if you try to use this
+ * as an InternalScanner at the Store level, you will get runtime exceptions. 
+ */
+public class KeyValueHeap implements KeyValueScanner, InternalScanner {
+  
+  private PriorityQueue<KeyValueScanner> heap;
+  
+  private KeyValueScanner current = null;
+  
+  private KVScannerComparator comparator;
+  
+  /**
+   * Constructor
+   * @param scanners
+   * @param comparator
+   */
+  public KeyValueHeap(KeyValueScanner [] scanners, KVComparator comparator) {
+    this.comparator = new KVScannerComparator(comparator);
+    this.heap = new PriorityQueue<KeyValueScanner>(scanners.length, 
+        this.comparator);
+    for(KeyValueScanner scanner : scanners) {
+      if(scanner.peek() != null) {
+        this.heap.add(scanner);
+      }
+    }
+    this.current = heap.poll();
+  }
+  
+  public KeyValue peek() {
+    if(this.current == null) {
+      return null;
+    }
+    return this.current.peek();
+  }
+  
+  public KeyValue next()  {
+    if(this.current == null) {
+      return null;
+    }
+    KeyValue kvReturn = this.current.next();
+    KeyValue kvNext = this.current.peek();
+    if(kvNext == null) {
+      this.current.close();
+      this.current = this.heap.poll();
+    } else {
+      KeyValueScanner topScanner = this.heap.peek();
+      if(topScanner == null ||
+          this.comparator.compare(kvNext, topScanner.peek()) > 0) {
+        this.heap.add(this.current);
+        this.current = this.heap.poll();
+      }
+    }
+    return kvReturn;
+  }
+  
+  /**
+   * Gets the next row of keys from the top-most scanner.
+   * <p>
+   * This method takes care of updating the heap.
+   * <p>
+   * This can ONLY be called when you are using Scanners that implement
+   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
+   * @return true if there are more keys, false if all scanners are done 
+   */
+  public boolean next(List<KeyValue> result) throws IOException {
+    InternalScanner currentAsInternal = (InternalScanner)this.current;
+    currentAsInternal.next(result);
+    KeyValue pee = this.current.peek();
+    if(pee == null) {
+      this.current.close();
+    } else {
+      this.heap.add(this.current);
+    }
+    this.current = this.heap.poll();
+    return (this.current != null);
+  }
+  
+  private class KVScannerComparator implements Comparator<KeyValueScanner> {
+    private KVComparator kvComparator;
+    /**
+     * Constructor
+     * @param kvComparator
+     */
+    public KVScannerComparator(KVComparator kvComparator) {
+      this.kvComparator = kvComparator;
+    }
+    public int compare(KeyValueScanner left, KeyValueScanner right) {
+      return compare(left.peek(), right.peek());
+    }
+    /**
+     * Compares two KeyValue
+     * @param left
+     * @param right
+     * @return less than 0 if left is smaller, 0 if equal etc..
+     */
+    public int compare(KeyValue left, KeyValue right) {
+      return this.kvComparator.compare(left, right);
+    }
+    /**
+     * @return KVComparator
+     */
+    public KVComparator getComparator() {
+      return this.kvComparator;
+    }
+  }
+
+  public void close() {
+    if(this.current != null) {
+      this.current.close();
+    }
+    KeyValueScanner scanner;
+    while((scanner = this.heap.poll()) != null) {
+      scanner.close();
+    }
+  }
+  
+  /**
+   * Seeks all scanners at or below the specified seek key.  If we earlied-out 
+   * of a row, we may end up skipping values that were never reached yet.
+   * Rather than iterating down, we want to give the opportunity to re-seek.
+   * <p>
+   * As individual scanners may run past their ends, those scanners are
+   * automatically closed and removed from the heap.
+   * @param seekKey KeyValue to seek at or after
+   * @return true if KeyValues exist at or after specified key, false if not
+   */
+  public boolean seek(KeyValue seekKey) {
+    if(this.current == null) {
+      return false;
+    }
+    this.heap.add(this.current);
+    this.current = null;
+
+    KeyValueScanner scanner;
+    while((scanner = this.heap.poll()) != null) {
+      KeyValue topKey = scanner.peek();
+      if(comparator.getComparator().compare(seekKey, topKey) <= 0) { // Correct?
+        // Top KeyValue is at-or-after Seek KeyValue
+        this.current = scanner;
+        return true;
+      }
+      if(!scanner.seek(seekKey)) {
+        scanner.close();
+      } else {
+        this.heap.add(scanner);
+      }
+    }
+    // Heap is returning empty, scanner is done
+    return false;
+  }
+
+  /**
+   * @return the current Heap
+   */
+  public PriorityQueue<KeyValueScanner> getHeap() {
+    return this.heap;
+  }
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Scanner that returns the next KeyValue.
+ */
+public interface KeyValueScanner {
+  /**
+   * Look at the next KeyValue in this scanner, but do not iterate scanner.
+   * @return the next KeyValue
+   */
+  public KeyValue peek();
+  
+  /**
+   * Return the next KeyValue in this scanner, iterating the scanner 
+   * @return the next KeyValue
+   */
+  public KeyValue next();
+  
+  /**
+   * Seek the scanner at or after the specified KeyValue.
+   * @param key
+   * @return true if scanner has values left, false if end of scanner
+   */
+  public boolean seek(KeyValue key);
+  
+  /**
+   * Close the KeyValue scanner.
+   */
+  public void close();
+}
\ No newline at end of file



Mime
View raw message