hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1406787 - in /hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks: Benchmark.java GetBenchmark.java
Date Wed, 07 Nov 2012 20:15:56 GMT
Author: liyin
Date: Wed Nov  7 20:15:56 2012
New Revision: 1406787

URL: http://svn.apache.org/viewvc?rev=1406787&view=rev
Log:
[HBASE-7068] Create a Get benchmark

Author: kranganathan

Summary: Initial Get benchmark with a variable number of client threads. Also added support
to bulk load files so that the reads will all come from block cache.

Test Plan: Tested by running benchmark.

Reviewers: kannan

Reviewed By: kannan

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D614229

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
Modified:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java?rev=1406787&r1=1406786&r2=1406787&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/Benchmark.java
Wed Nov  7 20:15:56 2012
@@ -1,22 +1,49 @@
 package org.apache.hadoop.hbase.benchmarks;
 
 import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.loadtest.ColumnFamilyProperties;
 import org.apache.hadoop.hbase.loadtest.HBaseUtils;
+import org.apache.hadoop.hbase.loadtest.RegionSplitter;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.LoadTestTool;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
+/**
+ * Abstract class to run a benchmark. Extend this class to implement a 
+ * benchmark. See GetBenchmark as an example.
+ */
 public abstract class Benchmark {
   public static final Log LOG = LogFactory.getLog(Benchmark.class);
+  // the zk to run against, defaults to localhost
   public static final String ARG_ZOOKEEPER = "--zookeeper";
+  // print help
+  public static final String ARG_HELP = "--help";
   // use local zookeeper by default
   public String zkNodeName = null;
   // cached config object
@@ -45,22 +72,50 @@ public abstract class Benchmark {
         zkNodeName = args[i+1];
         i++;
       }
+      else if (args[i].equals(ARG_HELP)) {
+        System.out.println("Usage: \n" +
+            "bin/hbase org.apache.hadoop.hbase.benchmarks.<CLASSNAME>" 
+            + " [--zookeeper zknode]\n");
+        System.exit(0);
+      }
     }
   }
   
