hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1584158 - /hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/
Date Wed, 02 Apr 2014 20:48:55 GMT
Author: liyin
Date: Wed Apr  2 20:48:55 2014
New Revision: 1584158

URL: http://svn.apache.org/r1584158
Log:
[HBASE-10754] Improve the micro benchmarks to perform multigets instead of gets.

Author: manukranthk

Summary: This diff intends to improve the benchmarks by adding multigets to the HBaseBenchmarkTool.

Test Plan: Testing on the dev server

Reviewers: adela, gauravm, fan

Reviewed By: fan

CC: hbase-eng@, fan

Differential Revision: https://phabricator.fb.com/D1215543

Task ID: 3904136

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java
      - copied, changed from r1577643, hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java
Modified:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java?rev=1584158&r1=1584157&r2=1584158&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java
Wed Apr  2 20:48:55 2014
@@ -35,11 +35,22 @@ public interface BenchmarkClient {
 
   public Put createPut(byte[] row, byte[] family, byte[] qual, byte[] value);
 
+  public Put createRandomPut(int rowLength, byte[] family, int qualLength,
+      int valueLength);
+
   public Result executeGet(Get get);
 
+  public Result[] executeMultiGet(List<Get> gets);
+
+  public void executeMultiPut(List<Put> puts);
+
   public void executePut(Put put);
 
   public List<Result> executeScan(Scan scan);
 
   public Scan createScan(byte[] row, byte[] family, int nbRows);
+
+  public void printProfilingData();
+
+  public void setProfilingData(boolean flag);
 }

