hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject svn commit: r782445 [15/17] - in /hadoop/hbase/trunk_on_hadoop-0.18.3: ./ bin/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/client/tableindexed/ src/java/org/apache/hadoop/hbase/client/tran...
Date Sun, 07 Jun 2009 19:57:43 GMT
Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sun Jun  7 19:57:37 2009
@@ -21,20 +21,25 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -46,156 +51,390 @@
 public class TestHRegion extends HBaseTestCase {
   static final Log LOG = LogFactory.getLog(TestHRegion.class);
 
-  private static final int FIRST_ROW = 1;
-  private static final int NUM_VALS = 1000;
-  private static final String CONTENTS_BASIC_STR = "contents:basic";
-  private static final byte [] CONTENTS_BASIC = Bytes.toBytes(CONTENTS_BASIC_STR);
-  private static final String CONTENTSTR = "contentstr";
-  private static final String ANCHORNUM = "anchor:anchornum-";
-  private static final String ANCHORSTR = "anchorstr";
-  private static final byte [] CONTENTS_FIRSTCOL = Bytes.toBytes("contents:firstcol");
-  private static final byte [] ANCHOR_SECONDCOL = Bytes.toBytes("anchor:secondcol");
-  
-  private MiniDFSCluster cluster = null;
-  private HTableDescriptor desc = null;
-  HRegion r = null;
-  HRegionIncommon region = null;
+  HRegion region = null;
+  private final String DIR = "test/build/data/TestHRegion/";
+  
+  private final int MAX_VERSIONS = 2;
   
-  private static int numInserted = 0;
-
   /**
    * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
    */
   @Override
   protected void setUp() throws Exception {
-    this.conf.set("hbase.hstore.compactionThreshold", "2");
-
-    conf.setLong("hbase.hregion.max.filesize", 65536);
-
-    cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
-    fs = cluster.getFileSystem();
-    
-    // Set the hbase.rootdir to be the home directory in mini dfs.
-    this.conf.set(HConstants.HBASE_DIR,
-      this.cluster.getFileSystem().getHomeDirectory().toString());
-    
     super.setUp();
   }
 
-  /**
-   * Since all the "tests" depend on the results of the previous test, they are
-   * not Junit tests that can stand alone. Consequently we have a single Junit
-   * test that runs the "sub-tests" as private methods.
-   * @throws IOException 
-   */
-  public void testHRegion() throws IOException {
-    try {
-      init();
-      locks();
-      badPuts();
-      basic();
-      scan();
-      splitAndMerge();
-      read();
-    } finally {
-      shutdownDfs(cluster);
-    }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // New tests that doesn't spin up a mini cluster but rather just test the 
+  // individual code pieces in the HRegion. Putting files locally in
+  // /tmp/testtable
+  //////////////////////////////////////////////////////////////////////////////
+  
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // checkAndPut tests
+  //////////////////////////////////////////////////////////////////////////////
+  public void testCheckAndPut_WithEmptyRowValue() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] qf1  = Bytes.toBytes("qualifier");
+    byte [] emptyVal  = new byte[] {};
+    byte [] val1  = Bytes.toBytes("value1");
+    byte [] val2  = Bytes.toBytes("value2");
+    Integer lockId = null;
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+    //Putting data in key
+    Put put = new Put(row1);
+    put.add(fam1, qf1, val1);
+
+    //checkAndPut with correct value
+    boolean res = region.checkAndPut(row1, fam1, qf1, emptyVal, put, lockId,
+        true);
+    assertTrue(res);
+
+    // not empty anymore
+    res = region.checkAndPut(row1, fam1, qf1, emptyVal, put, lockId, true);
+    assertFalse(res);
+
+    put = new Put(row1);
+    put.add(fam1, qf1, val2);
+    //checkAndPut with correct value
+    res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
+    assertTrue(res);
   }
-
-  // Create directories, start mini cluster, etc.
   
-  private void init() throws IOException {
-    desc = new HTableDescriptor("test");
-    desc.addFamily(new HColumnDescriptor("contents:"));
-    desc.addFamily(new HColumnDescriptor("anchor:"));
-    r = createNewHRegion(desc, null, null);
-    region = new HRegionIncommon(r);
-    LOG.info("setup completed.");
+  public void testCheckAndPut_WithWrongValue() throws IOException{
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] qf1  = Bytes.toBytes("qualifier");
+    byte [] val1  = Bytes.toBytes("value1");
+    byte [] val2  = Bytes.toBytes("value2");
+    Integer lockId = null;
+
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+
+    //Putting data in key
+    Put put = new Put(row1);
+    put.add(fam1, qf1, val1);
+    region.put(put);
+    
+    //checkAndPut with wrong value
+    boolean res = region.checkAndPut(row1, fam1, qf1, val2, put, lockId, true);
+    assertEquals(false, res);
   }
 
-  // Test basic functionality. Writes to contents:basic and anchor:anchornum-*
-
-  private void basic() throws IOException {
-    long startTime = System.currentTimeMillis();
+  public void testCheckAndPut_WithCorrectValue() throws IOException{
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] qf1  = Bytes.toBytes("qualifier");
+    byte [] val1  = Bytes.toBytes("value1");
+    Integer lockId = null;
+
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+
+    //Putting data in key
+    Put put = new Put(row1);
+    put.add(fam1, qf1, val1);
+    region.put(put);
+    
+    //checkAndPut with correct value
+    boolean res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
+    assertEquals(true, res);
+  }
+  
+  public void testCheckAndPut_ThatPutWasWritten() throws IOException{
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] fam2 = Bytes.toBytes("fam2");
+    byte [] qf1  = Bytes.toBytes("qualifier");
+    byte [] val1  = Bytes.toBytes("value1");
+    byte [] val2  = Bytes.toBytes("value2");
+    Integer lockId = null;
 
-    // Write out a bunch of values
-    for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
-      BatchUpdate batchUpdate = 
-        new BatchUpdate(Bytes.toBytes("row_" + k), System.currentTimeMillis());
-      batchUpdate.put(CONTENTS_BASIC,
-          (CONTENTSTR + k).getBytes(HConstants.UTF8_ENCODING));
-      batchUpdate.put(Bytes.toBytes(ANCHORNUM + k),
-          (ANCHORSTR + k).getBytes(HConstants.UTF8_ENCODING));
-      region.commit(batchUpdate);
+    byte [][] families = {fam1, fam2};
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+
+    //Putting data in the key to check
+    Put put = new Put(row1);
+    put.add(fam1, qf1, val1);
+    region.put(put);
+    
+    //Creating put to add
+    long ts = System.currentTimeMillis();
+    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
+    put = new Put(row1);
+    put.add(kv);
+    
+    //checkAndPut with wrong value
+    Store store = region.getStore(fam1);
+    int size = store.memcache.memcache.size();
+    
+    boolean res = region.checkAndPut(row1, fam1, qf1, val1, put, lockId, true);
+    assertEquals(true, res);
+    size = store.memcache.memcache.size();
+    
+    Get get = new Get(row1);
+    get.addColumn(fam2, qf1);
+    KeyValue [] actual = region.get(get, null).raw();
+    
+    KeyValue [] expected = {kv};
+    
+    assertEquals(expected.length, actual.length);
+    for(int i=0; i<actual.length; i++) {
+      assertEquals(expected[i], actual[i]);
     }
