hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1352764 - in /hbase/trunk: hbase-server/pom.xml hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java pom.xml
Date Fri, 22 Jun 2012 03:09:38 GMT
Author: tedyu
Date: Fri Jun 22 03:09:37 2012
New Revision: 1352764

URL: http://svn.apache.org/viewvc?rev=1352764&view=rev
Log:
HBASE-5539 asynchbase PerformanceEvaluation (Benoit)


Modified:
    hbase/trunk/hbase-server/pom.xml
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
    hbase/trunk/pom.xml

Modified: hbase/trunk/hbase-server/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/pom.xml?rev=1352764&r1=1352763&r2=1352764&view=diff
==============================================================================
--- hbase/trunk/hbase-server/pom.xml (original)
+++ hbase/trunk/hbase-server/pom.xml Fri Jun 22 03:09:37 2012
@@ -290,6 +290,11 @@
     </dependency>
     <!-- General dependencies -->
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+      <version>3.5.0.Final-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>com.yammer.metrics</groupId>
       <artifactId>metrics-core</artifactId>
     </dependency>
@@ -450,6 +455,34 @@
       <groupId>stax</groupId>
       <artifactId>stax-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.hbase</groupId>
+      <artifactId>asynchbase</artifactId>
+      <version>[1.3.1,)</version>
+      <!--
+        This is needed otherwise Maven complains because asynchbase depends on SLF4J 1.6:
+        "The requested version 1.5.8 by your slf4j binding is not compatible with [1.6]"
+        See http://stackoverflow.com/questions/5477942/slf4j-version-problem-while-building-through-the-maven
+        Note that we can't do what Ceki suggests here, because v1.6 removed some interface
+        that the Hadoop jar calls into, so we have to stick to the 1.5 version they pull.
+      -->
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <!-- We also have to exclude the other slf4j libraries pulled by asynchbase. 
-->
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>jcl-over-slf4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <profiles>
     <!-- Skip the tests in this module -->

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1352764&r1=1352763&r2=1352764&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Fri Jun 22 03:09:37 2012
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.lang.reflect.Constructor;
@@ -79,6 +81,14 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.LineReader;
 
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.hbase.async.GetRequest;
+import org.hbase.async.HBaseClient;
+import org.hbase.async.PleaseThrottleException;
+import org.hbase.async.PutRequest;
+import org.hbase.async.Scanner;
+
 /**
  * Script used evaluating HBase performance and scalability.  Runs a HBase
  * client that steps through one of a set of hardcoded tests or 'experiments'
@@ -155,6 +165,8 @@ public class PerformanceEvaluation {
 
     addCommandDescriptor(RandomReadTest.class, "randomRead",
         "Run random read test");
+    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
+        "Run random read test with asynchbase");
     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
         "Run random seek and scan 100 test");
     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
@@ -167,12 +179,20 @@ public class PerformanceEvaluation {
         "Run random seek scan with both start and stop row (max 10000 rows)");
     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
         "Run random write test");
+    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
+        "Run random write test with asynchbase");
     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
         "Run sequential read test");
+    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
+        "Run sequential read test with asynchbase");
     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
         "Run sequential write test");
+    addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
+        "Run sequential write test with asynchbase");
     addCommandDescriptor(ScanTest.class, "scan",
         "Run scan test (read every row)");
+    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
+        "Run scan test with asynchbase (read every row)");
     addCommandDescriptor(FilteredScanTest.class, "filterScan",
         "Run scan test using a filter to find a specific row based on it's value (make sure
to use --rows=20)");
   }
@@ -536,14 +556,16 @@ public class PerformanceEvaluation {
    */
   private void doMultipleClients(final Class<? extends Test> cmd) throws IOException
{
     final List<Thread> threads = new ArrayList<Thread>(this.N);
+    final long[] timings = new long[this.N];
     final int perClientRows = R/N;
     for (int i = 0; i < this.N; i++) {
-      Thread t = new Thread (Integer.toString(i)) {
+      final int index = i;
+      Thread t = new Thread ("TestClient-" + i) {
         @Override
         public void run() {
           super.run();
           PerformanceEvaluation pe = new PerformanceEvaluation(conf);
-          int index = Integer.parseInt(getName());
+          pe.N = N;
           try {
             long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
                perClientRows, R,
@@ -552,6 +574,7 @@ public class PerformanceEvaluation {
                     LOG.info("client-" + getName() + " " + msg);
                   }
                 });
+            timings[index] = elapsedTime;
             LOG.info("Finished " + getName() + " in " + elapsedTime +
               "ms writing " + perClientRows + " rows");
           } catch (IOException e) {
@@ -573,6 +596,18 @@ public class PerformanceEvaluation {
         }
       }
     }
+    final String test = cmd.getSimpleName();
+    LOG.info("[" + test + "] Summary of timings (ms): "
+             + Arrays.toString(timings));
+    Arrays.sort(timings);
+    long total = 0;
+    for (int i = 0; i < this.N; i++) {
+      total += timings[i];
+    }
+    LOG.info("[" + test + "]"
+             + "\tMin: " + timings[0] + "ms"
+             + "\tMax: " + timings[this.N - 1] + "ms"
+             + "\tAvg: " + (total / this.N) + "ms");
   }
 
   /*
@@ -692,6 +727,7 @@ public class PerformanceEvaluation {
     private int startRow;
     private int perClientRunRows;
     private int totalRows;
+    private int numClientThreads;
     private byte[] tableName;
     private boolean flushCommits;
     private boolean writeToWAL = true;
@@ -699,10 +735,13 @@ public class PerformanceEvaluation {
     TestOptions() {
     }
 
-    TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean
flushCommits, boolean writeToWAL) {
+    TestOptions(int startRow, int perClientRunRows, int totalRows,
+                int numClientThreads, byte[] tableName,
+                boolean flushCommits, boolean writeToWAL) {
       this.startRow = startRow;
       this.perClientRunRows = perClientRunRows;
       this.totalRows = totalRows;
+      this.numClientThreads = numClientThreads;
       this.tableName = tableName;
       this.flushCommits = flushCommits;
       this.writeToWAL = writeToWAL;
@@ -720,6 +759,10 @@ public class PerformanceEvaluation {
       return totalRows;
     }
 
+    public int getNumClientThreads() {
+      return numClientThreads;
+    }
+
     public byte[] getTableName() {
       return tableName;
     }
@@ -752,7 +795,6 @@ public class PerformanceEvaluation {
     protected final int totalRows;
     private final Status status;
     protected byte[] tableName;
-    protected HBaseAdmin admin;
     protected HTable table;
     protected volatile Configuration conf;
     protected boolean flushCommits;
@@ -785,13 +827,12 @@ public class PerformanceEvaluation {
     }
 
     void testSetup() throws IOException {
-      this.admin = new HBaseAdmin(conf);
       this.table = new HTable(conf, tableName);
       this.table.setAutoFlush(false);
       this.table.setScannerCaching(30);
     }
 
-    void testTakedown()  throws IOException {
+    void testTakedown() throws IOException {
       if (flushCommits) {
         this.table.flushCommits();
       }
@@ -804,16 +845,15 @@ public class PerformanceEvaluation {
      * @throws IOException
      */
     long test() throws IOException {
-      long elapsedTime;
       testSetup();
-      long startTime = System.currentTimeMillis();
+      LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
+      final long startTime = System.nanoTime();
       try {
         testTimed();
-        elapsedTime = System.currentTimeMillis() - startTime;
       } finally {
         testTakedown();
       }
-      return elapsedTime;
+      return (System.nanoTime() - startTime) / 1000000;
     }
 
     /**
@@ -834,7 +874,178 @@ public class PerformanceEvaluation {
     * Test for individual row.
     * @param i Row index.
     */
-    void testRow(final int i) throws IOException {
+    abstract void testRow(final int i) throws IOException;
+  }
+
+  static abstract class AsyncTest extends Test {
+    /** Maximum number of RPCs we're allowed in flight at a time.  */
+    private static final int MAX_OUTSTANDING_RPCS = 200000;  // Sized for 2G heap.
+
+    private static HBaseClient client;  // Only one client regardless of number of threads.
+
+    AsyncTest(final Configuration conf, final TestOptions options, final Status status) {
+      super(null, options, status);
+      final String zkquorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+      final String znode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                                    HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+      synchronized (AsyncTest.class) {
+        if (client == null) {
+          client = new HBaseClient(zkquorum, znode);
+          // Sanity check.
+          try {
+            client.ensureTableFamilyExists(TABLE_NAME, FAMILY_NAME).joinUninterruptibly();
+          } catch (Exception e) {
+            throw new RuntimeException("Missing test table/family?", e);
+          }
+        }
+      }
+      latch = new CountDownLatch(super.perClientRunRows);
+      final int maxrpcs = MAX_OUTSTANDING_RPCS / options.getNumClientThreads();
+      sem = new Semaphore(Math.max(100, maxrpcs));
+    }
+
+    /**
+     * If true, make sure that every read returns a valid-looking KeyValue.
+     */
+    private static final boolean CHECK_READS = false;
+
+    /** Checks that the row retrieved from HBase looks valid.  */
+    protected static void check(final ArrayList<org.hbase.async.KeyValue> row) throws
IOException {
+      if (!CHECK_READS) {
+        return;
+      }
+      if (row.size() != 1) {
+        throw new IOException((row.isEmpty() ? "No" : "Multiple (" + row.size() + ')')
+                              + " KeyValue found in row");
+      } else if (row.get(0).value().length != ROW_LENGTH) {
+        throw new IOException("Invalid value length (found: " + row.get(0).value().length
+                              + ", expected: " + ROW_LENGTH + ") in row \""
+                              + new String(row.get(0).key()) + '"');
+      }
+    }
+
+    private Exception error = null;  // Last exception caught asynchronously.
+    private volatile boolean failed = false;  // True if we caught an exception asynchronously.
+    /** Used by sub-classes to handle asynchronous exceptions.  */
+    protected final Callback<Exception, Exception> errback = new Callback<Exception,
Exception>() {
+      public Exception call(final Exception e) throws Exception {
+        rpcCompleted();
+        if (e instanceof PleaseThrottleException) {
+          LOG.warn("Throttling thread " + Thread.currentThread().getName()
+                   + ", HBase isn't keeping up", e);
+          final int permits = sem.drainPermits();  // Prevent creation of further RPCs.
+          ((PleaseThrottleException) e).getDeferred().addBoth(new Callback<Object, Object>()
{
+            public Object call(final Object arg) {
+              sem.release(permits);
+              LOG.warn("Done throttling thread " + Thread.currentThread().getName());
+              return arg;
+            }
+            public String toString() {
+              return "error recovery after " + e;
+            }
+          });
+          return null;
+        }
+        error = e;
+        failed = true;  // Volatile-write.
+        LOG.error(this + " caught an exception", e);
+        return e;
+      }
+
+      private final String toString = "errback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
+      public String toString() {
+        return toString;
+      }
+    };
+
+    /**
+     * Latch to guarantee we have gotten a response for every single RPC sent.
+     * This latch is initialized up with the number of RPCs we intend to send.
+     * Every time an RPC completes successfully, we decrement its count down
+     * by one.  This way we guarantee that all RPCs have completed and their
+     * responses have been handled within the section of the code we're
+     * timing.
+     */
+    private final CountDownLatch latch;
+
+    /**
+     * Semaphore to control the number of outstanding RPCs.
+     * Because the producer code is synchronous and asynchbase is
+     * non-blocking, the tests will try to create and send all RPCs at once,
+     * thus running the app out of memory.  In order to limit the number of
+     * RPCs in flight at the same time, we acquire a permit from this
+     * semaphore each time we access the client to send an RPC, and we release
+     * the permit each time the RPC completes.
+     */
+    private final Semaphore sem;
+
+    /** Records the completion of an RPC.  */
+    protected final void rpcCompleted() {
+      sem.release();
+      latch.countDown();
+    }
+
+    /** Callback used on successful read RPCs.  */
+    protected final Callback<Object, ArrayList<org.hbase.async.KeyValue>> readCallback
= new Callback<Object, ArrayList<org.hbase.async.KeyValue>>() {
+      public Object call(final ArrayList<org.hbase.async.KeyValue> row) throws IOException
{
+        rpcCompleted();
+        check(row);
+        return row;
+      }
+
+      private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
+      public String toString() {
+        return toString;
+      }
+    };
+
+    /** Callback used on other successful RPCs.  */
+    protected final Callback<Object, Object> callback = new Callback<Object, Object>()
{
+      public Object call(final Object arg) {
+        rpcCompleted();
+        return arg;
+      }
+
+      private final String toString = "callback for " + AsyncTest.this + " in " + Thread.currentThread().getName();
+      public String toString() {
+        return toString;
+      }
+    };
+
+    @Override
+    final void testSetup() {
+      // Nothing.
+    }
+
+    @Override
+    final void testTakedown() throws IOException {
+      try {
+        // For tests with few writes, asking for a flush before waiting on the
+        // latch tells asynchbase to start flushing writes instead of waiting
+        // until the timer flushes them.
+        client.flush().join();
+        latch.await();  // Make sure the last RPC completed.
+        if (failed) {  // Volatile-read
+          throw error;
+        }
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (IOException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new IOException("Uncaught exception from flush()", e);
+      }
+    }
+
+    /** Returns the client to use to send an RPC.  Call once per RPC.  */
+    protected final HBaseClient client() {
+      try {
+        sem.acquire();
+      } catch (InterruptedException e) {
+        LOG.error("Shouldn't happen!", e);
+        return null;
+      }
+      return client;
     }
   }
 
@@ -970,6 +1181,27 @@ public class PerformanceEvaluation {
 
   }
 
+  static class AsyncRandomReadTest extends AsyncTest {
+    AsyncRandomReadTest(Configuration conf, TestOptions options, Status status) {
+      super(conf, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      final GetRequest get = new GetRequest(TABLE_NAME, getRandomRow(this.rand, this.totalRows));
+      get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
+
+      client().get(get).addCallback(readCallback).addErrback(errback);
+    }
+
+    @Override
+    protected int getReportingPeriod() {
+      int period = this.perClientRunRows / 100;
+      return period == 0 ? this.perClientRunRows : period;
+    }
+
+  }
+
   static class RandomWriteTest extends Test {
     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
@@ -986,6 +1218,22 @@ public class PerformanceEvaluation {
     }
   }
 
+  static class AsyncRandomWriteTest extends AsyncTest {
+    AsyncRandomWriteTest(Configuration conf, TestOptions options, Status status) {
+      super(conf, options, status);
+    }
+
+    @Override
+    void testRow(final int i) {
+      final PutRequest put = new PutRequest(TABLE_NAME, getRandomRow(this.rand, this.totalRows),
+                                            FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
+      put.setDurable(writeToWAL);
+      put.setBufferable(flushCommits);
+      client().put(put).addCallbacks(callback, errback);
+    }
+
+  }
+
   static class ScanTest extends Test {
     private ResultScanner testScanner;
 
@@ -994,11 +1242,6 @@ public class PerformanceEvaluation {
     }
 
     @Override
-    void testSetup() throws IOException {
-      super.testSetup();
-    }
-
-    @Override
     void testTakedown() throws IOException {
       if (this.testScanner != null) {
         this.testScanner.close();
@@ -1019,6 +1262,50 @@ public class PerformanceEvaluation {
 
   }
 
+  static class AsyncScanTest extends AsyncTest {
+    private final Scanner scanner;
+    private final Callback continueScan = new Callback<Object, ArrayList<ArrayList<org.hbase.async.KeyValue>>>()
{
+      public Object call(final ArrayList<ArrayList<org.hbase.async.KeyValue>>
rows) throws Exception {
+        if (rows != null) {
+          testTimed();
+          for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
+            int n = row.size();
+            while (n-- >= 0) {
+              rpcCompleted();
+            }
+          }
+          for (final ArrayList<org.hbase.async.KeyValue> row : rows) {
+            check(row);  // Do this separate as it might throw.
+          }
+        }  // else arg is null, we're done scanning.
+        return rows;
+      }
+      public String toString() {
+        return "continueScan on " + scanner;
+      }
+    };
+
+    AsyncScanTest(Configuration conf, TestOptions options, Status status) {
+      super(conf, options, status);
+      scanner = client().newScanner(TABLE_NAME);
+      scanner.setStartKey(format(this.startRow));
+      scanner.setFamily(FAMILY_NAME);
+      scanner.setQualifier(QUALIFIER_NAME);
+    }
+
+    @Override
+    void testTimed() {
+      scanner.nextRows()
+        .addCallback(continueScan)
+        .addCallbacks(callback, errback);
+    }
+
+    @Override
+    void testRow(final int i) {
+      // Unused because we completely redefined testTimed().
+    }
+  }
+
   static class SequentialReadTest extends Test {
     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
@@ -1033,6 +1320,20 @@ public class PerformanceEvaluation {
 
   }
 
+  static class AsyncSequentialReadTest extends AsyncTest {
+    AsyncSequentialReadTest(Configuration conf, TestOptions options, Status status) {
+      super(conf, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      final GetRequest get = new GetRequest(TABLE_NAME, format(i));
+      get.family(FAMILY_NAME).qualifier(QUALIFIER_NAME);
+      client().get(get).addCallback(readCallback).addErrback(errback);
+    }
+
+  }
+
   static class SequentialWriteTest extends Test {
     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
@@ -1049,6 +1350,22 @@ public class PerformanceEvaluation {
 
   }
 
+  static class AsyncSequentialWriteTest extends AsyncTest {
+    AsyncSequentialWriteTest(Configuration conf, TestOptions options, Status status) {
+      super(conf, options, status);
+    }
+
+    @Override
+    void testRow(final int i) {
+      final PutRequest put = new PutRequest(TABLE_NAME, format(i),
+                                            FAMILY_NAME, QUALIFIER_NAME, generateValue(this.rand));
+      put.setDurable(writeToWAL);
+      put.setBufferable(flushCommits);
+      client().put(put).addCallbacks(callback, errback);
+    }
+
+  }
+
   static class FilteredScanTest extends Test {
     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
 
@@ -1123,9 +1440,9 @@ public class PerformanceEvaluation {
       perClientRunRows + " rows");
     long totalElapsedTime = 0;
 
-    Test t = null;
     TestOptions options = new TestOptions(startRow, perClientRunRows,
-        totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL);
+        totalRows, N, TABLE_NAME, flushCommits, writeToWAL);
+    final Test t;
     try {
       Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
           Configuration.class, TestOptions.class, Status.class);

Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1352764&r1=1352763&r2=1352764&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Fri Jun 22 03:09:37 2012
@@ -256,6 +256,10 @@
   </developers>
   <repositories>
     <repository>
+      <id>cloudbees netty</id>
+      <url>http://repository-netty.forge.cloudbees.com/snapshot/</url>
+    </repository>
+    <repository>
       <id>apache release</id>
       <url>https://repository.apache.org/content/repositories/releases/</url>
     </repository>



Mime
View raw message