hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r910334 [2/2] - in /hadoop/hbase/branches/0.20: ./ src/contrib/indexed/ src/contrib/indexed/src/java/org/apache/hadoop/hbase/client/idx/ src/contrib/indexed/src/java/org/apache/hadoop/hbase/client/idx/exp/ src/contrib/indexed/src/java/org/a...
Date Mon, 15 Feb 2010 21:00:05 GMT
Modified: hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/TestIdxRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/TestIdxRegion.java?rev=910334&r1=910333&r2=910334&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/TestIdxRegion.java
(original)
+++ hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/TestIdxRegion.java
Mon Feb 15 21:00:03 2010
@@ -30,6 +30,7 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
@@ -40,6 +41,7 @@
 import org.apache.hadoop.hbase.client.idx.exp.And;
 import org.apache.hadoop.hbase.client.idx.exp.Comparison;
 import org.apache.hadoop.hbase.client.idx.exp.Or;
+import org.apache.hadoop.hbase.client.idx.exp.Expression;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -135,6 +137,43 @@
   }
 
   /**
+   * Tests that a {@link org.apache.hadoop.hbase.DoNotRetryIOException} is thrown
+   * when an invalid index expression is supplied.
+   *
+   * @throws IOException exception
+   */
+  public void testIndexedScanWithInvalidIndexExpression() throws Exception {
+    byte[] tableName = Bytes.toBytes("testIndexedScanWithStartRow");
+    byte[] family = Bytes.toBytes("family");
+    IdxIndexDescriptor indexDescriptor
+      = new IdxIndexDescriptor(qualLong, IdxQualifierType.LONG);
+
+    // Setting up region
+    String method = "testIndexedScanWithStartRow";
+    initIdxRegion(tableName, method, new HBaseConfiguration(),
+      Pair.of(family, new IdxIndexDescriptor[]{indexDescriptor}));
+
+    IdxScan idxScan = new IdxScan();
+    idxScan.addFamily(family);
+    idxScan.setExpression(
+        Comparison.comparison(
+            family, Bytes.toBytes("invalid"), Comparison.Operator.GTE,
+            Bytes.toBytes(50L)
+        )
+    );
+    InternalScanner scanner = null;
+    try {
+      scanner = region.getScanner(idxScan);
+      Assert.fail("Excepted a DoNotRetryIOException to be thrown when an invalid" +
+          "index expression is specified.");
+    } catch (DoNotRetryIOException e) {
+      // expected
+    } catch (Exception e) {
+      Assert.fail("Expected a DoNotRetryIOException but got some other exception");
+    }
+  }
+
+  /**
    * Tests that the start row takes effect when scanning with an index.
    *
    * @throws IOException exception
@@ -588,12 +627,93 @@
     }
 
     checkScanWithTwoFamilies(family1, family2, false, numberOfRows * 2, 3);
-
   }
 
   private void checkScanWithTwoFamilies(byte[] family1, byte[] family2,
     boolean memStoreEmpty, int numRows, int numColumns) throws IOException {
 
+    IdxScan idxScan = createTwoFamiliesScan(family1, family2, memStoreEmpty);
+    InternalScanner scanner = region.getScanner(idxScan);
+    List<KeyValue> res = new ArrayList<KeyValue>();
+    int actualRows = 0;
+    //long start = System.nanoTime();
+    while (scanner.next(res)) {
+      assertEquals(numColumns, res.size());
+      actualRows++;
+      res.clear();
+    }
+    //long end = System.nanoTime();
+    //System.out.println("[top and botoom 10%] memStoreEmpty=" + memStoreEmpty + ", time="
+ (end - start)/1000000D);
+    assertEquals(numRows / 10 * 3, actualRows);
+  }
+
+  /**
+   * Verifies that a scan with more than one family where rows are sparse,
+   * is evaluated correctly.
+   *
+   * @throws Exception exception
+   */
+  public void testIndexedScanWithTwoFamiliesWhereRowsMayExistInOneButNotInBoth() throws Exception
{
+    String method = "testIndexedScanWithTwoFamiliesWhereRowsMayExistInOneButNotInBoth";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family1 = Bytes.toBytes("family1");
+    byte[] family2 = Bytes.toBytes("family2");
+    IdxIndexDescriptor indexDescriptor1 = new IdxIndexDescriptor(qualLong, IdxQualifierType.LONG);
+    IdxIndexDescriptor indexDescriptor2 = new IdxIndexDescriptor(qualDouble, IdxQualifierType.DOUBLE);
+    IdxIndexDescriptor indexDescriptor3 = new IdxIndexDescriptor(qualBytes, IdxQualifierType.BYTE_ARRAY);
+
+    initIdxRegion(tableName, method, new HBaseConfiguration(), Pair.of(family1,
+      new IdxIndexDescriptor[]{indexDescriptor1, indexDescriptor2}),
+      Pair.of(family2, new IdxIndexDescriptor[]{indexDescriptor3}));
+
+    int numberOfRows = 1000;
+
+    Random random = new Random(26011788L);
+    for (int row = 0; row < numberOfRows; row++) {
+      Put put = new Put(Bytes.toBytes(random.nextLong()));
+      int val = row % 10;
+      if (val != 9) {
+        put.add(family1, qualLong, Bytes.toBytes((long) val));
+        put.add(family1, qualDouble, Bytes.toBytes((double) val));
+      }
+      if (val != 1 && val != 4) {
+        put.add(family2, qualBytes, Bytes.toBytes(String.format("%04d", val)));
+      }
+      region.put(put);
+    }
+    checkScanWithTwoFamiliesWhereRowsMayExistInOneButNotInBoth(family1, family2, false, numberOfRows);
+
+    region.flushcache();
+
+    checkScanWithTwoFamiliesWhereRowsMayExistInOneButNotInBoth(family1, family2, true, numberOfRows);
+  }
+
+  private void checkScanWithTwoFamiliesWhereRowsMayExistInOneButNotInBoth(byte[] family1,
byte[] family2,
+    boolean memStoreEmpty, int numRows) throws IOException {
+
+    IdxScan idxScan = createTwoFamiliesScan(family1, family2, memStoreEmpty);
+    InternalScanner scanner = region.getScanner(idxScan);
+    List<KeyValue> res = new ArrayList<KeyValue>();
+    int actualRows = 0;
+    //long start = System.nanoTime();
+    while (scanner.next(res)) {
+      byte[] firstFamily = res.get(0).getFamily();
+      if (Bytes.equals(firstFamily, family1)){
+        Assert.assertEquals(2, res.size());
+        Assert.assertTrue(Bytes.equals(family1, res.get(1).getFamily()));
+      } else {
+        Assert.assertTrue(Bytes.equals(family2, res.get(0).getFamily()));
+        Assert.assertEquals(1, res.size());
+      }
+      actualRows++;
+      res.clear();
+    }
+    //long end = System.nanoTime();
+    //System.out.println("[top and botoom 10%] memStoreEmpty=" + memStoreEmpty + ", time="
+ (end - start)/1000000D);
+    assertEquals(numRows / 10 * 3, actualRows);
+  }
+
+  private IdxScan createTwoFamiliesScan(byte[] family1, byte[] family2, boolean memStoreEmpty)
{
     /**
      * Scan the index with a matching or expression on two indices
      */
@@ -616,21 +736,178 @@
             new BinaryComparator(bytesVal)))
       ));
     }