Copied: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java
(from r1577643, hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java?p2=hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java&p1=hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java&r1=1577643&r2=1584158&rev=1584158&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java
Wed Apr  2 20:48:55 2014
@@ -1,25 +1,8 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.hadoop.hbase.util.rpcbench;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
@@ -29,18 +12,24 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.ProfilingData;
 
-/**
- * This is a BenchmarkClient which performs RPC operations through HadoopRPC.
- */
-public class HadoopRPCBenchmarkClient implements BenchmarkClient {
-  private static final Log LOG = LogFactory.getLog(ThriftBenchmarkClient.class);
-  private HTable htable = null;
-  HadoopRPCBenchmarkClient(HTable htable) {
-    this.htable = htable;
-    this.htable.setAutoFlush(true);
-  }
+public abstract class BenchmarkClientImpl implements BenchmarkClient {
 
+  private static final Log LOG = LogFactory.getLog(BenchmarkClientImpl.class);
+  protected HTable htable = null;
+  @Override
+  public Put createRandomPut(int rowLength, byte[] family, int qualLength,
+      int valueLength) {
+    Random rand = new Random();
+    byte[] row = new byte[rowLength];
+    byte[] qual = new byte[qualLength];
+    byte[] value = new byte[valueLength];
+    rand.nextBytes(row);
+    rand.nextBytes(qual);
+    rand.nextBytes(value);
+    return this.createPut(row, family, qual, value);
+  }
   // Performing a get through thrift
   @Override
   public Result executeGet(Get get) {
@@ -65,6 +54,7 @@ public class HadoopRPCBenchmarkClient im
     }
   }
 
+  @SuppressWarnings("deprecation")
   public Get createGet(byte[] row, byte[] family, byte[] qual) {
     Get g = new Get(row);
     g.addColumn(family, qual);
@@ -87,4 +77,35 @@ public class HadoopRPCBenchmarkClient im
     throw new NotImplementedException();
   }
 
+  @Override
+  public Result[] executeMultiGet(List<Get> gets) {
+    try {
+      return this.htable.batchGet(gets);
+    } catch (IOException e) {
+      LOG.debug("Unable to perform put");
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  @Override
+  public void executeMultiPut(List<Put> puts) {
+    try {
+      this.htable.put(puts);
+    } catch (IOException e) {
+      LOG.debug("Unable to perform put");
+      e.printStackTrace();
+    }
+  }
+
+  public void printProfilingData() {
+    if (htable.getProfiling()) {
+      ProfilingData data = htable.getProfilingData();
+      LOG.debug(data.toPrettyString());
+    }
+  }
+
+  public void setProfilingData(boolean flag) {
+    this.htable.setProfiling(flag);
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java?rev=1584158&r1=1584157&r2=1584158&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java
Wed Apr  2 20:48:55 2014
@@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.loadtest.ColumnFamilyProperties;
 import org.apache.hadoop.hbase.loadtest.HBaseUtils;
@@ -31,7 +33,12 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Histogram;
+import org.weakref.jmx.com.google.common.base.Preconditions;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+
 /**
  * Tool that runs the benchmarks. This takes the name of a benchmark factory,
  * and performs a single put and does a lot of gets to retrieve that put.
@@ -54,30 +62,32 @@ public class HBaseRPCBenchmarkTool exten
   private static final long DEFAULT_REPORT_INTERVAL_MS = 10;
   private static final int DEFAULT_NUM_OPS = 200;
   private static final int DEFAULT_NUM_THREADS = 10;
-  private static final String DEFAULT_ROW = "rowkey";
+  private static final int DEFAULT_ROW_LENGTH = 20;
   private static final String DEFAULT_CF = "cf";
-  private static final String DEFAULT_QUAL ="q";
-  private static final String DEFAULT_VALUE = "v";
+  private static final int DEFAULT_QUAL_LENGTH = 10;
+  private static final int DEFAULT_VALUE_LENGTH = 100;
   private static final String DEFAULT_TABLENAME = "RPCBenchmarkingTable";
   private static final int DEFAULT_ZK_PORT =
       HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
-  private static final boolean DEFAULT_DO_PUT = true;
+  private static final boolean DEFAULT_ASYNC_CALLS = false;
 
   /**
    * The following are the command line parameters which this tool takes.
    */
-  private static final String OPT_CF= "cf";
-  private static final String OPT_QUAL = "q";
-  private static final String OPT_ROW = "r";
+  private static final String OPT_CF = "cf";
+  private static final String OPT_QUAL_LENGTH = "q";
+  private static final String OPT_ROW_LENGTH = "r";
   private static final String OPT_TBL_NAME = "t";
-  private static final String OPT_VALUE = "v";
+  private static final String OPT_VALUE_LENGTH = "v";
   private static final String OPT_CLASS = "c";
   private static final String OPT_NUM_OPS = "ops";
   private static final String OPT_NUM_THREADS = "threads";
   private static final String OPT_REPORT_INTERVAL = "interval";
-  private static final String OPT_NO_PUT = "no_put";
   private static final String OPT_ZK_QUORUM = "zk";
   private static final String OPT_ZK_PORT = "zkPort";
+  private static final String OPT_GET_BATCH_SIZE = "gbatch";
+  private static final String OPT_PUT_BATCH_SIZE = "pbatch";
+  private static final String OPT_ASYNC_CALLS = "async";
 
   /**
    * These are values that we get from the command line and
@@ -91,17 +101,20 @@ public class HBaseRPCBenchmarkTool exten
   private byte[] tblName;
   private String zkQuorum;
   private int zkPort;
-  private byte[] row;
+  private int rowLength;
   private byte[] family;
-  private byte[] qual;
-  private byte[] value;
+  private int qualLength;
+  private int valueLength;
   private int numOps;
   private int numThreads;
   private long reportInterval;
-  private boolean doPut;
   private AtomicLong sumLatency = new AtomicLong(0);
   private AtomicLong totalOps = new AtomicLong(0);
   private long runtimeMs;
+  private int multigetBatch;
+  private int multiputBatch;
+  private boolean useAsync;
+
 
   private HBaseRPCBenchmarkTool() {
   }
@@ -111,20 +124,21 @@ public class HBaseRPCBenchmarkTool exten
   }
 
   private HBaseRPCBenchmarkTool(Class<? extends BenchmarkFactory> factoryCls,
-      byte[] tableName, Configuration conf, byte[] row, byte[] cf, byte[] qual,
-      byte[] value, int numOps, int numThreads, long reportIntervalMs,
-      boolean doPut) {
+      byte[] tableName, Configuration conf, int rowLength, byte[] cf,
+      int qualLength, int valueLength, int numOps, int numThreads,
+      long reportIntervalMs, int multiGetBatch, int multiputBatch) {
     this.factoryCls = factoryCls;
     this.conf = conf;
     this.tblName = tableName;
-    this.row = row;
+    this.rowLength = rowLength;
     this.family = cf;
-    this.qual = qual;
-    this.value = value;
+    this.qualLength = qualLength;
+    this.valueLength = valueLength;
     this.numOps = numOps;
     this.numThreads = numThreads;
     this.reportInterval = reportIntervalMs;
-    this.doPut = doPut;
+    this.multigetBatch = multiGetBatch;
+    this.multiputBatch = multiputBatch;
   }
 
   /**
@@ -133,15 +147,16 @@ public class HBaseRPCBenchmarkTool exten
   public static class Builder {
     private final Class<? extends BenchmarkFactory> factoryCls;
     private byte[] tableName = Bytes.toBytes(DEFAULT_TABLENAME);
-    private byte[] row = Bytes.toBytes(DEFAULT_ROW);
+    private int rowLength = DEFAULT_ROW_LENGTH;
     private byte[] cf = Bytes.toBytes(DEFAULT_CF);
-    private byte[] qual = Bytes.toBytes(DEFAULT_QUAL);
-    private byte[] value = Bytes.toBytes(DEFAULT_VALUE);
+    private int qualLength = DEFAULT_QUAL_LENGTH;
+    private int valueLength = DEFAULT_VALUE_LENGTH;
     private int numOps = DEFAULT_NUM_OPS;
     private int numThreads = DEFAULT_NUM_THREADS;
     private long reportIntervalMs = DEFAULT_REPORT_INTERVAL_MS;
-    private boolean doPut = DEFAULT_DO_PUT;
     private Configuration conf;
+    private int multigetBatch = 1000;
+    private int multiputBatch = 1000;
 
     public Builder(Class<? extends BenchmarkFactory> factoryCls) {
       this.factoryCls = factoryCls;
@@ -152,8 +167,8 @@ public class HBaseRPCBenchmarkTool exten
       return this;
     }
 
-    public Builder withRow(byte[] row) {
-      this.row = row;
+    public Builder withRowLength(int rowLength) {
+      this.rowLength = rowLength;
       return this;
     }
 
@@ -162,13 +177,13 @@ public class HBaseRPCBenchmarkTool exten
       return this;
     }
 
-    public Builder withQualifier(byte[] qual) {
-      this.qual = qual;
+    public Builder withQualifierLength(int qualLength) {
+      this.qualLength = qualLength;
       return this;
     }
 
-    public Builder withValue(byte[] value) {
-      this.value = value;
+    public Builder withValue(int valueLength) {
+      this.valueLength = valueLength;
       return this;
     }
 
@@ -182,20 +197,26 @@ public class HBaseRPCBenchmarkTool exten
       return this;
     }
 
-    public Builder withDoPut(boolean doPut) {
-      this.doPut = doPut;
+    public Builder withConf(Configuration conf) {
+      this.conf = conf;
       return this;
     }
 
-    public Builder withConf(Configuration conf) {
-      this.conf = conf;
+    public Builder withMultiGetBatch(int multiGetBatch) {
+      this.multigetBatch = multiGetBatch;
+      return this;
+    }
+
+    public Builder withMultiPutBatch(int multiputBatch) {
+      this.multiputBatch = multiputBatch;
       return this;
     }
 
     public HBaseRPCBenchmarkTool create() {
       return new HBaseRPCBenchmarkTool(this.factoryCls, this.tableName,
-        this.conf, this.row, this.cf, this.qual, this.value,
-        this.numOps, this.numThreads, this.reportIntervalMs, this.doPut);
+        this.conf, this.rowLength, this.cf, this.qualLength, this.valueLength,
+        this.numOps, this.numThreads, this.reportIntervalMs,
+        this.multigetBatch, this.multiputBatch);
     }
   }
 
@@ -207,16 +228,17 @@ public class HBaseRPCBenchmarkTool exten
     addOptWithArg(OPT_CLASS, "Benchmark factory class");
     addOptWithArg(OPT_NUM_THREADS, "Number of threads");
     addOptWithArg(OPT_NUM_OPS, "Number of operations to execute per thread");
+    addOptWithArg(OPT_GET_BATCH_SIZE, "Number of gets in a single batch");
+    addOptWithArg(OPT_PUT_BATCH_SIZE, "Number of puts in a single batch");
     addOptWithArg(OPT_TBL_NAME, "Table name to use");
     addOptWithArg(OPT_ZK_QUORUM, "Table name");
     addOptWithArg(OPT_ZK_PORT, "Zookeeper Port");
     addOptWithArg(OPT_REPORT_INTERVAL, "Reporting interval in milliseconds");
-    addOptWithArg(OPT_ROW, "Row key");
-    addOptWithArg(OPT_NO_PUT,
-        "DO NOT perform a single put (writing the value) before the benchmark");
+    addOptWithArg(OPT_ROW_LENGTH, "Row key length");
     addOptWithArg(OPT_CF, "Column family to use");
-    addOptWithArg(OPT_QUAL, "Column qualifier to use");
-    addOptWithArg(OPT_VALUE, "Value to use");
+    addOptWithArg(OPT_QUAL_LENGTH, "Column qualifier length");
+    addOptWithArg(OPT_VALUE_LENGTH, "Value length");
+    addOptWithArg(OPT_ASYNC_CALLS, "Use async calls underneath the sync calls");
   }
 
   /**
@@ -244,7 +266,8 @@ public class HBaseRPCBenchmarkTool exten
       LOG.debug("Adding zookeeper quorum : " + zkQuorum);
     }
     reportInterval = parseLong(cmd.getOptionValue(OPT_REPORT_INTERVAL,
-        String.valueOf(DEFAULT_REPORT_INTERVAL_MS)), reportInterval, Long.MAX_VALUE);
+        String.valueOf(DEFAULT_REPORT_INTERVAL_MS)),
+        reportInterval, Long.MAX_VALUE);
     if (cmd.hasOption(OPT_TBL_NAME)) {
       tblName = Bytes.toBytes(cmd.getOptionValue(OPT_TBL_NAME));
     } else {
@@ -256,39 +279,60 @@ public class HBaseRPCBenchmarkTool exten
       HBaseUtils.createTableIfNotExists(conf,
           tblName, familyProperties, 1);
     }
-    row = Bytes.toBytes(cmd.getOptionValue(OPT_ROW, DEFAULT_ROW));
+    rowLength = Integer.parseInt(
+        cmd.getOptionValue(OPT_ROW_LENGTH, String.valueOf(DEFAULT_ROW_LENGTH)));
     family = Bytes.toBytes(cmd.getOptionValue(OPT_CF, DEFAULT_CF));
-    qual = Bytes.toBytes(cmd.getOptionValue(OPT_QUAL, DEFAULT_QUAL));
-    value = Bytes.toBytes(cmd.getOptionValue(OPT_VALUE, DEFAULT_VALUE));
+    qualLength = Integer.parseInt(cmd.getOptionValue(OPT_QUAL_LENGTH,
+        String.valueOf(DEFAULT_QUAL_LENGTH)));
+    valueLength = Integer.parseInt(cmd.getOptionValue(OPT_VALUE_LENGTH,
+        String.valueOf(DEFAULT_VALUE_LENGTH)));
     numOps = parseInt(cmd.getOptionValue(OPT_NUM_OPS,
         String.valueOf(DEFAULT_NUM_OPS)), 1, Integer.MAX_VALUE);
     numThreads = parseInt(cmd.getOptionValue(OPT_NUM_THREADS,
         String.valueOf(DEFAULT_NUM_THREADS)), 1, Integer.MAX_VALUE);
-    doPut = !cmd.hasOption(OPT_NO_PUT);
+    this.multigetBatch = parseInt(cmd.getOptionValue(OPT_GET_BATCH_SIZE,
+        String.valueOf(1000)), 1, Integer.MAX_VALUE);
+    this.multiputBatch = parseInt(cmd.getOptionValue(OPT_PUT_BATCH_SIZE,
+        String.valueOf(1000)), 1, Integer.MAX_VALUE);
+    this.useAsync = Boolean.parseBoolean(cmd.getOptionValue(OPT_ASYNC_CALLS));
+  }
+
+  public List<Get> performRandomMultiputs(BenchmarkClient benchmark,
+      int numMultiPutOps, int multiputBatchSize) throws IOException {
+    List<Get> ret = new ArrayList<Get>();
+    for (int i = 0; i < numMultiPutOps; i++) {
+      List<Put> puts = new ArrayList<Put>(multiputBatchSize);
+      for (int j = 0; j < multiputBatchSize; j++) {
+        Put p = benchmark.createRandomPut(this.rowLength, this.family,
+            this.qualLength, this.valueLength);
+        puts.add(p);
+        ret.add(new Get(p.getRow()));
+      }
+      benchmark.executeMultiPut(puts);
+    }
+    return ret;
   }
 
   /**
    * Main function which does the benchmarks.
    * @throws IllegalAccessException
    * @throws InstantiationException
+   * @throws IOException
    */
   @Override
-  protected void doWork() throws InterruptedException, InstantiationException, IllegalAccessException
{
+  protected void doWork() throws InterruptedException,
+      InstantiationException, IllegalAccessException, IOException {
     // Initializing the required objects.
     BenchmarkFactory factory = (BenchmarkFactory) factoryCls.newInstance();
     LOG.debug("Creating an instance of the factory class : " +
         factoryCls.getCanonicalName());
     ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    Configuration conf = HBaseConfiguration.create(this.conf);
+    conf.setBoolean(HConstants.HTABLE_ASYNC_CALLS, this.useAsync);
     BenchmarkClient benchmark = factory.makeBenchmarkClient(tblName, conf);
 
-    // Performing a single put to the region server.
-    if (doPut) {
-      benchmark.executePut(benchmark.createPut(row, family, qual, value));
-      Result r = benchmark.executeGet(benchmark.createGet(row, family, qual));
-      if (Bytes.equals(r.getValue(family, qual), value)) {
-      }
-      doPut = false;
-    }
+    List<Get> gets = performRandomMultiputs(benchmark, this.numOps/10,
+        this.multiputBatch);
     runtimeMs = System.currentTimeMillis();
 
     // Count down latches which let me synchronize all the benchmark workers to
@@ -301,8 +345,9 @@ public class HBaseRPCBenchmarkTool exten
     // Spawning the worker threads here.
     for (int i = 0; i < numThreads; i++) {
       executor.submit(new WorkerThread(histogram, sumLatency,
-          totalOps, factory, tblName, conf, numOps, reportInterval, row,
-          family, qual, readySignal, startSignal, doneSignal, running));
+          totalOps, factory, tblName, conf, numOps, reportInterval, rowLength,
+          family, qualLength, valueLength, gets, this.multigetBatch,
+          readySignal, startSignal, doneSignal, running));
     }
 
     // Here we will wait for all the worker threads to kick off and then we let
@@ -345,7 +390,6 @@ public class HBaseRPCBenchmarkTool exten
    * it starts running the benchmark.
    */
   class WorkerThread extends Thread {
-
     private final Histogram histogram;
     private final AtomicLong totalLatency;
     private final AtomicLong totalOps;
@@ -354,14 +398,21 @@ public class HBaseRPCBenchmarkTool exten
     private final byte[] tableName;
     private final Configuration conf;
     private final int numOps;
-    private final byte[] row;
+    @SuppressWarnings("unused")
+    private final int rowLength;
+    @SuppressWarnings("unused")
     private final byte[] family;
-    private final byte[] qual;
+    @SuppressWarnings("unused")
+    private final int qualLength;
+    @SuppressWarnings("unused")
+    private final int valueLength;
+    private final List<Get> gets;
     private final CountDownLatch readySignal; // To notify the controller that this thread
has started
     private final CountDownLatch startSignal; // To notify this thread that it is free to
start
     private final CountDownLatch doneSignal; // To notify the controller thread to shutdown
the threads.
     private final AtomicBoolean running; // the state variable which tells whether the threads
should be running.
     private boolean signalledDone = false;
+    private final int multiGetBatch;
 
     WorkerThread(Histogram histogram,
         AtomicLong totalLatency,
@@ -371,9 +422,12 @@ public class HBaseRPCBenchmarkTool exten
         Configuration conf,
         int numOps,
         long reportIntervalMs,
-        byte[] row,
+        int rowLength,
         byte[] family,
-        byte[] qual,
+        int qualLength,
+        int valueLength,
+        List<Get> gets,
+        int multiGetBatch,
         CountDownLatch readySignal,
         CountDownLatch startSignal,
         CountDownLatch doneSignal,
@@ -385,15 +439,32 @@ public class HBaseRPCBenchmarkTool exten
       this.totalLatency = totalLatency;
       this.totalOps = totalOps;
       this.numOps = numOps;
-      this.row = row;
+      this.rowLength = rowLength;
       this.family = family;
-      this.qual = qual;
+      this.qualLength = qualLength;
+      this.valueLength = valueLength;
+      this.gets = gets;
+      this.multiGetBatch = multiGetBatch;
       this.readySignal = readySignal;
       this.startSignal = startSignal;
       this.doneSignal = doneSignal;
       this.running = running;
     }
 
+    public Result[] executeMultiGet(int batch) {
+      List<Get> todogets = new ArrayList<Get>();
+      Random rand = new Random();
+      if (batch == 1) {
+        return new Result[] {
+            benchmark.executeGet(gets.get(rand.nextInt(gets.size())))
+          };
+      }
+      for (int i = 0; i < batch; i++) {
+        todogets.add(new Get(this.gets.get(rand.nextInt(this.gets.size())).getRow()));
+      }
+      return benchmark.executeMultiGet(todogets);
+    }
+
     @Override
     public void run() {
       LOG.debug("Worker Thread. numOps:" + numOps);
@@ -411,7 +482,15 @@ public class HBaseRPCBenchmarkTool exten
       for (int i = 0; ; ++i) {
         long opStartNs = System.nanoTime();
         try {
-          benchmark.executeGet(benchmark.createGet(row, family, qual));
+          if (i % (this.numOps / 10) == 0) {
+            setProfilingData(true);
+          }
+          Result[] ret = executeMultiGet(this.multiGetBatch);
+          Preconditions.checkArgument(ret.length == multigetBatch);
+          if (i % (this.numOps / 10) == 0) {
+            printProfilingData();
+            setProfilingData(false);
+          }
         } catch (Exception e) {
           LOG.debug("Encountered exception while performing get");
           e.printStackTrace();
@@ -419,7 +498,7 @@ public class HBaseRPCBenchmarkTool exten
         }
         long delta = System.nanoTime() - opStartNs;
         totalLatency.addAndGet(delta);
-        totalOps.incrementAndGet();
+        totalOps.addAndGet(this.multiGetBatch);
         histogram.addValue(delta);
         if (i >= numOps) {
           if (!signalledDone) {
@@ -431,8 +510,12 @@ public class HBaseRPCBenchmarkTool exten
       }
       StringBuilder sb = new StringBuilder();
       sb.append("Printing statistics for " + factoryCls.getName());
+      sb.append(". Total Ops : ");
+      sb.append(totalOps.get());
+      sb.append(". Get Batch Size : ");
+      sb.append(this.multiGetBatch);
       sb.append(". Average latency : ");
-      sb.append(sumLatency.get()/totalOps.get());
+      sb.append(totalLatency.get()/totalOps.get());
       sb.append("ns. ");
       sb.append("p95 latency : ");
       sb.append(histogram.getPercentileEstimate(PercentileMetric.P95));
@@ -441,8 +524,17 @@ public class HBaseRPCBenchmarkTool exten
       sb.append(". Throughput : ");
       sb.append((totalOps.get() * 1000)/
           (System.currentTimeMillis() - startTime));
+      sb.append(" ops/s.");
       LOG.debug(sb);
     }
+
+    private void printProfilingData() {
+      this.benchmark.printProfilingData();
+    }
+
+    private void setProfilingData(boolean flag) {
+      this.benchmark.setProfilingData(flag);
+    }
   }
 
   public long getTotalOps() {
@@ -466,7 +558,12 @@ public class HBaseRPCBenchmarkTool exten
   }
 
   public static void main(String[] args) {
-    int ret = new HBaseRPCBenchmarkTool().doStaticMain(args);
+    HBaseRPCBenchmarkTool tool = new HBaseRPCBenchmarkTool();
+    int ret = tool.doStaticMain(args);
+    System.out.println("Total throughput : " + tool.getThroughput());
+    System.out.println("Avg Latency : " + tool.getAverageLatency());
+    System.out.println("P99 latency : " + tool.getP99Latency());
+    System.out.println("P95 latency : " + tool.getP95Latency());
     System.exit(ret);
   }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java?rev=1584158&r1=1584157&r2=1584158&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java
Wed Apr  2 20:48:55 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.util.rpcbench;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -57,25 +58,27 @@ public class HBaseRPCProtocolComparison 
   private static final int DEFAULT_NUM_OPS = 10000;
   private static final int DEFAULT_NUM_ROUNDS = 100;
   private static final int DEFAULT_NUM_THREADS = 10;
-  private static final String DEFAULT_ROW = "rowkey";
+  private static final int DEFAULT_ROW_LENGTH = 20;
   private static final String DEFAULT_CF = "cf";
-  private static final String DEFAULT_QUAL ="q";
-  private static final String DEFAULT_VALUE = "v";
+  private static final int DEFAULT_QUAL_LENGTH =10;
+  private static final int DEFAULT_VALUE_LENGTH = 100;
   private static final String DEFAULT_TABLENAME = "RPCBenchmarkingTable";
   private static final int DEFAULT_ZK_PORT = 2181;
-  private static final boolean DEFAULT_DO_PUT = true;
+  private static final int DEFAULT_GET_BATCH_SIZE = 100;
+  private static final int DEFAULT_PUT_BATCH_SIZE = 100;
 
   private static final String OPT_CF= "cf";
-  private static final String OPT_QUAL = "q";
-  private static final String OPT_ROW = "r";
+  private static final String OPT_QUAL_LENGTH = "q";
+  private static final String OPT_ROW_LENGTH = "r";
   private static final String OPT_TBL_NAME = "t";
-  private static final String OPT_VALUE_LENGTH = "vlen";
+  private static final String OPT_VALUE_LENGTH = "v";
   private static final String OPT_CLASSES = "c";
   private static final String OPT_NUM_OPS = "ops";
+  private static final String OPT_GET_BATCH_SIZE = "gbatch";
+  private static final String OPT_PUT_BATCH_SIZE = "pbatch";
   private static final String OPT_NUM_ROUNDS = "rounds";
   private static final String OPT_NUM_THREADS = "threads";
   private static final String OPT_REPORT_INTERVAL = "interval";
-  private static final String OPT_NO_PUT = "no_put";
   private static final String OPT_ZK_QUORUM = "zk";
   private static final String OPT_ZK_PORT = "zkPort";
 
@@ -84,31 +87,31 @@ public class HBaseRPCProtocolComparison 
   private byte[] tblName;
   private String zkQuorum;
   private int zkPort;
-  private byte[] row;
+  private int rowLength;
   private byte[] family;
-  private byte[] qual;
-  private byte[] value;
+  private int qualLength;
   private int valueLength;
   private int numOps;
+  private int multigetbatch;
+  private int multiputbatch;
   private int numRounds;
   private int numThreads;
   private long reportInterval;
-  private boolean doPut;
 
   @Override
   protected void addOptions() {
     addOptWithArg(OPT_CLASSES, "Benchmark factory classes");
     addOptWithArg(OPT_NUM_THREADS, "Number of threads");
     addOptWithArg(OPT_NUM_OPS, "Number of operations to execute per thread");
+    addOptWithArg(OPT_GET_BATCH_SIZE, "multiget size to execute per thread");
+    addOptWithArg(OPT_PUT_BATCH_SIZE, "multiput size to execute");
     addOptWithArg(OPT_TBL_NAME, "Table name");
-    addOptWithArg(OPT_ZK_QUORUM, "Table name");
+    addOptWithArg(OPT_ZK_QUORUM, "Zookeeper Quorum");
     addOptWithArg(OPT_ZK_PORT, "Zookeeper Port");
     addOptWithArg(OPT_REPORT_INTERVAL, "Reporting interval in milliseconds");
-    addOptWithArg(OPT_ROW, "Row key");
-    addOptWithArg(OPT_NO_PUT,
-        "DO NOT perform a single put (writing the value) before the benchmark");
+    addOptWithArg(OPT_ROW_LENGTH, "Row key length");
     addOptWithArg(OPT_CF, "Column family to use");
-    addOptWithArg(OPT_QUAL, "Column qualifier to use");
+    addOptWithArg(OPT_QUAL_LENGTH, "Column qualifier length to use");
     addOptWithArg(OPT_VALUE_LENGTH, "Value length to use");
     addOptWithArg(OPT_NUM_ROUNDS, "Number of rounds to perform the tests");
   }
@@ -160,22 +163,23 @@ public class HBaseRPCProtocolComparison 
       HBaseUtils.createTableIfNotExists(conf,
           tblName, familyProperties, 1);
     }
-    row = Bytes.toBytes(cmd.getOptionValue(OPT_ROW, DEFAULT_ROW));
+    rowLength = Integer.parseInt(cmd.getOptionValue(OPT_ROW_LENGTH,
+        String.valueOf(DEFAULT_ROW_LENGTH)));
     family = Bytes.toBytes(cmd.getOptionValue(OPT_CF, DEFAULT_CF));
-    qual = Bytes.toBytes(cmd.getOptionValue(OPT_QUAL, DEFAULT_QUAL));
-    valueLength = parseInt(cmd.getOptionValue(OPT_VALUE_LENGTH, DEFAULT_VALUE),
-        0, Integer.MAX_VALUE);
-    Random r = new Random();
-    value = new byte[valueLength];
-    r.nextBytes(value);
+    qualLength = Integer.parseInt(cmd.getOptionValue(OPT_QUAL_LENGTH,
+        String.valueOf(DEFAULT_QUAL_LENGTH)));
+    valueLength = parseInt(cmd.getOptionValue(OPT_VALUE_LENGTH,
+        String.valueOf(DEFAULT_VALUE_LENGTH)), 0, Integer.MAX_VALUE);
     numOps = parseInt(cmd.getOptionValue(OPT_NUM_OPS,
         String.valueOf(DEFAULT_NUM_OPS)), 1, Integer.MAX_VALUE);
+    multigetbatch = parseInt(cmd.getOptionValue(OPT_GET_BATCH_SIZE,
+        String.valueOf(DEFAULT_GET_BATCH_SIZE)), 1, Integer.MAX_VALUE);
+    multiputbatch = parseInt(cmd.getOptionValue(OPT_PUT_BATCH_SIZE,
+        String.valueOf(DEFAULT_PUT_BATCH_SIZE)), 1, Integer.MAX_VALUE);
     numRounds = parseInt(cmd.getOptionValue(OPT_NUM_ROUNDS,
         String.valueOf(DEFAULT_NUM_ROUNDS)), 1, Integer.MAX_VALUE);
     numThreads = parseInt(cmd.getOptionValue(OPT_NUM_THREADS,
         String.valueOf(DEFAULT_NUM_THREADS)), 1, Integer.MAX_VALUE);
-    doPut = DEFAULT_DO_PUT;
-    doPut = !cmd.hasOption(OPT_NO_PUT);
   }
 
   /**
@@ -216,9 +220,11 @@ public class HBaseRPCProtocolComparison 
               long startTime = System.currentTimeMillis();
               HBaseRPCBenchmarkTool tool = new HBaseRPCBenchmarkTool
                 .Builder(factoryCls).withColumnFamily(family).withNumOps(numOps)
-                .withRow(row).withNumThreads(numThreads).withConf(conf)
-                .withQualifier(qual).withTableName(tblName).withValue(value)
-                .withDoPut(doPut).create();
+                .withRowLength(rowLength).withNumThreads(numThreads)
+                .withConf(conf).withQualifierLength(qualLength)
+                .withMultiPutBatch(multiputbatch)
+                .withMultiGetBatch(multigetbatch)
+                .withTableName(tblName).withValue(valueLength).create();
               tool.doWork();
               hist.addValue(tool.getP95Latency());
               runTime.addAndGet(System.currentTimeMillis() - startTime);
@@ -229,6 +235,8 @@ public class HBaseRPCProtocolComparison 
               LOG.debug("Cannot run the tool for factory : "
                  + factoryCls.getName());
               e.printStackTrace();
+            } catch (IOException e) {
+              LOG.error("Caught unknown IOException", e);
             }
           }
         });

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java?rev=1584158&r1=1584157&r2=1584158&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java
Wed Apr  2 20:48:55 2014
@@ -18,73 +18,14 @@
 
 package org.apache.hadoop.hbase.util.rpcbench;
 
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
 
 /**
  * This is a BenchmarkClient which performs RPC operations through HadoopRPC.
  */
-public class HadoopRPCBenchmarkClient implements BenchmarkClient {
-  private static final Log LOG = LogFactory.getLog(ThriftBenchmarkClient.class);
-  private HTable htable = null;
+public class HadoopRPCBenchmarkClient extends BenchmarkClientImpl {
   HadoopRPCBenchmarkClient(HTable htable) {
     this.htable = htable;
     this.htable.setAutoFlush(true);
   }
-
-  // Performing a get through thrift
-  @Override
-  public Result executeGet(Get get) {
-    Result r = null;
-    try {
-      r = this.htable.get(get);
-    } catch (IOException e) {
-      LOG.debug("Unable to perform get");
-      e.printStackTrace();
-    }
-    return r;
-  }
-
-  // Performing a put through hadoop rpc.
-  @Override
-  public void executePut(Put put) {
-    try {
-      this.htable.put(put);
-    } catch (IOException e) {
-      LOG.debug("Unable to perform put");
-      e.printStackTrace();
-    }
-  }
-
-  public Get createGet(byte[] row, byte[] family, byte[] qual) {
-    Get g = new Get(row);
-    g.addColumn(family, qual);
-    return g;
-  }
-
-  public Put createPut(byte[] row, byte[] family, byte[] qual, byte[] value) {
-    Put p = new Put(row);
-    p.add(family, qual, value);
-    return p;
-  }
-
-  @Override
-  public List<Result> executeScan(Scan scan) {
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public Scan createScan(byte[] row, byte[] family, int nbRows) {
-    throw new NotImplementedException();
-  }
-
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java?rev=1584158&r1=1584157&r2=1584158&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java
Wed Apr  2 20:48:55 2014
@@ -17,75 +17,16 @@
  */
 package org.apache.hadoop.hbase.util.rpcbench;
 
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
 
 /**
  * Implements the BenchmarkClient interface and provides functions to perform
  * gets and puts.
  *
  */
-public class ThriftBenchmarkClient implements BenchmarkClient {
-  private static final Log LOG = LogFactory.getLog(ThriftBenchmarkClient.class);
-  private HTable htable = null;
-
+public class ThriftBenchmarkClient extends BenchmarkClientImpl {
   ThriftBenchmarkClient(HTable htable) {
     this.htable = htable;
     this.htable.setAutoFlush(true);
   }
-
-  @Override
-  public Result executeGet(Get get) {
-    Result r = null;
-    try {
-      r = this.htable.get(get);
-    } catch (IOException e) {
-      LOG.debug("Unable to perform get");
-      e.printStackTrace();
-    }
-    return r;
-  }
-
-  @Override
-  public void executePut(Put put) {
-    try {
-      this.htable.put(put);
-    } catch (IOException e) {
-      LOG.debug("Unable to perform put");
-      e.printStackTrace();
-    }
-  }
-
-  /**
-   * TODO: make use of qual or get rid of it
-   */
-  public Get createGet(byte[] row, byte[] family, byte[] qual) {
-    return new Get.Builder(row).addFamily(family).create();
-  }
-
-  public Put createPut(byte[] row, byte[] family, byte[] qual, byte[] value) {
-    Put p = new Put(row);
-    p.add(family, qual, value);
-    return p;
-  }
-
-  @Override
-  public List<Result> executeScan(Scan scan) {
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public Scan createScan(byte[] row, byte[] family, int nbRows) {
-    throw new NotImplementedException();
-  }
-
 }



Mime
View raw message