hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r784618 [5/11] - in /hadoop/hbase/trunk_on_hadoop-0.18.3/src: java/ java/org/apache/hadoop/hbase/ java/org/apache/hadoop/hbase/client/ java/org/apache/hadoop/hbase/client/tableindexed/ java/org/apache/hadoop/hbase/filter/ java/org/apache/ha...
Date Sun, 14 Jun 2009 21:34:19 GMT
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java Sun Jun 14 21:34:13 2009
@@ -28,16 +28,10 @@
 class FailedLogCloseException extends IOException {
   private static final long serialVersionUID = 1759152841462990925L;
 
-  /**
-   * 
-   */
   public FailedLogCloseException() {
     super();
   }
 
-  /**
-   * @param arg0
-   */
   public FailedLogCloseException(String arg0) {
     super(arg0);
   }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sun Jun 14 21:34:13 2009
@@ -100,7 +100,7 @@
 public class HLog implements HConstants, Syncable {
   static final Log LOG = LogFactory.getLog(HLog.class);
   private static final String HLOG_DATFILE = "hlog.dat.";
-  static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
+  static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
   static final byte [] METAROW = Bytes.toBytes("METAROW");
   private final FileSystem fs;
   private final Path dir;
@@ -701,8 +701,8 @@
   }
 
   private KeyValue completeCacheFlushLogEdit() {
-    return new KeyValue(METAROW, METAFAMILY, null,
-      System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
+    return new KeyValue(METAROW, METACOLUMN, System.currentTimeMillis(),
+      COMPLETE_CACHE_FLUSH);
   }
 
   /**
@@ -716,11 +716,11 @@
   }
 
   /**
-   * @param family
+   * @param column
    * @return true if the column is a meta column
    */
-  public static boolean isMetaFamily(byte [] family) {
-    return Bytes.equals(METAFAMILY, family);
+  public static boolean isMetaColumn(byte [] column) {
+    return Bytes.equals(METACOLUMN, column);
   }
   
   /**
@@ -870,7 +870,6 @@
           Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
         for (final byte[] key : logEntries.keySet()) {
           Thread thread = new Thread(Bytes.toString(key)) {
-            @Override
             public void run() {
               LinkedList<HLogEntry> entries = logEntries.get(key);
               LOG.debug("Thread got " + entries.size() + " to process");

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java Sun Jun 14 21:34:13 2009
@@ -87,9 +87,6 @@
     return logSeqNum;
   }
 
-  /**
-   * @return the write time
-   */
   public long getWriteTime() {
     return this.writeTime;
   }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sun Jun 14 21:34:13 2009
@@ -1,5 +1,5 @@
  /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2008 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -22,9 +22,12 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +36,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +44,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -49,16 +54,19 @@
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionHistorian;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ValueOverMaxLengthException;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -98,15 +106,14 @@
  * regionName is a unique identifier for this HRegion. (startKey, endKey]
  * defines the keyspace for this HRegion.
  */
-public class HRegion implements HConstants { // , Writable{
+public class HRegion implements HConstants {
   static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
   final AtomicBoolean closed = new AtomicBoolean(false);
-  /* Closing can take some time; use the closing flag if there is stuff we don't 
-   * want to do while in closing state; e.g. like offer this region up to the 
-   * master as a region to close if the carrying regionserver is overloaded.
-   * Once set, it is never cleared.
+  /* Closing can take some time; use the closing flag if there is stuff we don't want
+   * to do while in closing state; e.g. like offer this region up to the master as a region
+   * to close if the carrying regionserver is overloaded.  Once set, it is never cleared.
    */
   private final AtomicBoolean closing = new AtomicBoolean(false);
   private final RegionHistorian historian;
@@ -119,13 +126,6 @@
     new ConcurrentHashMap<Integer, byte []>();
   protected final Map<byte [], Store> stores =
     new ConcurrentSkipListMap<byte [], Store>(KeyValue.FAMILY_COMPARATOR);
-  
-  //These variable are just used for getting data out of the region, to test on
-  //client side
-  // private int numStores = 0;
-  // private int [] storeSize = null;
-  // private byte [] name = null;
-  
   final AtomicLong memcacheSize = new AtomicLong(0);
 
   // This is the table subdirectory.
@@ -137,6 +137,7 @@
   final Path regiondir;
   private final Path regionCompactionDir;
   KeyValue.KVComparator comparator;
+  private KeyValue.KVComparator comparatorIgnoreTimestamp;
 
   /*
    * Set this when scheduling compaction if want the next compaction to be a
@@ -209,24 +210,6 @@
     Bytes.toBytes(REGIONINFO_FILE);
 
   /**
-   * Should only be used for testing purposes
-   */
-  public HRegion(){
-    this.basedir = null;
-    this.blockingMemcacheSize = 0;
-    this.conf = null;
-    this.flushListener = null;
-    this.fs = null;
-    this.historian = null;
-    this.memcacheFlushSize = 0;
-    this.log = null;
-    this.regionCompactionDir = null;
-    this.regiondir = null;
-    this.regionInfo = null;
-    this.threadWakeFrequency = 0L;
-  }
-  
-  /**
    * HRegion constructor.
    *
    * @param basedir qualified path of directory where region should be located,
@@ -245,11 +228,14 @@
    * @param flushListener an object that implements CacheFlushListener or null
    * making progress to master -- otherwise master might think region deploy
    * failed.  Can be null.
+   * @throws IOException 
    */
   public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, 
       HRegionInfo regionInfo, FlushRequester flushListener) {
     this.basedir = basedir;
     this.comparator = regionInfo.getComparator();
+    this.comparatorIgnoreTimestamp =
+      this.comparator.getComparatorIgnoringTimestamps();
     this.log = log;
     this.fs = fs;
     this.conf = conf;
@@ -517,11 +503,6 @@
     return this.regionInfo.getRegionName();
   }
 
-  /** @return region name as string for logging */
-  public String getRegionNameAsString() {
-    return this.regionInfo.getRegionNameAsString();
-  }
-
   /** @return HTableDescriptor for this region */
   public HTableDescriptor getTableDesc() {
     return this.regionInfo.getTableDesc();
@@ -984,6 +965,197 @@
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Fetch multiple versions of a single data item, with timestamp.
+   *
+   * @param row
+   * @param column
+   * @param ts
+   * @param nv
+   * @return Results or null if none.
+   * @throws IOException
+   */
+  public List<KeyValue> get(final byte[] row, final byte[] column, final long ts,
+      final int nv) 
+  throws IOException {
+    long timestamp = ts == -1? HConstants.LATEST_TIMESTAMP : ts;
+    int numVersions = nv == -1? 1 : nv;
+    splitsAndClosesLock.readLock().lock();
+    try {
+      if (this.closed.get()) {
+        throw new IOException("Region " + this + " closed");
+      }
+      // Make sure this is a valid row and valid column
+      checkRow(row);
+      checkColumn(column);
+      // Don't need a row lock for a simple get
+      List<KeyValue> result = getStore(column).
+        get(KeyValue.createFirstOnRow(row, column, timestamp), numVersions);
+      // Guarantee that we return null instead of a zero-length array, 
+      // if there are no results to return.
+      return (result == null || result.isEmpty())? null : result;
+    } finally {
+      splitsAndClosesLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Data structure with a counter that is accessible rather than create a
+   * new Integer every time we want to up the counter.  Initializes at count 1.
+   */
+  static class Counter {
+    int counter = 1;
+  }
+
+  /*
+   * Check to see if we've not gone over threshold for this particular
+   * column.
+   * @param kv
+   * @param versions
+   * @param versionsCount
+   * @return True if its ok to add current value.
+   */
+  static boolean okToAddResult(final KeyValue kv, final int versions,
+      final Map<KeyValue, HRegion.Counter> versionsCount) {
+    if (versionsCount == null) {
+      return true;
+    }
+    if (versionsCount.containsKey(kv)) {
+      if (versionsCount.get(kv).counter < versions) {
+        return true;
+      }
+    } else {
+      return true;
+    }
+    return false;
+  }
+
+  /*
+   * Used adding item found to list of results getting.
+   * @param kv
+   * @param versionsCount
+   * @param results
+   */
+  static void addResult(final KeyValue kv,
+      final Map<KeyValue, HRegion.Counter> versionsCount,
+      final List<KeyValue> results) {
+    // Don't add if already present; i.e. ignore second entry.
+    if (results.contains(kv)) return;
+    results.add(kv);
+    if (versionsCount == null) {
+      return;
+    }
+    if (!versionsCount.containsKey(kv)) {
+      versionsCount.put(kv, new HRegion.Counter());
+    } else {
+      versionsCount.get(kv).counter++;
+    }
+  }
+
+  /*
+   * @param versions Number of versions to get.
+   * @param versionsCount May be null.
+   * @param columns Columns we want to fetch.
+   * @return True if has enough versions.
+   */
+  static boolean hasEnoughVersions(final int versions,
+      final Map<KeyValue, HRegion.Counter> versionsCount,
+      final Set<byte []> columns) {
+    if (columns == null || versionsCount == null) {
+      // Wants all columns so just keep going
+      return false;
+    }
+    if (columns.size() > versionsCount.size()) {
+      return false;
+    }
+    if (versions == 1) {
+      return true;
+    }
+    // Need to look at each to make sure at least versions.
+    for (Map.Entry<KeyValue, HRegion.Counter> e: versionsCount.entrySet()) {
+      if (e.getValue().counter < versions) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Fetch all the columns for the indicated row at a specified timestamp.
+   * Returns a HbaseMapWritable that maps column names to values.
+   *
+   * We should eventually use Bloom filters here, to reduce running time.  If 
+   * the database has many column families and is very sparse, then we could be 
+   * checking many files needlessly.  A small Bloom for each row would help us 
+   * determine which column groups are useful for that row.  That would let us 
+   * avoid a bunch of disk activity.
+   *
+   * @param row
+   * @param columns Array of columns you'd like to retrieve. When null, get all.
+   * @param ts
+   * @param numVersions number of versions to retrieve
+   * @param lockid
+   * @return HbaseMapWritable<columnName, Cell> values
+   * @throws IOException
+   */
+  public HbaseMapWritable<byte [], Cell> getFull(final byte [] row,
+      final NavigableSet<byte []> columns, final long ts,
+      final int numVersions, final Integer lockid) 
+  throws IOException {
+    // Check columns passed
+    if (columns != null) {
+      for (byte [] column: columns) {
+        checkColumn(column);
+      }
+    }
+    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+    Map<KeyValue, Counter> versionCounter =
+      new TreeMap<KeyValue, Counter>(this.comparatorIgnoreTimestamp);
+    Integer lid = getLock(lockid,row);
+    HashSet<Store> storeSet = new HashSet<Store>();
+    try {
+      // Get the concerned columns or all of them
+      if (columns != null) {
+        for (byte[] bs : columns) {
+          Store store = stores.get(bs);
+          if (store != null) {
+            storeSet.add(store);
+          }
+        }
+      } else {
+        storeSet.addAll(stores.values());
+      }
+      long timestamp =
+        (ts == HConstants.LATEST_TIMESTAMP)? System.currentTimeMillis(): ts;
+      KeyValue key = KeyValue.createFirstOnRow(row, timestamp);
+      // For each column name that is just a column family, open the store
+      // related to it and fetch everything for that row. HBASE-631
+      // Also remove each store from storeSet so that these stores
+      // won't be opened for no reason. HBASE-783
+      if (columns != null) {
+        for (byte [] bs : columns) {
+          // TODO: Fix so we use comparator in KeyValue that looks at
+          // column family portion only.
+          if (KeyValue.getFamilyDelimiterIndex(bs, 0, bs.length) == (bs.length - 1)) {
+            Store store = stores.get(bs);
+            store.getFull(key, null, null, numVersions, versionCounter,
+              keyvalues, timestamp);
+            storeSet.remove(store);
+          }
+        }
+      }
+      for (Store targetStore: storeSet) {
+        targetStore.getFull(key, columns, null, numVersions, versionCounter,
+          keyvalues, timestamp);
+      }
+      
+      return Cell.createCells(keyvalues);
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+    }
+  }
+
   /**
    * Return all the data for the row that matches <i>row</i> exactly, 
    * or the one that immediately preceeds it, at or immediately before 
@@ -993,9 +1165,9 @@
    * @return map of values
    * @throws IOException
    */
-  Result getClosestRowBefore(final byte [] row)
+  RowResult getClosestRowBefore(final byte [] row)
   throws IOException{
-    return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
+    return getClosestRowBefore(row, HConstants.COLUMN_FAMILY);
   }
 
   /**
@@ -1004,12 +1176,12 @@
    * <i>ts</i>.
    * 
    * @param row row key
-   * @param family
+   * @param columnFamily Must include the column family delimiter character.
    * @return map of values
    * @throws IOException
    */
-  public Result getClosestRowBefore(final byte [] row,
-    final byte [] family)
+  public RowResult getClosestRowBefore(final byte [] row,
+    final byte [] columnFamily)
   throws IOException{
     // look across all the HStores for this region and determine what the
     // closest key is across all column families, since the data may be sparse
@@ -1017,7 +1189,7 @@
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
-      Store store = getStore(family);
+      Store store = getStore(columnFamily);
       KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
       key = store.getRowKeyAtOrBefore(kv);
@@ -1030,45 +1202,52 @@
       if (!this.comparator.matchingRows(kv, key)) {
         kv = new KeyValue(key.getRow(), HConstants.LATEST_TIMESTAMP);
       }
-      Get get = new Get(key.getRow());
-      store.get(get, null, results);
-      
-      return new Result(results);
+      store.getFull(kv, null, null, 1, null, results, System.currentTimeMillis());
+      // Convert to RowResult.  TODO: Remove need to do this.
+      return RowResult.createRowResult(results);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
   }
 
-  //TODO
   /**
    * Return an iterator that scans over the HRegion, returning the indicated 
-   * columns and rows specified by the {@link Scan}.
-   * <p>
-   * This Iterator must be closed by the caller.
+   * columns for only the rows that match the data filter.  This Iterator must
+   * be closed by the caller.
    *
-   * @param scan configured {@link Scan}
+   * @param cols columns to scan. If column name is a column family, all
+   * columns of the specified column family are returned.  Its also possible
+   * to pass a regex in the column qualifier. A column qualifier is judged to
+   * be a regex if it contains at least one of the following characters:
+   * <code>\+|^&*$[]]}{)(</code>.
+   * @param firstRow row which is the starting point of the scan
+   * @param timestamp only return rows whose timestamp is <= this value
+   * @param filter row filter
    * @return InternalScanner
    * @throws IOException
    */
-  public InternalScanner getScanner(Scan scan)
+  public InternalScanner getScanner(byte[][] cols, byte [] firstRow,
+    long timestamp, RowFilterInterface filter) 
   throws IOException {
     newScannerLock.readLock().lock();
     try {
       if (this.closed.get()) {
         throw new IOException("Region " + this + " closed");
       }
-      // Verify families are all valid
-      if(scan.hasFamilies()) {
-        for(byte [] family : scan.getFamilyMap().keySet()) {
-          checkFamily(family);
-        }
-      } else { // Adding all families to scanner
-        for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
-          scan.addFamily(family);
+      HashSet<Store> storeSet = new HashSet<Store>();
+      NavigableSet<byte []> columns =
+        new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+      // Below we make up set of stores we want scanners on and we fill out the
+      // list of columns.
+      for (int i = 0; i < cols.length; i++) {
+        columns.add(cols[i]);
+        Store s = stores.get(cols[i]);
+        if (s != null) {
+          storeSet.add(s);
         }
       }
-      return new RegionScanner(scan);
-      
+      return new HScanner(columns, firstRow, timestamp,
+        storeSet.toArray(new Store [storeSet.size()]), filter);
     } finally {
       newScannerLock.readLock().unlock();
     }
@@ -1077,136 +1256,44 @@
   //////////////////////////////////////////////////////////////////////////////
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
-  /**
-   * @param delete
-   * @param lockid
-   * @param writeToWAL
-   * @throws IOException
-   */
-  public void delete(Delete delete, Integer lockid, boolean writeToWAL)
-  throws IOException {
-    checkReadOnly();
-    checkResources();
-    splitsAndClosesLock.readLock().lock();
-    Integer lid = null;
-    try {
-      byte [] row = delete.getRow();
-      // If we did not pass an existing row lock, obtain a new one
-      lid = getLock(lockid, row);
-
-      //Check to see if this is a deleteRow insert
-      if(delete.getFamilyMap().isEmpty()){
-        for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
-          delete.deleteFamily(family);
-        }
-      } else {
-        for(byte [] family : delete.getFamilyMap().keySet()) {
-          if(family == null) {
-            throw new NoSuchColumnFamilyException("Empty family is invalid");
-          }
-          checkFamily(family);
-        }
-      }
-      
-      for(Map.Entry<byte[], List<KeyValue>> e: delete.getFamilyMap().entrySet()) {
-        byte [] family = e.getKey();
-        delete(family, e.getValue(), writeToWAL);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-      splitsAndClosesLock.readLock().unlock();
-    }
-  }
-  
-  
-  /**
-   * @param family
-   * @param kvs
-   * @param writeToWAL
-   * @throws IOException
-   */
-  public void delete(byte [] family, List<KeyValue> kvs, boolean writeToWAL)
-  throws IOException {
-    long now = System.currentTimeMillis();
-    boolean flush = false;
-    this.updatesLock.readLock().lock();
-    try {
-      if (writeToWAL) {
-        this.log.append(regionInfo.getRegionName(),
-          regionInfo.getTableDesc().getName(), kvs,
-          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
-      }
-      long size = 0;
-      Store store = getStore(family);
-      for (KeyValue kv: kvs) {
-        // Check if time is LATEST, change to time of most recent addition if so
-        // This is expensive.
-        if (kv.isLatestTimestamp() && kv.isDeleteType()) {
-          List<KeyValue> result = new ArrayList<KeyValue>(1);
-          Get g = new Get(kv.getRow());
-          NavigableSet<byte []> qualifiers =
-            new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-          qualifiers.add(kv.getQualifier());
-          get(store, g, qualifiers, result);
-          if (result.isEmpty()) {
-            // Nothing to delete
-            continue;
-          }
-          if (result.size() > 1) {
-            throw new RuntimeException("Unexpected size: " + result.size());
-          }
-          KeyValue getkv = result.get(0);
-          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
-            getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
-        }
-        size = this.memcacheSize.addAndGet(store.delete(kv));
-      }
-      flush = isFlushSize(size);
-    } finally {
-      this.updatesLock.readLock().unlock();
-    }
-    if (flush) {
-      // Request a cache flush.  Do it outside update lock.
-      requestFlush();
-    }
-  }
   
   /**
-   * @param put
+   * @param b
    * @throws IOException
    */
-  public void put(Put put) throws IOException {
-    this.put(put, null, true);
+  public void batchUpdate(BatchUpdate b) throws IOException {
+    this.batchUpdate(b, null, true);
   }
   
   /**
-   * @param put
+   * @param b
    * @param writeToWAL
    * @throws IOException
    */
-  public void put(Put put, boolean writeToWAL) throws IOException {
-    this.put(put, null, writeToWAL);
+  public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException {
+    this.batchUpdate(b, null, writeToWAL);
   }
 
+  
   /**
-   * @param put
+   * @param b
    * @param lockid
    * @throws IOException
    */
-  public void put(Put put, Integer lockid) throws IOException {
-    this.put(put, lockid, true);
+  public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException {
+    this.batchUpdate(b, lockid, true);
   }
-
+  
   /**
-   * @param put
+   * @param b
    * @param lockid
-   * @param writeToWAL
+   * @param writeToWAL if true, then we write this update to the log
    * @throws IOException
    */
-  public void put(Put put, Integer lockid, boolean writeToWAL)
+  public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
-//    validateValuesLength(put);
+    validateValuesLength(b);
 
     // Do a rough check that we have resources to accept a write.  The check is
     // 'rough' in that between the resource check and the call to obtain a 
@@ -1220,18 +1307,49 @@
       // #commit or #abort or if the HRegionServer lease on the lock expires.
       // See HRegionServer#RegionListener for how the expire on HRegionServer
       // invokes a HRegion#abort.
-      byte [] row = put.getRow();
+      byte [] row = b.getRow();
       // If we did not pass an existing row lock, obtain a new one
       Integer lid = getLock(lockid, row);
-      byte [] now = Bytes.toBytes(System.currentTimeMillis());
+      long now = System.currentTimeMillis();
+      long commitTime = b.getTimestamp() == LATEST_TIMESTAMP?
+        now: b.getTimestamp();
+      Set<byte []> latestTimestampDeletes = null;
+      List<KeyValue> edits = new ArrayList<KeyValue>();
       try {
-        for(Map.Entry<byte[], List<KeyValue>> entry : 
-          put.getFamilyMap().entrySet()) {
-          byte [] family = entry.getKey();
-          checkFamily(family);
-          List<KeyValue> puts = entry.getValue();
-          if(updateKeys(puts, now)) {
-            put(family, puts, writeToWAL);
+        for (BatchOperation op: b) {
+          byte [] column = op.getColumn();
+          checkColumn(column);
+          KeyValue kv = null;
+          if (op.isPut()) {
+            kv = new KeyValue(row, column, commitTime, op.getValue());
+          } else {
+            // Its a delete.
+            if (b.getTimestamp() == LATEST_TIMESTAMP) {
+              // Save off these deletes of the most recent thing added on the
+              // family.
+              if (latestTimestampDeletes == null) {
+                latestTimestampDeletes =
+                  new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
+              }
+              latestTimestampDeletes.add(op.getColumn());
+              continue;
+            }
+            // Its an explicit timestamp delete
+            kv = new KeyValue(row, column, commitTime, KeyValue.Type.Delete,
+              HConstants.EMPTY_BYTE_ARRAY);
+          }
+          edits.add(kv);
+        }
+        if (!edits.isEmpty()) {
+          update(edits, writeToWAL, now);
+        }
+        if (latestTimestampDeletes != null &&
+            !latestTimestampDeletes.isEmpty()) {
+          // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
+          // as edits.  Need to do individually after figuring which is latest
+          // timestamp to delete.
+          for (byte [] column: latestTimestampDeletes) {
+            deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
           }
         }
       } finally {
@@ -1242,119 +1360,127 @@
     }
   }
 
-  
-  //TODO, Think that gets/puts and deletes should be refactored a bit so that 
-  //the getting of the lock happens before, so that you would just pass it into
-  //the methods. So in the case of checkAndPut you could just do lockRow, 
-  //get, put, unlockRow or something
   /**
+   * Performs an atomic check and save operation. Checks if
+   * the specified expected values have changed, and if not
+   * applies the update.
    * 
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param expectedValue
-   * @param put
-   * @param lockId
-   * @param writeToWAL
+   * @param b the update to apply
+   * @param expectedValues the expected values to check
+   * @param lockid
+   * @param writeToWAL whether or not to write to the write ahead log
+   * @return true if update was applied
    * @throws IOException
-   * @return true if the new put was execute, false otherwise
    */
-  public boolean checkAndPut(byte [] row, byte [] family, byte [] qualifier,
-      byte [] expectedValue, Put put, Integer lockId, boolean writeToWAL) 
-  throws IOException{
+  public boolean checkAndSave(BatchUpdate b,
+    HbaseMapWritable<byte[], byte[]> expectedValues, Integer lockid,
+    boolean writeToWAL)
+  throws IOException {
+    // This is basically a copy of batchUpdate with the atomic check and save
+    // added in. So you should read this method with batchUpdate. I will
+    // comment the areas that I have changed where I have not changed, you
+    // should read the comments from the batchUpdate method
+    boolean success = true;
     checkReadOnly();
-    //TODO, add check for value length or maybe even better move this to the 
-    //client if this becomes a global setting
+    validateValuesLength(b);
     checkResources();
     splitsAndClosesLock.readLock().lock();
     try {
-      Get get = new Get(row, put.getRowLock());
-      checkFamily(family);
-      get.addColumn(family, qualifier);
-
-      byte [] now = Bytes.toBytes(System.currentTimeMillis());
-
-      // Lock row
-      Integer lid = getLock(lockId, get.getRow()); 
-      List<KeyValue> result = new ArrayList<KeyValue>();
+      byte[] row = b.getRow();
+      long now = System.currentTimeMillis();
+      Integer lid = getLock(lockid,row);
       try {
-        //Getting data
-        for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
-          get.getFamilyMap().entrySet()) {
-          get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
-        }
-        boolean matches = false;
-        if (result.size() == 0 && expectedValue.length == 0) {
-          matches = true;
-        } else if(result.size() == 1) {
-          //Compare the expected value with the actual value
-          byte [] actualValue = result.get(0).getValue();
-          matches = Bytes.equals(expectedValue, actualValue);
-        }
-        //If matches put the new put
-        if(matches) {
-          for(Map.Entry<byte[], List<KeyValue>> entry :
-            put.getFamilyMap().entrySet()) {
-            byte [] fam = entry.getKey();
-            checkFamily(fam);
-            List<KeyValue> puts = entry.getValue();
-            if(updateKeys(puts, now)) {
-              put(fam, puts, writeToWAL);
+        NavigableSet<byte []> keySet =
+          new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+        keySet.addAll(expectedValues.keySet());
+        Map<byte[],Cell> actualValues = getFull(row, keySet,
+          HConstants.LATEST_TIMESTAMP, 1,lid);
+        for (byte[] key : keySet) {
+	  // If test fails exit
+	  Cell cell = actualValues.get(key);
+	  byte[] actualValue = new byte[] {};
+	  if (cell != null) 
+	    actualValue = cell.getValue();
+	  if(!Bytes.equals(actualValue,
+			   expectedValues.get(key))) {
+	    success = false;
+	    break;
+	  }
+	}
+        if (success) {
+          long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
+            now: b.getTimestamp();
+          Set<byte []> latestTimestampDeletes = null;
+          List<KeyValue> edits = new ArrayList<KeyValue>();
+          for (BatchOperation op: b) {
+            byte [] column = op.getColumn();
+            KeyValue kv = null;
+            if (op.isPut()) {
+              kv = new KeyValue(row, column, commitTime, op.getValue());
+            } else {
+              // Its a delete.
+              if (b.getTimestamp() == LATEST_TIMESTAMP) {
+                // Save off these deletes of the most recent thing added on
+                // the family.
+                if (latestTimestampDeletes == null) {
+                  latestTimestampDeletes =
+                    new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
+                }
+                latestTimestampDeletes.add(op.getColumn());
+              } else {
+                // Its an explicit timestamp delete
+                kv = new KeyValue(row, column, commitTime,
+                  KeyValue.Type.Delete, HConstants.EMPTY_BYTE_ARRAY);
+              }
+            }
+            edits.add(kv);
+          }
+          if (!edits.isEmpty()) {
+            update(edits, writeToWAL, now);
+          }
+          if (latestTimestampDeletes != null &&
+              !latestTimestampDeletes.isEmpty()) {
+            // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
+            // as edits.  Need to do individually after figuring which is latest
+            // timestamp to delete.
+            for (byte [] column: latestTimestampDeletes) {
+              deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
             }
           }
-          return true;  
         }
-        return false;
       } finally {
-        if(lockId == null) releaseRowLock(lid);
+        if(lockid == null) releaseRowLock(lid);
       }
     } finally {
       splitsAndClosesLock.readLock().unlock();
-    }    
-  }
-      
-  
-  /**
-   * Checks if any stamps are > now.  If so, sets them to now.
-   * <p>
-   * This acts to be prevent users from inserting future stamps as well as
-   * to replace LATEST_TIMESTAMP with now.
-   * @param keys
-   * @param now
-   * @return <code>true</code> when updating the time stamp completed.
-   */
-  private boolean updateKeys(List<KeyValue> keys, byte [] now) {
-    if(keys == null || keys.isEmpty()) {
-      return false;
-    }
-    for(KeyValue key : keys) {
-      key.updateLatestStamp(now);
     }
-    return true;
+    return success;
   }
-  
 
-//  /*
-//   * Utility method to verify values length. 
-//   * @param batchUpdate The update to verify
-//   * @throws IOException Thrown if a value is too long
-//   */
-//  private void validateValuesLength(Put put)
-//  throws IOException {
-//    Map<byte[], List<KeyValue>> families = put.getFamilyMap();
-//    for(Map.Entry<byte[], List<KeyValue>> entry : families.entrySet()) {
-//      HColumnDescriptor hcd = 
-//        this.regionInfo.getTableDesc().getFamily(entry.getKey());
-//      int maxLen = hcd.getMaxValueLength();
-//      for(KeyValue kv : entry.getValue()) {
-//        if(kv.getValueLength() > maxLen) {
-//          throw new ValueOverMaxLengthException("Value in column "
-//            + Bytes.toString(kv.getColumn()) + " is too long. "
-//            + kv.getValueLength() + " > " + maxLen);
-//        }
-//      }
-//    }
-//  }
+  /*
+   * Utility method to verify values length
+   * @param batchUpdate The update to verify
+   * @throws IOException Thrown if a value is too long
+   */
+  private void validateValuesLength(BatchUpdate batchUpdate)
+  throws IOException {
+    for (Iterator<BatchOperation> iter = 
+      batchUpdate.iterator(); iter.hasNext();) {
+      BatchOperation operation = iter.next();
+      if (operation.getValue() != null) {
+        HColumnDescriptor fam = this.regionInfo.getTableDesc().
+          getFamily(operation.getColumn());
+        if (fam != null) {
+          int maxLength = fam.getMaxValueLength();
+          if (operation.getValue().length > maxLength) {
+            throw new ValueOverMaxLengthException("Value in column "
+                + Bytes.toString(operation.getColumn()) + " is too long. "
+                + operation.getValue().length + " instead of " + maxLength);
+          }
+        }
+      }
+    }
+  }
 
   /*
    * Check if resources to support an update.
@@ -1391,6 +1517,230 @@
           + Thread.currentThread().getName() + "'");
     }
   }
+  
+  /**
+   * Delete all cells of the same age as the passed timestamp or older.
+   * @param row
+   * @param column
+   * @param ts Delete all entries that have this timestamp or older
+   * @param lockid Row lock
+   * @throws IOException
+   */
+  public void deleteAll(final byte [] row, final byte [] column, final long ts,
+      final Integer lockid)
+  throws IOException {
+    checkColumn(column);
+    checkReadOnly();
+    Integer lid = getLock(lockid,row);
+    try {
+      // Delete ALL versions rather than column family VERSIONS.  If we just did
+      // VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
+      deleteMultiple(row, column, ts, ALL_VERSIONS, System.currentTimeMillis());
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+    }
+  }
+
+  /**
+   * Delete all cells of the same age as the passed timestamp or older.
+   * @param row
+   * @param ts Delete all entries that have this timestamp or older
+   * @param lockid Row lock
+   * @throws IOException
+   */
+  public void deleteAll(final byte [] row, final long ts, final Integer lockid)
+  throws IOException {
+    checkReadOnly();
+    Integer lid = getLock(lockid, row);
+    long now = System.currentTimeMillis();
+    long time = ts;
+    if (ts == HConstants.LATEST_TIMESTAMP) {
+      time = now;
+    }
+    KeyValue kv = KeyValue.createFirstOnRow(row, time);
+    try {
+      for (Store store : stores.values()) {
+        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, time);
+        List<KeyValue> edits = new ArrayList<KeyValue>();
+        for (KeyValue key: keyvalues) {
+          // This is UGLY. COPY OF KEY PART OF KeyValue.
+          edits.add(key.cloneDelete());
+        }
+        update(edits, now);
+      }
+    } finally {
+      if (lockid == null) releaseRowLock(lid);
+    }
+  }
+  
+  /**
+   * Delete all cells for a row with matching columns with timestamps
+   * less than or equal to <i>timestamp</i>. 
+   * 
+   * @param row The row to operate on
+   * @param columnRegex The column regex 
+   * @param timestamp Timestamp to match
+   * @param lockid Row lock
+   * @throws IOException
+   */
+  public void deleteAllByRegex(final byte [] row, final String columnRegex, 
+      final long timestamp, final Integer lockid) throws IOException {
+    checkReadOnly();
+    Pattern columnPattern = Pattern.compile(columnRegex);
+    Integer lid = getLock(lockid, row);
+    long now = System.currentTimeMillis();
+    KeyValue kv = new KeyValue(row, timestamp);
+    try {
+      for (Store store : stores.values()) {
+        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+        store.getFull(kv, null, columnPattern, ALL_VERSIONS, null, keyvalues,
+          now);
+        List<KeyValue> edits = new ArrayList<KeyValue>();
+        for (KeyValue key: keyvalues) {
+          edits.add(key.cloneDelete());
+        }
+        update(edits, now);
+      }
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+    }
+  }
+
+  /**
+   * Delete all cells for a row with matching column family with timestamps
+   * less than or equal to <i>timestamp</i>.
+   *
+   * @param row The row to operate on
+   * @param family The column family to match
+   * @param timestamp Timestamp to match
+   * @param lockid Row lock
+   * @throws IOException
+   */
+  public void deleteFamily(byte [] row, byte [] family, long timestamp,
+      final Integer lockid)
+  throws IOException{
+    checkReadOnly();
+    Integer lid = getLock(lockid, row);
+    long now = System.currentTimeMillis();
+    try {
+      // find the HStore for the column family
+      Store store = getStore(family);
+      // find all the keys that match our criteria
+      List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+      store.getFull(new KeyValue(row, timestamp), null, null, ALL_VERSIONS,
+        null, keyvalues, now);
+      // delete all the cells
+      List<KeyValue> edits = new ArrayList<KeyValue>();
+      for (KeyValue kv: keyvalues) {
+        edits.add(kv.cloneDelete());
+      }
+      update(edits, now);
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+    }
+  }
+
+  /**
+   * Delete all cells for a row with all the matching column families by
+   * familyRegex with timestamps less than or equal to <i>timestamp</i>.
+   * 
+   * @param row The row to operate on
+   * @param familyRegex The column family regex for matching. This regex
+   * expression just match the family name, it didn't include <code>:<code>
+   * @param timestamp Timestamp to match
+   * @param lockid Row lock
+   * @throws IOException
+   */
+  public void deleteFamilyByRegex(byte [] row, String familyRegex,
+      final long timestamp, final Integer lockid)
+  throws IOException {
+    checkReadOnly();
+    // construct the family regex pattern
+    Pattern familyPattern = Pattern.compile(familyRegex);
+    Integer lid = getLock(lockid, row);
+    long now = System.currentTimeMillis();
+    KeyValue kv = new KeyValue(row, timestamp);
+    try {
+      for(Store store: stores.values()) {
+        String familyName = Bytes.toString(store.getFamily().getName());
+        // check the family name match the family pattern.
+        if(!(familyPattern.matcher(familyName).matches())) 
+          continue;
+        
+        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
+        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, now);
+        List<KeyValue> edits = new ArrayList<KeyValue>();
+        for (KeyValue k: keyvalues) {
+          edits.add(k.cloneDelete());
+        }
+        update(edits, now);
+      }
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+    }
+  }
+  
+  /*
+   * Delete one or many cells.
+   * Used to support {@link #deleteAll(byte [], byte [], long)} and deletion of
+   * latest cell.
+   * @param row
+   * @param column
+   * @param ts Timestamp to start search on.
+   * @param versions How many versions to delete. Pass
+   * {@link HConstants#ALL_VERSIONS} to delete all.
+   * @param now
+   * @throws IOException
+   */
+  private void deleteMultiple(final byte [] row, final byte [] column,
+      final long ts, final int versions, final long now)
+  throws IOException {
+    checkReadOnly();
+    // We used to have a getKeys method that purportedly only got the keys and
+    // not the keys and values.  We now just do getFull.  For memcache values,
+    // shouldn't matter if we get key and value since it'll be the entry that
+    // is in memcache.  For the keyvalues from storefile, could be saving if
+    // we only returned key component. TODO.
+    List<KeyValue> keys = get(row, column, ts, versions);
+    if (keys != null && keys.size() > 0) {
+      // I think the below edits don't have to be storted.  Its deletes.
+      // hey don't have to go in in exact sorted order (we don't have to worry
+      // about the meta or root sort comparator here).
+      List<KeyValue> edits = new ArrayList<KeyValue>();
+      for (KeyValue key: keys) {
+        edits.add(key.cloneDelete());
+      }
+      update(edits, now);
+    }
+  }
+
+  /**
+   * Tests for the existence of any cells for a given coordinate.
+   * 
+   * @param row the row
+   * @param column the column, or null
+   * @param timestamp the timestamp, or HConstants.LATEST_VERSION for any
+   * @param lockid the existing lock, or null 
+   * @return true if cells exist for the row, false otherwise
+   * @throws IOException
+   */
+  public boolean exists(final byte[] row, final byte[] column, 
+    final long timestamp, final Integer lockid) 
+  throws IOException {
+    checkRow(row);
+    Integer lid = getLock(lockid, row);
+    try {
+      NavigableSet<byte []> columns = null;
+      if (column != null) {
+        columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+        columns.add(column);
+      }
+      return !getFull(row, columns, timestamp, 1, lid).isEmpty();
+    } finally {
+      if (lockid == null) releaseRowLock(lid);
+    }
+  }
 
   /**
    * @throws IOException Throws exception if region is in read-only mode.
@@ -1408,21 +1758,22 @@
    * @praram now
    * @throws IOException
    */
-  private void put(final byte [] family, final List<KeyValue> edits)
+  private void update(final List<KeyValue> edits, final long now)
   throws IOException {
-    this.put(family, edits, true);
+    this.update(edits, true, now);
   }
 
   /** 
    * Add updates first to the hlog (if writeToWal) and then add values to memcache.
    * Warning: Assumption is caller has lock on passed in row.
-   * @param family
-   * @param edits
    * @param writeToWAL if true, then we should write to the log
+   * @param updatesByColumn Cell updates by column
+   * @param now
    * @throws IOException
    */
-  private void put(final byte [] family, final List<KeyValue> edits, 
-      boolean writeToWAL) throws IOException {
+  private void update(final List<KeyValue> edits, boolean writeToWAL,
+    final long now)
+  throws IOException {
     if (edits == null || edits.isEmpty()) {
       return;
     }
@@ -1430,15 +1781,14 @@
     this.updatesLock.readLock().lock();
     try {
       if (writeToWAL) {
-        long now = System.currentTimeMillis();
         this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), edits,
           (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
       }
       long size = 0;
-      Store store = getStore(family);
       for (KeyValue kv: edits) {
-        size = this.memcacheSize.addAndGet(store.add(kv));
+        // TODO: Fix -- do I have to do a getColumn here?
+        size = this.memcacheSize.addAndGet(getStore(kv.getColumn()).add(kv));
       }
       flush = isFlushSize(size);
     } finally {
@@ -1476,6 +1826,7 @@
   }
   
   // Do any reconstruction needed from the log
+  @SuppressWarnings("unused")
   protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
     Progressable reporter)
   throws UnsupportedEncodingException, IOException {
@@ -1514,6 +1865,23 @@
           Bytes.toString(row) + "'");
     }
   }
+  
+  /*
+   * Make sure this is a valid column for the current table
+   * @param columnName
+   * @throws NoSuchColumnFamilyException
+   */
+  private void checkColumn(final byte [] column)
+  throws NoSuchColumnFamilyException {
+    if (column == null) {
+      return;
+    }
+    if (!regionInfo.getTableDesc().hasFamily(column)) {
+      throw new NoSuchColumnFamilyException("Column family on " +
+        Bytes.toString(column) + " does not exist in region " + this
+          + " in table " + regionInfo.getTableDesc());
+    }
+  }
 
   /**
    * Obtain a lock on the given row.  Blocks until success.
@@ -1538,7 +1906,7 @@
    * @throws IOException
    * @return The id of the held lock.
    */
-  public Integer obtainRowLock(final byte [] row) throws IOException {
+  Integer obtainRowLock(final byte [] row) throws IOException {
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
@@ -1574,7 +1942,7 @@
   
   /** 
    * Release the row lock!
-   * @param lockid  The lock ID to release.
+   * @param row Name of row whose lock we are to release
    */
   void releaseRowLock(final Integer lockid) {
     synchronized (locksToRows) {
@@ -1650,86 +2018,161 @@
     return this.basedir;
   }
 
-  
-  //TODO
   /**
-   * RegionScanner is an iterator through a bunch of rows in an HRegion.
-   * <p>
-   * It is used to combine scanners from multiple Stores (aka column families).
+   * HScanner is an iterator through a bunch of rows in an HRegion.
    */
-  class RegionScanner implements InternalScanner {
-    
-    private KeyValueHeap storeHeap;
-    private byte [] stopRow;
-    
-    RegionScanner(Scan scan) throws IOException {
-      if(Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
-        this.stopRow = null;
-      } else {
-        this.stopRow = scan.getStopRow();
-      }
-      
-      List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
+  private class HScanner implements InternalScanner {
+    private InternalScanner[] scanners;
+    private List<KeyValue> [] resultSets;
+    private RowFilterInterface filter;
+
+    /** Create an HScanner with a handle on many HStores. */
+    @SuppressWarnings("unchecked")
+    HScanner(final NavigableSet<byte []> columns, byte [] firstRow,
+      long timestamp, final Store [] stores, final RowFilterInterface filter)
+    throws IOException {
+      this.filter = filter;
+      this.scanners = new InternalScanner[stores.length];
       try {
-        for(Map.Entry<byte[], NavigableSet<byte[]>> entry : 
-          scan.getFamilyMap().entrySet()) {
-          Store store = stores.get(entry.getKey());
-          scanners.add(store.getScanner(scan, entry.getValue()));
+        for (int i = 0; i < stores.length; i++) {
+          // Only pass relevant columns to each store
+          NavigableSet<byte[]> columnSubset =
+            new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+          for (byte [] c: columns) {
+            if (KeyValue.FAMILY_COMPARATOR.compare(stores[i].storeName, c) == 0) {
+              columnSubset.add(c);
+            }
+          }
+          RowFilterInterface f = filter;
+          if (f != null) {
+            // Need to replicate filters.
+            // At least WhileMatchRowFilter will mess up the scan if only
+            // one shared across many rows. See HADOOP-2467.
+            f = (RowFilterInterface) WritableUtils.clone(filter, conf);
+          }
+          scanners[i] = stores[i].getScanner(timestamp, columnSubset, firstRow, f);
         }
       } catch (IOException e) {
-        for(KeyValueScanner scanner : scanners) {
-          if(scanner != null) {
-            close(scanner);
+        for (int i = 0; i < this.scanners.length; i++) {
+          if (scanners[i] != null) {
+            closeScanner(i);
           }
         }
         throw e;
       }
-      
-      this.storeHeap = 
-        new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
-      
+
+      // Advance to the first key in each store.
+      // All results will match the required column-set and scanTime.
+      this.resultSets = new List[scanners.length];
+      for (int i = 0; i < scanners.length; i++) {
+        resultSets[i] = new ArrayList<KeyValue>();
+        if(scanners[i] != null && !scanners[i].next(resultSets[i])) {
+          closeScanner(i);
+        }
+      }
+
       // As we have now successfully completed initialization, increment the
       // activeScanner count.
       activeScannerCount.incrementAndGet();
     }
 
-    /**
-     * Get the next row of results from this region.
-     * @param results list to append results to
-     * @return true if there are more rows, false if scanner is done
-     */
     public boolean next(List<KeyValue> results)
     throws IOException {
-      // This method should probably be reorganized a bit... has gotten messy
-      KeyValue kv = this.storeHeap.peek();
-      if(kv == null) {
-        return false;
-      }
-      byte [] currentRow = kv.getRow();
-      // See if we passed stopRow
-      if(stopRow != null &&
-          comparator.compareRows(stopRow, 0, stopRow.length, 
-              currentRow, 0, currentRow.length)
-          <= 0){
-        return false;
+      boolean moreToFollow = false;
+      boolean filtered = false;
+      do {
+        // Find the lowest key across all stores.
+        KeyValue chosen = null;
+        long chosenTimestamp = -1;
+        for (int i = 0; i < this.scanners.length; i++) {
+          if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
+            continue;
+          }
+          KeyValue kv = this.resultSets[i].get(0);
+          if (chosen == null ||
+               (comparator.compareRows(kv, chosen) < 0) ||
+               ((comparator.compareRows(kv, chosen) == 0) &&
+                 (kv.getTimestamp() > chosenTimestamp))) {
+            chosen = kv;
+            chosenTimestamp = chosen.getTimestamp();
+          }
+        }
+
+        // Store results from each sub-scanner.
+        if (chosenTimestamp >= 0) {
+          for (int i = 0; i < scanners.length; i++) {
+            if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
+              continue;
+            }
+            KeyValue kv = this.resultSets[i].get(0);
+            if (comparator.compareRows(kv, chosen) == 0) {
+              results.addAll(this.resultSets[i]);
+              resultSets[i].clear();
+              if (!scanners[i].next(resultSets[i])) {
+                closeScanner(i);
+              }
+            }
+          }
+        }
+
+        moreToFollow = chosenTimestamp >= 0;
+        if (results == null || results.size() <= 0) {
+          // If we got no results, then there is no more to follow.
+          moreToFollow = false;
+        }
+
+        filtered = filter == null ? false : filter.filterRow(results);
+        if (filter != null && filter.filterAllRemaining()) {
+          moreToFollow = false;
+        }
+        
+        if (moreToFollow) {
+          if (filter != null) {
+            filter.rowProcessed(filtered, chosen.getBuffer(), chosen.getRowOffset(),
+              chosen.getRowLength());
+          }
+          if (filtered) {
+            results.clear();
+          }
+        }
+      } while(filtered && moreToFollow);
+
+      // Make sure scanners closed if no more results
+      if (!moreToFollow) {
+        for (int i = 0; i < scanners.length; i++) {
+          if (null != scanners[i]) {
+            closeScanner(i);
+          }
+        }
       }
-      this.storeHeap.next(results);
-      while(true) {
-        kv = this.storeHeap.peek();
-        if(kv == null) {
-          return false;
-        }
-        byte [] row = kv.getRow();
-        if(!Bytes.equals(currentRow, row)) {
-          return true;
+      
+      return moreToFollow;
+    }
+
+    /** Shut down a single scanner */
+    void closeScanner(int i) {
+      try {
+        try {
+          scanners[i].close();
+        } catch (IOException e) {
+          LOG.warn("Failed closing scanner " + i, e);
+        }
+      } finally {
+        scanners[i] = null;
+        // These data members can be null if exception in constructor
+        if (resultSets != null) {
+          resultSets[i] = null;
         }
-        this.storeHeap.next(results);
       }
     }
 
     public void close() {
       try {
-        storeHeap.close();
+        for(int i = 0; i < scanners.length; i++) {
+          if(scanners[i] != null) {
+            closeScanner(i);
+          }
+        }
       } finally {
         synchronized (activeScannerCount) {
           int count = activeScannerCount.decrementAndGet();
@@ -1745,22 +2188,14 @@
         }
       }
     }
-    /**
-     * 
-     * @param scanner to be closed
-     */
-    public void close(KeyValueScanner scanner) {
-      try {
-        scanner.close();
-      } catch(NullPointerException npe) {}
-    }
-    
-    /**
-     * @return the current storeHeap
-     */
-    public KeyValueHeap getStoreHeap() {
-      return this.storeHeap;
+
+    public boolean isWildcardScanner() {
+      throw new UnsupportedOperationException("Unimplemented on HScanner");
     }
+
+    public boolean isMultipleMatchScanner() {
+      throw new UnsupportedOperationException("Unimplemented on HScanner");
+    }  
   }
   
   // Utility methods
@@ -1847,9 +2282,9 @@
     Integer lid = meta.obtainRowLock(row);
     try {
       List<KeyValue> edits = new ArrayList<KeyValue>();
-      edits.add(new KeyValue(row, CATALOG_FAMILY, REGIONINFO_QUALIFIER,
-          System.currentTimeMillis(), Writables.getBytes(r.getRegionInfo())));
-      meta.put(HConstants.CATALOG_FAMILY, edits);
+      edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
+        Writables.getBytes(r.getRegionInfo())));
+      meta.update(edits, System.currentTimeMillis());
     } finally {
       meta.releaseRowLock(lid);
     }
@@ -1869,9 +2304,8 @@
   public static void removeRegionFromMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final byte [] regionName)
   throws IOException {
-    Delete delete = new Delete(regionName);
-    delete.deleteFamily(HConstants.CATALOG_FAMILY);
-    srvr.delete(metaRegionName, delete);
+    srvr.deleteFamily(metaRegionName, regionName, HConstants.COLUMN_FAMILY,
+      HConstants.LATEST_TIMESTAMP, -1L);
   }
 
   /**
@@ -1885,18 +2319,14 @@
   public static void offlineRegionInMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final HRegionInfo info)
   throws IOException {
-    // Puts and Deletes used to be "atomic" here.  We can use row locks if
-    // we need to keep that property, or we can expand Puts and Deletes to
-    // allow them to be committed at once.
-    byte [] row = info.getRegionName();
-    Put put = new Put(row);
+    BatchUpdate b = new BatchUpdate(info.getRegionName());
     info.setOffline(true);
-    put.add(CATALOG_FAMILY, REGIONINFO_QUALIFIER, Writables.getBytes(info));
-    srvr.put(metaRegionName, put);
-    Delete del = new Delete(row);
-    del.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
-    del.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
-    srvr.delete(metaRegionName, del);
+    b.put(COL_REGIONINFO, Writables.getBytes(info));
+    b.delete(COL_SERVER);
+    b.delete(COL_STARTCODE);
+    // If carrying splits, they'll be in place when we show up on new
+    // server.
+    srvr.batchUpdate(metaRegionName, b, -1L);
   }
   
   /**
@@ -1910,10 +2340,12 @@
   public static void cleanRegionInMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final HRegionInfo info)
   throws IOException {
-    Delete del = new Delete(info.getRegionName());
-    del.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
-    del.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
-    srvr.delete(metaRegionName, del);
+    BatchUpdate b = new BatchUpdate(info.getRegionName());
+    b.delete(COL_SERVER);
+    b.delete(COL_STARTCODE);
+    // If carrying splits, they'll be in place when we show up on new
+    // server.
+    srvr.batchUpdate(metaRegionName, b, LATEST_TIMESTAMP);
   }
 
   /**
@@ -2206,127 +2638,67 @@
     }
   }
 
-  
-  //
-  // HBASE-880
-  //
-  /**
-   * @param get
-   * @param lockid
-   * @return result
-   * @throws IOException
-   */
-  public Result get(final Get get, final Integer lockid) throws IOException {
-    // Verify families are all valid
-    if(get.hasFamilies()) {
-      for(byte [] family : get.familySet()) {
-        checkFamily(family);
-      }
-    } else { // Adding all families to scanner
-      for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
-        get.addFamily(family);
-      }
-    }
-    // Lock row
-    Integer lid = getLock(lockid, get.getRow()); 
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    try {
-      for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
-          get.getFamilyMap().entrySet()) {
-        get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-    return new Result(result);
-  }
-
-  private void get(final Store store, final Get get,
-    final NavigableSet<byte []> qualifiers, List<KeyValue> result)
-  throws IOException {
-    store.get(get, qualifiers, result);
-  }
-
-  /**
-   * 
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param amount
-   * @return The new value.
-   * @throws IOException
-   */
-  public long incrementColumnValue(byte [] row, byte [] family,
-      byte [] qualifier, long amount)
+  public long incrementColumnValue(byte[] row, byte[] column, long amount)
   throws IOException {
     checkRow(row);
+    checkColumn(column);
     
-    // Lock row
     Integer lid = obtainRowLock(row);
-    long result = 0L;
+    splitsAndClosesLock.readLock().lock();
     try {
-      Store store = stores.get(family);
-      result = store.incrementColumnValue(row, family, qualifier, amount);
-    } finally {
-      if(lid == null) {
-        releaseRowLock(lid);
+      KeyValue kv = new KeyValue(row, column);
+      long ts = System.currentTimeMillis();
+      byte [] value = null;
+
+      Store store = getStore(column);
+
+      List<KeyValue> c;
+      // Try the memcache first.
+      store.lock.readLock().lock();
+      try {
+        c = store.memcache.get(kv, 1);
+      } finally {
+        store.lock.readLock().unlock();
       }
-    }
-    return result;
-  }
-    
-  
-  //
-  // New HBASE-880 Helpers
-  //
-  
-  private void checkFamily(final byte [] family) 
-  throws NoSuchColumnFamilyException {
-    if(!regionInfo.getTableDesc().hasFamily(family)) {
-      throw new NoSuchColumnFamilyException("Column family " +
-          Bytes.toString(family) + " does not exist in region " + this
-            + " in table " + regionInfo.getTableDesc());
+      // Pick the latest value out of List<Cell> c:
+      if (c.size() >= 1) {
+        // Use the memcache timestamp value.
+        LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) +
+          "/" + Bytes.toString(column));
+        ts = c.get(0).getTimestamp();
+        value = c.get(0).getValue();
+      }
+
+      if (value == null) {
+        // Check the store (including disk) for the previous value.
+        c = store.get(kv, 1);
+        if (c != null && c.size() == 1) {
+          LOG.debug("Using HFile previous value for " + Bytes.toString(row) +
+            "/" + Bytes.toString(column));
+          value = c.get(0).getValue();
+        } else if (c != null && c.size() > 1) {
+          throw new DoNotRetryIOException("more than 1 value returned in " +
+            "incrementColumnValue from Store");
+        }
+      }
+      
+      if (value == null) {
+        // Doesn't exist
+        LOG.debug("Creating new counter value for " + Bytes.toString(row) +
+          "/"+ Bytes.toString(column));
+        value = Bytes.toBytes(amount);
+      } else {
+        if (amount == 0) return Bytes.toLong(value);
+        value = Bytes.incrementBytes(value, amount);
+      }
+
+      BatchUpdate b = new BatchUpdate(row, ts);
+      b.put(column, value);
+      batchUpdate(b, lid, true);
+      return Bytes.toLong(value);
+    } finally {
+      splitsAndClosesLock.readLock().unlock();
+      releaseRowLock(lid);
     }
   }
-  
-  
-//  //HBaseAdmin Debugging 
-//  /**
-//   * @return number of stores in the region
-//   */
-//  public int getNumStores() {
-//    return this.numStores;
-//  }
-//  /**
-//   * @return the name of the region
-//   */
-//  public byte [] getRegionsName() {
-//    return this.name;
-//  }
-//  /**
-//   * @return the number of files in every store
-//   */
-//  public int [] getStoresSize() {
-//    return this.storeSize;
-//  }
-//  
-//  //Writable, used for debugging purposes only
-//  public void readFields(final DataInput in)
-//  throws IOException {
-//    this.name = Bytes.readByteArray(in);
-//    this.numStores = in.readInt();
-//    this.storeSize = new int [numStores];
-//    for(int i=0; i<this.numStores; i++) {
-//      this.storeSize[i] = in.readInt();
-//    }
-//  }
-//
-//  public void write(final DataOutput out)
-//  throws IOException {
-//    Bytes.writeByteArray(out, this.regionInfo.getRegionName());
-//    out.writeInt(this.stores.size());
-//    for(Store store : this.stores.values()) {
-//      out.writeInt(store.getNumberOfstorefiles());
-//    }
-//  }
 }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Jun 14 21:34:13 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2007 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -29,6 +29,7 @@
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -37,6 +38,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
@@ -77,13 +79,13 @@
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -222,6 +224,8 @@
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
 
+  private final long rpcTimeout;
+
   // Address passed in to constructor.
   private final HServerAddress address;
 
@@ -273,6 +277,8 @@
     this.numRegionsToReport =                                        
       conf.getInt("hbase.regionserver.numregionstoreport", 10);      
 
+    this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
+
     reinitialize();
   }
 
@@ -665,8 +671,6 @@
     }
     join();
 
-    zooKeeperWrapper.close();
-
     if (shutdownHDFS.get()) {
       runThread(this.hdfsShutdownThread,
           this.conf.getLong("hbase.dfs.shutdown.wait", 30000));
@@ -754,7 +758,8 @@
    * @return RegionLoad instance.
    * @throws IOException
    */
-  private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
+  private HServerLoad.RegionLoad createRegionLoad(final HRegion r)
+  throws IOException {
     byte[] name = r.getRegionName();
     int stores = 0;
     int storefiles = 0;
@@ -777,7 +782,8 @@
    * @return An instance of RegionLoad.
    * @throws IOException
    */
-  public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName) {
+  public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName)
+  throws IOException {
     return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName)));
   }
 
@@ -1074,7 +1080,12 @@
           for(Map.Entry<byte [], Store> ee: r.stores.entrySet()) {
             Store store = ee.getValue(); 
             storefiles += store.getStorefilesCount();
-            storefileIndexSize += store.getStorefilesIndexSize();
+            try {
+              storefileIndexSize += store.getStorefilesIndexSize();
+            } catch (IOException ex) {
+              LOG.warn("error getting store file index size for " + store +
+                ": " + StringUtils.stringifyException(ex));  
+            }
           }
         }
       }
@@ -1619,7 +1630,7 @@
       super(Thread.currentThread().getName() + ".regionCloser." + r.toString());
       this.r = r;
     }
-    
+
     @Override
     public void run() {
       try {
@@ -1690,51 +1701,96 @@
     return getRegion(regionName).getRegionInfo();
   }
 
-
-  public Result getClosestRowBefore(final byte [] regionName, 
-    final byte [] row, final byte [] family)
+  public Cell [] get(final byte [] regionName, final byte [] row,
+    final byte [] column, final long timestamp, final int numVersions) 
   throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
-      // locate the region we're operating on
-      HRegion region = getRegion(regionName);
-      // ask the region for all the data 
-      
-      Result r = region.getClosestRowBefore(row, family);
-      return r;
+      List<KeyValue> results =
+        getRegion(regionName).get(row, column, timestamp, numVersions);
+      return Cell.createSingleCellArray(results);
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  /** {@inheritDoc} */
-  public Result get(byte [] regionName, Get get) throws IOException {
+  public RowResult getRow(final byte [] regionName, final byte [] row, 
+    final byte [][] columns, final long ts,
+    final int numVersions, final long lockId)
+  throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
+      // convert the columns array into a set so it's easy to check later.
+      NavigableSet<byte []> columnSet = null;
+      if (columns != null) {
+        columnSet = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+        columnSet.addAll(Arrays.asList(columns));
+      }
       HRegion region = getRegion(regionName);
-      return region.get(get, getLockFromId(get.getLockId()));
-    } catch(Throwable t) {
+      HbaseMapWritable<byte [], Cell> result =
+        region.getFull(row, columnSet, ts, numVersions, getLockFromId(lockId));
+      if (result == null || result.isEmpty())
+        return null;
+      return new RowResult(row, result);
+    } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public boolean exists(byte [] regionName, Get get) throws IOException {
+  public RowResult getClosestRowBefore(final byte [] regionName, 
+    final byte [] row, final byte [] columnFamily)
+  throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
+      // locate the region we're operating on
       HRegion region = getRegion(regionName);
-      Result r = region.get(get, getLockFromId(get.getLockId()));
-      return r != null && !r.isEmpty();
-    } catch(Throwable t) {
+      // ask the region for all the data 
+      RowResult rr = region.getClosestRowBefore(row, columnFamily);
+      return rr;
+    } catch (Throwable t) {
+      throw convertThrowableToIOE(cleanup(t));
+    }
+  }
+  
+  public RowResult next(final long scannerId) throws IOException {
+    RowResult[] rrs = next(scannerId, 1);
+    return rrs.length == 0 ? null : rrs[0];
+  }
+
+  public RowResult [] next(final long scannerId, int nbRows) throws IOException {
+    checkOpen();
+    List<List<KeyValue>> results = new ArrayList<List<KeyValue>>();
+    try {
+      String scannerName = String.valueOf(scannerId);
+      InternalScanner s = scanners.get(scannerName);
+      if (s == null) {
+        throw new UnknownScannerException("Name: " + scannerName);
+      }
+      this.leases.renewLease(scannerName);
+      for (int i = 0; i < nbRows; i++) {
+        requestCount.incrementAndGet();
+        // Collect values to be returned here
+        List<KeyValue> values = new ArrayList<KeyValue>();
+        while (s.next(values)) {
+          if (!values.isEmpty()) {
+            // Row has something in it. Return the value.
+            results.add(values);
+            break;
+          }
+        }
+      }
+      return RowResult.createRowResultArray(results);
+    } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public void put(final byte [] regionName, final Put put)
+  public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId)
   throws IOException {
-    if (put.getRow() == null)
+    if (b.getRow() == null)
       throw new IllegalArgumentException("update has null row");
     
     checkOpen();
@@ -1742,24 +1798,24 @@
     HRegion region = getRegion(regionName);
     try {
       cacheFlusher.reclaimMemcacheMemory();
-      region.put(put, getLockFromId(put.getLockId()));
+      region.batchUpdate(b, getLockFromId(b.getRowLock()));
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
   
-  public int put(final byte[] regionName, final Put [] puts)
+  public int batchUpdates(final byte[] regionName, final BatchUpdate [] b)
   throws IOException {
     int i = 0;
     checkOpen();
     try {
       HRegion region = getRegion(regionName);
       this.cacheFlusher.reclaimMemcacheMemory();
-      Integer[] locks = new Integer[puts.length];
-      for (i = 0; i < puts.length; i++) {
+      Integer[] locks = new Integer[b.length];
+      for (i = 0; i < b.length; i++) {
         this.requestCount.incrementAndGet();
-        locks[i] = getLockFromId(puts[i].getLockId());
-        region.put(puts[i], locks[i]);
+        locks[i] = getLockFromId(b[i].getRowLock());
+        region.batchUpdate(b[i], locks[i]);
       }
     } catch(WrongRegionException ex) {
       return i;
@@ -1771,49 +1827,38 @@
     return -1;
   }
   
-
-  /**
-   * 
-   * @param regionName
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param value the expected value
-   * @param put
-   * @throws IOException
-   * @return true if the new put was execute, false otherwise
-   */
-  public boolean checkAndPut(final byte[] regionName, final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value, 
-      final Put put) throws IOException{
-    //Getting actual value
-    Get get = new Get(row);
-    get.addColumn(family, qualifier);
-    
+  public boolean checkAndSave(final byte [] regionName, final BatchUpdate b,
+      final HbaseMapWritable<byte[],byte[]> expectedValues)
+  throws IOException {
+    if (b.getRow() == null)
+      throw new IllegalArgumentException("update has null row");
     checkOpen();
     this.requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
     try {
       cacheFlusher.reclaimMemcacheMemory();
-      return region.checkAndPut(row, family, qualifier, value, put,
-          getLockFromId(put.getLockId()), true);
+      return region.checkAndSave(b,
+        expectedValues,getLockFromId(b.getRowLock()), true);
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
-  
+
   //
   // remote scanner interface
   //
 
-  public long openScanner(byte [] regionName, Scan scan)
+  public long openScanner(byte [] regionName, byte [][] cols, byte [] firstRow,
+    final long timestamp, final RowFilterInterface filter)
   throws IOException {
     checkOpen();
     NullPointerException npe = null;
     if (regionName == null) {
       npe = new NullPointerException("regionName is null");
-    } else if (scan == null) {
-      npe = new NullPointerException("scan is null");
+    } else if (cols == null) {
+      npe = new NullPointerException("columns to scan is null");
+    } else if (firstRow == null) {
+      npe = new NullPointerException("firstRow for scanner is null");
     }
     if (npe != null) {
       throw new IOException("Invalid arguments to openScanner", npe);
@@ -1821,7 +1866,8 @@
     requestCount.incrementAndGet();
     try {
       HRegion r = getRegion(regionName);
-      InternalScanner s = r.getScanner(scan);
+      InternalScanner s =
+        r.getScanner(cols, firstRow, timestamp, filter);
       long scannerId = addScanner(s);
       return scannerId;
     } catch (Throwable t) {
@@ -1840,46 +1886,6 @@
       createLease(scannerName, new ScannerListener(scannerName));
     return scannerId;
   }
-
-  public Result next(final long scannerId) throws IOException {
-    Result [] res = next(scannerId, 1);
-    if(res == null || res.length == 0) {
-      return null;
-    }
-    return res[0];
-  }
-
-  public Result [] next(final long scannerId, int nbRows) throws IOException {
-    checkOpen();
-    List<Result> results = new ArrayList<Result>();
-    try {
-	long start = System.currentTimeMillis();
-      String scannerName = String.valueOf(scannerId);
-      InternalScanner s = scanners.get(scannerName);
-      if (s == null) {
-        throw new UnknownScannerException("Name: " + scannerName);
-      }
-      this.leases.renewLease(scannerName);
-      for (int i = 0; i < nbRows; i++) {
-        requestCount.incrementAndGet();
-        // Collect values to be returned here
-        List<KeyValue> values = new ArrayList<KeyValue>();
-        boolean moreRows = s.next(values);
-        if(!values.isEmpty()) {
-          results.add(new Result(values));
-        }
-        if(!moreRows) {
-          break;
-        }
-      }
-      return results.toArray(new Result[0]);
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
-  } 
-  
-  
-  
   
   public void close(final long scannerId) throws IOException {
     try {
@@ -1931,23 +1937,45 @@
   // Methods that do the actual work for the remote API
   //
   
-  public void delete(final byte [] regionName, final Delete delete)
+  public void deleteAll(final byte [] regionName, final byte [] row,
+      final byte [] column, final long timestamp, final long lockId) 
   throws IOException {
-    checkOpen();
-    try {
-      boolean writeToWAL = true;
-      this.cacheFlusher.reclaimMemcacheMemory();
-      this.requestCount.incrementAndGet();
-      Integer lock = getLockFromId(delete.getLockId());
-      HRegion region = getRegion(regionName);
-      region.delete(delete, lock, writeToWAL);
-    } catch(WrongRegionException ex) {
-    } catch (NotServingRegionException ex) {
-    } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t));
-    }
+    HRegion region = getRegion(regionName);
+    region.deleteAll(row, column, timestamp, getLockFromId(lockId));
+  }
+
+  public void deleteAll(final byte [] regionName, final byte [] row,
+      final long timestamp, final long lockId) 
+  throws IOException {
+    HRegion region = getRegion(regionName);
+    region.deleteAll(row, timestamp, getLockFromId(lockId));
+  }
+
+  public void deleteAllByRegex(byte[] regionName, byte[] row, String colRegex,
+      long timestamp, long lockId) throws IOException {
+    getRegion(regionName).deleteAllByRegex(row, colRegex, timestamp, 
+        getLockFromId(lockId));
+  }
+
+  public void deleteFamily(byte [] regionName, byte [] row, byte [] family, 
+    long timestamp, final long lockId)
+  throws IOException{
+    getRegion(regionName).deleteFamily(row, family, timestamp,
+        getLockFromId(lockId));
+  }
+
+  public void deleteFamilyByRegex(byte[] regionName, byte[] row, String familyRegex,
+      long timestamp, long lockId) throws IOException {
+    getRegion(regionName).deleteFamilyByRegex(row, familyRegex, timestamp, 
+        getLockFromId(lockId));
+  }
+
+  public boolean exists(byte[] regionName, byte[] row, byte[] column,
+      long timestamp, long lockId)
+  throws IOException {
+    return getRegion(regionName).exists(row, column, timestamp, 
+      getLockFromId(lockId));
   }
-  
 
   public long lockRow(byte [] regionName, byte [] row)
   throws IOException {
@@ -1995,7 +2023,7 @@
    * @return intId Integer row lock used internally in HRegion
    * @throws IOException Thrown if this is not a valid client lock id.
    */
-  Integer getLockFromId(long lockId)
+  private Integer getLockFromId(long lockId)
   throws IOException {
     if(lockId == -1L) {
       return null;
@@ -2119,10 +2147,6 @@
     return Collections.unmodifiableCollection(onlineRegions.values());
   }
 
-  public HRegion [] getOnlineRegionsAsArray() {
-    return getOnlineRegions().toArray(new HRegion[0]);
-  }
-  
   /**
    * @return The HRegionInfos from online regions sorted
    */
@@ -2386,6 +2410,7 @@
         } catch (Throwable t) {
           LOG.error( "Can not start region server because "+
               StringUtils.stringifyException(t) );
+          System.exit(-1);
         }
         break;
       }
@@ -2401,20 +2426,39 @@
     }
   }
   
+  /**
+   * @param args
+   */
+  public static void main(String [] args) {
+    Configuration conf = new HBaseConfiguration();
+    @SuppressWarnings("unchecked")
+    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
+        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+    doMain(args, regionServerClass);
+  }
+
   /** {@inheritDoc} */
-  public long incrementColumnValue(byte [] regionName, byte [] row, 
-      byte [] family, byte [] qualifier, long amount)
-  throws IOException {
+  public long incrementColumnValue(byte[] regionName, byte[] row,
+      byte[] column, long amount) throws IOException {
     checkOpen();
-
+    
+    NullPointerException npe = null;
     if (regionName == null) {
-      throw new IOException("Invalid arguments to incrementColumnValue " + 
-      "regionName is null");
+      npe = new NullPointerException("regionName is null");
+    } else if (row == null) {
+      npe = new NullPointerException("row is null");
+    } else if (column == null) {
+      npe = new NullPointerException("column is null");
+    }
+    if (npe != null) {
+      IOException io = new IOException(
+          "Invalid arguments to incrementColumnValue", npe);
+      throw io;
     }
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
-      return region.incrementColumnValue(row, family, qualifier, amount);
+      return region.incrementColumnValue(row, column, amount);
     } catch (IOException e) {
       checkFileSystem();
       throw e;
@@ -2435,17 +2479,4 @@
   public HServerInfo getHServerInfo() throws IOException {
     return serverInfo;
   }
-  
-  
-  /**
-   * @param args
-   */
-  public static void main(String [] args) {
-    Configuration conf = new HBaseConfiguration();
-    @SuppressWarnings("unchecked")
-    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
-        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
-    doMain(args, regionServerClass);
-  }
-
 }

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java?rev=784618&r1=784617&r2=784618&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java Sun Jun 14 21:34:13 2009
@@ -1,5 +1,5 @@
 /**
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2007 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -41,9 +41,11 @@
  */
 public interface InternalScanner extends Closeable {
   /**
-   * Grab the next row's worth of values.
+   * Grab the next row's worth of values. The scanner will return the most
+   * recent data value for each row that is not newer than the target time
+   * passed when the scanner was created.
    * @param results
-   * @return true if more rows exist after this one, false if scanner is done
+   * @return true if data was returned
    * @throws IOException
    */
   public boolean next(List<KeyValue> results)
@@ -53,5 +55,11 @@
    * Closes the scanner and releases any resources it has allocated
    * @throws IOException
    */
-  public void close() throws IOException;
+  public void close() throws IOException;  
+  
+  /** @return true if the scanner is matching a column family or regex */
+  public boolean isWildcardScanner();
+  
+  /** @return true if the scanner is matching multiple column family members */
+  public boolean isMultipleMatchScanner();
 }
\ No newline at end of file



Mime
View raw message