-    LOG.info("Write " + NUM_VALS + " rows. Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    // Flush cache
-
-    startTime = System.currentTimeMillis();
-
-    region.flushcache();
-
-    LOG.info("Cache flush elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
+    
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Delete tests
+  //////////////////////////////////////////////////////////////////////////////
+  public void testDelete_CheckFamily() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] fam2 = Bytes.toBytes("fam2");
+    byte [] fam3 = Bytes.toBytes("fam3");
+    byte [] fam4 = Bytes.toBytes("fam4");
+
+    byte[][] families  = {fam1, fam2, fam3};
+
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
 
-    // Read them back in
+    List<KeyValue> kvs  = new ArrayList<KeyValue>();
+    kvs.add(new KeyValue(row1, fam4, null, null));
 
-    startTime = System.currentTimeMillis();
+    //testing existing family
+    byte [] family = fam2;
+    try {
+      region.delete(family, kvs, true);
+    } catch (Exception e) {
+      assertTrue("Family " +new String(family)+ " does not exist", false);
+    }
 
-    byte [] collabel = null;
-    for (int k = FIRST_ROW; k <= NUM_VALS; k++) {
-      String rowlabelStr = "row_" + k;
-      byte [] rowlabel = Bytes.toBytes(rowlabelStr);
-      if (k % 100 == 0) LOG.info(Bytes.toString(rowlabel));
-      Cell c = region.get(rowlabel, CONTENTS_BASIC);
-      assertNotNull("K is " + k, c);
-      byte [] bodydata = c.getValue();
-      assertNotNull(bodydata);
-      String bodystr = new String(bodydata, HConstants.UTF8_ENCODING).trim();
-      String teststr = CONTENTSTR + k;
-      assertEquals("Incorrect value for key: (" + rowlabelStr + "," + CONTENTS_BASIC_STR
-          + "), expected: '" + teststr + "' got: '" + bodystr + "'",
-          bodystr, teststr);
-      String collabelStr = ANCHORNUM + k;
-      collabel = Bytes.toBytes(collabelStr);
-      bodydata = region.get(rowlabel, collabel).getValue();
-      bodystr = new String(bodydata, HConstants.UTF8_ENCODING).trim();
-      teststr = ANCHORSTR + k;
-      assertEquals("Incorrect value for key: (" + rowlabelStr + "," + collabelStr
-          + "), expected: '" + teststr + "' got: '" + bodystr + "'",
-          bodystr, teststr);
-    }
-
-    LOG.info("Read " + NUM_VALS + " rows. Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    LOG.info("basic completed.");
-  }
-  
-  private void badPuts() {
-    // Try column name not registered in the table.    
-    boolean exceptionThrown = false;
-    exceptionThrown = false;
+    //testing non existing family 
+    boolean ok = false; 
+    family = fam4;
     try {
-      BatchUpdate batchUpdate = new BatchUpdate(Bytes.toBytes("Some old key"));
-      String unregisteredColName = "FamilyGroup:FamilyLabel";
-      batchUpdate.put(Bytes.toBytes(unregisteredColName),
-        unregisteredColName.getBytes(HConstants.UTF8_ENCODING));
-      region.commit(batchUpdate);
-    } catch (IOException e) {
-      exceptionThrown = true;
-    } finally {
+      region.delete(family, kvs, true);
+    } catch (Exception e) {
+      ok = true;
     }
-    assertTrue("Bad family", exceptionThrown);
-    LOG.info("badPuts completed.");
+    assertEquals("Family " +new String(family)+ " does exist", true, ok);
   }
   
-  /**
-   * Test getting and releasing locks.
-   */
-  private void locks() {
+  //Visual test, since the method doesn't return anything
+  public void testDelete_CheckTimestampUpdated()
+  throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] col1 = Bytes.toBytes("col1");
+    byte [] col2 = Bytes.toBytes("col2");
+    byte [] col3 = Bytes.toBytes("col3");
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+    
+    //Building checkerList 
+    List<KeyValue> kvs  = new ArrayList<KeyValue>();
+    kvs.add(new KeyValue(row1, fam1, col1, null));
+    kvs.add(new KeyValue(row1, fam1, col2, null));
+    kvs.add(new KeyValue(row1, fam1, col3, null));
+    
+    region.delete(fam1, kvs, true);
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Get tests
+  //////////////////////////////////////////////////////////////////////////////
+  public void testGet_FamilyChecker() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] fam2 = Bytes.toBytes("False");
+    byte [] col1 = Bytes.toBytes("col1");
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+    
+    Get get = new Get(row1);
+    get.addColumn(fam2, col1);
+    
+    //Test
+    try {
+      region.get(get, null);
+    } catch (NoSuchColumnFamilyException e){
+      assertFalse(false);
+      return;
+    }
+    assertFalse(true);
+  }
+  
+  public void testGet_Basic() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] col1 = Bytes.toBytes("col1");
+    byte [] col2 = Bytes.toBytes("col2");
+    byte [] col3 = Bytes.toBytes("col3");
+    byte [] col4 = Bytes.toBytes("col4");
+    byte [] col5 = Bytes.toBytes("col5");
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, fam1);
+    
+    //Add to memcache
+    Put put = new Put(row1);
+    put.add(fam1, col1, null);
+    put.add(fam1, col2, null);
+    put.add(fam1, col3, null);
+    put.add(fam1, col4, null);
+    put.add(fam1, col5, null);
+    region.put(put);
+    
+    Get get = new Get(row1);
+    get.addColumn(fam1, col2);
+    get.addColumn(fam1, col4);
+    //Expected result
+    KeyValue kv1 = new KeyValue(row1, fam1, col2);
+    KeyValue kv2 = new KeyValue(row1, fam1, col4);
+    KeyValue [] expected = {kv1, kv2};
+    
+    //Test
+    Result res = region.get(get, null);
+    
+    assertEquals(expected.length, res.size());
+    for(int i=0; i<res.size(); i++){
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getRow(), res.raw()[i].getRow()));
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getFamily(), res.raw()[i].getFamily()));
+      assertEquals(0,
+          Bytes.compareTo(
+              expected[i].getQualifier(), res.raw()[i].getQualifier()));
+    }
+  }
+  
+  public void testGet_Empty() throws IOException {
+    byte [] tableName = Bytes.toBytes("emptytable");
+    byte [] row = Bytes.toBytes("row");
+    byte [] fam = Bytes.toBytes("fam");
+    
+    String method = this.getName();
+    initHRegion(tableName, method, fam);
+    
+    Get get = new Get(row);
+    get.addFamily(fam);
+    Result r = region.get(get, null);
+    
+    assertTrue(r.isEmpty());
+  }
+  
+  //Test that checked if there was anything special when reading from the ROOT
+  //table. To be able to use this test you need to comment the part in
+  //HTableDescriptor that checks for '-' and '.'. You also need to remove the
+  //s in the beginning of the name.
+  public void stestGet_Root() throws IOException {
+    //Setting up region
+    String method = this.getName();
+    initHRegion(HConstants.ROOT_TABLE_NAME, method, HConstants.CATALOG_FAMILY);
+
+    //Add to memcache
+    Put put = new Put(HConstants.EMPTY_START_ROW);
+    put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, null);
+    region.put(put);
+    
+    Get get = new Get(HConstants.EMPTY_START_ROW);
+    get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+
+    //Expected result
+    KeyValue kv1 = new KeyValue(HConstants.EMPTY_START_ROW,
+        HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+    KeyValue [] expected = {kv1};
+    
+    //Test from memcache
+    Result res = region.get(get, null);
+    
+    assertEquals(expected.length, res.size());
+    for(int i=0; i<res.size(); i++){
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getRow(), res.raw()[i].getRow()));
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getFamily(), res.raw()[i].getFamily()));
+      assertEquals(0,
+          Bytes.compareTo(
+              expected[i].getQualifier(), res.raw()[i].getQualifier()));
+    }
+    
+    //flush
+    region.flushcache();
+    
+    //test2
+    res = region.get(get, null);
+    
+    assertEquals(expected.length, res.size());
+    for(int i=0; i<res.size(); i++){
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getRow(), res.raw()[i].getRow()));
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getFamily(), res.raw()[i].getFamily()));
+      assertEquals(0,
+          Bytes.compareTo(
+              expected[i].getQualifier(), res.raw()[i].getQualifier()));
+    }
+    
+    //Scan
+    Scan scan = new Scan();
+    scan.addFamily(HConstants.CATALOG_FAMILY);
+    InternalScanner s = region.getScanner(scan);
+    List<KeyValue> result = new ArrayList<KeyValue>();
+    s.next(result);
+    
+    assertEquals(expected.length, result.size());
+    for(int i=0; i<res.size(); i++){
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getRow(), result.get(i).getRow()));
+      assertEquals(0,
+          Bytes.compareTo(expected[i].getFamily(), result.get(i).getFamily()));
+      assertEquals(0,
+          Bytes.compareTo(
+              expected[i].getQualifier(), result.get(i).getQualifier()));
+    }
+  }  
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Lock test
+  ////////////////////////////////////////////////////////////////////////////// 
+  public void testLocks() throws IOException{
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [][] families = {fam1, fam2, fam3};
+    
+    HBaseConfiguration hc = initSplit();
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, hc, families);
+    
     final int threadCount = 10;
     final int lockCount = 10;
     