+    return idxScan;
+  }
+
+  /**
+   * Test that includeMissing works properly - resulting with
+   *
+   * @throws Exception
+   */
+  public void testIndexedScanIncludeMissing() throws Exception {
+    final String method = "testIndexedScanIncludeMissing";
+    final byte[] tableName = Bytes.toBytes(method);
+    final byte[] family1 = Bytes.toBytes("family1");
+    final byte[] family2 = Bytes.toBytes("family2");
+    final byte[] qual1 = Bytes.toBytes("qual1");
+    final byte[] qual2 = Bytes.toBytes("qual2");
+    final IdxIndexDescriptor indexDescriptor1 = new IdxIndexDescriptor(qual1, IdxQualifierType.INT);
+    final IdxIndexDescriptor indexDescriptor2 = new IdxIndexDescriptor(qual2, IdxQualifierType.INT);
+
+    int numberOfRows = 1000;
+
+    initIdxRegion(tableName, method, new HBaseConfiguration(),
+      Pair.of(family1, new IdxIndexDescriptor[]{indexDescriptor1, indexDescriptor2}),
+      Pair.of(family2, new IdxIndexDescriptor[]{indexDescriptor1, indexDescriptor2}));
+
+    Random random = new Random(5032010L);
+
+    putSomeForTestIncludeMissing(family1, family2, qual1, qual2, numberOfRows, random);
+
+    checkIndexedScanIncludeMissing(family1, family2, qual1, qual2, false, numberOfRows);
+
+    region.flushcache();
+
+    checkIndexedScanIncludeMissing(family1, family2, qual1, qual2, true, numberOfRows);
+
+    // Try again with results in both memstore and store files
+    putSomeForTestIncludeMissing(family1, family2, qual1, qual2, numberOfRows, random);
+
+    checkIndexedScanIncludeMissing(family1, family2, qual1, qual2, false, numberOfRows *
2);
+
+  }
+
+  private void putSomeForTestIncludeMissing(byte[] family1, byte[] family2, byte[] qual1,
byte[] qual2, int numberOfRows, Random random) throws IOException {
+    for (int row = 0; row < numberOfRows; row++) {
+      Put put = new Put(Bytes.toBytes(random.nextLong()));
+
+      int val = row % 10;
+      byte[] bytesVal1 = Bytes.toBytes(val);
+      byte[] bytesVal2 = Bytes.toBytes(val * val);
+      switch (val) {
+        case 0:
+          put.add(family1, qual1, bytesVal1);
+          put.add(family2, qual1, bytesVal1);
+          break;
+        case 1:
+          put.add(family1, qual1, bytesVal1);
+          put.add(family1, qual2, bytesVal2);
+          break;
+        case 2:
+          put.add(family2, qual1, bytesVal1);
+          put.add(family2, qual2, bytesVal2);
+          break;
+        case 3:
+          put.add(family1, qual2, bytesVal2);
+          put.add(family2, qual2, bytesVal2);
+          break;
+        case 4:
+          put.add(family1, qual2, bytesVal2);
+          put.add(family2, qual1, bytesVal1);
+          break;
+        case 5:
+          put.add(family1, qual1, bytesVal1);
+          put.add(family2, qual2, bytesVal2);
+          break;
+        case 6:
+          put.add(family1, qual2, bytesVal2);
+          put.add(family2, qual1, bytesVal1);
+          put.add(family2, qual2, bytesVal2);
+          break;
+        case 7:
+          put.add(family1, qual1, bytesVal1);
+          put.add(family2, qual1, bytesVal1);
+          put.add(family2, qual2, bytesVal2);
+          break;
+        case 8:
+          put.add(family1, qual1, bytesVal1);
+          put.add(family1, qual2, bytesVal2);
+          put.add(family2, qual2, bytesVal2);
+          break;
+        case 9:
+          put.add(family1, qual1, bytesVal1);
+          put.add(family1, qual2, bytesVal2);
+          put.add(family2, qual1, bytesVal1);
+          break;
+      }
+      region.put(put);
+    }
+  }
+
+  private void checkIndexedScanIncludeMissing(byte[] family1, byte[] family2,
+    byte[] qual1, byte[] qual2,
+    boolean memStoreEmpty, int numRows) throws IOException {
+    IdxScan idxScan = new IdxScan();
+    final byte[] zero = Bytes.toBytes(0);
+    idxScan.setExpression(Expression.comparison(family1, qual1, Comparison.Operator.GTE,
zero));
+    if (!memStoreEmpty) {
+      idxScan.setFilter(new SingleColumnValueFilter(family1, qual1, CompareFilter.CompareOp.GREATER_OR_EQUAL,
+        new BinaryComparator(zero)));
+    }
+
+    // Try the same scan but without missing rows
+
+    IdxScan idxScan1 = new IdxScan();
+    idxScan1.setExpression(Comparison.comparison(family1, qual1, Comparison.Operator.GTE,
zero, false));
+    if (!memStoreEmpty) {
+      SingleColumnValueFilter scvf = new SingleColumnValueFilter(family1, qual1, CompareFilter.CompareOp.GREATER_OR_EQUAL,
+        new BinaryComparator(zero));
+      scvf.setFilterIfMissing(true);
+      idxScan1.setFilter(scvf);
+    }
+
+    checkScan(numRows, idxScan);
+
+    checkScan(numRows / 10 * 6, idxScan1);
+
+    // A more elaborate test, with expressions from both families and an 'AND' condition
+
+    idxScan = new IdxScan();
+    final byte[] five = Bytes.toBytes(5);
+    final byte[] fifty = Bytes.toBytes(50);
+    idxScan.setExpression(Expression.and(
+      Expression.comparison(family1, qual1, Comparison.Operator.GTE, five),
+      Expression.comparison(family2, qual2, Comparison.Operator.LTE, fifty)));
+    if (!memStoreEmpty) {
+      idxScan.setFilter(new FilterList(Arrays.<Filter>asList(
+        new SingleColumnValueFilter(family1, qual1, CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(five)),
+        new SingleColumnValueFilter(family2, qual2, CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(fifty)))));
+    }
+
+    // Try the same scan but without missing rows
+
+    idxScan1 = new IdxScan();
+    idxScan1.setExpression(Expression.and(
+      Expression.comparison(family1, qual1, Comparison.Operator.GTE, five, false),
+      Expression.comparison(family2, qual2, Comparison.Operator.LTE, fifty, false)));
+    if (!memStoreEmpty) {
+      SingleColumnValueFilter fiveFilter = new SingleColumnValueFilter(family1, qual1, CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(five));
+      fiveFilter.setFilterIfMissing(true);
+      SingleColumnValueFilter fiftyFilter = new SingleColumnValueFilter(family2, qual2, CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(fifty));
+      fiftyFilter.setFilterIfMissing(true);
+      idxScan1.setFilter(new FilterList(Arrays.<Filter>asList(fiveFilter, fiftyFilter)));
+    }
+
+    checkScan(numRows / 10 * 7, idxScan);
+
+    checkScan(numRows / 10 * 2, idxScan1);
+  }
+
+  private void checkScan(int expectedNumRows, IdxScan idxScan) throws IOException {
     InternalScanner scanner = region.getScanner(idxScan);
     List<KeyValue> res = new ArrayList<KeyValue>();
     int actualRows = 0;
     //long start = System.nanoTime();
-    while (scanner.next(res)) {
-      assertEquals(numColumns, res.size());
+    while (scanner.next(res) || res.size() > 0) {
       actualRows++;
       res.clear();
     }
+
     //long end = System.nanoTime();
     //System.out.println("[top and botoom 10%] memStoreEmpty=" + memStoreEmpty + ", time="
+ (end - start)/1000000D);
-    assertEquals(numRows / 10 * 3, actualRows);
+    assertEquals(expectedNumRows, actualRows);
   }
 
