hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nkey...@apache.org
Subject svn commit: r1585653 - /hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
Date Tue, 08 Apr 2014 07:31:02 GMT
Author: nkeywal
Date: Tue Apr  8 07:31:02 2014
New Revision: 1585653

URL: http://svn.apache.org/r1585653
Log:
HBASE-10592 Refactor PerformanceEvaluation tool (Nick Dimiduk)

Modified:
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1585653&r1=1585652&r2=1585653&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Tue Apr  8 07:31:02 2014
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.reflect.Constructor;
@@ -30,20 +28,21 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Durability;
@@ -70,21 +69,17 @@ import org.apache.hadoop.hbase.util.Hash
 import org.apache.hadoop.hbase.util.MurmurHash;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
-import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import static org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY;
 
 /**
  * Script used evaluating HBase performance and scalability.  Runs a HBase
@@ -105,7 +100,7 @@ import org.apache.hadoop.util.ToolRunner
 public class PerformanceEvaluation extends Configured implements Tool {
   protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
 
-  public static final TableName TABLE_NAME = TableName.valueOf("TestTable");
+  public static final String TABLE_NAME = "TestTable";
   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
   public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
   public static final int VALUE_LENGTH = 1000;
@@ -119,44 +114,12 @@ public class PerformanceEvaluation exten
   private static final MathContext CXT = MathContext.DECIMAL64;
   private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
   private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
+  private static final TestOptions DEFAULT_OPTS = new TestOptions();
 
-  protected HTableDescriptor TABLE_DESCRIPTOR;
   protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
 
-  private boolean nomapred = false;
-  private int N = 1;
-  private int R = ROWS_PER_GB;
-  private float sampleRate = 1.0f;
-  private TableName tableName = TABLE_NAME;
-  private Compression.Algorithm compression = Compression.Algorithm.NONE;
-  private DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
-  private boolean flushCommits = true;
-  private boolean writeToWAL = true;
-  private boolean inMemoryCF = false;
-  private boolean reportLatency = false;
-  private int presplitRegions = 0;
-  private boolean useTags = false;
-  private int noOfTags = 1;
-  private int multiGet = 0;
-  private HConnection connection;
-
   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
 
-  /** Regex to parse lines in input file passed to mapreduce task. */
-  public static final Pattern LINE_PATTERN =
-    Pattern.compile("tableName=(\\w+),\\s+" +
-        "startRow=(\\d+),\\s+" +
-        "perClientRunRows=(\\d+),\\s+" +
-        "totalRows=(\\d+),\\s+" +
-        "sampleRate=([-+]?[0-9]*\\.?[0-9]+),\\s+" +
-        "clients=(\\d+),\\s+" +
-        "flushCommits=(\\w+),\\s+" +
-        "writeToWAL=(\\w+),\\s+" +
-        "useTags=(\\w+),\\s+" +
-        "noOfTags=(\\d+),\\s+" +
-        "reportLatency=(\\w+),\\s+" +
-        "multiGet=(\\d+)");
-
   /**
    * Enum for map metrics.  Keep it out here rather than inside in the Map
    * inner-class so we can find associated properties.
@@ -220,272 +183,10 @@ public class PerformanceEvaluation exten
   }
 
   /**
-   *  This class works as the InputSplit of Performance Evaluation
-   *  MapReduce InputFormat, and the Record Value of RecordReader.
-   *  Each map task will only read one record from a PeInputSplit,
-   *  the record value is the PeInputSplit itself.
-   */
-  public static class PeInputSplit extends InputSplit implements Writable {
-    private TableName tableName = TABLE_NAME;
-    private int startRow = 0;
-    private int rows = 0;
-    private int totalRows = 0;
-    private float sampleRate = 1.0f;
-    private int clients = 0;
-    private boolean flushCommits = false;
-    private boolean writeToWAL = true;
-    private boolean useTags = false;
-    private int noOfTags = 0;
-    private boolean reportLatency = false;
-    private int multiGet = 0;
-
-    public PeInputSplit() {}
-
-    public PeInputSplit(TableName tableName, int startRow, int rows, int totalRows,
-        float sampleRate, int clients, boolean flushCommits, boolean writeToWAL,
-        boolean useTags, int noOfTags, boolean reportLatency, int multiGet) {
-      this.tableName = tableName;
-      this.startRow = startRow;
-      this.rows = rows;
-      this.totalRows = totalRows;
-      this.sampleRate = sampleRate;
-      this.clients = clients;
-      this.flushCommits = flushCommits;
-      this.writeToWAL = writeToWAL;
-      this.useTags = useTags;
-      this.noOfTags = noOfTags;
-      this.reportLatency = reportLatency;
-      this.multiGet = multiGet;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      int tableNameLen = in.readInt();
-      byte[] name = new byte[tableNameLen];
-      in.readFully(name);
-      this.tableName = TableName.valueOf(name);
-
-      this.startRow = in.readInt();
-      this.rows = in.readInt();
-      this.totalRows = in.readInt();
-      this.sampleRate = in.readFloat();
-      this.clients = in.readInt();
-      this.flushCommits = in.readBoolean();
-      this.writeToWAL = in.readBoolean();
-      this.useTags = in.readBoolean();
-      this.noOfTags = in.readInt();
-      this.reportLatency = in.readBoolean();
-      this.multiGet = in.readInt();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      byte[] name = this.tableName.toBytes();
-      out.writeInt(name.length);
-      out.write(name);
-      out.writeInt(startRow);
-      out.writeInt(rows);
-      out.writeInt(totalRows);
-      out.writeFloat(sampleRate);
-      out.writeInt(clients);
-      out.writeBoolean(flushCommits);
-      out.writeBoolean(writeToWAL);
-      out.writeBoolean(useTags);
-      out.writeInt(noOfTags);
-      out.writeBoolean(reportLatency);
-      out.writeInt(multiGet);
-    }
-
-    @Override
-    public long getLength() throws IOException, InterruptedException {
-      return 0;
-    }
-
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-      return new String[0];
-    }
-
-    public TableName getTableName() {
-      return tableName;
-    }
-
-    public int getStartRow() {
-      return startRow;
-    }
-
-    public int getRows() {
-      return rows;
-    }
-
-    public int getTotalRows() {
-      return totalRows;
-    }
-
-    public float getSampleRate() {
-      return sampleRate;
-    }
-
-    public int getClients() {
-      return clients;
-    }
-
-    public boolean isFlushCommits() {
-      return flushCommits;
-    }
-
-    public boolean isWriteToWAL() {
-      return writeToWAL;
-    }
-
-    public boolean isUseTags() {
-      return useTags;
-    }
-
-    public int getNoOfTags() {
-      return noOfTags;
-    }
-
-    public boolean isReportLatency() {
-      return reportLatency;
-    }
-
-    public int getMultiGet() {
-      return multiGet;
-    }
-  }
-
-  /**
-   *  InputFormat of Performance Evaluation MapReduce job.
-   *  It extends from FileInputFormat, want to use it's methods such as setInputPaths().
-   */
-  public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
-
-    @Override
-    public List<InputSplit> getSplits(JobContext job) throws IOException {
-      // generate splits
-      List<InputSplit> splitList = new ArrayList<InputSplit>();
-
-      for (FileStatus file: listStatus(job)) {
-        if (file.isDir()) {
-          continue;
-        }
-        Path path = file.getPath();
-        FileSystem fs = path.getFileSystem(job.getConfiguration());
-        FSDataInputStream fileIn = fs.open(path);
-        LineReader in = new LineReader(fileIn, job.getConfiguration());
-        int lineLen = 0;
-        while(true) {
-          Text lineText = new Text();
-          lineLen = in.readLine(lineText);
-          if(lineLen <= 0) {
-          break;
-          }
-          Matcher m = LINE_PATTERN.matcher(lineText.toString());
-          if((m != null) && m.matches()) {
-            TableName tableName = TableName.valueOf(m.group(1));
-            int startRow = Integer.parseInt(m.group(2));
-            int rows = Integer.parseInt(m.group(3));
-            int totalRows = Integer.parseInt(m.group(4));
-            float sampleRate = Float.parseFloat(m.group(5));
-            int clients = Integer.parseInt(m.group(6));
-            boolean flushCommits = Boolean.parseBoolean(m.group(7));
-            boolean writeToWAL = Boolean.parseBoolean(m.group(8));
-            boolean useTags = Boolean.parseBoolean(m.group(9));
-            int noOfTags = Integer.parseInt(m.group(10));
-            boolean reportLatency = Boolean.parseBoolean(m.group(11));
-            int multiGet = Integer.parseInt(m.group(12));
-
-            LOG.debug("tableName=" + tableName +
-                      " split["+ splitList.size() + "] " +
-                      " startRow=" + startRow +
-                      " rows=" + rows +
-                      " totalRows=" + totalRows +
-                      " sampleRate=" + sampleRate +
-                      " clients=" + clients +
-                      " flushCommits=" + flushCommits +
-                      " writeToWAL=" + writeToWAL +
-                      " useTags=" + useTags +
-                      " noOfTags=" + noOfTags +
-                      " reportLatency=" + reportLatency +
-                      " multiGet=" + multiGet);
-
-            PeInputSplit newSplit =
-              new PeInputSplit(tableName, startRow, rows, totalRows, sampleRate, clients,
-                flushCommits, writeToWAL, useTags, noOfTags, reportLatency, multiGet);
-            splitList.add(newSplit);
-          }
-        }
-        in.close();
-      }
-
-      LOG.info("Total # of splits: " + splitList.size());
-      return splitList;
-    }
-
-    @Override
-    public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
-                            TaskAttemptContext context) {
-      return new PeRecordReader();
-    }
-
-    public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
-      private boolean readOver = false;
-      private PeInputSplit split = null;
-      private NullWritable key = null;
-      private PeInputSplit value = null;
-
-      @Override
-      public void initialize(InputSplit split, TaskAttemptContext context)
-                  throws IOException, InterruptedException {
-        this.readOver = false;
-        this.split = (PeInputSplit)split;
-      }
-
-      @Override
-      public boolean nextKeyValue() throws IOException, InterruptedException {
-        if(readOver) {
-          return false;
-        }
-
-        key = NullWritable.get();
-        value = split;
-
-        readOver = true;
-        return true;
-      }
-
-      @Override
-      public NullWritable getCurrentKey() throws IOException, InterruptedException {
-        return key;
-      }
-
-      @Override
-      public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
-        return value;
-      }
-
-      @Override
-      public float getProgress() throws IOException, InterruptedException {
-        if(readOver) {
-          return 1.0f;
-        } else {
-          return 0.0f;
-        }
-      }
-
-      @Override
-      public void close() throws IOException {
-        // do nothing
-      }
-    }
-  }
-
-  /**
    * MapReduce job that runs a performance evaluation client in each map task.
    */
   public static class EvaluationMapTask