@@ -209,8 +448,8 @@
           for (int i = 0; i < lockCount; i++) {
             try {
               byte [] rowid = Bytes.toBytes(Integer.toString(i));
-              lockids[i] = r.obtainRowLock(rowid);
-              assertEquals(rowid, r.getRowFromLock(lockids[i]));
+              lockids[i] = region.obtainRowLock(rowid);
+              assertEquals(rowid, region.getRowFromLock(lockids[i]));
               LOG.debug(getName() + " locked " + Bytes.toString(rowid));
             } catch (IOException e) {
               e.printStackTrace();
@@ -221,7 +460,7 @@
           
           // Abort outstanding locks.
           for (int i = lockCount - 1; i >= 0; i--) {
-            r.releaseRowLock(lockids[i]);
+            region.releaseRowLock(lockids[i]);
             LOG.debug(getName() + " unlocked " + i);
           }
           LOG.debug(getName() + " released " +
@@ -247,409 +486,938 @@
     }
     LOG.info("locks completed.");
   }
-
-  // Test scanners. Writes contents:firstcol and anchor:secondcol
   
-  private void scan() throws IOException {
-    byte [] cols [] = {
-        CONTENTS_FIRSTCOL,
-        ANCHOR_SECONDCOL
-    };
-
-    // Test the Scanner!!!
-    String[] vals1 = new String[1000];
-    for(int k = 0; k < vals1.length; k++) {
-      vals1[k] = Integer.toString(k);
-    }
-
-    // 1.  Insert a bunch of values
-    long startTime = System.currentTimeMillis();
-    for(int k = 0; k < vals1.length / 2; k++) {
-      String kLabel = String.format("%1$03d", k);
-
-      BatchUpdate batchUpdate = 
-        new BatchUpdate(Bytes.toBytes("row_vals1_" + kLabel), 
-          System.currentTimeMillis());
-      batchUpdate.put(cols[0], vals1[k].getBytes(HConstants.UTF8_ENCODING));
-      batchUpdate.put(cols[1], vals1[k].getBytes(HConstants.UTF8_ENCODING));
-      region.commit(batchUpdate);
-      numInserted += 2;
-    }
-    LOG.info("Write " + (vals1.length / 2) + " elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    // 2.  Scan from cache
-    startTime = System.currentTimeMillis();
-    ScannerIncommon s = this.region.getScanner(cols, HConstants.EMPTY_START_ROW,
-      System.currentTimeMillis());
-    int numFetched = 0;
+  //////////////////////////////////////////////////////////////////////////////
+  // Merge test
+  ////////////////////////////////////////////////////////////////////////////// 
+  public void testMerge() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [][] families = {fam1, fam2, fam3};
+    
+    HBaseConfiguration hc = initSplit();
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, hc, families);
+    
     try {
-      List<KeyValue> curVals = new ArrayList<KeyValue>();
-      int k = 0;
-      while(s.next(curVals)) {
-        for (KeyValue kv: curVals) {
-          byte [] val = kv.getValue();
-          int curval =
-            Integer.parseInt(new String(val, HConstants.UTF8_ENCODING).trim());
-          for(int j = 0; j < cols.length; j++) {
-            if (!kv.matchingColumn(cols[j])) {
-              assertEquals("Error at: " + kv  + " " + Bytes.toString(cols[j]),
-                k, curval);
-              numFetched++;
-              break;
-            }
+      LOG.info("" + addContent(region, fam3));
+      region.flushcache();
+      byte [] splitRow = region.compactStores();
+      assertNotNull(splitRow);
+      LOG.info("SplitRow: " + Bytes.toString(splitRow));
+      HRegion [] regions = split(region, splitRow);
+      try {
+        // Need to open the regions.
+        // TODO: Add an 'open' to HRegion... don't do open by constructing
+        // instance.
+        for (int i = 0; i < regions.length; i++) {
+          regions[i] = openClosedRegion(regions[i]);
+        }
+        Path oldRegionPath = region.getRegionDir();
+        long startTime = System.currentTimeMillis();
+        HRegion subregions [] = region.splitRegion(splitRow);
+        if (subregions != null) {
+          LOG.info("Split region elapsed time: "
+              + ((System.currentTimeMillis() - startTime) / 1000.0));
+          assertEquals("Number of subregions", subregions.length, 2);
+          for (int i = 0; i < subregions.length; i++) {
+            subregions[i] = openClosedRegion(subregions[i]);
+            subregions[i].compactStores();
+          }
+
+          // Now merge it back together
+          Path oldRegion1 = subregions[0].getRegionDir();
+          Path oldRegion2 = subregions[1].getRegionDir();
+          startTime = System.currentTimeMillis();
+          region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
+          LOG.info("Merge regions elapsed time: " +
+              ((System.currentTimeMillis() - startTime) / 1000.0));
+          fs.delete(oldRegion1, true);
+          fs.delete(oldRegion2, true);
+          fs.delete(oldRegionPath, true);
+        }
+        LOG.info("splitAndMerge completed.");
+      } finally {
+        for (int i = 0; i < regions.length; i++) {
+          try {
+            regions[i].close();
+          } catch (IOException e) {
+            // Ignore.
           }
         }
-        curVals.clear();
-        k++;
       }
     } finally {
-      s.close();
+      if (region != null) {
+        region.close();
+        region.getLog().closeAndDelete();
+      }
     }
-    assertEquals(numInserted, numFetched);
-
-    LOG.info("Scanned " + (vals1.length / 2)
-        + " rows from cache. Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Scanner tests
+  //////////////////////////////////////////////////////////////////////////////
+  public void testGetScanner_WithOkFamilies() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] fam2 = Bytes.toBytes("fam2");
+    
+    byte [][] families = {fam1, fam2};
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    Scan scan = new Scan();
+    scan.addFamily(fam1);
+    scan.addFamily(fam2);
+    try {
+      InternalScanner is = region.getScanner(scan);
+    } catch (Exception e) {
+      assertTrue("Families could not be found in Region", false);
+    }
+  }
+  
+  public void testGetScanner_WithNotOkFamilies() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] fam2 = Bytes.toBytes("fam2");
+
+    byte [][] families = {fam1};
+
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+
+    Scan scan = new Scan();
+    scan.addFamily(fam2);
+    boolean ok = false;
+    try {
+      InternalScanner is = region.getScanner(scan);
+    } catch (Exception e) {
+      ok = true;
+    }
+    assertTrue("Families could not be found in Region", ok);
+  }
+  
+  public void testGetScanner_WithNoFamilies() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] fam2 = Bytes.toBytes("fam2");
+    byte [] fam3 = Bytes.toBytes("fam3");
+    byte [] fam4 = Bytes.toBytes("fam4");
+    
+    byte [][] families = {fam1, fam2, fam3, fam4};
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    Put put = new Put(row1);
+    put.add(fam1, null, null);
+    put.add(fam2, null, null);
+    put.add(fam3, null, null);
+    put.add(fam4, null, null);
+    region.put(put);
+    
+    Scan scan = null;
+    InternalScanner is = null;
+    
+    //Testing to see how many scanners that is produced by getScanner, starting 
+    //with known number, 2 - current = 1
+    scan = new Scan();
+    scan.addFamily(fam2);
+    scan.addFamily(fam4);
+    is = region.getScanner(scan);
+    assertEquals(1, ((RegionScanner)is).getStoreHeap().getHeap().size());
+    
+    scan = new Scan();
+    is = region.getScanner(scan);
+    assertEquals(families.length -1, 
+        ((RegionScanner)is).getStoreHeap().getHeap().size());
+  }
 