-  public void initialize() {
-    conf = HBaseConfiguration.create();
+  /**
+   * Returns the zk nodename we are to run against
+   */
+  public String getZookeeperHostname() {
+    return zkNodeName;
+  }
+  
+  /**
+   * Creates a new instance of a conf object
+   */
+  public Configuration getNewConfObject() {
+    Configuration conf = HBaseConfiguration.create();
     if (zkNodeName != null) {
       conf.set("hbase.zookeeper.quorum", zkNodeName);
       conf.setInt("hbase.zookeeper.property.clientPort", 2181);
     }
+    return conf;
+  }
+  
+  /**
+   * Initialize this object.
+   */
+  public void initialize() {
+    conf = getNewConfObject();
   }
   
+  /**
+   * Print the results of the benchmark
+   */
   public void printBenchmarkResults() {
     System.out.println("Benchmark results");
     benchmarkResults.prettyPrint();
   }
 
+
   /**
    * Helper function to create a table and load the requested number of KVs 
    * into the table - if the table does not exist. If the table exists, then 
@@ -68,7 +123,23 @@ public abstract class Benchmark {
    * @throws IOException 
    */
   public HTable createTableAndLoadData(byte[] tableName, int kvSize, 
-      long numKVs, boolean flushData) throws IOException {
+      long numKVs, boolean bulkLoad) throws IOException {
+    return createTableAndLoadData(tableName, "cf1", kvSize, numKVs, bulkLoad);
+  }
+  
+  /**
+   * Helper to create a table and load data into it.
+   * 
+   * @param tableName table name to create and load
+   * @param cfNameStr cf to create
+   * @param kvSize size of kv's to write
+   * @param numKVs number of kv's to write into the table and cf
+   * @param bulkLoad if true, create HFile and load. Else do puts.
+   * @return HTable instance to the table just created
+   * @throws IOException
+   */
+  public HTable createTableAndLoadData(byte[] tableName, String cfNameStr, 
+      int kvSize, long numKVs, boolean bulkLoad) throws IOException {
     HTable htable = null;
     try {
       htable = new HTable(conf, tableName);
@@ -78,8 +149,9 @@ public abstract class Benchmark {
     
     if (htable != null) return htable;
 
+    // setup the column family properties
     ColumnFamilyProperties familyProperty = new ColumnFamilyProperties();
-    familyProperty.familyName = "cf1";
+    familyProperty.familyName = cfNameStr;
     familyProperty.minColsPerKey = 1;
     familyProperty.maxColsPerKey = 1;    
     familyProperty.minColDataSize = kvSize;
@@ -91,37 +163,76 @@ public abstract class Benchmark {
     familyProperties[0] = familyProperty;
     
     // create the table
+    LOG.info("Creating table " + new String(tableName));
     HBaseUtils.createTableIfNotExists(conf, tableName, familyProperties, 1);
-    // write data if the table was created
-    LOG.info("Loading data for the table");
-    String[] loadTestToolArgs = {
-      "-zk", "localhost", 
-      "-tn", new String(tableName),
-      "-cf", familyProperty.familyName,
-      "-write", "1:" + kvSize, 
-      "-num_keys", "" + numKVs, 
-      "-multiput",
-      "-compression", "NONE",
-    };
-    LoadTestTool.doMain(loadTestToolArgs);
-    LOG.info("Done loading data");
-    
-    if (flushData) {
-      LOG.info("Flush of data requested");
-      HBaseAdmin admin = new HBaseAdmin(conf);
-      admin.flush(tableName);
-      try {
-        Thread.sleep(2*1000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
+
+    if (bulkLoad) {
+      LOG.info("Bulk load of " + numKVs + " KVs of " + kvSize + 
+          " bytes requested.");
+      // get the regions to RS map for this table
+      htable = new HTable(conf, tableName);
+      NavigableMap<HRegionInfo, HServerAddress> regionsToRS = 
+        htable.getRegionsInfo();
+      // get the region and rs objects
+      HRegionInfo hRegionInfo = regionsToRS.firstKey();
+      HServerAddress hServerAddress = regionsToRS.get(hRegionInfo);
+      // get the regionserver
+      HConnection connection = HConnectionManager.getConnection(conf);
+      HRegionInterface regionServer = 
+        connection.getHRegionConnection(hServerAddress);
+
+      // create an hfile
+      LOG.info("Creating data files...");
+      String tableNameStr = new String(tableName);
+      FileSystem fs = FileSystem.get(conf);
+      Path hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
+      Path basedir = new Path(hbaseRootDir, tableNameStr);
+      Path hFile =  new Path(basedir, "hfile." + System.currentTimeMillis());
+      HFile.Writer writer =
+        HFile.getWriterFactoryNoCache(conf).withPath(fs, hFile).create();
+      byte [] family = 
+        hRegionInfo.getTableDesc().getFamilies().iterator().next().getName();
+      byte [] value = new byte[kvSize];
+      (new Random()).nextBytes(value);
+      for (long rowID = 0; rowID < numKVs; rowID++) {
+        // write a 20 byte fixed length key (Long.MAX_VALUE has 19 digits)
+        byte[] row = Bytes.toBytes(String.format("%20d", rowID));
+        writer.append(new KeyValue(row, family, Bytes.toBytes(rowID), 
+            System.currentTimeMillis(), value));
       }
-      LOG.info("Done flushing data");
+      writer.close();
+      LOG.info("Done creating data file [" + hFile.getName() + 
+          "], will bulk load data...");
+
+      // bulk load the file
+      regionServer.bulkLoadHFile(hFile.toString(), hRegionInfo.getRegionName(), 
+          Bytes.toBytes(cfNameStr), true);
+    } 
+    else {
+      // write data as puts
+      LOG.info("Loading data for the table");
+      String[] loadTestToolArgs = {
+        "-zk", "localhost", 
+        "-tn", new String(tableName),
+        "-cf", familyProperty.familyName,
+        "-write", "1:" + kvSize, 
+        "-num_keys", "" + numKVs, 
+        "-multiput",
+        "-compression", "NONE",
+      };
+      LoadTestTool.doMain(loadTestToolArgs);
+      LOG.info("Done loading data");
     }
     
+    // make sure we can open the table just created and loaded
     htable = new HTable(conf, tableName);
     return htable;
   }
   
+  /**
+   * Sets up the environment for the benchmark. All extending class should call 
+   * this method from their main() method.
+   */
   public static void benchmarkRunner(
       Class<? extends Benchmark> benchmarkClass, String[] args) 
   throws Throwable {

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java?rev=1406787&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/benchmarks/GetBenchmark.java
Wed Nov  7 20:15:56 2012
@@ -0,0 +1,188 @@
+package org.apache.hadoop.hbase.benchmarks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.ipc.HBaseClient;
+import org.apache.hadoop.hbase.ipc.HBaseRPC;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class GetBenchmark extends Benchmark {
+  public static final Log LOG = LogFactory.getLog(GetBenchmark.class);
+  static byte[] tableName = Bytes.toBytes("bench.GetFromMemory");
+  static String cfName = "cf1";
+  private static Integer[] CLIENT_THREADS = { 40, 50, 60, 70, 80, 90, 100 };
+  private static Integer[] NUM_CONNECTIONS = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+  public static Configuration[] connCountConfs = new Configuration[100];
+  public static int kvSize = 50;
+  public static int numKVs = 1000000;
+  
+  /**
+   * Initialize the benchmark results tracking and output.
+   */
+  public void initBenchmarkResults() {
+    List<String> header = new ArrayList<String>();
+    header.add("Threads");
+    for (int i = 0; i < NUM_CONNECTIONS.length; i++) {
+      header.add("   conn=" +  NUM_CONNECTIONS[i]);
+    }
+    benchmarkResults = new BenchmarkResults<Integer, Integer, Double>(
+        CLIENT_THREADS, NUM_CONNECTIONS, "  %5d", "   %5.2f", header);
+  }
+  
+  public void runBenchmark() throws Throwable {
+    // populate the table, bulk load it
+    createTableAndLoadData(tableName, cfName, kvSize, numKVs, true);
+    // warm block cache, force jit compilation
+    System.out.println("Warming blockcache and forcing JIT compilation...");
+    runExperiment("warmup-", false, 1, 100*1000, 1);
+    // iterate on the number of connections
+    for (int numConnections : NUM_CONNECTIONS) {
+      LOG.info("Num connection = " + numConnections);
+      // stop all connections so that we respect num-connections param
+      HBaseRPC.stopClients();
+      // vary for a number of client threads
+      for (int numThreads : CLIENT_THREADS) {
+        if (numConnections > numThreads) continue;
+        try {
+          // read enough KVs to benchmark within a reasonable time
+          long numKVsToRead = 40*1000;
+          if (numThreads >= 5) numKVsToRead = 20*1000;
+          if (numThreads >= 40) numKVsToRead = 10*1000;
+          // run the experiment
+          runExperiment("t" + numThreads + "-", true, 
+              numThreads, numKVsToRead, numConnections);
+        } catch (IOException e) { 
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  public void runExperiment(String prefix, boolean printStats, 
+      int numThreads, long numReadsPerThread, int numConnections) 
+  throws IOException {
+    // Prepare the read threads
+    GetBenchMarkThread threads[] = new GetBenchMarkThread[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      if (connCountConfs[numConnections] == null) {
+        connCountConfs[numConnections] = getNewConfObject();
+        // set the number of connections per thread
+        connCountConfs[numConnections].setInt(
+            HBaseClient.NUM_CONNECTIONS_PER_SERVER, numConnections);
+      }
+      Configuration conf = connCountConfs[numConnections];
+      threads[i] = new GetBenchMarkThread(prefix+i, tableName, 
+          Bytes.toBytes(cfName), 0, numKVs, numReadsPerThread, conf, true);
+    }
+    // start the read threads, each one times itself
+    for (int i = 0; i < numThreads; i++) {
+      threads[i].start();
+    }
+    // wait for all the threads and compute the total ops/sec
+    double totalOpsPerSec = 0;
+    int successThreads = 0;
+    for (int i = 0; i < numThreads; i++) {
+      try {
+        threads[i].join();
+        totalOpsPerSec += threads[i].getOpsPerSecond();
+        successThreads++;
+      } catch (InterruptedException e) {
+        LOG.error("Exception in thread " + i, e);
+      }
+    }
+    System.out.println("Num threads =  " + successThreads + ", " + 
+        "performance = " + String.format("%5.2f", totalOpsPerSec) + " ops/sec");
+    // add to the benchmark results
+    benchmarkResults.addResult(numThreads, numConnections, totalOpsPerSec);
+  }
+  
+  /**
+   * Thread that performs a given number of read operations and computes the 
+   * number of read operations per second it was able to get.
+   */
+  public static class GetBenchMarkThread extends Thread {
+    public static final long PRINT_INTERVAL = 20000;
+    String name;
+    byte[] table;
+    byte[] cf;
+    int startKey;
+    int endKey;
+    long numGetsToPerform;
+    Configuration conf;
+    long timeTakenMillis = 0;
+    boolean debug = false;
+    Random random = new Random();
+    
+    public GetBenchMarkThread(String name, byte[] table, byte[] cf, 
+        int startKey, int endKey, long numGetsToPerform, Configuration conf, 
+        boolean debug) {
+      this.name = name;
+      this.table = table;
+      this.cf = cf;
+      this.startKey = startKey;
+      this.endKey = endKey;
+      this.numGetsToPerform = numGetsToPerform;
+      this.conf = conf;
+      this.debug = debug;
+    }
+    
+    /**
+     * Returns the number of ops/second at the current moment.
+     */
+    public double getOpsPerSecond() {
+      return (numGetsToPerform * 1.0 * 1000 / timeTakenMillis);
+    }
+    
+    public void run() {
+      try {
+        // create a new HTable instance
+        HTable htable = new HTable(conf, tableName);
+        long rowKey = 0;
+        // number of reads we have performed
+        long numSuccessfulGets = 0;
+        while (numSuccessfulGets < numGetsToPerform) {
+          // create a random key in the range passed in
+          rowKey = startKey + random.nextInt(endKey - startKey);
+          // keys are assumed to be 20 chars
+          Get get = 
+            new Get(Bytes.toBytes(String.format("%20d", rowKey)));
+          get.addFamily(cf);
+          // time the actual get
+          long t1 = System.currentTimeMillis();
+          htable.get(get);
+          timeTakenMillis += System.currentTimeMillis() - t1;
+          numSuccessfulGets++;
+          // print progress if needed
+          if (debug && numSuccessfulGets % PRINT_INTERVAL == 0) {
+            double opsPerSec = 
+              (numSuccessfulGets * 1.0 * 1000 / timeTakenMillis);
+            LOG.debug("[Thread-" + name + "] " + "" +
+            		"Num gets = " + numSuccessfulGets + "/" + numGetsToPerform + 
+            		", current rate = " + String.format("%.2f", opsPerSec) + 
+            		" ops/sec");
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("IOException while running read thread, will exit", e);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Throwable {
+    String className = 
+      Thread.currentThread().getStackTrace()[1].getClassName();
+    System.out.println("Running benchmark " + className);
+    @SuppressWarnings("unchecked")
+    Class<? extends Benchmark> benchmarkClass = 
+      (Class<? extends Benchmark>)Class.forName(className);
+    Benchmark.benchmarkRunner(benchmarkClass, args);
+  }
+}



Mime
View raw message