-      extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
+      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
 
     /** configuration parameter name that contains the command */
     public final static String CMD_KEY = "EvaluationMapTask.command";
@@ -512,16 +213,14 @@ public class PerformanceEvaluation exten
     }
 
     private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
-      Class<? extends Type> clazz = null;
       try {
-        clazz = Class.forName(className).asSubclass(type);
+        return Class.forName(className).asSubclass(type);
       } catch (ClassNotFoundException e) {
         throw new IllegalStateException("Could not find class for name: " + className, e);
       }
-      return clazz;
     }
 
-    protected void map(NullWritable key, PeInputSplit value, final Context context)
+    protected void map(LongWritable key, Text value, final Context context)
            throws IOException, InterruptedException {
 
       Status status = new Status() {
@@ -530,18 +229,17 @@ public class PerformanceEvaluation exten
         }
       };
 
+      ObjectMapper mapper = new ObjectMapper();
+      TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
+      Configuration conf = HBaseConfiguration.create(context.getConfiguration());
+
       // Evaluation task
-      pe.tableName = value.getTableName();
-      long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
-          value.getRows(), value.getTotalRows(), value.getSampleRate(),
-          value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(),
-          value.getNoOfTags(), value.isReportLatency(), value.getMultiGet(),
-          HConnectionManager.createConnection(context.getConfiguration()), status);
+      long elapsedTime = this.pe.runOneClient(this.cmd, conf, opts, status);
       // Collect how much time the thing took. Report as map output and
       // to the ELAPSED_TIME counter.
       context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