-
   public void testIndexedScanWithMultipleVersions() throws Exception {
     byte[] tableName = Bytes.toBytes("testIndexedScanWithMultipleVersions");
     byte[] family = Bytes.toBytes("family");

Modified: hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/IntSetBaseTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/IntSetBaseTestCase.java?rev=910334&r1=910333&r2=910334&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/IntSetBaseTestCase.java
(original)
+++ hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/IntSetBaseTestCase.java
Mon Feb 15 21:00:03 2010
@@ -195,6 +195,9 @@
 
 
   public void testComplement() {
+    IntSetBase emptySet = newSet(0);
+    Assert.assertEquals(emptySet.complement().size(), emptySet.size());
+
     for (int capacity = 950; capacity < 1050; capacity++) {
       IntSetBase intSet = newSet(capacity);
       Assert.assertEquals(intSet.size(), 0);

Modified: hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestBitSet.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestBitSet.java?rev=910334&r1=910333&r2=910334&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestBitSet.java
(original)
+++ hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestBitSet.java
Mon Feb 15 21:00:03 2010
@@ -37,8 +37,7 @@
    * FIXED SIZE constant.
    */
   public void testHeapSize() {
-    assertEquals(ClassSize.estimateBase(BitSet.class, false),
-      BitSet.FIXED_SIZE);
+    assertEquals(ClassSize.estimateBase(BitSet.class, false), BitSet.FIXED_SIZE);
   }
 
 }

Modified: hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestSparseBitSet.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestSparseBitSet.java?rev=910334&r1=910333&r2=910334&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestSparseBitSet.java
(original)
+++ hadoop/hbase/branches/0.20/src/contrib/indexed/src/test/org/apache/hadoop/hbase/regionserver/idx/support/sets/TestSparseBitSet.java
Mon Feb 15 21:00:03 2010
@@ -36,8 +36,7 @@
    * FIXED SIZE constant.
    */
   public void testHeapSize() {
-    assertEquals(ClassSize.estimateBase(SparseBitSet.class, false),
-      SparseBitSet.FIXED_SIZE);
+    assertEquals(ClassSize.estimateBase(SparseBitSet.class, false), SparseBitSet.FIXED_SIZE);
   }
 
 }
\ No newline at end of file

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HConstants.java?rev=910334&r1=910333&r2=910334&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/HConstants.java Mon Feb 15
21:00:03 2010
@@ -259,8 +259,10 @@
    */
   public static final int WEEK_IN_SECONDS = 7 * 24 * 3600;
 
-  //TODO: HBASE_CLIENT_RETRIES_NUMBER_KEY is only used by TestMigrate. Move it
-  //      there.
+  /**
+   * The key used to store the number of times a client should retry before
+   * giving up.
+   */
   public static final String HBASE_CLIENT_RETRIES_NUMBER_KEY =
     "hbase.client.retries.number";
 

Modified: hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=910334&r1=910333&r2=910334&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
(original)
+++ hadoop/hbase/branches/0.20/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
Mon Feb 15 21:00:03 2010
@@ -590,6 +590,15 @@
   }
 
   /**
+   * Get an estimate of the number of key values stored in this store.
+   *
+   * @return the number of key/values in this memstore.
+   */
+  public int numKeyValues() {
+    return kvset.size() + snapshot.size();
+  }
+
+  /**
    * Code to help figure if our approximation of object heap sizes is close
    * enough.  See hbase-900.  Fills memstores then waits so user can heap
    * dump and bring up resultant hprof in something like jprofiler which

Modified: hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=910334&r1=910333&r2=910334&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
(original)
+++ hadoop/hbase/branches/0.20/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
Mon Feb 15 21:00:03 2010
@@ -23,16 +23,18 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.Arrays;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.lang.reflect.Constructor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -50,6 +52,10 @@
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Hash;
@@ -94,31 +100,17 @@
   private static final int ONE_GB = 1024 * 1024 * 1000;
   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
   
+  public static final byte [] TABLE_NAME = Bytes.toBytes("TestTable");
   public static final byte [] FAMILY_NAME = Bytes.toBytes("info");
   public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
-  
+
   protected static final HTableDescriptor TABLE_DESCRIPTOR;
   static {
-    TABLE_DESCRIPTOR = new HTableDescriptor("TestTable");
+    TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME);
     TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
   }
-  
-  private static final String RANDOM_READ = "randomRead";
-  private static final String RANDOM_SEEK_SCAN = "randomSeekScan";
-  private static final String RANDOM_READ_MEM = "randomReadMem";
-  private static final String RANDOM_WRITE = "randomWrite";
-  private static final String SEQUENTIAL_READ = "sequentialRead";
-  private static final String SEQUENTIAL_WRITE = "sequentialWrite";
-  private static final String SCAN = "scan";
-  
-  private static final List<String> COMMANDS =
-    Arrays.asList(new String [] {RANDOM_READ,
-      RANDOM_SEEK_SCAN,
-      RANDOM_READ_MEM,
-      RANDOM_WRITE,
-      SEQUENTIAL_READ,
-      SEQUENTIAL_WRITE,
-      SCAN});
+
+  protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
   
   volatile HBaseConfiguration conf;
   private boolean miniCluster = false;
@@ -133,7 +125,7 @@
   public static final Pattern LINE_PATTERN =
     Pattern.compile("startRow=(\\d+),\\s+" +
     "perClientRunRows=(\\d+),\\s+totalRows=(\\d+),\\s+clients=(\\d+)");
-  
+
   /**
    * Enum for map metrics.  Keep it out here rather than inside in the Map
    * inner-class so we can find associated properties.
@@ -151,6 +143,19 @@
    */
   public PerformanceEvaluation(final HBaseConfiguration c) {
     this.conf = c;
+
+    addCommandDescriptor(RandomReadTest.class, "randomRead", "Run random read test");
+    addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan", "Run random seek and
scan 100 test");
+    addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
+    addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read
test");
+    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write
test");
+    addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
+    addCommandDescriptor(FilteredScanTest.class, "filterScan", "Run scan test using a filter
to find a specific row based on it's value");
+  }
+
+  protected void addCommandDescriptor(Class<? extends Test> cmdClass, String name,
String description) {
+    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
+    commands.put(name, cmdDescriptor);
   }
   
   /**
@@ -347,13 +352,18 @@
 
     /** configuration parameter name that contains the command */
     public final static String CMD_KEY = "EvaluationMapTask.command";
-    private String cmd;
+    private Class<? extends Test> cmd;
     private PerformanceEvaluation pe;
     
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
-    	this.cmd = context.getConfiguration().get(CMD_KEY);
-    	this.pe = new PerformanceEvaluation(new HBaseConfiguration(context.getConfiguration()));
+      String cmdClassName = context.getConfiguration().get(CMD_KEY);
+      try {
+        this.cmd = Class.forName(cmdClassName).asSubclass(Test.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("Could not find class for name: " + cmdClassName,
e);
+      }
+      this.pe = new PerformanceEvaluation(new HBaseConfiguration(context.getConfiguration()));
     }
     
     protected void map(NullWritable key, PeInputSplit value, final Context context) 
@@ -384,21 +394,26 @@
    * @throws IOException
    */
   private boolean checkTable(HBaseAdmin admin) throws IOException {
-    boolean tableExists = admin.tableExists(TABLE_DESCRIPTOR.getName());
+    HTableDescriptor tableDescriptor = getTableDescriptor();
+    boolean tableExists = admin.tableExists(tableDescriptor.getName());
     if (!tableExists) {
-      admin.createTable(TABLE_DESCRIPTOR);
-      LOG.info("Table " + TABLE_DESCRIPTOR + " created");
+      admin.createTable(tableDescriptor);
+      LOG.info("Table " + tableDescriptor + " created");
     }
     return !tableExists;
   }
- 
+
+  protected HTableDescriptor getTableDescriptor() {
+    return TABLE_DESCRIPTOR;
+  }
+
   /*
    * We're to run multiple clients concurrently.  Setup a mapreduce job.  Run
    * one map per client.  Then run a single reduce to sum the elapsed times.
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void runNIsMoreThanOne(final String cmd)
+  private void runNIsMoreThanOne(final Class<? extends Test> cmd)
   throws IOException, InterruptedException, ClassNotFoundException {
     checkTable(new HBaseAdmin(conf));
     if (this.nomapred) {
@@ -414,7 +429,7 @@
    * @throws IOException
    */
   @SuppressWarnings("unused")
-  private void doMultipleClients(final String cmd) throws IOException {
+  private void doMultipleClients(final Class<? extends Test> cmd) throws IOException
{
     final List<Thread> threads = new ArrayList<Thread>(this.N);
     final int perClientRows = R/N;
     for (int i = 0; i < this.N; i++) {
@@ -462,10 +477,10 @@
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void doMapReduce(final String cmd) throws IOException, 
+  private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
   			InterruptedException, ClassNotFoundException {
     Path inputDir = writeInputFile(this.conf);
-    this.conf.set(EvaluationMapTask.CMD_KEY, cmd);
+    this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
     Job job = new Job(this.conf);
     job.setJarByClass(PerformanceEvaluation.class);
     job.setJobName("HBase Performance Evaluation");
@@ -527,6 +542,33 @@
     return subdir;
   }
 
+  /**
+   * Describes a command.
+   */
+  static class CmdDescriptor {
+    private Class<? extends Test> cmdClass;
+    private String name;
+    private String description;
+
+    CmdDescriptor(Class<? extends Test> cmdClass, String name, String description)
{
+      this.cmdClass = cmdClass;
+      this.name = name;
+      this.description = description;
+    }
+
+    public Class<? extends Test> getCmdClass() {
+      return cmdClass;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+  }
+
   /*
    * A test.
    * Subclass to particularize what happens per row.
@@ -537,17 +579,23 @@
     protected final int perClientRunRows;
     protected final int totalRows;
     private final Status status;
+    protected byte[] tableName;
     protected HBaseAdmin admin;
     protected HTable table;
     protected volatile HBaseConfiguration conf;
-    
+
+    /**
+     * Note that all subclasses of this class must provide a public contructor
+     * that has the exact same list of arguments.
+     */
     Test(final HBaseConfiguration conf, final int startRow,
-        final int perClientRunRows, final int totalRows, final Status status) {
+         final int perClientRunRows, final int totalRows, final Status status, byte[] tableName)
{
       super();
       this.startRow = startRow;
       this.perClientRunRows = perClientRunRows;
       this.totalRows = totalRows;
       this.status = status;
+      this.tableName = tableName;
       this.table = null;
       this.conf = conf;
     }
@@ -563,7 +611,7 @@
     
     void testSetup() throws IOException {
       this.admin = new HBaseAdmin(conf);
-      this.table = new HTable(conf, TABLE_DESCRIPTOR.getName());
+      this.table = new HTable(conf, tableName);
       this.table.setAutoFlush(false);
       this.table.setWriteBufferSize(1024*1024*12);
       this.table.setScannerCaching(30);
@@ -583,37 +631,40 @@
       testSetup();
       long startTime = System.currentTimeMillis();
       try {
-        int lastRow = this.startRow + this.perClientRunRows;
-        // Report on completion of 1/10th of total.
-        for (int i = this.startRow; i < lastRow; i++) {
-          testRow(i);
-          if (status != null && i > 0 && (i % getReportingPeriod()) ==
0) {
-            status.setStatus(generateStatus(this.startRow, i, lastRow));
-          }
-        }
+        testTimed();
         elapsedTime = System.currentTimeMillis() - startTime;
       } finally {
         testTakedown();
       }
       return elapsedTime;
     }
-    
-    /*
-     * Test for individual row.
-     * @param i Row index.
+
+    /**
+     * Provides an extension point for tests that don't want a per row invocation.
      */
-    abstract void testRow(final int i) throws IOException;
-    
+    void testTimed() throws IOException {
+      int lastRow = this.startRow + this.perClientRunRows;
+      // Report on completion of 1/10th of total.
+      for (int i = this.startRow; i < lastRow; i++) {
+        testRow(i);
+        if (status != null && i > 0 && (i % getReportingPeriod()) == 0)
{
+          status.setStatus(generateStatus(this.startRow, i, lastRow));
+        }
+      }
+    }
+
     /*
-     * @return Test name.
-     */
-    abstract String getTestName();
+    * Test for individual row.
+    * @param i Row index.
+    */
+    void testRow(final int i) throws IOException {
+    }
   }
 
-  class RandomSeekScanTest extends Test {
+  static class RandomSeekScanTest extends Test {
     RandomSeekScanTest(final HBaseConfiguration conf, final int startRow,
-        final int perClientRunRows, final int totalRows, final Status status) {
-      super(conf, startRow, perClientRunRows, totalRows, status);
+                       final int perClientRunRows, final int totalRows, final Status status,
byte[] tableName) {
+      super(conf, startRow, perClientRunRows, totalRows, status, tableName);
     }
     
     @Override
@@ -635,16 +686,12 @@
       return period == 0? this.perClientRunRows: period;
     }
 
-    @Override
-    String getTestName() {
-      return "randomSeekScanTest";
-    }
   }
 
-  class RandomReadTest extends Test {
-    RandomReadTest(final HBaseConfiguration conf, final int startRow,
-        final int perClientRunRows, final int totalRows, final Status status) {
-      super(conf, startRow, perClientRunRows, totalRows, status);
+  static class RandomReadTest extends Test {
+    public RandomReadTest(final HBaseConfiguration conf, final int startRow,
+                   final int perClientRunRows, final int totalRows, final Status status,
byte[] tableName) {
+      super(conf, startRow, perClientRunRows, totalRows, status, tableName);
     }
     
     @Override
@@ -660,38 +707,30 @@
       return period == 0? this.perClientRunRows: period;
     }
 
-    @Override
-    String getTestName() {
-      return "randomRead";
-    }
   }
   
-  class RandomWriteTest extends Test {
+  static class RandomWriteTest extends Test {
     RandomWriteTest(final HBaseConfiguration conf, final int startRow,
-        final int perClientRunRows, final int totalRows, final Status status) {
-      super(conf, startRow, perClientRunRows, totalRows, status);
+        final int perClientRunRows, final int totalRows, final Status status, byte[] tableName)
{
+      super(conf, startRow, perClientRunRows, totalRows, status, tableName);
     }
     
     @Override
     void testRow(final int i) throws IOException {
       byte [] row = getRandomRow(this.rand, this.totalRows);
       Put put = new Put(row);
-      put.add(FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
+      byte[] value = generateValue(this.rand);
+      put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       table.put(put);
     }
-
-    @Override
-    String getTestName() {
-      return "randomWrite";
-    }
   }
   
-  class ScanTest extends Test {
+  static class ScanTest extends Test {
     private ResultScanner testScanner;
     
     ScanTest(final HBaseConfiguration conf, final int startRow,
-        final int perClientRunRows, final int totalRows, final Status status) {
-      super(conf, startRow, perClientRunRows, totalRows, status);
+        final int perClientRunRows, final int totalRows, final Status status, byte[] tableName)
{
+      super(conf, startRow, perClientRunRows, totalRows, status, tableName);
     }
     
     @Override
@@ -716,16 +755,12 @@
       testScanner.next();
     }
 
-    @Override
-    String getTestName() {
-      return "scan";
-    }
   }
   
-  class SequentialReadTest extends Test {
+  static class SequentialReadTest extends Test {
     SequentialReadTest(final HBaseConfiguration conf, final int startRow,
-        final int perClientRunRows, final int totalRows, final Status status) {
-      super(conf, startRow, perClientRunRows, totalRows, status);
+        final int perClientRunRows, final int totalRows, final Status status, byte[] tableName)
{
+      super(conf, startRow, perClientRunRows, totalRows, status, tableName);
     }
     
     @Override
@@ -735,28 +770,61 @@
       table.get(get);
     }
 
-    @Override
-    String getTestName() {
-      return "sequentialRead";
-    }
   }
   
-  class SequentialWriteTest extends Test {
+  static class SequentialWriteTest extends Test {
     SequentialWriteTest(final HBaseConfiguration conf, final int startRow,
-        final int perClientRunRows, final int totalRows, final Status status) {
-      super(conf, startRow, perClientRunRows, totalRows, status);
+        final int perClientRunRows, final int totalRows, final Status status, byte[] tableName)
{
+      super(conf, startRow, perClientRunRows, totalRows, status, tableName);
     }
     
     @Override
     void testRow(final int i) throws IOException {
       Put put = new Put(format(i));
-      put.add(FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
+      byte[] value = generateValue(this.rand);
+      put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       table.put(put);
     }
 
+  }
+
+  static class FilteredScanTest extends Test {
+    protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
+
+    private static final int COUNT = 20;
+
+    FilteredScanTest(final HBaseConfiguration conf, final int startRow,
+                     final int perClientRunRows, final int totalRows,
+                     final Status status, byte[] tableName) {
+      super(conf, startRow, perClientRunRows, totalRows, status, tableName);
+    }
+
     @Override
-    String getTestName() {
-      return "sequentialWrite";
+    void testTimed() throws IOException {
+      for (int i = 0; i < COUNT; i++) {
+        byte[] value = generateValue(this.rand);
+        Scan scan = constructScan(value);
+        ResultScanner scanner = null;
+        try {
+          scanner = this.table.getScanner(scan);
+          while (scanner.next() != null) {
+          }
+        } finally {
+          LOG.info("Completed scan " + i + " of " + COUNT);
+          if (scanner != null) scanner.close();
+        }
+      }
+    }
+
+    protected Scan constructScan(byte[] valuePrefix) throws IOException {
+      Filter filter = new SingleColumnValueFilter(
+          FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
+          new BinaryComparator(valuePrefix)
+      );
+      Scan scan = new Scan();
+      scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      scan.setFilter(filter);
+      return scan;
     }
   }
   
@@ -792,47 +860,40 @@
     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
   }
   
-  long runOneClient(final String cmd, final int startRow,
+  long runOneClient(final Class<? extends Test> cmd, final int startRow,
     final int perClientRunRows, final int totalRows, final Status status)
   throws IOException {
     status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
       perClientRunRows + " rows");
     long totalElapsedTime = 0;
-    if (cmd.equals(RANDOM_READ)) {
-      Test t = new RandomReadTest(this.conf, startRow, perClientRunRows,
-        totalRows, status);
-      totalElapsedTime = t.test();
-    } else if (cmd.equals(RANDOM_READ_MEM)) {
-      throw new UnsupportedOperationException("Not yet implemented");
-    } else if (cmd.equals(RANDOM_WRITE)) {
-      Test t = new RandomWriteTest(this.conf, startRow, perClientRunRows,
-        totalRows, status);
-      totalElapsedTime = t.test();
-    } else if (cmd.equals(SCAN)) {
-      Test t = new ScanTest(this.conf, startRow, perClientRunRows,
-        totalRows, status);
-      totalElapsedTime = t.test();
-    } else if (cmd.equals(SEQUENTIAL_READ)) {
-      Test t = new SequentialReadTest(this.conf, startRow, perClientRunRows,
-        totalRows, status);
-      totalElapsedTime = t.test();
-    } else if (cmd.equals(SEQUENTIAL_WRITE)) {
-      Test t = new SequentialWriteTest(this.conf, startRow, perClientRunRows,
-        totalRows, status);
-      totalElapsedTime = t.test();
-    } else if (cmd.equals(RANDOM_SEEK_SCAN)) {
-      Test t = new RandomSeekScanTest(this.conf, startRow, perClientRunRows,
-          totalRows, status);
-        totalElapsedTime = t.test();
-    } else {
-      throw new IllegalArgumentException("Invalid command value: " + cmd);
+
+    Test t = null;
+    try {
+      Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
+          HBaseConfiguration.class,
+          int.class,
+          int.class,
+          int.class,
+          Status.class,
+          byte[].class
+      );
+      t = constructor.newInstance(this.conf, startRow, perClientRunRows,
+          totalRows, status, getTableDescriptor().getName());
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("Invalid command class: " +
+          cmd.getName() + ".  It does not provide a constructor as described by" +
+          "the javadoc comment.  Available consctructors are: " + Arrays.toString(cmd.getConstructors()));
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to construct command class", e);
     }
+    totalElapsedTime = t.test();
+
     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
       "ms at offset " + startRow + " for " + perClientRunRows + " rows");
     return totalElapsedTime;
   }
   
-  private void runNIsOne(final String cmd) {
+  private void runNIsOne(final Class<? extends Test> cmd) {
     Status status = new Status() {
       public void setStatus(String msg) throws IOException {
         LOG.info(msg);
@@ -849,22 +910,21 @@
     } 
   }
 
-  private void runTest(final String cmd) throws IOException, 
+  private void runTest(final Class<? extends Test> cmd) throws IOException,
   				InterruptedException, ClassNotFoundException {
-    if (cmd.equals(RANDOM_READ_MEM)) {
-      // For this one test, so all fits in memory, make R smaller (See
-      // pg. 9 of BigTable paper).
-      R = (this.R / 10) * N;
-    }
-    
     MiniHBaseCluster hbaseMiniCluster = null;
     MiniDFSCluster dfsCluster = null;
+    MiniZooKeeperCluster zooKeeperCluster = null;
     if (this.miniCluster) {
       dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
+      zooKeeperCluster = new MiniZooKeeperCluster();
+      int zooKeeperPort = zooKeeperCluster.startup(new File(System.getProperty("java.io.tmpdir")));
+      
       // mangle the conf so that the fs parameter points to the minidfs we
       // just started up
       FileSystem fs = dfsCluster.getFileSystem();
-      conf.set("fs.default.name", fs.getUri().toString());      
+      conf.set("fs.default.name", fs.getUri().toString());
+      conf.set("hbase.zookeeper.property.clientPort", Integer.toString(zooKeeperPort));
       Path parentdir = fs.getHomeDirectory();
       conf.set(HConstants.HBASE_DIR, parentdir.toString());
       fs.mkdirs(parentdir);
@@ -882,24 +942,24 @@
         runNIsMoreThanOne(cmd);
       }
     } finally {
-      if(this.miniCluster && hbaseMiniCluster != null) {
-        hbaseMiniCluster.shutdown();
+      if(this.miniCluster) {
+        if (hbaseMiniCluster != null) hbaseMiniCluster.shutdown();
+        if (zooKeeperCluster != null) zooKeeperCluster.shutdown();
         HBaseTestCase.shutdownDfs(dfsCluster);
       }
     }
   }
   
-  private void printUsage() {
+  protected void printUsage() {
     printUsage(null);
   }
   
-  private void printUsage(final String message) {
+  protected void printUsage(final String message) {
     if (message != null && message.length() > 0) {
       System.err.println(message);
     }
-    System.err.println("Usage: java " + this.getClass().getName() +
-        " [--miniCluster]");
-    System.err.println("  [--nomapred] [--rows=ROWS] <command> <nclients>");
+    System.err.println("Usage: java " + this.getClass().getName() + " \\");
+    System.err.println("  [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" miniCluster     Run the test on an HBaseMiniCluster");
@@ -908,14 +968,9 @@
     System.err.println(" rows            Rows each client runs. Default: One million");
     System.err.println();
     System.err.println("Command:");
-    System.err.println(" randomRead      Run random read test");
-    System.err.println(" randomReadMem   Run random read test where table " +
-      "is in memory");
-    System.err.println(" randomSeekScan  Run random seek and scan 100 test");
-    System.err.println(" randomWrite     Run random write test");
-    System.err.println(" sequentialRead  Run sequential read test");
-    System.err.println(" sequentialWrite Run sequential write test");
-    System.err.println(" scan            Run scan test");
+    for (CmdDescriptor command : commands.values()) {
+      System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
+    }
     System.err.println();
     System.err.println("Args:");
     System.err.println(" nclients        Integer. Required. Total number of " +
@@ -923,8 +978,8 @@
     System.err.println("                 running: 1 <= value <= 500");
     System.err.println("Examples:");
     System.err.println(" To run a single evaluation client:");
-    System.err.println(" $ bin/hbase " +
-      "org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 1");
+    System.err.println(" $ bin/hbase " + this.getClass().getName()
+        + " sequentialWrite 1");
   }
 
   private void getArgs(final int start, final String[] args) {
@@ -939,7 +994,7 @@
     this.R = this.R * N;
   }
   
-  private int doCommandLine(final String[] args) {
+  public int doCommandLine(final String[] args) {
     // Process command-line args. TODO: Better cmd-line processing
     // (but hopefully something not as painful as cli options).    
     int errCode = -1;
@@ -974,10 +1029,11 @@
           this.R = Integer.parseInt(cmd.substring(rows.length()));
           continue;
         }
-       
-        if (COMMANDS.contains(cmd)) {
+
+        Class<? extends Test> cmdClass = determineCommandClass(cmd);
+        if (cmdClass != null) {
           getArgs(i + 1, args);
-          runTest(cmd);
+          runTest(cmdClass);
           errCode = 0;
           break;
         }
@@ -991,7 +1047,12 @@
     
     return errCode;
   }
-  
+
+  private Class<? extends Test> determineCommandClass(String cmd) {
+    CmdDescriptor descriptor = commands.get(cmd);
+    return descriptor != null ? descriptor.getCmdClass() : null;
+  }
+
   /**
    * @param args
    */



Mime
View raw message