-    // 3.  Flush to disk
-    startTime = System.currentTimeMillis();
+  public void testRegionScanner_Next() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] row2 = Bytes.toBytes("row2");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [] fam2 = Bytes.toBytes("fam2");
+    byte [] fam3 = Bytes.toBytes("fam3");
+    byte [] fam4 = Bytes.toBytes("fam4");
+    
+    byte [][] families = {fam1, fam2, fam3, fam4};
+    long ts = System.currentTimeMillis();
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    Put put = null;
+    put = new Put(row1);
+    put.add(fam1, null, ts, null);
+    put.add(fam2, null, ts, null);
+    put.add(fam3, null, ts, null);
+    put.add(fam4, null, ts, null);
+    region.put(put);
+
+    put = new Put(row2);
+    put.add(fam1, null, ts, null);
+    put.add(fam2, null, ts, null);
+    put.add(fam3, null, ts, null);
+    put.add(fam4, null, ts, null);
+    region.put(put);
+    
+    Scan scan = new Scan();
+    scan.addFamily(fam2);
+    scan.addFamily(fam4);
+    InternalScanner is = region.getScanner(scan);
+    
+    List<KeyValue> res = null;
+    
+    //Result 1
+    List<KeyValue> expected1 = new ArrayList<KeyValue>();
+    expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
+    expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
+    
+    res = new ArrayList<KeyValue>();
+    is.next(res);
+    for(int i=0; i<res.size(); i++) {
+      assertEquals(expected1.get(i), res.get(i));
+    }
+    
+    //Result 2
+    List<KeyValue> expected2 = new ArrayList<KeyValue>();
+    expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
+    expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
+    
+    res = new ArrayList<KeyValue>();
+    is.next(res);
+    for(int i=0; i<res.size(); i++) {
+      assertEquals(expected2.get(i), res.get(i));
+    }
+    
+  }
+  
+  public void testScanner_ExplicitColumns_FromMemcache_EnforceVersions() 
+  throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] qf1 = Bytes.toBytes("qualifier1");
+    byte [] qf2 = Bytes.toBytes("qualifier2");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [][] families = {fam1};
+    
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    Put put = null;
+    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
+    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
+    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
+    
+    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
+    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
+    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
+    
+    put = new Put(row1);
+    put.add(kv13);
+    put.add(kv12);
+    put.add(kv11);
+    put.add(kv23);
+    put.add(kv22);
+    put.add(kv21);
+    region.put(put);
+    
+    //Expected
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(kv13);
+    expected.add(kv12);
+    
+    Scan scan = new Scan(row1);
+    scan.addColumn(fam1, qf1);
+    scan.setMaxVersions(MAX_VERSIONS);
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    InternalScanner scanner = region.getScanner(scan);
+    
+    boolean hasNext = scanner.next(actual);
+    assertEquals(false, hasNext);
+    
+    //Verify result
+    for(int i=0; i<expected.size(); i++) {
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+  
+  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() 
+  throws IOException{
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] qf1 = Bytes.toBytes("qualifier1");
+    byte [] qf2 = Bytes.toBytes("qualifier2");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [][] families = {fam1};
+    
+    long ts1 = 1; //System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    Put put = null;
+    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
+    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
+    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
+    
+    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
+    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
+    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
+    
+    put = new Put(row1);
+    put.add(kv13);
+    put.add(kv12);
+    put.add(kv11);
+    put.add(kv23);
+    put.add(kv22);
+    put.add(kv21);
+    region.put(put);
     region.flushcache();
-    LOG.info("Cache flush elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    // 4.  Scan from disk
-    startTime = System.currentTimeMillis();
-    s = this.region.getScanner(cols, HConstants.EMPTY_START_ROW,
-      System.currentTimeMillis());
-    numFetched = 0;
+    
+    //Expected
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(kv13);
+    expected.add(kv12);
+    expected.add(kv23);
+    expected.add(kv22);
+    
+    Scan scan = new Scan(row1);
+    scan.addColumn(fam1, qf1);
+    scan.addColumn(fam1, qf2);
+    scan.setMaxVersions(MAX_VERSIONS);
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    InternalScanner scanner = region.getScanner(scan);
+    
+    boolean hasNext = scanner.next(actual);
+    assertEquals(false, hasNext);
+    
+    //Verify result
+    for(int i=0; i<expected.size(); i++) {
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+  
+  public void testScanner_ExplicitColumns_FromMemcacheAndFiles_EnforceVersions()
+  throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [][] families = {fam1};
+    byte [] qf1 = Bytes.toBytes("qualifier1");
+    byte [] qf2 = Bytes.toBytes("qualifier2");
+    
+    long ts1 = 1;
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    long ts4 = ts1 + 3;
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
+    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
+    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
+    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
+    
+    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
+    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
+    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
+    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
+    
+    Put put = null;
+    put = new Put(row1);
+    put.add(kv14);
+    put.add(kv24);
+    region.put(put);
+    region.flushcache();
+    
+    put = new Put(row1);
+    put.add(kv23);
+    put.add(kv13);
+    region.put(put);
+    region.flushcache();
+    
+    put = new Put(row1);
+    put.add(kv22);
+    put.add(kv12);
+    region.put(put);
+    region.flushcache();
+    
+    put = new Put(row1);
+    put.add(kv21);
+    put.add(kv11);
+    region.put(put);
+    
+    //Expected
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(kv14);
+    expected.add(kv13);
+    expected.add(kv12);
+    expected.add(kv24);
+    expected.add(kv23);
+    expected.add(kv22);
+    
+    Scan scan = new Scan(row1);
+    scan.addColumn(fam1, qf1);
+    scan.addColumn(fam1, qf2);
+    int versions = 3;
+    scan.setMaxVersions(versions);
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    InternalScanner scanner = region.getScanner(scan);
+    
+    boolean hasNext = scanner.next(actual);
+    assertEquals(false, hasNext);
+    
+    //Verify result
+    for(int i=0; i<expected.size(); i++) {
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+  
+  public void testScanner_Wildcard_FromMemcache_EnforceVersions() 
+  throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] qf1 = Bytes.toBytes("qualifier1");
+    byte [] qf2 = Bytes.toBytes("qualifier2");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [][] families = {fam1};
+    
+    long ts1 = System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    Put put = null;
+    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
+    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
+    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
+    
+    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
+    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
+    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
+    
+    put = new Put(row1);
+    put.add(kv13);
+    put.add(kv12);
+    put.add(kv11);
+    put.add(kv23);
+    put.add(kv22);
+    put.add(kv21);
+    region.put(put);
+    
+    //Expected
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(kv13);
+    expected.add(kv12);
+    expected.add(kv23);
+    expected.add(kv22);
+    
+    Scan scan = new Scan(row1);
+    scan.addFamily(fam1);
+    scan.setMaxVersions(MAX_VERSIONS);
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    InternalScanner scanner = region.getScanner(scan);
+    
+    boolean hasNext = scanner.next(actual);
+    assertEquals(false, hasNext);
+    
+    //Verify result
+    for(int i=0; i<expected.size(); i++) {
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+  
+  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() 
+  throws IOException{
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] qf1 = Bytes.toBytes("qualifier1");
+    byte [] qf2 = Bytes.toBytes("qualifier2");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [][] families = {fam1};
+    
+    long ts1 = 1; //System.currentTimeMillis();
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    Put put = null;
+    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
+    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
+    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
+    
+    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
+    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
+    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
+    
+    put = new Put(row1);
+    put.add(kv13);
+    put.add(kv12);
+    put.add(kv11);
+    put.add(kv23);
+    put.add(kv22);
+    put.add(kv21);
+    region.put(put);
+    region.flushcache();
+    
+    //Expected
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(kv13);
+    expected.add(kv12);
+    expected.add(kv23);
+    expected.add(kv22);
+    
+    Scan scan = new Scan(row1);
+    scan.addFamily(fam1);
+    scan.setMaxVersions(MAX_VERSIONS);
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    InternalScanner scanner = region.getScanner(scan);
+    
+    boolean hasNext = scanner.next(actual);
+    assertEquals(false, hasNext);
+    
+    //Verify result
+    for(int i=0; i<expected.size(); i++) {
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+  
+  public void testScanner_Wildcard_FromMemcacheAndFiles_EnforceVersions()
+  throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] row1 = Bytes.toBytes("row1");
+    byte [] fam1 = Bytes.toBytes("fam1");
+    byte [][] families = {fam1};
+    byte [] qf1 = Bytes.toBytes("qualifier1");
+    byte [] qf2 = Bytes.toBytes("qualifier2");
+    
+    long ts1 = 1;
+    long ts2 = ts1 + 1;
+    long ts3 = ts1 + 2;
+    long ts4 = ts1 + 3;
+    
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, families);
+    
+    //Putting data in Region
+    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
+    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
+    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
+    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
+    
+    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
+    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
+    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
+    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
+    
+    Put put = null;
+    put = new Put(row1);
+    put.add(kv14);
+    put.add(kv24);
+    region.put(put);
+    region.flushcache();
+    
+    put = new Put(row1);
+    put.add(kv23);
+    put.add(kv13);
+    region.put(put);
+    region.flushcache();
+    
+    put = new Put(row1);
+    put.add(kv22);
+    put.add(kv12);
+    region.put(put);
+    region.flushcache();
+    
+    put = new Put(row1);
+    put.add(kv21);
+    put.add(kv11);
+    region.put(put);
+    
+    //Expected
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(kv14);
+    expected.add(kv13);
+    expected.add(kv12);
+    expected.add(kv24);
+    expected.add(kv23);
+    expected.add(kv22);
+    
+    Scan scan = new Scan(row1);
+    int versions = 3;
+    scan.setMaxVersions(versions);
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    InternalScanner scanner = region.getScanner(scan);
+    
+    boolean hasNext = scanner.next(actual);
+    assertEquals(false, hasNext);
+    
+    //Verify result
+    for(int i=0; i<expected.size(); i++) {
+      assertEquals(expected.get(i), actual.get(i));
+    }
+  }
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Split test
+  //////////////////////////////////////////////////////////////////////////////
+  /**
+   * Splits twice and verifies getting from each of the split regions.
+   * @throws Exception
+   */
+  public void testBasicSplit() throws Exception {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [][] families = {fam1, fam2, fam3};
+    
+    HBaseConfiguration hc = initSplit();
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, hc, families);
+    
     try {
-      List<KeyValue> curVals = new ArrayList<KeyValue>();
-      int k = 0;
-      while(s.next(curVals)) {
-        for(Iterator<KeyValue> it = curVals.iterator(); it.hasNext(); ) {
-          KeyValue kv = it.next();
-          byte [] col = kv.getColumn();
-          byte [] val = kv.getValue();
-          int curval =
-            Integer.parseInt(new String(val, HConstants.UTF8_ENCODING).trim());
-          for(int j = 0; j < cols.length; j++) {
-            if (Bytes.compareTo(col, cols[j]) == 0) {
-              assertEquals("Error at:" + kv.getRow() + "/"
-                  + kv.getTimestamp()
-                  + ", Value for " + col + " should be: " + k
-                  + ", but was fetched as: " + curval, k, curval);
-              numFetched++;
-            }
+      LOG.info("" + addContent(region, fam3));
+      region.flushcache();
+      byte [] splitRow = region.compactStores();
+      assertNotNull(splitRow);
+      LOG.info("SplitRow: " + Bytes.toString(splitRow));
+      HRegion [] regions = split(region, splitRow);
+      try {
+        // Need to open the regions.
+        // TODO: Add an 'open' to HRegion... don't do open by constructing
+        // instance.
+        for (int i = 0; i < regions.length; i++) {
+          regions[i] = openClosedRegion(regions[i]);
+        }
+        // Assert can get rows out of new regions. Should be able to get first
+        // row from first region and the midkey from second region.
+        assertGet(regions[0], fam3, Bytes.toBytes(START_KEY));
+        assertGet(regions[1], fam3, splitRow);
+        // Test I can get scanner and that it starts at right place.
+        assertScan(regions[0], fam3,
+            Bytes.toBytes(START_KEY));
+        assertScan(regions[1], fam3, splitRow);
+        // Now prove can't split regions that have references.
+        for (int i = 0; i < regions.length; i++) {
+          // Add so much data to this region, we create a store file that is >
+          // than one of our unsplitable references. it will.
+          for (int j = 0; j < 2; j++) {
+            addContent(regions[i], fam3);
           }
+          addContent(regions[i], fam2);
+          addContent(regions[i], fam1);
+          regions[i].flushcache();
         }
-        curVals.clear();
-        k++;
-      }
-    } finally {
-      s.close();
-    }
-    assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched, numInserted, numFetched);
 
-    LOG.info("Scanned " + (vals1.length / 2)
-        + " rows from disk. Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    // 5.  Insert more values
-    startTime = System.currentTimeMillis();
-    for(int k = vals1.length/2; k < vals1.length; k++) {
-      String kLabel = String.format("%1$03d", k);
-      BatchUpdate batchUpdate = 
-        new BatchUpdate(Bytes.toBytes("row_vals1_" + kLabel), 
-          System.currentTimeMillis());
-      batchUpdate.put(cols[0], vals1[k].getBytes(HConstants.UTF8_ENCODING));
-      batchUpdate.put(cols[1], vals1[k].getBytes(HConstants.UTF8_ENCODING));
-      region.commit(batchUpdate);
-      numInserted += 2;
-    }
-
-    LOG.info("Write " + (vals1.length / 2) + " rows (second half). Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    // 6.  Scan from cache and disk
-    startTime = System.currentTimeMillis();
-    s = this.region.getScanner(cols, HConstants.EMPTY_START_ROW,
-        System.currentTimeMillis());
-    numFetched = 0;
-    try {
-      List<KeyValue> curVals = new ArrayList<KeyValue>();
-      int k = 0;
-      while(s.next(curVals)) {
-        for(Iterator<KeyValue> it = curVals.iterator(); it.hasNext(); ) {
-          KeyValue kv = it.next();
-          byte [] col = kv.getColumn();
-          byte [] val = kv.getValue();
-          int curval =
-            Integer.parseInt(new String(val, HConstants.UTF8_ENCODING).trim());
-          for(int j = 0; j < cols.length; j++) {
-            if(Bytes.compareTo(col, cols[j]) == 0) {
-              assertEquals("Error at:" + kv.getRow() + "/"
-                  + kv.getTimestamp()
-                  + ", Value for " + col + " should be: " + k
-                  + ", but was fetched as: " + curval, k, curval);
-              numFetched++;
+        byte [][] midkeys = new byte [regions.length][];
+        // To make regions splitable force compaction.
+        for (int i = 0; i < regions.length; i++) {
+          midkeys[i] = regions[i].compactStores();
+        }
+
+        TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
+        // Split these two daughter regions so then I'll have 4 regions. Will
+        // split because added data above.
+        for (int i = 0; i < regions.length; i++) {
+          HRegion[] rs = null;
+          if (midkeys[i] != null) {
+            rs = split(regions[i], midkeys[i]);
+            for (int j = 0; j < rs.length; j++) {
+              sortedMap.put(Bytes.toString(rs[j].getRegionName()),
+                openClosedRegion(rs[j]));
             }
           }
         }
-        curVals.clear();
-        k++;
+        LOG.info("Made 4 regions");
+        // The splits should have been even. Test I can get some arbitrary row
+        // out of each.
+        int interval = (LAST_CHAR - FIRST_CHAR) / 3;
+        byte[] b = Bytes.toBytes(START_KEY);
+        for (HRegion r : sortedMap.values()) {
+          assertGet(r, fam3, b);
+          b[0] += interval;
+        }
+      } finally {
+        for (int i = 0; i < regions.length; i++) {
+          try {
+            regions[i].close();
+          } catch (IOException e) {
+            // Ignore.
+          }
+        }
       }
     } finally {
-      s.close();
+      if (region != null) {
+        region.close();
+        region.getLog().closeAndDelete();
+      }
     }
-    assertEquals("Inserted " + numInserted + " values, but fetched " +
-      numFetched, numInserted, numFetched);
-
-    LOG.info("Scanned " + vals1.length
-        + " rows from cache and disk. Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
+  }
+  
+  public void testSplitRegion() throws IOException {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [] qualifier = Bytes.toBytes("qualifier");
+    HBaseConfiguration hc = initSplit();
+    int numRows = 10;
+    byte [][] families = {fam1, fam3};
     
-    // 7.  Flush to disk
-    startTime = System.currentTimeMillis();
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, hc, families);
+
+    //Put data in region
+    int startRow = 100;
+    putData(startRow, numRows, qualifier, families);
+    int splitRow = startRow + numRows;
+    putData(splitRow, numRows, qualifier, families);
     region.flushcache();
-    LOG.info("Cache flush elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
     
-    // 8.  Scan from disk
-    startTime = System.currentTimeMillis();
-    s = this.region.getScanner(cols, HConstants.EMPTY_START_ROW,
-      System.currentTimeMillis());
-    numFetched = 0;
+    HRegion [] regions = null;
     try {
-      List<KeyValue> curVals = new ArrayList<KeyValue>();
-      int k = 0;
-      while(s.next(curVals)) {
-        for(Iterator<KeyValue> it = curVals.iterator(); it.hasNext(); ) {
-          KeyValue kv = it.next();
-          byte [] col = kv.getColumn();
-          byte [] val = kv.getValue();
-          int curval =
-            Integer.parseInt(new String(val, HConstants.UTF8_ENCODING).trim());
-          for (int j = 0; j < cols.length; j++) {
-            if (Bytes.compareTo(col, cols[j]) == 0) {
-              assertEquals("Value for " + Bytes.toString(col) + " should be: " + k
-                  + ", but was fetched as: " + curval, curval, k);
-              numFetched++;
-            }
-          }
-        }
-        curVals.clear();
-        k++;
+      regions = region.splitRegion(Bytes.toBytes("" + splitRow));
+      //Opening the regions returned. 
+      for (int i = 0; i < regions.length; i++) {
+        regions[i] = openClosedRegion(regions[i]);
       }
+      //Verifying that the region has been split
+      assertEquals(2, regions.length);
+      
+      //Verifying that all data is still there and that data is in the right
+      //place
+      verifyData(regions[0], startRow, numRows, qualifier, families);
+      verifyData(regions[1], splitRow, numRows, qualifier, families);
+      
     } finally {
-      s.close();
+      if (region != null) {
+        region.close();
+        region.getLog().closeAndDelete();
+      }
     }
-    assertEquals("Inserted " + numInserted + " values, but fetched " + numFetched,
-      numInserted, numFetched);
-    LOG.info("Scanned " + vals1.length
-        + " rows from disk. Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    // 9. Scan with a starting point
-    startTime = System.currentTimeMillis();
-    s = this.region.getScanner(cols, Bytes.toBytes("row_vals1_500"),
-        System.currentTimeMillis());
-    numFetched = 0;
-    try {
-      List<KeyValue> curVals = new ArrayList<KeyValue>();
-      int k = 500;
-      while(s.next(curVals)) {
-        for(Iterator<KeyValue> it = curVals.iterator(); it.hasNext(); ) {
-          KeyValue kv = it.next();
-          byte [] col = kv.getColumn();
-          byte [] val = kv.getValue();
-          int curval =
-            Integer.parseInt(new String(val, HConstants.UTF8_ENCODING).trim());
-          for (int j = 0; j < cols.length; j++) {
-            if (Bytes.compareTo(col, cols[j]) == 0) {
-              assertEquals("Value for " + Bytes.toString(col) + " should be: " + k
-                  + ", but was fetched as: " + curval, curval, k);
-              numFetched++;
-            }
-          }
-        }
-        curVals.clear();
-        k++;
+  }
+  
+  private void putData(int startRow, int numRows, byte [] qf,
+      byte [] ...families)
+  throws IOException {
+    for(int i=startRow; i<startRow+numRows; i++) {
+      Put put = new Put(Bytes.toBytes("" + i));
+      for(byte [] family : families) {
+        put.add(family, qf, null);
       }
-    } finally {
-      s.close();
+      region.put(put);
     }
-    assertEquals("Should have fetched " + (numInserted / 2) +
-      " values, but fetched " + numFetched, (numInserted / 2), numFetched);
-    
-    LOG.info("Scanned " + (numFetched / 2)
-        + " rows from disk with specified start point. Elapsed time: "
-        + ((System.currentTimeMillis() - startTime) / 1000.0));
-
-    LOG.info("scan completed.");
-  }
-  
-  // NOTE: This test depends on testBatchWrite succeeding
-  private void splitAndMerge() throws IOException {
-    Path oldRegionPath = r.getRegionDir();
-    byte [] splitRow = r.compactStores();
-    assertNotNull(splitRow);
-    long startTime = System.currentTimeMillis();
-    HRegion subregions [] = r.splitRegion(splitRow);
-    if (subregions != null) {
-      LOG.info("Split region elapsed time: "
-          + ((System.currentTimeMillis() - startTime) / 1000.0));
-      assertEquals("Number of subregions", subregions.length, 2);
-      for (int i = 0; i < subregions.length; i++) {
-        subregions[i] = openClosedRegion(subregions[i]);
-        subregions[i].compactStores();
+  }
+
+  private void verifyData(HRegion newReg, int startRow, int numRows, byte [] qf,
+      byte [] ... families) 
+  throws IOException {
+    for(int i=startRow; i<startRow + numRows; i++) {
+      byte [] row = Bytes.toBytes("" + i);
+      Get get = new Get(row);
+      for(byte [] family : families) {
+        get.addColumn(family, qf);
       }
-      
-      // Now merge it back together
-      Path oldRegion1 = subregions[0].getRegionDir();
-      Path oldRegion2 = subregions[1].getRegionDir();
-      startTime = System.currentTimeMillis();
-      r = HRegion.mergeAdjacent(subregions[0], subregions[1]);
-      region = new HRegionIncommon(r);
-      LOG.info("Merge regions elapsed time: " +
-        ((System.currentTimeMillis() - startTime) / 1000.0));
-      fs.delete(oldRegion1, true);
-      fs.delete(oldRegion2, true);
-      fs.delete(oldRegionPath, true);
-    }
-    LOG.info("splitAndMerge completed.");
-  }
-
-  // This test verifies that everything is still there after splitting and merging
-  
-  private void read() throws IOException {
-    // First verify the data written by testBasic()
-    byte [][] cols = {
-        Bytes.toBytes(ANCHORNUM + "[0-9]+"),
-        CONTENTS_BASIC
-    };
-    long startTime = System.currentTimeMillis();
-    InternalScanner s =
-      r.getScanner(cols, HConstants.EMPTY_START_ROW,
-          System.currentTimeMillis(), null);
+      Result result = newReg.get(get, null);
+      KeyValue [] raw = result.sorted();
+      assertEquals(families.length, result.size());
+      for(int j=0; j<families.length; j++) {
+        assertEquals(0, Bytes.compareTo(row, raw[j].getRow()));
+        assertEquals(0, Bytes.compareTo(families[j], raw[j].getFamily()));
+        assertEquals(0, Bytes.compareTo(qf, raw[j].getQualifier()));
+      }
+    }
+  }
+  
+  
+  /**
+   * Test for HBASE-810
+   * @throws Exception
+   */
+  public void testScanSplitOnRegion() throws Exception {
+    byte [] tableName = Bytes.toBytes("testtable");
+    byte [][] families = {fam3};
+
+    HBaseConfiguration hc = initSplit();
+    //Setting up region
+    String method = this.getName();
+    initHRegion(tableName, method, hc, families);
+
     try {
-      int contentsFetched = 0;
-      int anchorFetched = 0;
-      List<KeyValue> curVals = new ArrayList<KeyValue>();
-      int k = 0;
-      while(s.next(curVals)) {
-        for(Iterator<KeyValue> it = curVals.iterator(); it.hasNext(); ) {
-          KeyValue kv = it.next();
-          byte [] col = kv.getColumn();
-          byte [] val = kv.getValue();
-          String curval = Bytes.toString(val);
-          if (Bytes.compareTo(col, CONTENTS_BASIC) == 0) {
-            assertTrue("Error at:" + kv
-                + ", Value for " + col + " should start with: " + CONTENTSTR
-                + ", but was fetched as: " + curval,
-                curval.startsWith(CONTENTSTR));
-            contentsFetched++;
-            
-          } else if (Bytes.toString(col).startsWith(ANCHORNUM)) {
-            assertTrue("Error at:" + kv
-                + ", Value for " + Bytes.toString(col) +
-                " should start with: " + ANCHORSTR
-                + ", but was fetched as: " + curval,
-                curval.startsWith(ANCHORSTR));
-            anchorFetched++;
-            
-          } else {
-            LOG.info("UNEXPECTED COLUMN " + Bytes.toString(col));
-          }
+      addContent(region, fam3);
+      region.flushcache();
+      final byte [] midkey = region.compactStores();
+      assertNotNull(midkey);
+      byte [][] cols = {fam3};
+      Scan scan = new Scan();
+      scan.addColumns(cols);
+      final InternalScanner s = region.getScanner(scan);
+      final HRegion regionForThread = region;
+
+      Thread splitThread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            split(regionForThread, midkey);
+          } catch (IOException e) {
+            fail("Unexpected exception " + e);
+          } 
+        }
+      };
+      splitThread.start();
+      for(int i = 0; i < 6; i++) {
+        try {
+          Put put = new Put(region.getRegionInfo().getStartKey());
+          put.add(fam3, null, Bytes.toBytes("val"));
+          region.put(put);
+          Thread.sleep(1000);
+        }
+        catch (InterruptedException e) {
+          fail("Unexpected exception " + e);
         }
-        curVals.clear();
-        k++;
       }
-      assertEquals("Expected " + NUM_VALS + " " + Bytes.toString(CONTENTS_BASIC) +
-        " values, but fetched " + contentsFetched, NUM_VALS, contentsFetched);
-      assertEquals("Expected " + NUM_VALS + " " + ANCHORNUM +
-        " values, but fetched " + anchorFetched, NUM_VALS, anchorFetched);
-
-      LOG.info("Scanned " + NUM_VALS
-          + " rows from disk. Elapsed time: "
-          + ((System.currentTimeMillis() - startTime) / 1000.0));
-      
-    } finally {
+      s.next(new ArrayList<KeyValue>());
       s.close();
+    } catch(UnknownScannerException ex) {
+      ex.printStackTrace();
+      fail("Got the " + ex);
+    } 
+  }
+  
+  
+  private void assertGet(final HRegion r, final byte [] family, final byte [] k)
+  throws IOException {
+    // Now I have k, get values out and assert they are as expected.
+    Get get = new Get(k).addFamily(family).setMaxVersions();
+    KeyValue [] results = r.get(get, null).raw();
+    for (int j = 0; j < results.length; j++) {
+      byte [] tmp = results[j].getValue();
+      // Row should be equal to value every time.
+      assertTrue(Bytes.equals(k, tmp));
     }
-    
-    // Verify testScan data
-    
-    cols = new byte [][] {CONTENTS_FIRSTCOL, ANCHOR_SECONDCOL};
-    
-    startTime = System.currentTimeMillis();
-
-    s = r.getScanner(cols, HConstants.EMPTY_START_ROW,
-      System.currentTimeMillis(), null);
+  }
+  
+  /*
+   * Assert first value in the passed region is <code>firstValue</code>.
+   * @param r
+   * @param column
+   * @param firstValue
+   * @throws IOException
+   */
+  private void assertScan(final HRegion r, final byte [] column,
+      final byte [] firstValue)
+  throws IOException {
+    byte [][] cols = {column};
+    Scan scan = new Scan();
+    scan.addColumns(cols);
+    InternalScanner s = r.getScanner(scan);
     try {
-      int numFetched = 0;
       List<KeyValue> curVals = new ArrayList<KeyValue>();
-      int k = 0;
-      while(s.next(curVals)) {
-        for(Iterator<KeyValue> it = curVals.iterator(); it.hasNext(); ) {
-          KeyValue kv = it.next();
-          byte [] col = kv.getColumn();
+      boolean first = true;
+      OUTER_LOOP: while(s.next(curVals)) {
+        for (KeyValue kv: curVals) {
           byte [] val = kv.getValue();
-          int curval =
-            Integer.parseInt(new String(val, HConstants.UTF8_ENCODING).trim());
-
-          for (int j = 0; j < cols.length; j++) {
-            if (Bytes.compareTo(col, cols[j]) == 0) {
-              assertEquals("Value for " + Bytes.toString(col) + " should be: " + k
-                  + ", but was fetched as: " + curval, curval, k);
-              numFetched++;
-            }
+          byte [] curval = val;
+          if (first) {
+            first = false;
+            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
+          } else {
+            // Not asserting anything.  Might as well break.
+            break OUTER_LOOP;
           }
         }
-        curVals.clear();
-        k++;
       }
-      assertEquals("Inserted " + numInserted + " values, but fetched " +
-        numFetched, numInserted, numFetched);
-
-      LOG.info("Scanned " + (numFetched / 2)
-          + " rows from disk. Elapsed time: "
-          + ((System.currentTimeMillis() - startTime) / 1000.0));
-      
     } finally {
       s.close();
     }
+  }
+  
+  protected HRegion [] split(final HRegion r, final byte [] splitRow)
+  throws IOException {
+    // Assert can get mid key from passed region.
+    assertGet(r, fam3, splitRow);
+    HRegion [] regions = r.splitRegion(splitRow);
+    assertEquals(regions.length, 2);
+    return regions;
+  }
+  
+  private HBaseConfiguration initSplit() {
+    HBaseConfiguration conf = new HBaseConfiguration();
+    // Always compact if there is more than one store file.
+    conf.setInt("hbase.hstore.compactionThreshold", 2);
     
-    // Test a scanner which only specifies the column family name
-    
-    cols = new byte [][] {
-        Bytes.toBytes("anchor:")
-    };
+    // Make lease timeout longer, lease checks less frequent
+    conf.setInt("hbase.master.lease.period", 10 * 1000);
+    conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
     
-    startTime = System.currentTimeMillis();
+    conf.setInt("hbase.regionserver.lease.period", 10 * 1000);
     
-    s = r.getScanner(cols, HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
+    // Increase the amount of time between client retries
+    conf.setLong("hbase.client.pause", 15 * 1000);
 
-    try {
-      int fetched = 0;
-      List<KeyValue> curVals = new ArrayList<KeyValue>();
-      while(s.next(curVals)) {
-        for(Iterator<KeyValue> it = curVals.iterator(); it.hasNext(); ) {
-          it.next();
-          fetched++;
-        }
-        curVals.clear();
-      }
-      assertEquals("Inserted " + (NUM_VALS + numInserted/2) +
-        " values, but fetched " + fetched, (NUM_VALS + numInserted/2), fetched);
-      LOG.info("Scanned " + fetched
-          + " rows from disk. Elapsed time: "
-          + ((System.currentTimeMillis() - startTime) / 1000.0));
-      
-    } finally {
-      s.close();
-    }
-    LOG.info("read completed.");
+    // This size should make it so we always split using the addContent
+    // below.  After adding all data, the first region is 1.3M
+    conf.setLong("hbase.hregion.max.filesize", 1024 * 128);
+    return conf;
+  }  
+  
+  //////////////////////////////////////////////////////////////////////////////
+  // Helpers
+  //////////////////////////////////////////////////////////////////////////////
+  private HBaseConfiguration initHRegion() {
+    HBaseConfiguration conf = new HBaseConfiguration();
+    
+    conf.set("hbase.hstore.compactionThreshold", "2");
+    conf.setLong("hbase.hregion.max.filesize", 65536);
+    
+    return conf;
   }
   
+  private void initHRegion (byte [] tableName, String callingMethod,
+      byte[] ... families) throws IOException{
+    initHRegion(tableName, callingMethod, new HBaseConfiguration(), families);
+  }
+  
+  private void initHRegion (byte [] tableName, String callingMethod,
+      HBaseConfiguration conf, byte [] ... families) throws IOException{
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for(byte [] family : families) {
+      HColumnDescriptor hcd = new HColumnDescriptor(family);
+      htd.addFamily(new HColumnDescriptor(family));
+    }
+    
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    Path path = new Path(DIR + callingMethod); 
+    region = HRegion.createHRegion(info, path, conf);
+  }
 }

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,207 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+//import java.util.Random;
+import java.util.Set;
+//import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+public class TestKeyValueHeap extends HBaseTestCase
+implements HConstants {
+  private final boolean PRINT = false;
+  
+  List<Scanner> scanners = new ArrayList<Scanner>();
+
+  private byte [] row1;
+  private byte [] fam1;
+  private byte [] col1;
+  private byte [] data;
+
+  private byte [] row2;
+  private byte [] fam2;
+  private byte [] col2;
+
+  private byte [] col3;
+  private byte [] col4;
+  private byte [] col5;
+
+  public void setUp(){
+    data = Bytes.toBytes("data");
+
+    row1 = Bytes.toBytes("row1");
+    fam1 = Bytes.toBytes("fam1");
+    col1 = Bytes.toBytes("col1");
+
+    row2 = Bytes.toBytes("row2");
+    fam2 = Bytes.toBytes("fam2");
+    col2 = Bytes.toBytes("col2");
+
+    col3 = Bytes.toBytes("col3");
+    col4 = Bytes.toBytes("col4");
+    col5 = Bytes.toBytes("col5");
+  }
+
+  public void testSorted(){
+    //Cases that need to be checked are:
+    //1. The "smallest" KeyValue is in the same scanners as current
+    //2. Current scanner gets empty
+
+    List<KeyValue> l1 = new ArrayList<KeyValue>();
+    l1.add(new KeyValue(row1, fam1, col5, data));
+    l1.add(new KeyValue(row2, fam1, col1, data));
+    l1.add(new KeyValue(row2, fam1, col2, data));
+    scanners.add(new Scanner(l1));
+
+    List<KeyValue> l2 = new ArrayList<KeyValue>();
+    l2.add(new KeyValue(row1, fam1, col1, data));
+    l2.add(new KeyValue(row1, fam1, col2, data));
+    scanners.add(new Scanner(l2));
+
+    List<KeyValue> l3 = new ArrayList<KeyValue>();
+    l3.add(new KeyValue(row1, fam1, col3, data));
+    l3.add(new KeyValue(row1, fam1, col4, data));
+    l3.add(new KeyValue(row1, fam2, col1, data));
+    l3.add(new KeyValue(row1, fam2, col2, data));
+    l3.add(new KeyValue(row2, fam1, col3, data));
+    scanners.add(new Scanner(l3));
+
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(new KeyValue(row1, fam1, col1, data));
+    expected.add(new KeyValue(row1, fam1, col2, data));
+    expected.add(new KeyValue(row1, fam1, col3, data));
+    expected.add(new KeyValue(row1, fam1, col4, data));
+    expected.add(new KeyValue(row1, fam1, col5, data));
+    expected.add(new KeyValue(row1, fam2, col1, data));
+    expected.add(new KeyValue(row1, fam2, col2, data));
+    expected.add(new KeyValue(row2, fam1, col1, data));
+    expected.add(new KeyValue(row2, fam1, col2, data));
+    expected.add(new KeyValue(row2, fam1, col3, data));
+
+    //Creating KeyValueHeap
+    KeyValueHeap kvh =
+      new KeyValueHeap(scanners.toArray(new Scanner[0]), KeyValue.COMPARATOR);
+    
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    while(kvh.peek() != null){
+      actual.add(kvh.next());
+    }
+
+    assertEquals(expected.size(), actual.size());
+    for(int i=0; i<expected.size(); i++){
+      assertEquals(expected.get(i), actual.get(i));
+      if(PRINT){
+        System.out.println("expected " +expected.get(i)+
+            "\nactual   " +actual.get(i) +"\n");
+      }
+    }
+    
+    //Check if result is sorted according to Comparator
+    for(int i=0; i<actual.size()-1; i++){
+      int ret = KeyValue.COMPARATOR.compare(actual.get(i), actual.get(i+1));
+      assertTrue(ret < 0);
+    }
+    
+  }
+
+  public void testSeek(){
+    //Cases:
+    //1. Seek KeyValue that is not in scanner
+    //2. Check that smallest that is returned from a seek is correct
+    
+    List<KeyValue> l1 = new ArrayList<KeyValue>();
+    l1.add(new KeyValue(row1, fam1, col5, data));
+    l1.add(new KeyValue(row2, fam1, col1, data));
+    l1.add(new KeyValue(row2, fam1, col2, data));
+    scanners.add(new Scanner(l1));
+
+    List<KeyValue> l2 = new ArrayList<KeyValue>();
+    l2.add(new KeyValue(row1, fam1, col1, data));
+    l2.add(new KeyValue(row1, fam1, col2, data));
+    scanners.add(new Scanner(l2));
+
+    List<KeyValue> l3 = new ArrayList<KeyValue>();
+    l3.add(new KeyValue(row1, fam1, col3, data));
+    l3.add(new KeyValue(row1, fam1, col4, data));
+    l3.add(new KeyValue(row1, fam2, col1, data));
+    l3.add(new KeyValue(row1, fam2, col2, data));
+    l3.add(new KeyValue(row2, fam1, col3, data));
+    scanners.add(new Scanner(l3));
+
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(new KeyValue(row2, fam1, col1, data));
+    
+    //Creating KeyValueHeap
+    KeyValueHeap kvh =
+      new KeyValueHeap(scanners.toArray(new Scanner[0]), KeyValue.COMPARATOR);
+    
+    KeyValue seekKv = new KeyValue(row2, fam1, null, null);
+    kvh.seek(seekKv);
+    
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    actual.add(kvh.peek());
+    
+    assertEquals(expected.size(), actual.size());
+    for(int i=0; i<expected.size(); i++){
+      assertEquals(expected.get(i), actual.get(i));
+      if(PRINT){
+        System.out.println("expected " +expected.get(i)+
+            "\nactual   " +actual.get(i) +"\n");
+      }
+    }
+    
+  }
+
+  private class Scanner implements KeyValueScanner {
+    private Set<KeyValue> scan =
+      new TreeSet<KeyValue>((Comparator)KeyValue.COMPARATOR);
+    private Iterator<KeyValue> iter;
+    private KeyValue current;
+
+    public Scanner(List<KeyValue> list) {
+      Collections.sort(list, (Comparator)KeyValue.COMPARATOR);
+      iter = list.iterator();
+      if(iter.hasNext()){
+        current = iter.next();
+      } 
+    }
+    
+    public KeyValue peek() {
+      return current;
+    }
+
+    public KeyValue next() {
+      KeyValue oldCurrent = current;
+      if(iter.hasNext()){
+        current = iter.next();
+      } else {
+        current = null;
+      }
+      return oldCurrent;
+    }
+
+    public void close(){}
+    
+    public boolean seek(KeyValue seekKv) {
+      while(iter.hasNext()){
+        KeyValue next = iter.next();
+        int ret = KeyValue.COMPARATOR.compare(next, seekKv);
+        if(ret >= 0){
+          current = next;
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+}

Added: hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java?rev=782445&view=auto
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java (added)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestKeyValueScanFixture.java Sun Jun  7 19:57:37 2009
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestKeyValueScanFixture extends TestCase {
+
+
+  public void testKeyValueScanFixture() {
+    KeyValue kvs[] = new KeyValue[]{
+        KeyValueTestUtil.create("RowA", "family", "qf1",
+            1, KeyValue.Type.Put, "value-1"),
+        KeyValueTestUtil.create("RowA", "family", "qf2",
+            1, KeyValue.Type.Put, "value-2"),
+        KeyValueTestUtil.create("RowB", "family", "qf1",
+            10, KeyValue.Type.Put, "value-10")
+    };
+    KeyValueScanner scan = new KeyValueScanFixture(
+        KeyValue.COMPARATOR, kvs);
+
+    // test simple things.
+    assertNull(scan.peek());
+    KeyValue kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowA"));
+    // should seek to this:
+    assertTrue(scan.seek(kv));
+    KeyValue res = scan.peek();
+    assertEquals(kvs[0], res);
+
+    kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowB"));
+    assertTrue(scan.seek(kv));
+    res = scan.peek();
+    assertEquals(kvs[2], res);
+
+    // ensure we pull things out properly:
+    kv = KeyValue.createFirstOnRow(Bytes.toBytes("RowA"));
+    assertTrue(scan.seek(kv));
+    assertEquals(kvs[0], scan.peek());
+    assertEquals(kvs[0], scan.next());
+    assertEquals(kvs[1], scan.peek());
+    assertEquals(kvs[1], scan.next());
+    assertEquals(kvs[2], scan.peek());
+    assertEquals(kvs[2], scan.next());
+    assertEquals(null, scan.peek());
+    assertEquals(null, scan.next());
+  }
+}

Modified: hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java?rev=782445&r1=782444&r2=782445&view=diff
==============================================================================
--- hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java (original)
+++ hadoop/hbase/trunk_on_hadoop-0.18.3/src/test/org/apache/hadoop/hbase/regionserver/TestLogRolling.java Sun Jun  7 19:57:37 2009
@@ -26,11 +26,13 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.HBaseClusterTestCase;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -106,16 +108,15 @@
     
     // Create the test table and open it
     HTableDescriptor desc = new HTableDescriptor(tableName);
-    desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY));
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
     HBaseAdmin admin = new HBaseAdmin(conf);
     admin.createTable(desc);
     HTable table = new HTable(conf, tableName);
 
     for (int i = 1; i <= 256; i++) {    // 256 writes should cause 8 log rolls
-      BatchUpdate b =
-        new BatchUpdate("row" + String.format("%1$04d", i));
-      b.put(HConstants.COLUMN_FAMILY, value);
-      table.commit(b);
+      Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
+      put.add(HConstants.CATALOG_FAMILY, null, value);
+      table.put(put);
 
       if (i % 32 == 0) {
         // After every 32 writes sleep to let the log roller run



Mime
View raw message