-      context.getCounter(Counter.ROWS).increment(value.rows);
-      context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
+      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
+      context.write(new LongWritable(opts.startRow), new LongWritable(elapsedTime));
       context.progress();
     }
   }
@@ -552,21 +250,21 @@ public class PerformanceEvaluation exten
    * @return True if we created the table.
    * @throws IOException
    */
-  private boolean checkTable(HBaseAdmin admin) throws IOException {
-    HTableDescriptor tableDescriptor = getTableDescriptor();
-    if (this.presplitRegions > 0) {
+  private static boolean checkTable(HBaseAdmin admin, TestOptions opts) throws IOException {
+    HTableDescriptor tableDescriptor = getTableDescriptor(opts);
+    if (opts.presplitRegions > 0) {
       // presplit requested
       if (admin.tableExists(tableDescriptor.getTableName())) {
         admin.disableTable(tableDescriptor.getTableName());
         admin.deleteTable(tableDescriptor.getTableName());
       }
 
-      byte[][] splits = getSplits();
+      byte[][] splits = getSplits(opts);
       for (int i=0; i < splits.length; i++) {
         LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
       }
       admin.createTable(tableDescriptor, splits);
-      LOG.info ("Table created with " + this.presplitRegions + " splits");
+      LOG.info ("Table created with " + opts.presplitRegions + " splits");
     }
     else {
       boolean tableExists = admin.tableExists(tableDescriptor.getTableName());
@@ -578,33 +276,32 @@ public class PerformanceEvaluation exten
     return admin.tableExists(tableDescriptor.getTableName());
   }
 
-  protected HTableDescriptor getTableDescriptor() {
-    if (TABLE_DESCRIPTOR == null) {
-      TABLE_DESCRIPTOR = new HTableDescriptor(tableName);
-      HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
-      family.setDataBlockEncoding(blockEncoding);
-      family.setCompressionType(compression);
-      if (inMemoryCF) {
-        family.setInMemory(true);
-      }
-      TABLE_DESCRIPTOR.addFamily(family);
+  /**
+   * Create an HTableDescriptor from provided TestOptions.
+   */
+  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
+    HTableDescriptor desc = new HTableDescriptor(opts.tableName);
+    HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
+    family.setDataBlockEncoding(opts.blockEncoding);
+    family.setCompressionType(opts.compression);
+    if (opts.inMemoryCF) {
+      family.setInMemory(true);
     }
-    return TABLE_DESCRIPTOR;
+    desc.addFamily(family);
+    return desc;
   }
 
   /**
    * generates splits based on total number of rows and specified split regions
-   *
-   * @return splits : array of byte []
    */
-  protected  byte[][] getSplits() {
-    if (this.presplitRegions == 0)
+  protected static byte[][] getSplits(TestOptions opts) {
+    if (opts.presplitRegions == 0)
       return new byte [0][];
 
-    int numSplitPoints = presplitRegions - 1;
+    int numSplitPoints = opts.presplitRegions - 1;
     byte[][] splits = new byte[numSplitPoints][];
-    int jump = this.R  / this.presplitRegions;
-    for (int i=0; i < numSplitPoints; i++) {
+    int jump = opts.totalRows / opts.presplitRegions;
+    for (int i = 0; i < numSplitPoints; i++) {
       int rowkey = jump * (1 + i);
       splits[i] = format(rowkey);
     }
@@ -612,90 +309,40 @@ public class PerformanceEvaluation exten
   }
 
   /*
-   * We're to run multiple clients concurrently.  Setup a mapreduce job.  Run
-   * one map per client.  Then run a single reduce to sum the elapsed times.
-   * @param cmd Command to run.
-   * @throws IOException
-   */
-  private void runNIsMoreThanOne(final Class<? extends Test> cmd)
-  throws IOException, InterruptedException, ClassNotFoundException {
-    checkTable(new HBaseAdmin(getConf()));
-    if (this.nomapred) {
-      doMultipleClients(cmd);
-    } else {
-      doMapReduce(cmd);
-    }
-  }
-
-  /*
    * Run all clients in this vm each to its own thread.
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
-    final List<Thread> threads = new ArrayList<Thread>(this.N);
-    final long[] timings = new long[this.N];
-    final int perClientRows = R/N;
-    final float sampleRate = this.sampleRate;
-    final TableName tableName = this.tableName;
-    final DataBlockEncoding encoding = this.blockEncoding;
-    final boolean flushCommits = this.flushCommits;
-    final Compression.Algorithm compression = this.compression;
-    final boolean writeToWal = this.writeToWAL;
-    final boolean reportLatency = this.reportLatency;
-    final int preSplitRegions = this.presplitRegions;
-    final boolean useTags = this.useTags;
-    final int numTags = this.noOfTags;
-    final int multiGet = this.multiGet;
-    final HConnection connection = HConnectionManager.createConnection(getConf());
-    for (int i = 0; i < this.N; i++) {
+  private void doLocalClients(final Class<? extends Test> cmd, final TestOptions opts)
+      throws IOException, InterruptedException {
+    Future<Long>[] threads = new Future[opts.numClientThreads];
+    long[] timings = new long[opts.numClientThreads];
+    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
+      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
+    for (int i = 0; i < threads.length; i++) {
       final int index = i;
-      Thread t = new Thread ("TestClient-" + i) {
+      threads[i] = pool.submit(new Callable<Long>() {
         @Override
-        public void run() {
-          super.run();
-          PerformanceEvaluation pe = new PerformanceEvaluation(getConf());
-          pe.tableName = tableName;
-          pe.blockEncoding = encoding;
-          pe.flushCommits = flushCommits;
-          pe.compression = compression;
-          pe.writeToWAL = writeToWal;
-          pe.presplitRegions = preSplitRegions;
-          pe.N = N;
-          pe.sampleRate = sampleRate;
-          pe.reportLatency = reportLatency;
-          pe.connection = connection;
-          pe.useTags = useTags;
-          pe.noOfTags = numTags;
-          pe.multiGet = multiGet;
-          try {
-            long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
-               perClientRows, R, sampleRate, flushCommits, writeToWal, useTags,
-               noOfTags, reportLatency, multiGet, connection, new Status() {
-                  public void setStatus(final String msg) throws IOException {
-                    LOG.info("client-" + getName() + " " + msg);
-                  }
-                });
-            timings[index] = elapsedTime;
-            LOG.info("Finished " + getName() + " in " + elapsedTime +
-              "ms writing " + perClientRows + " rows");
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
+        public Long call() throws Exception {
+          TestOptions threadOpts = new TestOptions(opts);
+          threadOpts.startRow = index * threadOpts.perClientRunRows;
+          long elapsedTime = runOneClient(cmd, getConf(), threadOpts, new Status() {
+            public void setStatus(final String msg) throws IOException {
+              LOG.info("client-" + Thread.currentThread().getName() + " " + msg);
+            }
+          });
+          LOG.info("Finished " + Thread.currentThread().getName() + " in " + elapsedTime +
+            "ms over " + threadOpts.perClientRunRows + " rows");
+          return elapsedTime;
         }
-      };
-      threads.add(t);
+      });
     }
-    for (Thread t: threads) {
-      t.start();
-    }
-    for (Thread t: threads) {
-      while(t.isAlive()) {
-        try {
-          t.join();
-        } catch (InterruptedException e) {
-          LOG.debug("Interrupted, continuing" + e.toString());
-        }
+    pool.shutdown();
+    for (int i = 0; i < threads.length; i++) {
+      try {
+        timings[i] = threads[i].get();
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
       }
     }
     final String test = cmd.getSimpleName();
@@ -703,13 +350,13 @@ public class PerformanceEvaluation exten
              + Arrays.toString(timings));
     Arrays.sort(timings);
     long total = 0;
-    for (int i = 0; i < this.N; i++) {
+    for (int i = 0; i < timings.length; i++) {
       total += timings[i];
     }
     LOG.info("[" + test + "]"
              + "\tMin: " + timings[0] + "ms"
-             + "\tMax: " + timings[this.N - 1] + "ms"
-             + "\tAvg: " + (total / this.N) + "ms");
+             + "\tMax: " + timings[timings.length - 1] + "ms"
+             + "\tAvg: " + (total / timings.length) + "ms");
   }
 
   /*
@@ -719,18 +366,20 @@ public class PerformanceEvaluation exten
    * @param cmd Command to run.
    * @throws IOException
    */
-  private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
+  private void doMapReduce(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
         InterruptedException, ClassNotFoundException {
     Configuration conf = getConf();
-    Path inputDir = writeInputFile(conf);
+    Path inputDir = writeInputFile(conf, opts);
     conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
     conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
     Job job = new Job(conf);
     job.setJarByClass(PerformanceEvaluation.class);
     job.setJobName("HBase Performance Evaluation");
 
-    job.setInputFormatClass(PeInputFormat.class);
-    PeInputFormat.setInputPaths(job, inputDir);
+    job.setInputFormatClass(NLineInputFormat.class);
+    NLineInputFormat.setInputPaths(job, inputDir);
+    // this is default, but be explicit about it just in case.
+    NLineInputFormat.setNumLinesPerSplit(job, 1);
 
     job.setOutputKeyClass(LongWritable.class);
     job.setOutputValueClass(LongWritable.class);
@@ -744,9 +393,9 @@ public class PerformanceEvaluation exten
     TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
 
     TableMapReduceUtil.addDependencyJars(job);
-    // Add a Class from the hbase.jar so it gets registered too.
     TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
-      org.apache.hadoop.hbase.util.Bytes.class);
+      DescriptiveStatistics.class, // commons-math
+      ObjectMapper.class);         // jackson-mapper-asl
 
     TableMapReduceUtil.initCredentials(job);
 
@@ -759,7 +408,7 @@ public class PerformanceEvaluation exten
    * @return Directory that contains file written.
    * @throws IOException
    */
-  private Path writeInputFile(final Configuration c) throws IOException {
+  private Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
     Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
     Path inputDir = new Path(jobdir, "inputs");
@@ -772,22 +421,16 @@ public class PerformanceEvaluation exten
     // Make input random.
     Map<Integer, String> m = new TreeMap<Integer, String>();
     Hash h = MurmurHash.getInstance();
-    int perClientRows = (this.R / this.N);
+    int perClientRows = (opts.totalRows / opts.numClientThreads);
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
     try {
       for (int i = 0; i < 10; i++) {
-        for (int j = 0; j < N; j++) {
-          String s = "tableName=" + this.tableName +
-          ", startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
-          ", perClientRunRows=" + (perClientRows / 10) +
-          ", totalRows=" + this.R +
-          ", sampleRate=" + this.sampleRate +
-          ", clients=" + this.N +
-          ", flushCommits=" + this.flushCommits +
-          ", writeToWAL=" + this.writeToWAL +
-          ", useTags=" + this.useTags +
-          ", noOfTags=" + this.noOfTags +
-          ", reportLatency=" + this.reportLatency +
-          ", multiGet=" + this.multiGet;
+        for (int j = 0; j < opts.numClientThreads; j++) {
+          TestOptions next = new TestOptions(opts);
+          next.startRow = (j * perClientRows) + (i * (perClientRows/10));
+          next.perClientRunRows = perClientRows / 10;
+          String s = mapper.writeValueAsString(next);
           int hash = h.hash(Bytes.toBytes(s));
           m.put(hash, s);
         }
@@ -829,95 +472,50 @@ public class PerformanceEvaluation exten
   }
 
   /**
-   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test
-   * tests}.  This makes the reflection logic a little easier to understand...
+   * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
+   * This makes tracking all these arguments a little easier.
    */
   static class TestOptions {
-    private int startRow;
-    private int perClientRunRows;
-    private int totalRows;
-    private float sampleRate;
-    private int numClientThreads;
-    private TableName tableName;
-    private boolean flushCommits;
-    private boolean writeToWAL = true;
-    private boolean useTags = false;
-    private int noOfTags = 0;
-    private boolean reportLatency;
-    private int multiGet = 0;
-    private HConnection connection;
-
-    TestOptions() {}
 
-    TestOptions(int startRow, int perClientRunRows, int totalRows, float sampleRate,
-        int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL,
-        boolean useTags, int noOfTags, boolean reportLatency, int multiGet,
-        HConnection connection) {
-      this.startRow = startRow;
-      this.perClientRunRows = perClientRunRows;
-      this.totalRows = totalRows;
-      this.sampleRate = sampleRate;
-      this.numClientThreads = numClientThreads;
-      this.tableName = tableName;
-      this.flushCommits = flushCommits;
-      this.writeToWAL = writeToWAL;
-      this.useTags = useTags;
-      this.noOfTags = noOfTags;
-      this.reportLatency = reportLatency;
-      this.multiGet = multiGet;
-      this.connection = connection;
-    }
-
-    public int getStartRow() {
-      return startRow;
-    }
+    public TestOptions() {}
 
-    public int getPerClientRunRows() {
-      return perClientRunRows;
-    }
-
-    public int getTotalRows() {
-      return totalRows;
-    }
-
-    public float getSampleRate() {
-      return sampleRate;
-    }
-
-    public int getNumClientThreads() {
-      return numClientThreads;
-    }
-
-    public TableName getTableName() {
-      return tableName;
-    }
-
-    public boolean isFlushCommits() {
-      return flushCommits;
-    }
-
-    public boolean isWriteToWAL() {
-      return writeToWAL;
-    }
-
-    public boolean isReportLatency() {
-      return reportLatency;
-    }
-
-    public int getMultiGet() {
-      return multiGet;
-    }
-
-    public HConnection getConnection() {
-      return connection;
-    }
-    
-    public boolean isUseTags() {
-      return this.useTags;
-    }
-    public int getNumTags() {
-      return this.noOfTags;
-    }
+    public TestOptions(TestOptions that) {
+      this.nomapred = that.nomapred;
+      this.startRow = that.startRow;
+      this.perClientRunRows = that.perClientRunRows;
+      this.numClientThreads = that.numClientThreads;
+      this.totalRows = that.totalRows;
+      this.sampleRate = that.sampleRate;
+      this.tableName = that.tableName;
+      this.flushCommits = that.flushCommits;
+      this.writeToWAL = that.writeToWAL;
+      this.useTags = that.useTags;
+      this.noOfTags = that.noOfTags;
+      this.reportLatency = that.reportLatency;
+      this.multiGet = that.multiGet;
+      this.inMemoryCF = that.inMemoryCF;
+      this.presplitRegions = that.presplitRegions;
+      this.compression = that.compression;
+      this.blockEncoding = that.blockEncoding;
+    }
+
+    public boolean nomapred = false;
+    public int startRow = 0;
+    public int perClientRunRows = ROWS_PER_GB;
+    public int numClientThreads = 1;
+    public int totalRows = ROWS_PER_GB;
+    public float sampleRate = 1.0f;
+    public String tableName = TABLE_NAME;
+    public boolean flushCommits = true;
+    public boolean writeToWAL = true;
+    public boolean useTags = false;
+    public int noOfTags = 1;
+    public boolean reportLatency = false;
+    public int multiGet = 0;
+    boolean inMemoryCF = false;
+    int presplitRegions = 0;
+    public Compression.Algorithm compression = Compression.Algorithm.NONE;
+    public DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
   }
 
   /*
@@ -927,48 +525,26 @@ public class PerformanceEvaluation exten
   static abstract class Test {
     // Below is make it so when Tests are all running in the one
     // jvm, that they each have a differently seeded Random.
-    private static final Random randomSeed =
-      new Random(System.currentTimeMillis());
+    private static final Random randomSeed = new Random(System.currentTimeMillis());
     private static long nextRandomSeed() {
       return randomSeed.nextLong();
     }
     protected final Random rand = new Random(nextRandomSeed());
+    protected final Configuration conf;
+    protected final TestOptions opts;
 
-    protected final int startRow;
-    protected final int perClientRunRows;
-    protected final int totalRows;
-    protected final float sampleRate;
     private final Status status;
-    protected TableName tableName;
-    protected HTableInterface table;
-    protected volatile Configuration conf;
-    protected boolean flushCommits;
-    protected boolean writeToWAL;
-    protected boolean useTags;
-    protected int noOfTags;
-    protected boolean reportLatency;
     protected HConnection connection;
+    protected HTableInterface table;
 
     /**
      * Note that all subclasses of this class must provide a public contructor
      * that has the exact same list of arguments.
      */
     Test(final Configuration conf, final TestOptions options, final Status status) {
-      super();
-      this.startRow = options.getStartRow();
-      this.perClientRunRows = options.getPerClientRunRows();
-      this.totalRows = options.getTotalRows();
-      this.sampleRate = options.getSampleRate();
-      this.status = status;
-      this.tableName = options.getTableName();
-      this.table = null;
       this.conf = conf;
-      this.flushCommits = options.isFlushCommits();
-      this.writeToWAL = options.isWriteToWAL();
-      this.useTags = options.isUseTags();
-      this.noOfTags = options.getNumTags();
-      this.reportLatency = options.isReportLatency();
-      this.connection = options.getConnection();
+      this.opts = options;
+      this.status = status;
     }
 
     private String generateStatus(final int sr, final int i, final int lr) {
@@ -976,20 +552,22 @@ public class PerformanceEvaluation exten
     }
 
     protected int getReportingPeriod() {
-      int period = this.perClientRunRows / 10;
-      return period == 0 ? this.perClientRunRows : period;
+      int period = opts.perClientRunRows / 10;
+      return period == 0 ? opts.perClientRunRows : period;
     }
 
     void testSetup() throws IOException {
-      this.table = connection.getTable(tableName);
+      this.connection = HConnectionManager.createConnection(conf);
+      this.table = connection.getTable(opts.tableName);
       this.table.setAutoFlush(false, true);
     }
 
     void testTakedown() throws IOException {
-      if (flushCommits) {
+      if (opts.flushCommits) {
         this.table.flushCommits();
       }
       table.close();
+      connection.close();
     }
 
     /*
@@ -1013,12 +591,12 @@ public class PerformanceEvaluation exten
      * Provides an extension point for tests that don't want a per row invocation.
      */
     void testTimed() throws IOException {
-      int lastRow = this.startRow + this.perClientRunRows;
+      int lastRow = opts.startRow + opts.perClientRunRows;
       // Report on completion of 1/10th of total.
-      for (int i = this.startRow; i < lastRow; i++) {
+      for (int i = opts.startRow; i < lastRow; i++) {
         testRow(i);
         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
-          status.setStatus(generateStatus(this.startRow, i, lastRow));
+          status.setStatus(generateStatus(opts.startRow, i, lastRow));
         }
       }
     }
@@ -1039,7 +617,7 @@ public class PerformanceEvaluation exten
 
     @Override
     void testRow(final int i) throws IOException {
-      Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
+      Scan scan = new Scan(getRandomRow(this.rand, opts.totalRows));
       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
       scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
       ResultScanner s = this.table.getScanner(scan);
@@ -1049,8 +627,8 @@ public class PerformanceEvaluation exten
 
     @Override
     protected int getReportingPeriod() {
-      int period = this.perClientRunRows / 100;
-      return period == 0 ? this.perClientRunRows : period;
+      int period = opts.perClientRunRows / 100;
+      return period == 0 ? opts.perClientRunRows : period;
     }
 
   }
@@ -1084,15 +662,15 @@ public class PerformanceEvaluation exten
     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
 
     protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
-      int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
+      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
       int stop = start + maxRange;
       return new Pair<byte[],byte[]>(format(start), format(stop));
     }
 
     @Override
     protected int getReportingPeriod() {
-      int period = this.perClientRunRows / 100;
-      return period == 0? this.perClientRunRows: period;
+      int period = opts.perClientRunRows / 100;
+      return period == 0? opts.perClientRunRows: period;
     }
   }
 
@@ -1142,24 +720,20 @@ public class PerformanceEvaluation exten
 
   static class RandomReadTest extends Test {
     private final int everyN;
-    private final boolean reportLatency;
     private final double[] times;
-    private final int multiGet;
     private ArrayList<Get> gets;
     int idx = 0;
 
     RandomReadTest(Configuration conf, TestOptions options, Status status) {
       super(conf, options, status);
-      everyN = (int) (this.totalRows / (this.totalRows * this.sampleRate));
-      LOG.info("Sampling 1 every " + everyN + " out of " + perClientRunRows + " total rows.");
-      this.reportLatency = options.isReportLatency();
-      this.multiGet = options.getMultiGet();
-      if (this.multiGet > 0) {
-        LOG.info("MultiGet enabled. Sending GETs in batches of " + this.multiGet + ".");
-        this.gets = new ArrayList<Get>(this.multiGet);
+      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
+      LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
+      if (opts.multiGet > 0) {
+        LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
+        this.gets = new ArrayList<Get>(opts.multiGet);
       }
-      if (this.reportLatency) {
-        this.times = new double[(int) Math.ceil(this.perClientRunRows * this.sampleRate / Math.max(1, this.multiGet))];
+      if (opts.reportLatency) {
+        this.times = new double[(int) Math.ceil(opts.perClientRunRows * opts.sampleRate / Math.max(1, opts.multiGet))];
       } else {
         this.times = null;
       }
@@ -1168,14 +742,14 @@ public class PerformanceEvaluation exten
     @Override
     void testRow(final int i) throws IOException {
       if (i % everyN == 0) {
-        Get get = new Get(getRandomRow(this.rand, this.totalRows));
+        Get get = new Get(getRandomRow(this.rand, opts.totalRows));
         get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
-        if (this.multiGet > 0) {
+        if (opts.multiGet > 0) {
           this.gets.add(get);
-          if (this.gets.size() == this.multiGet) {
+          if (this.gets.size() == opts.multiGet) {
             long start = System.nanoTime();
             this.table.get(this.gets);
-            if (this.reportLatency) {
+            if (opts.reportLatency) {
               times[idx++] = (System.nanoTime() - start) / 1e6;
             }
             this.gets.clear();
@@ -1183,7 +757,7 @@ public class PerformanceEvaluation exten
         } else {
           long start = System.nanoTime();
           this.table.get(get);
-          if (this.reportLatency) {
+          if (opts.reportLatency) {
             times[idx++] = (System.nanoTime() - start) / 1e6;
           }
         }
@@ -1192,8 +766,8 @@ public class PerformanceEvaluation exten
 
     @Override
     protected int getReportingPeriod() {
-      int period = this.perClientRunRows / 100;
-      return period == 0 ? this.perClientRunRows : period;
+      int period = opts.perClientRunRows / 100;
+      return period == 0 ? opts.perClientRunRows : period;
     }
 
     @Override
@@ -1203,7 +777,7 @@ public class PerformanceEvaluation exten
         this.gets.clear();
       }
       super.testTakedown();
-      if (this.reportLatency) {
+      if (opts.reportLatency) {
         Arrays.sort(times);
         DescriptiveStatistics ds = new DescriptiveStatistics();
         for (double t : times) {
@@ -1231,13 +805,13 @@ public class PerformanceEvaluation exten
 
     @Override
     void testRow(final int i) throws IOException {
-      byte[] row = getRandomRow(this.rand, this.totalRows);
+      byte[] row = getRandomRow(this.rand, opts.totalRows);
       Put put = new Put(row);
       byte[] value = generateData(this.rand, VALUE_LENGTH);
-      if (useTags) {
+      if (opts.useTags) {
         byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[noOfTags];
-        for (int n = 0; n < noOfTags; n++) {
+        Tag[] tags = new Tag[opts.noOfTags];
+        for (int n = 0; n < opts.noOfTags; n++) {
           Tag t = new Tag((byte) n, tag);
           tags[n] = t;
         }
@@ -1247,7 +821,7 @@ public class PerformanceEvaluation exten
       } else {
         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
-      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       table.put(put);
     }
   }
@@ -1272,7 +846,7 @@ public class PerformanceEvaluation exten
     @Override
     void testRow(final int i) throws IOException {
       if (this.testScanner == null) {
-        Scan scan = new Scan(format(this.startRow));
+        Scan scan = new Scan(format(opts.startRow));
         scan.setCaching(30);
         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
         this.testScanner = table.getScanner(scan);
@@ -1305,10 +879,10 @@ public class PerformanceEvaluation exten
       byte[] row = format(i);
       Put put = new Put(row);
       byte[] value = generateData(this.rand, VALUE_LENGTH);
-      if (useTags) {
+      if (opts.useTags) {
         byte[] tag = generateData(this.rand, TAG_LENGTH);
-        Tag[] tags = new Tag[noOfTags];
-        for (int n = 0; n < noOfTags; n++) {
+        Tag[] tags = new Tag[opts.noOfTags];
+        for (int n = 0; n < opts.noOfTags; n++) {
           Tag t = new Tag((byte) n, tag);
           tags[n] = t;
         }
@@ -1318,7 +892,7 @@ public class PerformanceEvaluation exten
       } else {
         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
-      put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
       table.put(put);
     }
   }
@@ -1421,26 +995,21 @@ public class PerformanceEvaluation exten
     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
   }
 
-  long runOneClient(final Class<? extends Test> cmd, final int startRow,
-      final int perClientRunRows, final int totalRows, final float sampleRate,
-      boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags,
-      boolean reportLatency, int multiGet, HConnection connection, final Status status)
-  throws IOException {
-    status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
-      perClientRunRows + " rows");
+  static long runOneClient(final Class<? extends Test> cmd, Configuration conf, TestOptions opts,
+    final Status status)
+      throws IOException {
+    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for " +
+      opts.perClientRunRows + " rows");
     long totalElapsedTime = 0;
 
-    TestOptions options = new TestOptions(startRow, perClientRunRows,
-      totalRows, sampleRate, N, tableName, flushCommits, writeToWAL, useTags, noOfTags,
-      reportLatency, multiGet, connection);
     final Test t;
     try {
-      Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
-          Configuration.class, TestOptions.class, Status.class);
-      t = constructor.newInstance(getConf(), options, status);
+      Constructor<? extends Test> constructor =
+        cmd.getDeclaredConstructor(Configuration.class, TestOptions.class, Status.class);
+      t = constructor.newInstance(conf, opts, status);
     } catch (NoSuchMethodException e) {
       throw new IllegalArgumentException("Invalid command class: " +
-          cmd.getName() + ".  It does not provide a constructor as described by" +
+          cmd.getName() + ".  It does not provide a constructor as described by " +
           "the javadoc comment.  Available constructors are: " +
           Arrays.toString(cmd.getConstructors()));
     } catch (Exception e) {
@@ -1449,41 +1018,24 @@ public class PerformanceEvaluation exten
     totalElapsedTime = t.test();
 
     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
-      "ms at offset " + startRow + " for " + perClientRunRows + " rows" +
-      " (" + calculateMbps((int)(perClientRunRows * sampleRate), totalElapsedTime) + ")");
+      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
+      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime) + ")");
     return totalElapsedTime;
   }
 
-  private void runNIsOne(final Class<? extends Test> cmd) throws IOException {
-    Status status = new Status() {
-      public void setStatus(String msg) throws IOException {
-        LOG.info(msg);
-      }
-    };
-
+  private void runTest(final Class<? extends Test> cmd, TestOptions opts) throws IOException,
+      InterruptedException, ClassNotFoundException {
     HBaseAdmin admin = null;
     try {
       admin = new HBaseAdmin(getConf());
-      checkTable(admin);
-      runOneClient(cmd, 0, this.R, this.R, this.sampleRate, this.flushCommits,
-      this.writeToWAL, this.useTags, this.noOfTags, this.reportLatency, this.multiGet,
-        this.connection, status);
-    } catch (Exception e) {
-      LOG.error("Failed", e);
+      checkTable(admin, opts);
     } finally {
       if (admin != null) admin.close();
     }
-  }
-
-  private void runTest(final Class<? extends Test> cmd) throws IOException,
-      InterruptedException, ClassNotFoundException {
-    if (N == 1) {
-      // If there is only one client and one HRegionServer, we assume nothing
-      // has been set up at all.
-      runNIsOne(cmd);
+    if (opts.nomapred) {
+      doLocalClients(cmd, opts);
     } else {
-      // Else, run
-      runNIsMoreThanOne(cmd);
+      doMapReduce(cmd, opts);
     }
   }
 
@@ -1543,16 +1095,15 @@ public class PerformanceEvaluation exten
         + " sequentialWrite 1");
   }
 
-  private void getArgs(final int start, final String[] args) {
+  private static int getNumClients(final int start, final String[] args) {
     if(start + 1 > args.length) {
       throw new IllegalArgumentException("must supply the number of clients");
     }
-    N = Integer.parseInt(args[start]);
+    int N = Integer.parseInt(args[start]);
     if (N < 1) {
       throw new IllegalArgumentException("Number of clients must be > 1");
     }
-    // Set total number of rows to write.
-    this.R = this.R * N;
+    return N;
   }
 
   public int run(String[] args) throws Exception {
@@ -1570,7 +1121,9 @@ public class PerformanceEvaluation exten
       // input, take a look at writeInputFile().
       // Then you must adapt the LINE_PATTERN input regex,
       // and parse the argument, take a look at PEInputFormat.getSplits().
-      
+
+      TestOptions opts = new TestOptions();
+
       for (int i = 0; i < args.length; i++) {
         String cmd = args[i];
         if (cmd.equals("-h") || cmd.startsWith("--h")) {
@@ -1581,94 +1134,94 @@ public class PerformanceEvaluation exten
 
         final String nmr = "--nomapred";
         if (cmd.startsWith(nmr)) {
-          this.nomapred = true;
+          opts.nomapred = true;
           continue;
         }
 
         final String rows = "--rows=";
         if (cmd.startsWith(rows)) {
-          this.R = Integer.parseInt(cmd.substring(rows.length()));
+          opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
           continue;
         }
 
         final String sampleRate = "--sampleRate=";
         if (cmd.startsWith(sampleRate)) {
-          this.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+          opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
           continue;
         }
 
         final String table = "--table=";
         if (cmd.startsWith(table)) {
-          this.tableName = TableName.valueOf(cmd.substring(table.length()));
+          opts.tableName = cmd.substring(table.length());
           continue;
         }
 
         final String compress = "--compress=";
         if (cmd.startsWith(compress)) {
-          this.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+          opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
           continue;
         }
 
         final String blockEncoding = "--blockEncoding=";
         if (cmd.startsWith(blockEncoding)) {
-          this.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+          opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
           continue;
         }
 
         final String flushCommits = "--flushCommits=";
         if (cmd.startsWith(flushCommits)) {
-          this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+          opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
           continue;
         }
 
         final String writeToWAL = "--writeToWAL=";
         if (cmd.startsWith(writeToWAL)) {
-          this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+          opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
           continue;
         }
 
         final String presplit = "--presplit=";
         if (cmd.startsWith(presplit)) {
-          this.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
+          opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
           continue;
         }
         
         final String inMemory = "--inmemory=";
         if (cmd.startsWith(inMemory)) {
-          this.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
+          opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
           continue;
         }
 
         final String latency = "--latency";
         if (cmd.startsWith(latency)) {
-          this.reportLatency = true;
+          opts.reportLatency = true;
           continue;
         }
 
         final String multiGet = "--multiGet=";
         if (cmd.startsWith(multiGet)) {
-          this.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+          opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
           continue;
         }
 
-        this.connection = HConnectionManager.createConnection(getConf());
-        
         final String useTags = "--usetags=";
         if (cmd.startsWith(useTags)) {
-          this.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+          opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
           continue;
         }
 
         final String noOfTags = "--nooftags=";
         if (cmd.startsWith(noOfTags)) {
-          this.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+          opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
           continue;
         }
 
         Class<? extends Test> cmdClass = determineCommandClass(cmd);
         if (cmdClass != null) {
-          getArgs(i + 1, args);
-          runTest(cmdClass);
+          opts.numClientThreads = getNumClients(i + 1, args);
+          // number of rows specified
+          opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
+          runTest(cmdClass, opts);
           errCode = 0;
           break;
         }



Mime
View raw message