hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r782178 [6/16] - in /hadoop/hbase/trunk: bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/client/transactional/ src/java/o...
Date Sat, 06 Jun 2009 01:26:27 GMT
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import org.apache.hadoop.hbase.regionserver.QueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is used for the tracking and enforcement of columns and numbers 
+ * of versions during the course of a Get or Scan operation, when explicit
+ * column qualifiers have been asked for in the query.
+ *
+ * With a little magic (see {@link ScanQueryMatcher}), we can use this matcher
+ * for both scans and gets.  The main difference is 'next' and 'done' collapse
+ * for the scan case (since we see all columns in order), and we only reset
+ * between rows.
+ * 
+ * <p>
+ * This class is utilized by {@link QueryMatcher} through two methods:
+ * <ul><li>{@link checkColumn} is called when a Put satisfies all other
+ * conditions of the query.  This method returns a {@link MatchCode} to define
+ * what action should be taken.
+ * <li>{@link update} is called at the end of every StoreFile or Memcache.
+ * <p>
+ * This class is NOT thread-safe as queries are never multi-threaded 
+ */
+public class ExplicitColumnTracker implements ColumnTracker {
+
+  private int maxVersions;
+  private List<ColumnCount> columns;
+  private int index;
+  private ColumnCount column;
+  private NavigableSet<byte[]> origColumns;
+  
+  /**
+   * Default constructor.
+   * @param columns columns specified user in query
+   * @param maxVersions maximum versions to return per column
+   */
+  public ExplicitColumnTracker(NavigableSet<byte[]> columns, int maxVersions) {
+    this.maxVersions = maxVersions;
+    this.origColumns = columns;
+    reset();
+  }
+  
+  /**
+   * Done when there are no more columns to match against.
+   */
+  public boolean done() {
+    return this.columns.size() == 0;
+  }
+
+  public ColumnCount getColumnHint() {
+    return this.column;
+  }
+  
+  /**
+   * Checks against the parameters of the query and the columns which have
+   * already been processed by this query.
+   * @param bytes KeyValue buffer
+   * @param offset offset to the start of the qualifier
+   * @param length length of the qualifier
+   * @return MatchCode telling QueryMatcher what action to take
+   */
+  public MatchCode checkColumn(byte [] bytes, int offset, int length) {
+    // No more columns left, we are done with this query
+    if(this.columns.size() == 0) {
+      return MatchCode.DONE; // done_row
+    }
+    
+    // No more columns to match against, done with storefile
+    if(this.column == null) {
+      return MatchCode.NEXT; // done_row
+    }
+    
+    // Compare specific column to current column
+    int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(), 
+        column.getLength(), bytes, offset, length);
+    
+    // Matches, decrement versions left and include
+    if(ret == 0) {
+      if(this.column.decrement() == 0) {
+        // Done with versions for this column
+        this.columns.remove(this.index);
+        if(this.columns.size() == this.index) {
+          // Will not hit any more columns in this storefile
+          this.column = null;
+        } else {
+          this.column = this.columns.get(this.index);
+        }
+      }
+      return MatchCode.INCLUDE;
+    }
+
+    // Specified column is bigger than current column
+    // Move down current column and check again
+    if(ret <= -1) {
+      if(++this.index == this.columns.size()) {
+        // No more to match, do not include, done with storefile
+        return MatchCode.NEXT; // done_row
+      }
+      this.column = this.columns.get(this.index);
+      return checkColumn(bytes, offset, length);
+    }
+
+    // Specified column is smaller than current column
+    // Skip
+    return MatchCode.SKIP; // skip to next column, with hint?
+  }
+  
+  /**
+   * Called at the end of every StoreFile or Memcache.
+   */
+  public void update() {
+    if(this.columns.size() != 0) {
+      this.index = 0;
+      this.column = this.columns.get(this.index);
+    } else {
+      this.index = -1;
+      this.column = null;
+    }
+  }
+
+  // Called between every row.
+  public void reset() {
+    buildColumnList(this.origColumns);
+    this.index = 0;
+    this.column = this.columns.get(this.index);
+  }
+
+  private void buildColumnList(NavigableSet<byte[]> columns) {
+    this.columns = new ArrayList<ColumnCount>(columns.size());
+    for(byte [] column : columns) {
+      this.columns.add(new ColumnCount(column,maxVersions));
+    }
+  }
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/FailedLogCloseException.java Sat Jun  6 01:26:21 2009
@@ -28,10 +28,16 @@
 class FailedLogCloseException extends IOException {
   private static final long serialVersionUID = 1759152841462990925L;
 
+  /**
+   * 
+   */
   public FailedLogCloseException() {
     super();
   }
 
+  /**
+   * @param arg0
+   */
   public FailedLogCloseException(String arg0) {
     super(arg0);
   }

Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java?rev=782178&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/GetDeleteTracker.java Sat Jun  6 01:26:21 2009
@@ -0,0 +1,405 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is responsible for the tracking and enforcement of Deletes
+ * during the course of a Get operation.
+ * <p>
+ * This class is utilized through three methods:
+ * <ul><li>{@link add} when encountering a Delete
+ * <li>{@link isDeleted} when checking if a Put KeyValue has been deleted
+ * <li>{@link update} when reaching the end of a StoreFile
+ * <p>
+ * This class is NOT thread-safe as queries are never multi-threaded 
+ */
+public class GetDeleteTracker implements DeleteTracker {
+
+  private long familyStamp = -1L;
+  protected List<Delete> deletes = null;
+  private List<Delete> newDeletes = new ArrayList<Delete>();
+  private Iterator<Delete> iterator;
+  private Delete delete = null;
+
+  private KeyValue.KeyComparator comparator;
+
+  /**
+   * Constructor
+   * @param comparator
+   */
+  public GetDeleteTracker(KeyValue.KeyComparator comparator) {
+    this.comparator = comparator;
+  }
+
+  /**
+   * Add the specified KeyValue to the list of deletes to check against for
+   * this row operation.
+   * <p>
+   * This is called when a Delete is encountered in a StoreFile.
+   * @param kv
+   * @param type
+   * @param timestamp
+   */
+  @Override
+  public void add(byte [] buffer, int qualifierOffset, int qualifierLength,
+      long timestamp, byte type) {
+    if(type == KeyValue.Type.DeleteFamily.getCode()) {
+      if(timestamp > familyStamp) {
+        familyStamp = timestamp;
+      }
+      return;
+    }
+    if(timestamp > familyStamp) {
+      this.newDeletes.add(new Delete(buffer, qualifierOffset, qualifierLength,
+          type, timestamp));
+    }
+  }
+
+  /** 
+   * Check if the specified KeyValue buffer has been deleted by a previously
+   * seen delete.
+   * @param buffer KeyValue buffer
+   * @param qualifierOffset column qualifier offset
+   * @param qualifierLength column qualifier length
+   * @param timestamp timestamp
+   * @return true is the specified KeyValue is deleted, false if not
+   */
+  @Override
+  public boolean isDeleted(byte [] buffer, int qualifierOffset,
+      int qualifierLength, long timestamp) {
+
+    // Check against DeleteFamily
+    if(timestamp <= familyStamp) {
+      return true;
+    }
+
+    // Check if there are other deletes
+    if(this.delete == null) {
+      return false;
+    }
+
+    // Check column
+    int ret = comparator.compareRows(buffer, qualifierOffset, qualifierLength,
+        this.delete.buffer, this.delete.qualifierOffset, 
+        this.delete.qualifierLength);
+    if(ret <= -1) {
+      // Have not reached the next delete yet
+      return false;
+    } else if(ret >= 1) {
+      // Deletes an earlier column, need to move down deletes
+      if(this.iterator.hasNext()) {
+        this.delete = this.iterator.next();
+      } else {
+        this.delete = null;
+        return false;
+      }
+      return isDeleted(buffer, qualifierOffset, qualifierLength, timestamp);
+    }
+
+    // Check Timestamp
+    if(timestamp > this.delete.timestamp) {
+      return false;
+    }
+
+    // Check Type
+    switch(KeyValue.Type.codeToType(this.delete.type)) {
+    case Delete:
+      boolean equal = timestamp == this.delete.timestamp;
+
+      if(this.iterator.hasNext()) {
+        this.delete = this.iterator.next();
+      } else {
+        this.delete = null;
+      }
+
+      if(equal){
+        return true;
+      }
+      // timestamp < this.delete.timestamp
+      // Delete of an explicit column newer than current
+      return isDeleted(buffer, qualifierOffset, qualifierLength, timestamp);
+    case DeleteColumn:
+      return true;
+    }
+
+    // should never reach this
+    return false;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    if(this.familyStamp == 0L && this.delete == null) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void reset() {
+    this.deletes = null;
+    this.delete = null;
+    this.newDeletes = new ArrayList<Delete>();
+    this.familyStamp = 0L;
+    this.iterator = null;
+  }
+
+  /**
+   * Called at the end of every StoreFile.
+   * <p>
+   * Many optimized implementations of Trackers will require an update at
+   * when the end of each StoreFile is reached.
+   */
+  @Override
+  public void update() {
+    // If no previous deletes, use new deletes and return
+    if(this.deletes == null || this.deletes.size() == 0) {
+      finalize(this.newDeletes);
+      return;
+    }
+
+    // If no new delete, retain previous deletes and return
+    if(this.newDeletes.size() == 0) {
+      return;
+    }
+
+    // Merge previous deletes with new deletes
+    List<Delete> mergeDeletes = 
+      new ArrayList<Delete>(this.newDeletes.size());
+    int oldIndex = 0;
+    int newIndex = 0;
+
+    Delete newDelete = newDeletes.get(oldIndex);
+    Delete oldDelete = deletes.get(oldIndex);
+    while(true) {
+      switch(compareDeletes(oldDelete,newDelete)) {
+      case NEXT_NEW: {
+        if(++newIndex == newDeletes.size()) {
+          // Done with new, add the rest of old to merged and return
+          mergeDown(mergeDeletes, deletes, oldIndex);
+          finalize(mergeDeletes);
+          return;
+        }
+        newDelete = this.newDeletes.get(newIndex);
+        break;
+      }
+
+      case INCLUDE_NEW_NEXT_NEW: {
+        mergeDeletes.add(newDelete);
+        if(++newIndex == newDeletes.size()) {
+          // Done with new, add the rest of old to merged and return
+          mergeDown(mergeDeletes, deletes, oldIndex);
+          finalize(mergeDeletes);
+          return;
+        }
+        newDelete = this.newDeletes.get(newIndex);
+        break;
+      }
+
+      case INCLUDE_NEW_NEXT_BOTH: {
+        mergeDeletes.add(newDelete);
+        ++oldIndex;
+        ++newIndex;
+        if(oldIndex == deletes.size()) {
+          if(newIndex == newDeletes.size()) {
+            finalize(mergeDeletes);
+            return;
+          }
+          mergeDown(mergeDeletes, newDeletes, newIndex);
+          finalize(mergeDeletes);
+          return;
+        } else if(newIndex == newDeletes.size()) {
+          mergeDown(mergeDeletes, deletes, oldIndex);
+          finalize(mergeDeletes);
+          return;
+        }
+        oldDelete = this.deletes.get(oldIndex);
+        newDelete = this.newDeletes.get(newIndex);
+        break;
+      }
+
+      case INCLUDE_OLD_NEXT_BOTH: {
+        mergeDeletes.add(oldDelete);
+        ++oldIndex;
+        ++newIndex;
+        if(oldIndex == deletes.size()) {
+          if(newIndex == newDeletes.size()) {
+            finalize(mergeDeletes);
+            return;
+          }
+          mergeDown(mergeDeletes, newDeletes, newIndex);
+          finalize(mergeDeletes);
+          return;
+        } else if(newIndex == newDeletes.size()) {
+          mergeDown(mergeDeletes, deletes, oldIndex);
+          finalize(mergeDeletes);
+          return;
+        }
+        oldDelete = this.deletes.get(oldIndex);
+        newDelete = this.newDeletes.get(newIndex);
+        break;
+      }
+
+      case INCLUDE_OLD_NEXT_OLD: {
+        mergeDeletes.add(oldDelete);
+        if(++oldIndex == deletes.size()) {
+          mergeDown(mergeDeletes, newDeletes, newIndex);
+          finalize(mergeDeletes);
+          return;
+        }
+        oldDelete = this.deletes.get(oldIndex);
+        break;
+      }
+
+      case NEXT_OLD: {
+        if(++oldIndex == deletes.size()) {
+          // Done with old, add the rest of new to merged and return
+          mergeDown(mergeDeletes, newDeletes, newIndex);
+          finalize(mergeDeletes);
+          return;
+        }
+        oldDelete = this.deletes.get(oldIndex);
+      }
+      }
+    }
+  }
+
+  private void finalize(List<Delete> mergeDeletes) {
+    this.deletes = mergeDeletes;
+    this.newDeletes = new ArrayList<Delete>();
+    if(this.deletes.size() > 0){
+      this.iterator = deletes.iterator();
+      this.delete = iterator.next();
+    }
+  }
+
+  private void mergeDown(List<Delete> mergeDeletes, List<Delete> srcDeletes, 
+      int srcIndex) {
+    int index = srcIndex;
+    while(index < srcDeletes.size()) {
+      mergeDeletes.add(srcDeletes.get(index++));
+    }
+  }
+
+
+  protected DeleteCompare compareDeletes(Delete oldDelete, Delete newDelete) {
+
+    // Compare columns
+    // Just compairing qualifier portion, can keep on using Bytes.compareTo().
+    int ret = Bytes.compareTo(oldDelete.buffer, oldDelete.qualifierOffset,
+        oldDelete.qualifierLength, newDelete.buffer, newDelete.qualifierOffset,
+        newDelete.qualifierLength);
+
+    if(ret <= -1) {
+      return DeleteCompare.INCLUDE_OLD_NEXT_OLD;
+    } else if(ret >= 1) {
+      return DeleteCompare.INCLUDE_NEW_NEXT_NEW;
+    }
+
+    // Same column
+
+    // Branches below can be optimized.  Keeping like this until testing
+    // is complete.
+    if(oldDelete.type == newDelete.type) {
+      // the one case where we can merge 2 deletes -> 1 delete.
+      if(oldDelete.type == KeyValue.Type.Delete.getCode()){
+        if(oldDelete.timestamp > newDelete.timestamp) {
+          return DeleteCompare.INCLUDE_OLD_NEXT_OLD;
+        } else if(oldDelete.timestamp < newDelete.timestamp) {
+          return DeleteCompare.INCLUDE_NEW_NEXT_NEW;
+        } else {
+          return DeleteCompare.INCLUDE_OLD_NEXT_BOTH;
+        }
+      }
+      if(oldDelete.timestamp < newDelete.timestamp) {
+        return DeleteCompare.INCLUDE_NEW_NEXT_BOTH;
+      } 
+      return DeleteCompare.INCLUDE_OLD_NEXT_BOTH;
+    }
+
+    // old delete is more specific than the new delete.
+    // if the olddelete is newer then the newdelete, we have to
+    //  keep it
+    if(oldDelete.type < newDelete.type) {
+      if(oldDelete.timestamp > newDelete.timestamp) {
+        return DeleteCompare.INCLUDE_OLD_NEXT_OLD;
+      } else if(oldDelete.timestamp < newDelete.timestamp) {
+        return DeleteCompare.NEXT_OLD;
+      } else {
+        return DeleteCompare.NEXT_OLD;
+      }
+    }
+
+    // new delete is more specific than the old delete.
+    if(oldDelete.type > newDelete.type) {
+      if(oldDelete.timestamp > newDelete.timestamp) {
+        return DeleteCompare.NEXT_NEW;
+      } else if(oldDelete.timestamp < newDelete.timestamp) {
+        return DeleteCompare.INCLUDE_NEW_NEXT_NEW;
+      } else {
+        return DeleteCompare.NEXT_NEW;
+      }
+    }
+
+    // Should never reach,
+    // throw exception for assertion?
+    throw new RuntimeException("GetDeleteTracker:compareDelete reached terminal state");
+  }
+
+  /**
+   * Internal class used to store the necessary information for a Delete.
+   * <p>
+   * Rather than reparsing the KeyValue, or copying fields, this class points
+   * to the underlying KeyValue buffer with pointers to the qualifier and fields
+   * for type and timestamp.  No parsing work is done in DeleteTracker now.
+   * <p>
+   * Fields are public because they are accessed often, directly, and only
+   * within this class.
+   */
+  protected class Delete {
+    byte [] buffer;
+    int qualifierOffset;
+    int qualifierLength;
+    byte type;
+    long timestamp;
+    /**
+     * Constructor
+     * @param buffer
+     * @param qualifierOffset
+     * @param qualifierLength
+     * @param type
+     * @param timestamp
+     */
+    public Delete(byte [] buffer, int qualifierOffset, int qualifierLength,
+        byte type, long timestamp) {
+      this.buffer = buffer;
+      this.qualifierOffset = qualifierOffset;
+      this.qualifierLength = qualifierLength;
+      this.type = type;
+      this.timestamp = timestamp;
+    }
+  }
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Sat Jun  6 01:26:21 2009
@@ -100,7 +100,7 @@
 public class HLog implements HConstants, Syncable {
   static final Log LOG = LogFactory.getLog(HLog.class);
   private static final String HLOG_DATFILE = "hlog.dat.";
-  static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
+  static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
   static final byte [] METAROW = Bytes.toBytes("METAROW");
   private final FileSystem fs;
   private final Path dir;
@@ -701,8 +701,8 @@
   }
 
   private KeyValue completeCacheFlushLogEdit() {
-    return new KeyValue(METAROW, METACOLUMN, System.currentTimeMillis(),
-      COMPLETE_CACHE_FLUSH);
+    return new KeyValue(METAROW, METAFAMILY, null,
+      System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
   }
 
   /**
@@ -716,11 +716,11 @@
   }
 
   /**
-   * @param column
+   * @param family
    * @return true if the column is a meta column
    */
-  public static boolean isMetaColumn(byte [] column) {
-    return Bytes.equals(METACOLUMN, column);
+  public static boolean isMetaFamily(byte [] family) {
+    return Bytes.equals(METAFAMILY, family);
   }
   
   /**
@@ -870,6 +870,7 @@
           Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
         for (final byte[] key : logEntries.keySet()) {
           Thread thread = new Thread(Bytes.toString(key)) {
+            @Override
             public void run() {
               LinkedList<HLogEntry> entries = logEntries.get(key);
               LOG.debug("Thread got " + entries.size() + " to process");

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java Sat Jun  6 01:26:21 2009
@@ -87,6 +87,9 @@
     return logSeqNum;
   }
 
+  /**
+   * @return the write time
+   */
   public long getWriteTime() {
     return this.writeTime;
   }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=782178&r1=782177&r2=782178&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Jun  6 01:26:21 2009
@@ -1,5 +1,5 @@
  /**
- * Copyright 2008 The Apache Software Foundation
+ * Copyright 2009 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -22,12 +22,9 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -36,7 +33,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,7 +40,6 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -54,19 +49,16 @@
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionHistorian;
-import org.apache.hadoop.hbase.ValueOverMaxLengthException;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.HbaseMapWritable;
-import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -106,14 +98,15 @@
  * regionName is a unique identifier for this HRegion. (startKey, endKey]
  * defines the keyspace for this HRegion.
  */
-public class HRegion implements HConstants {
+public class HRegion implements HConstants { // , Writable{
   static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
   final AtomicBoolean closed = new AtomicBoolean(false);
-  /* Closing can take some time; use the closing flag if there is stuff we don't want
-   * to do while in closing state; e.g. like offer this region up to the master as a region
-   * to close if the carrying regionserver is overloaded.  Once set, it is never cleared.
+  /* Closing can take some time; use the closing flag if there is stuff we don't 
+   * want to do while in closing state; e.g. like offer this region up to the 
+   * master as a region to close if the carrying regionserver is overloaded.
+   * Once set, it is never cleared.
    */
   private final AtomicBoolean closing = new AtomicBoolean(false);
   private final RegionHistorian historian;
@@ -126,6 +119,13 @@
     new ConcurrentHashMap<Integer, byte []>();
   protected final Map<byte [], Store> stores =
     new ConcurrentSkipListMap<byte [], Store>(KeyValue.FAMILY_COMPARATOR);
+  
+  //These variable are just used for getting data out of the region, to test on
+  //client side
+  // private int numStores = 0;
+  // private int [] storeSize = null;
+  // private byte [] name = null;
+  
   final AtomicLong memcacheSize = new AtomicLong(0);
 
   // This is the table subdirectory.
@@ -137,7 +137,6 @@
   final Path regiondir;
   private final Path regionCompactionDir;
   KeyValue.KVComparator comparator;
-  private KeyValue.KVComparator comparatorIgnoreTimestamp;
 
   /*
    * Set this when scheduling compaction if want the next compaction to be a
@@ -210,6 +209,24 @@
     Bytes.toBytes(REGIONINFO_FILE);
 
   /**
+   * Should only be used for testing purposes
+   */
+  public HRegion(){
+    this.basedir = null;
+    this.blockingMemcacheSize = 0;
+    this.conf = null;
+    this.flushListener = null;
+    this.fs = null;
+    this.historian = null;
+    this.memcacheFlushSize = 0;
+    this.log = null;
+    this.regionCompactionDir = null;
+    this.regiondir = null;
+    this.regionInfo = null;
+    this.threadWakeFrequency = 0L;
+  }
+  
+  /**
    * HRegion constructor.
    *
    * @param basedir qualified path of directory where region should be located,
@@ -234,8 +251,6 @@
       HRegionInfo regionInfo, FlushRequester flushListener) {
     this.basedir = basedir;
     this.comparator = regionInfo.getComparator();
-    this.comparatorIgnoreTimestamp =
-      this.comparator.getComparatorIgnoringTimestamps();
     this.log = log;
     this.fs = fs;
     this.conf = conf;
@@ -965,197 +980,6 @@
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Fetch multiple versions of a single data item, with timestamp.
-   *
-   * @param row
-   * @param column
-   * @param ts
-   * @param nv
-   * @return Results or null if none.
-   * @throws IOException
-   */
-  public List<KeyValue> get(final byte[] row, final byte[] column, final long ts,
-      final int nv) 
-  throws IOException {
-    long timestamp = ts == -1? HConstants.LATEST_TIMESTAMP : ts;
-    int numVersions = nv == -1? 1 : nv;
-    splitsAndClosesLock.readLock().lock();
-    try {
-      if (this.closed.get()) {
-        throw new IOException("Region " + this + " closed");
-      }
-      // Make sure this is a valid row and valid column
-      checkRow(row);
-      checkColumn(column);
-      // Don't need a row lock for a simple get
-      List<KeyValue> result = getStore(column).
-        get(KeyValue.createFirstOnRow(row, column, timestamp), numVersions);
-      // Guarantee that we return null instead of a zero-length array, 
-      // if there are no results to return.
-      return (result == null || result.isEmpty())? null : result;
-    } finally {
-      splitsAndClosesLock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Data structure with a counter that is accessible rather than create a
-   * new Integer every time we want to up the counter.  Initializes at count 1.
-   */
-  static class Counter {
-    int counter = 1;
-  }
-
-  /*
-   * Check to see if we've not gone over threshold for this particular
-   * column.
-   * @param kv
-   * @param versions
-   * @param versionsCount
-   * @return True if its ok to add current value.
-   */
-  static boolean okToAddResult(final KeyValue kv, final int versions,
-      final Map<KeyValue, HRegion.Counter> versionsCount) {
-    if (versionsCount == null) {
-      return true;
-    }
-    if (versionsCount.containsKey(kv)) {
-      if (versionsCount.get(kv).counter < versions) {
-        return true;
-      }
-    } else {
-      return true;
-    }
-    return false;
-  }
-
-  /*
-   * Used adding item found to list of results getting.
-   * @param kv
-   * @param versionsCount
-   * @param results
-   */
-  static void addResult(final KeyValue kv,
-      final Map<KeyValue, HRegion.Counter> versionsCount,
-      final List<KeyValue> results) {
-    // Don't add if already present; i.e. ignore second entry.
-    if (results.contains(kv)) return;
-    results.add(kv);
-    if (versionsCount == null) {
-      return;
-    }
-    if (!versionsCount.containsKey(kv)) {
-      versionsCount.put(kv, new HRegion.Counter());
-    } else {
-      versionsCount.get(kv).counter++;
-    }
-  }
-
-  /*
-   * @param versions Number of versions to get.
-   * @param versionsCount May be null.
-   * @param columns Columns we want to fetch.
-   * @return True if has enough versions.
-   */
-  static boolean hasEnoughVersions(final int versions,
-      final Map<KeyValue, HRegion.Counter> versionsCount,
-      final Set<byte []> columns) {
-    if (columns == null || versionsCount == null) {
-      // Wants all columns so just keep going
-      return false;
-    }
-    if (columns.size() > versionsCount.size()) {
-      return false;
-    }
-    if (versions == 1) {
-      return true;
-    }
-    // Need to look at each to make sure at least versions.
-    for (Map.Entry<KeyValue, HRegion.Counter> e: versionsCount.entrySet()) {
-      if (e.getValue().counter < versions) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Fetch all the columns for the indicated row at a specified timestamp.
-   * Returns a HbaseMapWritable that maps column names to values.
-   *
-   * We should eventually use Bloom filters here, to reduce running time.  If 
-   * the database has many column families and is very sparse, then we could be 
-   * checking many files needlessly.  A small Bloom for each row would help us 
-   * determine which column groups are useful for that row.  That would let us 
-   * avoid a bunch of disk activity.
-   *
-   * @param row
-   * @param columns Array of columns you'd like to retrieve. When null, get all.
-   * @param ts
-   * @param numVersions number of versions to retrieve
-   * @param lockid
-   * @return HbaseMapWritable<columnName, Cell> values
-   * @throws IOException
-   */
-  public HbaseMapWritable<byte [], Cell> getFull(final byte [] row,
-      final NavigableSet<byte []> columns, final long ts,
-      final int numVersions, final Integer lockid) 
-  throws IOException {
-    // Check columns passed
-    if (columns != null) {
-      for (byte [] column: columns) {
-        checkColumn(column);
-      }
-    }
-    List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-    Map<KeyValue, Counter> versionCounter =
-      new TreeMap<KeyValue, Counter>(this.comparatorIgnoreTimestamp);
-    Integer lid = getLock(lockid,row);
-    HashSet<Store> storeSet = new HashSet<Store>();
-    try {
-      // Get the concerned columns or all of them
-      if (columns != null) {
-        for (byte[] bs : columns) {
-          Store store = stores.get(bs);
-          if (store != null) {
-            storeSet.add(store);
-          }
-        }
-      } else {
-        storeSet.addAll(stores.values());
-      }
-      long timestamp =
-        (ts == HConstants.LATEST_TIMESTAMP)? System.currentTimeMillis(): ts;
-      KeyValue key = KeyValue.createFirstOnRow(row, timestamp);
-      // For each column name that is just a column family, open the store
-      // related to it and fetch everything for that row. HBASE-631
-      // Also remove each store from storeSet so that these stores
-      // won't be opened for no reason. HBASE-783
-      if (columns != null) {
-        for (byte [] bs : columns) {
-          // TODO: Fix so we use comparator in KeyValue that looks at
-          // column family portion only.
-          if (KeyValue.getFamilyDelimiterIndex(bs, 0, bs.length) == (bs.length - 1)) {
-            Store store = stores.get(bs);
-            store.getFull(key, null, null, numVersions, versionCounter,
-              keyvalues, timestamp);
-            storeSet.remove(store);
-          }
-        }
-      }
-      for (Store targetStore: storeSet) {
-        targetStore.getFull(key, columns, null, numVersions, versionCounter,
-          keyvalues, timestamp);
-      }
-      
-      return Cell.createCells(keyvalues);
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
   /**
    * Return all the data for the row that matches <i>row</i> exactly, 
    * or the one that immediately preceeds it, at or immediately before 
@@ -1165,9 +989,9 @@
    * @return map of values
    * @throws IOException
    */
-  RowResult getClosestRowBefore(final byte [] row)
+  Result getClosestRowBefore(final byte [] row)
   throws IOException{
-    return getClosestRowBefore(row, HConstants.COLUMN_FAMILY);
+    return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
   }
 
   /**
@@ -1176,12 +1000,13 @@
    * <i>ts</i>.
    * 
    * @param row row key
+   * @param family
    * @param columnFamily Must include the column family delimiter character.
    * @return map of values
    * @throws IOException
    */
-  public RowResult getClosestRowBefore(final byte [] row,
-    final byte [] columnFamily)
+  public Result getClosestRowBefore(final byte [] row,
+    final byte [] family)
   throws IOException{
     // look across all the HStores for this region and determine what the
     // closest key is across all column families, since the data may be sparse
@@ -1189,7 +1014,7 @@
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
-      Store store = getStore(columnFamily);
+      Store store = getStore(family);
       KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
       key = store.getRowKeyAtOrBefore(kv);
@@ -1202,52 +1027,45 @@
       if (!this.comparator.matchingRows(kv, key)) {
         kv = new KeyValue(key.getRow(), HConstants.LATEST_TIMESTAMP);
       }
-      store.getFull(kv, null, null, 1, null, results, System.currentTimeMillis());
-      // Convert to RowResult.  TODO: Remove need to do this.
-      return RowResult.createRowResult(results);
+      Get get = new Get(key.getRow());
+      store.get(get, null, results);
+      
+      return new Result(results);
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
   }
 
+  //TODO
   /**
    * Return an iterator that scans over the HRegion, returning the indicated 
-   * columns for only the rows that match the data filter.  This Iterator must
-   * be closed by the caller.
+   * columns and rows specified by the {@link Scan}.
+   * <p>
+   * This Iterator must be closed by the caller.
    *
-   * @param cols columns to scan. If column name is a column family, all
-   * columns of the specified column family are returned.  Its also possible
-   * to pass a regex in the column qualifier. A column qualifier is judged to
-   * be a regex if it contains at least one of the following characters:
-   * <code>\+|^&*$[]]}{)(</code>.
-   * @param firstRow row which is the starting point of the scan
-   * @param timestamp only return rows whose timestamp is <= this value
-   * @param filter row filter
+   * @param scan configured {@link Scan}
    * @return InternalScanner
    * @throws IOException
    */
-  public InternalScanner getScanner(byte[][] cols, byte [] firstRow,
-    long timestamp, RowFilterInterface filter) 
+  public InternalScanner getScanner(Scan scan)
   throws IOException {
     newScannerLock.readLock().lock();
     try {
       if (this.closed.get()) {
         throw new IOException("Region " + this + " closed");
       }
-      HashSet<Store> storeSet = new HashSet<Store>();
-      NavigableSet<byte []> columns =
-        new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-      // Below we make up set of stores we want scanners on and we fill out the
-      // list of columns.
-      for (int i = 0; i < cols.length; i++) {
-        columns.add(cols[i]);
-        Store s = stores.get(cols[i]);
-        if (s != null) {
-          storeSet.add(s);
+      // Verify families are all valid
+      if(scan.hasFamilies()) {
+        for(byte [] family : scan.getFamilyMap().keySet()) {
+          checkFamily(family);
+        }
+      } else { // Adding all families to scanner
+        for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
+          scan.addFamily(family);
         }
       }
-      return new HScanner(columns, firstRow, timestamp,
-        storeSet.toArray(new Store [storeSet.size()]), filter);
+      return new RegionScanner(scan);
+      
     } finally {
       newScannerLock.readLock().unlock();
     }
@@ -1256,44 +1074,136 @@
   //////////////////////////////////////////////////////////////////////////////
   // set() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
+  /**
+   * @param delete
+   * @param lockid
+   * @param writeToWAL
+   * @throws IOException
+   */
+  public void delete(Delete delete, Integer lockid, boolean writeToWAL)
+  throws IOException {
+    checkReadOnly();
+    checkResources();
+    splitsAndClosesLock.readLock().lock();
+    Integer lid = null;
+    try {
+      byte [] row = delete.getRow();
+      // If we did not pass an existing row lock, obtain a new one
+      lid = getLock(lockid, row);
+
+      //Check to see if this is a deleteRow insert
+      if(delete.getFamilyMap().isEmpty()){
+        for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
+          delete.deleteFamily(family);
+        }
+      } else {
+        for(byte [] family : delete.getFamilyMap().keySet()) {
+          if(family == null) {
+            throw new NoSuchColumnFamilyException("Empty family is invalid");
+          }
+          checkFamily(family);
+        }
+      }
+      
+      for(Map.Entry<byte[], List<KeyValue>> e: delete.getFamilyMap().entrySet()) {
+        byte [] family = e.getKey();
+        delete(family, e.getValue(), writeToWAL);
+      }
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+      splitsAndClosesLock.readLock().unlock();
+    }
+  }
+  
   
   /**
-   * @param b
+   * @param family
+   * @param kvs
+   * @param writeToWAL
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b) throws IOException {
-    this.batchUpdate(b, null, true);
+  public void delete(byte [] family, List<KeyValue> kvs, boolean writeToWAL)
+  throws IOException {
+    long now = System.currentTimeMillis();
+    boolean flush = false;
+    this.updatesLock.readLock().lock();
+    try {
+      if (writeToWAL) {
+        this.log.append(regionInfo.getRegionName(),
+          regionInfo.getTableDesc().getName(), kvs,
+          (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
+      }
+      long size = 0;
+      Store store = getStore(family);
+      for (KeyValue kv: kvs) {
+        // Check if time is LATEST, change to time of most recent addition if so
+        // This is expensive.
+        if (kv.isLatestTimestamp() && kv.isDeleteType()) {
+          List<KeyValue> result = new ArrayList<KeyValue>(1);
+          Get g = new Get(kv.getRow());
+          NavigableSet<byte []> qualifiers =
+            new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+          qualifiers.add(kv.getQualifier());
+          get(store, g, qualifiers, result);
+          if (result.isEmpty()) {
+            // Nothing to delete
+            continue;
+          }
+          if (result.size() > 1) {
+            throw new RuntimeException("Unexpected size: " + result.size());
+          }
+          KeyValue getkv = result.get(0);
+          Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
+            getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
+        }
+        size = this.memcacheSize.addAndGet(store.delete(kv));
+      }
+      flush = isFlushSize(size);
+    } finally {
+      this.updatesLock.readLock().unlock();
+    }
+    if (flush) {
+      // Request a cache flush.  Do it outside update lock.
+      requestFlush();
+    }
+  }
+  
+  /**
+   * @param put
+   * @throws IOException
+   */
+  public void put(Put put) throws IOException {
+    this.put(put, null, true);
   }
   
   /**
-   * @param b
+   * @param put
    * @param writeToWAL
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException {
-    this.batchUpdate(b, null, writeToWAL);
+  public void put(Put put, boolean writeToWAL) throws IOException {
+    this.put(put, null, writeToWAL);
   }
 
-  
   /**
-   * @param b
+   * @param put
    * @param lockid
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException {
-    this.batchUpdate(b, lockid, true);
+  public void put(Put put, Integer lockid) throws IOException {
+    this.put(put, lockid, true);
   }
-  
+
   /**
-   * @param b
+   * @param put
    * @param lockid
-   * @param writeToWAL if true, then we write this update to the log
+   * @param writeToWAL
    * @throws IOException
    */
-  public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
+  public void put(Put put, Integer lockid, boolean writeToWAL)
   throws IOException {
     checkReadOnly();
-    validateValuesLength(b);
+//    validateValuesLength(put);
 
     // Do a rough check that we have resources to accept a write.  The check is
     // 'rough' in that between the resource check and the call to obtain a 
@@ -1307,49 +1217,18 @@
       // #commit or #abort or if the HRegionServer lease on the lock expires.
       // See HRegionServer#RegionListener for how the expire on HRegionServer
       // invokes a HRegion#abort.
-      byte [] row = b.getRow();
+      byte [] row = put.getRow();
       // If we did not pass an existing row lock, obtain a new one
       Integer lid = getLock(lockid, row);
-      long now = System.currentTimeMillis();
-      long commitTime = b.getTimestamp() == LATEST_TIMESTAMP?
-        now: b.getTimestamp();
-      Set<byte []> latestTimestampDeletes = null;
-      List<KeyValue> edits = new ArrayList<KeyValue>();
+      byte [] now = Bytes.toBytes(System.currentTimeMillis());
       try {
-        for (BatchOperation op: b) {
-          byte [] column = op.getColumn();
-          checkColumn(column);
-          KeyValue kv = null;
-          if (op.isPut()) {
-            kv = new KeyValue(row, column, commitTime, op.getValue());
-          } else {
-            // Its a delete.
-            if (b.getTimestamp() == LATEST_TIMESTAMP) {
-              // Save off these deletes of the most recent thing added on the
-              // family.
-              if (latestTimestampDeletes == null) {
-                latestTimestampDeletes =
-                  new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
-              }
-              latestTimestampDeletes.add(op.getColumn());
-              continue;
-            }
-            // Its an explicit timestamp delete
-            kv = new KeyValue(row, column, commitTime, KeyValue.Type.Delete,
-              HConstants.EMPTY_BYTE_ARRAY);
-          }
-          edits.add(kv);
-        }
-        if (!edits.isEmpty()) {
-          update(edits, writeToWAL, now);
-        }
-        if (latestTimestampDeletes != null &&
-            !latestTimestampDeletes.isEmpty()) {
-          // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
-          // as edits.  Need to do individually after figuring which is latest
-          // timestamp to delete.
-          for (byte [] column: latestTimestampDeletes) {
-            deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
+        for(Map.Entry<byte[], List<KeyValue>> entry : 
+          put.getFamilyMap().entrySet()) {
+          byte [] family = entry.getKey();
+          checkFamily(family);
+          List<KeyValue> puts = entry.getValue();
+          if(updateKeys(puts, now)) {
+            put(family, puts, writeToWAL);
           }
         }
       } finally {
@@ -1360,127 +1239,119 @@
     }
   }
 
+  
+  //TODO, Think that gets/puts and deletes should be refactored a bit so that 
+  //the getting of the lock happens before, so that you would just pass it into
+  //the methods. So in the case of checkAndPut you could just do lockRow, 
+  //get, put, unlockRow or something
   /**
-   * Performs an atomic check and save operation. Checks if
-   * the specified expected values have changed, and if not
-   * applies the update.
    * 
-   * @param b the update to apply
-   * @param expectedValues the expected values to check
-   * @param lockid
-   * @param writeToWAL whether or not to write to the write ahead log
-   * @return true if update was applied
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param expectedValue
+   * @param put
+   * @param lockId
+   * @param writeToWAL
    * @throws IOException
+   * @return true if the new put was execute, false otherwise
    */
-  public boolean checkAndSave(BatchUpdate b,
-    HbaseMapWritable<byte[], byte[]> expectedValues, Integer lockid,
-    boolean writeToWAL)
-  throws IOException {
-    // This is basically a copy of batchUpdate with the atomic check and save
-    // added in. So you should read this method with batchUpdate. I will
-    // comment the areas that I have changed where I have not changed, you
-    // should read the comments from the batchUpdate method
-    boolean success = true;
+  public boolean checkAndPut(byte [] row, byte [] family, byte [] qualifier,
+      byte [] expectedValue, Put put, Integer lockId, boolean writeToWAL) 
+  throws IOException{
     checkReadOnly();
-    validateValuesLength(b);
+    //TODO, add check for value length or maybe even better move this to the 
+    //client if this becomes a global setting
     checkResources();
     splitsAndClosesLock.readLock().lock();
     try {
-      byte[] row = b.getRow();
-      long now = System.currentTimeMillis();
-      Integer lid = getLock(lockid,row);
+      Get get = new Get(row, put.getRowLock());
+      checkFamily(family);
+      get.addColumn(family, qualifier);
+
+      byte [] now = Bytes.toBytes(System.currentTimeMillis());
+
+      // Lock row
+      Integer lid = getLock(lockId, get.getRow()); 
+      List<KeyValue> result = new ArrayList<KeyValue>();
       try {
-        NavigableSet<byte []> keySet =
-          new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        keySet.addAll(expectedValues.keySet());
-        Map<byte[],Cell> actualValues = getFull(row, keySet,
-          HConstants.LATEST_TIMESTAMP, 1,lid);
-        for (byte[] key : keySet) {
-	  // If test fails exit
-	  Cell cell = actualValues.get(key);
-	  byte[] actualValue = new byte[] {};
-	  if (cell != null) 
-	    actualValue = cell.getValue();
-	  if(!Bytes.equals(actualValue,
-			   expectedValues.get(key))) {
-	    success = false;
-	    break;
-	  }
-	}
-        if (success) {
-          long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)?
-            now: b.getTimestamp();
-          Set<byte []> latestTimestampDeletes = null;
-          List<KeyValue> edits = new ArrayList<KeyValue>();
-          for (BatchOperation op: b) {
-            byte [] column = op.getColumn();
-            KeyValue kv = null;
-            if (op.isPut()) {
-              kv = new KeyValue(row, column, commitTime, op.getValue());
-            } else {
-              // Its a delete.
-              if (b.getTimestamp() == LATEST_TIMESTAMP) {
-                // Save off these deletes of the most recent thing added on
-                // the family.
-                if (latestTimestampDeletes == null) {
-                  latestTimestampDeletes =
-                    new TreeSet<byte []>(Bytes.BYTES_RAWCOMPARATOR);
-                }
-                latestTimestampDeletes.add(op.getColumn());
-              } else {
-                // Its an explicit timestamp delete
-                kv = new KeyValue(row, column, commitTime,
-                  KeyValue.Type.Delete, HConstants.EMPTY_BYTE_ARRAY);
-              }
-            }
-            edits.add(kv);
-          }
-          if (!edits.isEmpty()) {
-            update(edits, writeToWAL, now);
-          }
-          if (latestTimestampDeletes != null &&
-              !latestTimestampDeletes.isEmpty()) {
-            // We have some LATEST_TIMESTAMP deletes to run.  Can't do them inline
-            // as edits.  Need to do individually after figuring which is latest
-            // timestamp to delete.
-            for (byte [] column: latestTimestampDeletes) {
-              deleteMultiple(row, column, LATEST_TIMESTAMP, 1, now);
+        //Getting data
+        for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
+          get.getFamilyMap().entrySet()) {
+          get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
+        }
+        boolean matches = false;
+        if (result.size() == 0 && expectedValue.length == 0) {
+          matches = true;
+        } else if(result.size() == 1) {
+          //Compare the expected value with the actual value
+          byte [] actualValue = result.get(0).getValue();
+          matches = Bytes.equals(expectedValue, actualValue);
+        }
+        //If matches put the new put
+        if(matches) {
+          for(Map.Entry<byte[], List<KeyValue>> entry :
+            put.getFamilyMap().entrySet()) {
+            byte [] fam = entry.getKey();
+            checkFamily(fam);
+            List<KeyValue> puts = entry.getValue();
+            if(updateKeys(puts, now)) {
+              put(fam, puts, writeToWAL);
             }
           }
+          return true;  
         }
+        return false;
       } finally {
-        if(lockid == null) releaseRowLock(lid);
+        if(lockId == null) releaseRowLock(lid);
       }
     } finally {
       splitsAndClosesLock.readLock().unlock();
-    }
-    return success;
+    }    
   }
-
-  /*
-   * Utility method to verify values length
-   * @param batchUpdate The update to verify
-   * @throws IOException Thrown if a value is too long
-   */
-  private void validateValuesLength(BatchUpdate batchUpdate)
-  throws IOException {
-    for (Iterator<BatchOperation> iter = 
-      batchUpdate.iterator(); iter.hasNext();) {
-      BatchOperation operation = iter.next();
-      if (operation.getValue() != null) {
-        HColumnDescriptor fam = this.regionInfo.getTableDesc().
-          getFamily(operation.getColumn());
-        if (fam != null) {
-          int maxLength = fam.getMaxValueLength();
-          if (operation.getValue().length > maxLength) {
-            throw new ValueOverMaxLengthException("Value in column "
-                + Bytes.toString(operation.getColumn()) + " is too long. "
-                + operation.getValue().length + " instead of " + maxLength);
-          }
-        }
-      }
+      
+  
+  /**
+   * Checks if any stamps are > now.  If so, sets them to now.
+   * <p>
+   * This acts to be prevent users from inserting future stamps as well as
+   * to replace LATEST_TIMESTAMP with now.
+   * @param keys
+   * @param now
+   * @return
+   */
+  private boolean updateKeys(List<KeyValue> keys, byte [] now) {
+    if(keys == null || keys.isEmpty()) {
+      return false;
+    }
+    for(KeyValue key : keys) {
+      key.updateLatestStamp(now);
     }
+    return true;
   }
+  
+
+//  /*
+//   * Utility method to verify values length. 
+//   * @param batchUpdate The update to verify
+//   * @throws IOException Thrown if a value is too long
+//   */
+//  private void validateValuesLength(Put put)
+//  throws IOException {
+//    Map<byte[], List<KeyValue>> families = put.getFamilyMap();
+//    for(Map.Entry<byte[], List<KeyValue>> entry : families.entrySet()) {
+//      HColumnDescriptor hcd = 
+//        this.regionInfo.getTableDesc().getFamily(entry.getKey());
+//      int maxLen = hcd.getMaxValueLength();
+//      for(KeyValue kv : entry.getValue()) {
+//        if(kv.getValueLength() > maxLen) {
+//          throw new ValueOverMaxLengthException("Value in column "
+//            + Bytes.toString(kv.getColumn()) + " is too long. "
+//            + kv.getValueLength() + " > " + maxLen);
+//        }
+//      }
+//    }
+//  }
 
   /*
    * Check if resources to support an update.
@@ -1517,230 +1388,6 @@
           + Thread.currentThread().getName() + "'");
     }
   }
-  
-  /**
-   * Delete all cells of the same age as the passed timestamp or older.
-   * @param row
-   * @param column
-   * @param ts Delete all entries that have this timestamp or older
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteAll(final byte [] row, final byte [] column, final long ts,
-      final Integer lockid)
-  throws IOException {
-    checkColumn(column);
-    checkReadOnly();
-    Integer lid = getLock(lockid,row);
-    try {
-      // Delete ALL versions rather than column family VERSIONS.  If we just did
-      // VERSIONS, then if 2* VERSION cells, subsequent gets would get old stuff.
-      deleteMultiple(row, column, ts, ALL_VERSIONS, System.currentTimeMillis());
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
-  /**
-   * Delete all cells of the same age as the passed timestamp or older.
-   * @param row
-   * @param ts Delete all entries that have this timestamp or older
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteAll(final byte [] row, final long ts, final Integer lockid)
-  throws IOException {
-    checkReadOnly();
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    long time = ts;
-    if (ts == HConstants.LATEST_TIMESTAMP) {
-      time = now;
-    }
-    KeyValue kv = KeyValue.createFirstOnRow(row, time);
-    try {
-      for (Store store : stores.values()) {
-        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, time);
-        List<KeyValue> edits = new ArrayList<KeyValue>();
-        for (KeyValue key: keyvalues) {
-          // This is UGLY. COPY OF KEY PART OF KeyValue.
-          edits.add(key.cloneDelete());
-        }
-        update(edits, now);
-      }
-    } finally {
-      if (lockid == null) releaseRowLock(lid);
-    }
-  }
-  
-  /**
-   * Delete all cells for a row with matching columns with timestamps
-   * less than or equal to <i>timestamp</i>. 
-   * 
-   * @param row The row to operate on
-   * @param columnRegex The column regex 
-   * @param timestamp Timestamp to match
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteAllByRegex(final byte [] row, final String columnRegex, 
-      final long timestamp, final Integer lockid) throws IOException {
-    checkReadOnly();
-    Pattern columnPattern = Pattern.compile(columnRegex);
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    KeyValue kv = new KeyValue(row, timestamp);
-    try {
-      for (Store store : stores.values()) {
-        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-        store.getFull(kv, null, columnPattern, ALL_VERSIONS, null, keyvalues,
-          now);
-        List<KeyValue> edits = new ArrayList<KeyValue>();
-        for (KeyValue key: keyvalues) {
-          edits.add(key.cloneDelete());
-        }
-        update(edits, now);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
-  /**
-   * Delete all cells for a row with matching column family with timestamps
-   * less than or equal to <i>timestamp</i>.
-   *
-   * @param row The row to operate on
-   * @param family The column family to match
-   * @param timestamp Timestamp to match
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteFamily(byte [] row, byte [] family, long timestamp,
-      final Integer lockid)
-  throws IOException{
-    checkReadOnly();
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    try {
-      // find the HStore for the column family
-      Store store = getStore(family);
-      // find all the keys that match our criteria
-      List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-      store.getFull(new KeyValue(row, timestamp), null, null, ALL_VERSIONS,
-        null, keyvalues, now);
-      // delete all the cells
-      List<KeyValue> edits = new ArrayList<KeyValue>();
-      for (KeyValue kv: keyvalues) {
-        edits.add(kv.cloneDelete());
-      }
-      update(edits, now);
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-
-  /**
-   * Delete all cells for a row with all the matching column families by
-   * familyRegex with timestamps less than or equal to <i>timestamp</i>.
-   * 
-   * @param row The row to operate on
-   * @param familyRegex The column family regex for matching. This regex
-   * expression just match the family name, it didn't include <code>:<code>
-   * @param timestamp Timestamp to match
-   * @param lockid Row lock
-   * @throws IOException
-   */
-  public void deleteFamilyByRegex(byte [] row, String familyRegex,
-      final long timestamp, final Integer lockid)
-  throws IOException {
-    checkReadOnly();
-    // construct the family regex pattern
-    Pattern familyPattern = Pattern.compile(familyRegex);
-    Integer lid = getLock(lockid, row);
-    long now = System.currentTimeMillis();
-    KeyValue kv = new KeyValue(row, timestamp);
-    try {
-      for(Store store: stores.values()) {
-        String familyName = Bytes.toString(store.getFamily().getName());
-        // check the family name match the family pattern.
-        if(!(familyPattern.matcher(familyName).matches())) 
-          continue;
-        
-        List<KeyValue> keyvalues = new ArrayList<KeyValue>();
-        store.getFull(kv, null, null, ALL_VERSIONS, null, keyvalues, now);
-        List<KeyValue> edits = new ArrayList<KeyValue>();
-        for (KeyValue k: keyvalues) {
-          edits.add(k.cloneDelete());
-        }
-        update(edits, now);
-      }
-    } finally {
-      if(lockid == null) releaseRowLock(lid);
-    }
-  }
-  
-  /*
-   * Delete one or many cells.
-   * Used to support {@link #deleteAll(byte [], byte [], long)} and deletion of
-   * latest cell.
-   * @param row
-   * @param column
-   * @param ts Timestamp to start search on.
-   * @param versions How many versions to delete. Pass
-   * {@link HConstants#ALL_VERSIONS} to delete all.
-   * @param now
-   * @throws IOException
-   */
-  private void deleteMultiple(final byte [] row, final byte [] column,
-      final long ts, final int versions, final long now)
-  throws IOException {
-    checkReadOnly();
-    // We used to have a getKeys method that purportedly only got the keys and
-    // not the keys and values.  We now just do getFull.  For memcache values,
-    // shouldn't matter if we get key and value since it'll be the entry that
-    // is in memcache.  For the keyvalues from storefile, could be saving if
-    // we only returned key component. TODO.
-    List<KeyValue> keys = get(row, column, ts, versions);
-    if (keys != null && keys.size() > 0) {
-      // I think the below edits don't have to be storted.  Its deletes.
-      // hey don't have to go in in exact sorted order (we don't have to worry
-      // about the meta or root sort comparator here).
-      List<KeyValue> edits = new ArrayList<KeyValue>();
-      for (KeyValue key: keys) {
-        edits.add(key.cloneDelete());
-      }
-      update(edits, now);
-    }
-  }
-
-  /**
-   * Tests for the existence of any cells for a given coordinate.
-   * 
-   * @param row the row
-   * @param column the column, or null
-   * @param timestamp the timestamp, or HConstants.LATEST_VERSION for any
-   * @param lockid the existing lock, or null 
-   * @return true if cells exist for the row, false otherwise
-   * @throws IOException
-   */
-  public boolean exists(final byte[] row, final byte[] column, 
-    final long timestamp, final Integer lockid) 
-  throws IOException {
-    checkRow(row);
-    Integer lid = getLock(lockid, row);
-    try {
-      NavigableSet<byte []> columns = null;
-      if (column != null) {
-        columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
-        columns.add(column);
-      }
-      return !getFull(row, columns, timestamp, 1, lid).isEmpty();
-    } finally {
-      if (lockid == null) releaseRowLock(lid);
-    }
-  }
 
   /**
    * @throws IOException Throws exception if region is in read-only mode.
@@ -1758,9 +1405,9 @@
    * @praram now
    * @throws IOException
    */
-  private void update(final List<KeyValue> edits, final long now)
+  private void put(final byte [] family, final List<KeyValue> edits)
   throws IOException {
-    this.update(edits, true, now);
+    this.put(family, edits, true);
   }
 
   /** 
@@ -1771,9 +1418,8 @@
    * @param now
    * @throws IOException
    */
-  private void update(final List<KeyValue> edits, boolean writeToWAL,
-    final long now)
-  throws IOException {
+  private void put(final byte [] family, final List<KeyValue> edits, 
+      boolean writeToWAL) throws IOException {
     if (edits == null || edits.isEmpty()) {
       return;
     }
@@ -1781,14 +1427,15 @@
     this.updatesLock.readLock().lock();
     try {
       if (writeToWAL) {
+        long now = System.currentTimeMillis();
         this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), edits,
           (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
       }
       long size = 0;
+      Store store = getStore(family);
       for (KeyValue kv: edits) {
-        // TODO: Fix -- do I have to do a getColumn here?
-        size = this.memcacheSize.addAndGet(getStore(kv.getColumn()).add(kv));
+        size = this.memcacheSize.addAndGet(store.add(kv));
       }
       flush = isFlushSize(size);
     } finally {
@@ -1826,7 +1473,6 @@
   }
   
   // Do any reconstruction needed from the log
-  @SuppressWarnings("unused")
   protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
     Progressable reporter)
   throws UnsupportedEncodingException, IOException {
@@ -1865,23 +1511,6 @@
           Bytes.toString(row) + "'");
     }
   }
-  
-  /*
-   * Make sure this is a valid column for the current table
-   * @param columnName
-   * @throws NoSuchColumnFamilyException
-   */
-  private void checkColumn(final byte [] column)
-  throws NoSuchColumnFamilyException {
-    if (column == null) {
-      return;
-    }
-    if (!regionInfo.getTableDesc().hasFamily(column)) {
-      throw new NoSuchColumnFamilyException("Column family on " +
-        Bytes.toString(column) + " does not exist in region " + this
-          + " in table " + regionInfo.getTableDesc());
-    }
-  }
 
   /**
    * Obtain a lock on the given row.  Blocks until success.
@@ -1906,7 +1535,7 @@
    * @throws IOException
    * @return The id of the held lock.
    */
-  Integer obtainRowLock(final byte [] row) throws IOException {
+  public Integer obtainRowLock(final byte [] row) throws IOException {
     checkRow(row);
     splitsAndClosesLock.readLock().lock();
     try {
@@ -2018,161 +1647,86 @@
     return this.basedir;
   }
 
+  
+  //TODO
   /**
-   * HScanner is an iterator through a bunch of rows in an HRegion.
+   * RegionScanner is an iterator through a bunch of rows in an HRegion.
+   * <p>
+   * It is used to combine scanners from multiple Stores (aka column families).
    */
-  private class HScanner implements InternalScanner {
-    private InternalScanner[] scanners;
-    private List<KeyValue> [] resultSets;
-    private RowFilterInterface filter;
-
-    /** Create an HScanner with a handle on many HStores. */
-    @SuppressWarnings("unchecked")
-    HScanner(final NavigableSet<byte []> columns, byte [] firstRow,
-      long timestamp, final Store [] stores, final RowFilterInterface filter)
-    throws IOException {
-      this.filter = filter;
-      this.scanners = new InternalScanner[stores.length];
+  class RegionScanner implements InternalScanner {
+    
+    private KeyValueHeap storeHeap;
+    private byte [] stopRow;
+    
+    RegionScanner(Scan scan) throws IOException {
+      if(Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
+        this.stopRow = null;
+      } else {
+        this.stopRow = scan.getStopRow();
+      }
+      
+      List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
       try {
-        for (int i = 0; i < stores.length; i++) {
-          // Only pass relevant columns to each store
-          NavigableSet<byte[]> columnSubset =
-            new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
-          for (byte [] c: columns) {
-            if (KeyValue.FAMILY_COMPARATOR.compare(stores[i].storeName, c) == 0) {
-              columnSubset.add(c);
-            }
-          }
-          RowFilterInterface f = filter;
-          if (f != null) {
-            // Need to replicate filters.
-            // At least WhileMatchRowFilter will mess up the scan if only
-            // one shared across many rows. See HADOOP-2467.
-            f = WritableUtils.clone(filter, conf);
-          }
-          scanners[i] = stores[i].getScanner(timestamp, columnSubset, firstRow, f);
+        for(Map.Entry<byte[], NavigableSet<byte[]>> entry : 
+          scan.getFamilyMap().entrySet()) {
+          Store store = stores.get(entry.getKey());
+          scanners.add(store.getScanner(scan, entry.getValue()));
         }
       } catch (IOException e) {
-        for (int i = 0; i < this.scanners.length; i++) {
-          if (scanners[i] != null) {
-            closeScanner(i);
+        for(KeyValueScanner scanner : scanners) {
+          if(scanner != null) {
+            close(scanner);
           }
         }
         throw e;
       }
-
-      // Advance to the first key in each store.
-      // All results will match the required column-set and scanTime.
-      this.resultSets = new List[scanners.length];
-      for (int i = 0; i < scanners.length; i++) {
-        resultSets[i] = new ArrayList<KeyValue>();
-        if(scanners[i] != null && !scanners[i].next(resultSets[i])) {
-          closeScanner(i);
-        }
-      }
-
+      
+      this.storeHeap = 
+        new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
+      
       // As we have now successfully completed initialization, increment the
       // activeScanner count.
       activeScannerCount.incrementAndGet();
     }
 
+    /**
+     * Get the next row of results from this region.
+     * @param results list to append results to
+     * @return true if there are more rows, false if scanner is done
+     */
     public boolean next(List<KeyValue> results)
     throws IOException {
-      boolean moreToFollow = false;
-      boolean filtered = false;
-      do {
-        // Find the lowest key across all stores.
-        KeyValue chosen = null;
-        long chosenTimestamp = -1;
-        for (int i = 0; i < this.scanners.length; i++) {
-          if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
-            continue;
-          }
-          KeyValue kv = this.resultSets[i].get(0);
-          if (chosen == null ||
-               (comparator.compareRows(kv, chosen) < 0) ||
-               ((comparator.compareRows(kv, chosen) == 0) &&
-                 (kv.getTimestamp() > chosenTimestamp))) {
-            chosen = kv;
-            chosenTimestamp = chosen.getTimestamp();
-          }
-        }
-
-        // Store results from each sub-scanner.
-        if (chosenTimestamp >= 0) {
-          for (int i = 0; i < scanners.length; i++) {
-            if (this.resultSets[i] == null || this.resultSets[i].isEmpty()) {
-              continue;
-            }
-            KeyValue kv = this.resultSets[i].get(0);
-            if (comparator.compareRows(kv, chosen) == 0) {
-              results.addAll(this.resultSets[i]);
-              resultSets[i].clear();
-              if (!scanners[i].next(resultSets[i])) {
-                closeScanner(i);
-              }
-            }
-          }
-        }
-
-        moreToFollow = chosenTimestamp >= 0;
-        if (results == null || results.size() <= 0) {
-          // If we got no results, then there is no more to follow.
-          moreToFollow = false;
-        }
-
-        filtered = filter == null ? false : filter.filterRow(results);
-        if (filter != null && filter.filterAllRemaining()) {
-          moreToFollow = false;
-        }
-        
-        if (moreToFollow) {
-          if (filter != null) {
-            filter.rowProcessed(filtered, chosen.getBuffer(), chosen.getRowOffset(),
-              chosen.getRowLength());
-          }
-          if (filtered) {
-            results.clear();
-          }
-        }
-      } while(filtered && moreToFollow);
-
-      // Make sure scanners closed if no more results
-      if (!moreToFollow) {
-        for (int i = 0; i < scanners.length; i++) {
-          if (null != scanners[i]) {
-            closeScanner(i);
-          }
-        }
+      // This method should probably be reorganized a bit... has gotten messy
+      KeyValue kv = this.storeHeap.peek();
+      if(kv == null) {
+        return false;
       }
-      
-      return moreToFollow;
-    }
-
-    /** Shut down a single scanner */
-    void closeScanner(int i) {
-      try {
-        try {
-          scanners[i].close();
-        } catch (IOException e) {
-          LOG.warn("Failed closing scanner " + i, e);
-        }
-      } finally {
-        scanners[i] = null;
-        // These data members can be null if exception in constructor
-        if (resultSets != null) {
-          resultSets[i] = null;
+      byte [] currentRow = kv.getRow();
+      // See if we passed stopRow
+      if(stopRow != null &&
+          comparator.compareRows(stopRow, 0, stopRow.length, 
+              currentRow, 0, currentRow.length)
+          <= 0){
+        return false;
+      }
+      this.storeHeap.next(results);
+      while(true) {
+        kv = this.storeHeap.peek();
+        if(kv == null) {
+          return false;
+        }
+        byte [] row = kv.getRow();
+        if(!Bytes.equals(currentRow, row)) {
+          return true;
         }
+        this.storeHeap.next(results);
       }
     }
 
     public void close() {
       try {
-        for(int i = 0; i < scanners.length; i++) {
-          if(scanners[i] != null) {
-            closeScanner(i);
-          }
-        }
+        storeHeap.close();
       } finally {
         synchronized (activeScannerCount) {
           int count = activeScannerCount.decrementAndGet();
@@ -2188,14 +1742,22 @@
         }
       }
     }
-
-    public boolean isWildcardScanner() {
-      throw new UnsupportedOperationException("Unimplemented on HScanner");
+    /**
+     * 
+     * @param scanner to be closed
+     */
+    public void close(KeyValueScanner scanner) {
+      try {
+        scanner.close();
+      } catch(NullPointerException npe) {}
+    }
+    
+    /**
+     * @return the current storeHeap
+     */
+    public KeyValueHeap getStoreHeap() {
+      return this.storeHeap;
     }
-
-    public boolean isMultipleMatchScanner() {
-      throw new UnsupportedOperationException("Unimplemented on HScanner");
-    }  
   }
   
   // Utility methods
@@ -2282,9 +1844,9 @@
     Integer lid = meta.obtainRowLock(row);
     try {
       List<KeyValue> edits = new ArrayList<KeyValue>();
-      edits.add(new KeyValue(row, COL_REGIONINFO, System.currentTimeMillis(),
-        Writables.getBytes(r.getRegionInfo())));
-      meta.update(edits, System.currentTimeMillis());
+      edits.add(new KeyValue(row, CATALOG_FAMILY, REGIONINFO_QUALIFIER,
+          System.currentTimeMillis(), Writables.getBytes(r.getRegionInfo())));
+      meta.put(HConstants.CATALOG_FAMILY, edits);
     } finally {
       meta.releaseRowLock(lid);
     }
@@ -2304,8 +1866,9 @@
   public static void removeRegionFromMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final byte [] regionName)
   throws IOException {
-    srvr.deleteFamily(metaRegionName, regionName, HConstants.COLUMN_FAMILY,
-      HConstants.LATEST_TIMESTAMP, -1L);
+    Delete delete = new Delete(regionName);
+    delete.deleteFamily(HConstants.CATALOG_FAMILY);
+    srvr.delete(metaRegionName, delete);
   }
 
   /**
@@ -2319,14 +1882,18 @@
   public static void offlineRegionInMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final HRegionInfo info)
   throws IOException {
-    BatchUpdate b = new BatchUpdate(info.getRegionName());
+    // Puts and Deletes used to be "atomic" here.  We can use row locks if
+    // we need to keep that property, or we can expand Puts and Deletes to
+    // allow them to be committed at once.
+    byte [] row = info.getRegionName();
+    Put put = new Put(row);
     info.setOffline(true);
-    b.put(COL_REGIONINFO, Writables.getBytes(info));
-    b.delete(COL_SERVER);
-    b.delete(COL_STARTCODE);
-    // If carrying splits, they'll be in place when we show up on new
-    // server.
-    srvr.batchUpdate(metaRegionName, b, -1L);
+    put.add(CATALOG_FAMILY, REGIONINFO_QUALIFIER, Writables.getBytes(info));
+    srvr.put(metaRegionName, put);
+    Delete del = new Delete(row);
+    del.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
+    del.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+    srvr.delete(metaRegionName, del);
   }
   
   /**
@@ -2340,12 +1907,10 @@
   public static void cleanRegionInMETA(final HRegionInterface srvr,
     final byte [] metaRegionName, final HRegionInfo info)
   throws IOException {
-    BatchUpdate b = new BatchUpdate(info.getRegionName());
-    b.delete(COL_SERVER);
-    b.delete(COL_STARTCODE);
-    // If carrying splits, they'll be in place when we show up on new
-    // server.
-    srvr.batchUpdate(metaRegionName, b, LATEST_TIMESTAMP);
+    Delete del = new Delete(info.getRegionName());
+    del.deleteColumns(CATALOG_FAMILY, SERVER_QUALIFIER);
+    del.deleteColumns(CATALOG_FAMILY, STARTCODE_QUALIFIER);
+    srvr.delete(metaRegionName, del);
   }
 
   /**
@@ -2638,67 +2203,127 @@
     }
   }
 
-  public long incrementColumnValue(byte[] row, byte[] column, long amount)
+  
+  //
+  // HBASE-880
+  //
+  /**
+   * @param get
+   * @param lockid
+   * @return result
+   * @throws IOException
+   */
+  public Result get(final Get get, final Integer lockid) throws IOException {
+    // Verify families are all valid
+    if(get.hasFamilies()) {
+      for(byte [] family : get.familySet()) {
+        checkFamily(family);
+      }
+    } else { // Adding all families to scanner
+      for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
+        get.addFamily(family);
+      }
+    }
+    // Lock row
+    Integer lid = getLock(lockid, get.getRow()); 
+    List<KeyValue> result = new ArrayList<KeyValue>();
+    try {
+      for(Map.Entry<byte[],NavigableSet<byte[]>> entry:
+          get.getFamilyMap().entrySet()) {
+        get(this.stores.get(entry.getKey()), get, entry.getValue(), result);
+      }
+    } finally {
+      if(lockid == null) releaseRowLock(lid);
+    }
+    return new Result(result);
+  }
+
+  private void get(final Store store, final Get get,
+    final NavigableSet<byte []> qualifiers, List<KeyValue> result)
+  throws IOException {
+    store.get(get, qualifiers, result);
+  }
+
+  /**
+   * 
+   * @param row
+   * @param family
+   * @param qualifier
+   * @param amount
+   * @return
+   * @throws IOException
+   */
+  public long incrementColumnValue(byte [] row, byte [] family,
+      byte [] qualifier, long amount)
   throws IOException {
     checkRow(row);
-    checkColumn(column);
     
+    // Lock row
     Integer lid = obtainRowLock(row);
-    splitsAndClosesLock.readLock().lock();
+    long result = 0L;
     try {
-      KeyValue kv = new KeyValue(row, column);
-      long ts = System.currentTimeMillis();
-      byte [] value = null;
-
-      Store store = getStore(column);
-
-      List<KeyValue> c;
-      // Try the memcache first.
-      store.lock.readLock().lock();
-      try {
-        c = store.memcache.get(kv, 1);
-      } finally {
-        store.lock.readLock().unlock();
-      }
-      // Pick the latest value out of List<Cell> c:
-      if (c.size() >= 1) {
-        // Use the memcache timestamp value.
-        LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) +
-          "/" + Bytes.toString(column));
-        ts = c.get(0).getTimestamp();
-        value = c.get(0).getValue();
-      }
-
-      if (value == null) {
-        // Check the store (including disk) for the previous value.
-        c = store.get(kv, 1);
-        if (c != null && c.size() == 1) {
-          LOG.debug("Using HFile previous value for " + Bytes.toString(row) +
-            "/" + Bytes.toString(column));
-          value = c.get(0).getValue();
-        } else if (c != null && c.size() > 1) {
-          throw new DoNotRetryIOException("more than 1 value returned in " +
-            "incrementColumnValue from Store");
-        }
-      }
-      
-      if (value == null) {
-        // Doesn't exist
-        LOG.debug("Creating new counter value for " + Bytes.toString(row) +
-          "/"+ Bytes.toString(column));
-        value = Bytes.toBytes(amount);
-      } else {
-        if (amount == 0) return Bytes.toLong(value);
-        value = Bytes.incrementBytes(value, amount);
-      }
-
-      BatchUpdate b = new BatchUpdate(row, ts);
-      b.put(column, value);
-      batchUpdate(b, lid, true);
-      return Bytes.toLong(value);
+      Store store = stores.get(family);
+      result = store.incrementColumnValue(row, family, qualifier, amount);
     } finally {
-      splitsAndClosesLock.readLock().unlock();
-      releaseRowLock(lid);
+      if(lid == null) {
+        releaseRowLock(lid);
+      }
+    }
+    return result;
+  }
+    
+  
+  //
+  // New HBASE-880 Helpers
+  //
+  
+  private void checkFamily(final byte [] family) 
+  throws NoSuchColumnFamilyException {
+    if(!regionInfo.getTableDesc().hasFamily(family)) {
+      throw new NoSuchColumnFamilyException("Column family " +
+          Bytes.toString(family) + " does not exist in region " + this
+            + " in table " + regionInfo.getTableDesc());
     }
   }
-}
+  
+  
+//  //HBaseAdmin Debugging 
+//  /**
+//   * @return number of stores in the region
+//   */
+//  public int getNumStores() {
+//    return this.numStores;
+//  }
+//  /**
+//   * @return the name of the region
+//   */
+//  public byte [] getRegionsName() {
+//    return this.name;
+//  }
+//  /**
+//   * @return the number of files in every store
+//   */
+//  public int [] getStoresSize() {
+//    return this.storeSize;
+//  }
+//  
+//  //Writable, used for debugging purposes only
+//  public void readFields(final DataInput in)
+//  throws IOException {
+//    this.name = Bytes.readByteArray(in);
+//    this.numStores = in.readInt();
+//    this.storeSize = new int [numStores];
+//    for(int i=0; i<this.numStores; i++) {
+//      this.storeSize[i] = in.readInt();
+//    }
+//  }
+//
+//  public void write(final DataOutput out)
+//  throws IOException {
+//    Bytes.writeByteArray(out, this.regionInfo.getRegionName());
+//    out.writeInt(this.stores.size());
+//    for(Store store : this.stores.values()) {
+//      out.writeInt(store.getNumberOfstorefiles());
+//    }
+//  }
+}
\ No newline at end of file



Mime
View raw message