hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject hbase git commit: HBASE-16101 Tool to microbenchmark procedure WAL performance.
Date Tue, 30 Aug 2016 20:33:34 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 e38b54dcb -> bd0466f15


HBASE-16101 Tool to microbenchmark procedure WAL performance.

Change-Id: I8ec158319395d2ec8e36641a3beab2694f7b6aef


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bd0466f1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bd0466f1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bd0466f1

Branch: refs/heads/branch-1.3
Commit: bd0466f1534a6230ebef17bf2587ba0bde3d6522
Parents: e38b54d
Author: Apekshit Sharma <appy@apache.org>
Authored: Mon Aug 29 19:23:09 2016 -0700
Committer: Apekshit Sharma <appy@apache.org>
Committed: Tue Aug 30 13:33:13 2016 -0700

----------------------------------------------------------------------
 hbase-assembly/pom.xml                          |  11 +
 .../hadoop/hbase/util/AbstractHBaseTool.java    |  78 +++---
 ...ProcedureWALLoaderPerformanceEvaluation.java | 248 +++++++++++++++++
 .../wal/ProcedureWALPerformanceEvaluation.java  | 267 +++++++++++++++++++
 4 files changed, 567 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bd0466f1/hbase-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml
index eca3f12..a651c75 100644
--- a/hbase-assembly/pom.xml
+++ b/hbase-assembly/pom.xml
@@ -165,6 +165,17 @@
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-thrift</artifactId>
     </dependency>
+    <!-- To dump tools in hbase-procedure into cached_classpath.txt. -->
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-procedure</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-procedure</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-hadoop-compat</artifactId>

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd0466f1/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
index a876aef..6e3dec6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
@@ -17,13 +17,11 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
-import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
@@ -40,12 +38,11 @@ import org.apache.hadoop.util.ToolRunner;
  */
 @InterfaceAudience.Private
 public abstract class AbstractHBaseTool implements Tool {
-
   protected static final int EXIT_SUCCESS = 0;
   protected static final int EXIT_FAILURE = 1;
 
-  private static final String SHORT_HELP_OPTION = "h";
-  private static final String LONG_HELP_OPTION = "help";
+  private static final Option HELP_OPTION = new Option("h", "help", false,
+      "Prints help for this tool.");
 
   private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class);
 
@@ -53,8 +50,6 @@ public abstract class AbstractHBaseTool implements Tool {
 
   protected Configuration conf = null;
 
-  private static final Set<String> requiredOptions = new TreeSet<String>();
-
   protected String[] cmdLineArgs = null;
 
   /**
@@ -83,6 +78,7 @@ public abstract class AbstractHBaseTool implements Tool {
 
   @Override
   public final int run(String[] args) throws IOException {
+    cmdLineArgs = args;
     if (conf == null) {
       LOG.error("Tool configuration is not initialized");
       throw new NullPointerException("conf");
@@ -90,24 +86,22 @@ public abstract class AbstractHBaseTool implements Tool {
 
     CommandLine cmd;
     try {
+      addOptions();
+      if (isHelpCommand(args)) {
+        printUsage();
+        return EXIT_SUCCESS;
+      }
       // parse the command line arguments
-      cmd = parseArgs(args);
-      cmdLineArgs = args;
+      cmd = new BasicParser().parse(options, args);
     } catch (ParseException e) {
       LOG.error("Error when parsing command-line arguments", e);
       printUsage();
       return EXIT_FAILURE;
     }
 
-    if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION) ||
-        !sanityCheckOptions(cmd)) {
-      printUsage();
-      return EXIT_FAILURE;
-    }
-
     processOptions(cmd);
 
-    int ret = EXIT_FAILURE;
+    int ret;
     try {
       ret = doWork();
     } catch (Exception e) {
@@ -117,22 +111,11 @@ public abstract class AbstractHBaseTool implements Tool {
     return ret;
   }
 
-  private boolean sanityCheckOptions(CommandLine cmd) {
-    boolean success = true;
-    for (String reqOpt : requiredOptions) {
-      if (!cmd.hasOption(reqOpt)) {
-        LOG.error("Required option -" + reqOpt + " is missing");
-        success = false;
-      }
-    }
-    return success;
-  }
-
-  protected CommandLine parseArgs(String[] args) throws ParseException {
-    options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage");
-    addOptions();
-    CommandLineParser parser = new BasicParser();
-    return parser.parse(options, args);
+  private boolean isHelpCommand(String[] args) throws ParseException {
+    Options helpOption = new Options().addOption(HELP_OPTION);
+    // this parses the command line but doesn't throw an exception on unknown options
+    CommandLine cl = new BasicParser().parse(helpOption, args, true);
+    return cl.getOptions().length != 0;
   }
 
   protected void printUsage() {
@@ -146,14 +129,20 @@ public abstract class AbstractHBaseTool implements Tool {
     helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter);
   }
 
+  protected void addOption(Option option) {
+    options.addOption(option);
+  }
+
   protected void addRequiredOptWithArg(String opt, String description) {
-    requiredOptions.add(opt);
-    addOptWithArg(opt, description);
+    Option option = new Option(opt, true, description);
+    option.setRequired(true);
+    options.addOption(option);
   }
 
   protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description)
{
-    requiredOptions.add(longOpt);
-    addOptWithArg(shortOpt, longOpt, description);
+    Option option = new Option(shortOpt, longOpt, true, description);
+    option.setRequired(true);
+    options.addOption(option);
   }
 
   protected void addOptNoArg(String opt, String description) {
@@ -172,6 +161,21 @@ public abstract class AbstractHBaseTool implements Tool {
     options.addOption(shortOpt, longOpt, true, description);
   }
 
+  public int getOptionAsInt(CommandLine cmd, String opt, int defaultValue) {
+    if (cmd.hasOption(opt)) {
+      return Integer.parseInt(cmd.getOptionValue(opt));
+    } else {
+      return defaultValue;
+    }
+  }
+
+  public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) {
+    if (cmd.hasOption(opt)) {
+      return Double.parseDouble(cmd.getOptionValue(opt));
+    } else {
+      return defaultValue;
+    }
+  }
   /**
    * Parse a number and enforce a range.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd0466f1/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
new file mode 100644
index 0000000..347239d
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.ProcedureInfo;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+
+import static java.lang.System.currentTimeMillis;
+
+public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
+  protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  // Command line options and defaults.
+  public static int DEFAULT_NUM_PROCS = 1000000;  // 1M
+  public static Option NUM_PROCS_OPTION = new Option("procs", true,
+      "Total number of procedures. Default: " + DEFAULT_NUM_PROCS);
+  public static int DEFAULT_NUM_WALS = 0;
+  public static Option NUM_WALS_OPTION = new Option("wals", true,
+      "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY
+
+          " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
+  public static int DEFAULT_STATE_SIZE = 1024;  // 1KB
+  public static Option STATE_SIZE_OPTION = new Option("size", true,
+      "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
+          + " bytes");
+  public static int DEFAULT_UPDATES_PER_PROC = 5;
+  public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true,
+      "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC);
+  public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50;
+  public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction",
true,
+      "Fraction of procs for which to write delete state. Distribution of procs chosen for
"
+          + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION);
+
+  public int numProcs;
+  public int updatesPerProc;
+  public double deleteProcsFraction;
+  public int numWals;
+  private WALProcedureStore store;
+  static byte[] serializedState;
+
+  private class LoadCounter implements ProcedureStore.ProcedureLoader {
+    public LoadCounter() {}
+
+    @Override
+    public void setMaxProcId(long maxProcId) {
+    }
+
+    @Override
+    public void load(ProcedureIterator procIter) throws IOException {
+      while (procIter.hasNext()) {
+        if (procIter.isNextCompleted()) {
+          ProcedureInfo proc = procIter.nextAsProcedureInfo();
+        } else {
+          Procedure proc = procIter.nextAsProcedure();
+        }
+      }
+    }
+
+    @Override
+    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+      while (procIter.hasNext()) {
+        Procedure proc = procIter.nextAsProcedure();
+      }
+    }
+  }
+
+  @Override
+  protected void addOptions() {
+    addOption(NUM_PROCS_OPTION);
+    addOption(UPDATES_PER_PROC_OPTION);
+    addOption(DELETE_PROCS_FRACTION_OPTION);
+    addOption(NUM_WALS_OPTION);
+    addOption(STATE_SIZE_OPTION);
+  }
+
+  @Override
+  protected void processOptions(CommandLine cmd) {
+    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
+    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
+    int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
+    serializedState = new byte[stateSize];
+    updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(),
+        DEFAULT_UPDATES_PER_PROC);
+    deleteProcsFraction = getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(),
+        DEFAULT_DELETE_PROCS_FRACTION);
+    setupConf();
+  }
+
+  private void setupConf() {
+    if (numWals > 0) {
+      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
+    }
+  }
+
+  public void setUpProcedureStore() throws IOException {
+    Path testDir = UTIL.getDataTestDir();
+    FileSystem fs = testDir.getFileSystem(conf);
+    Path logDir = new Path(testDir, "proc-logs");
+    System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
+    fs.delete(logDir, true);
+    store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
+    store.start(1);
+    store.recoverLease();
+    store.load(new LoadCounter());
+  }
+
+  /**
+   * @return a list of shuffled integers which represent state of proc id. First occurrence
of a
+   * number denotes insert state, consecutive occurrences denote update states, and -ve value
+   * denotes delete state.
+   */
+  private List<Integer> shuffleProcWriteSequence() {
+    Random rand = new Random();
+    List<Integer> procStatesSequence = new ArrayList<>();
+    Set<Integer> toBeDeletedProcs = new HashSet<>();
+    // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete,
add
+    // extra entry which is marked -ve in the loop after shuffle.
+    for (int procId  = 1; procId <= numProcs; ++procId) {
+      procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId));
+      if (rand.nextFloat() < deleteProcsFraction) {
+        procStatesSequence.add(procId);
+        toBeDeletedProcs.add(procId);
+      }
+    }
+    Collections.shuffle(procStatesSequence);
+    // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete
state.
+    for (int i = procStatesSequence.size() - 1; i >= 0; --i) {
+      int procId = procStatesSequence.get(i);
+      if (toBeDeletedProcs.contains(procId)) {
+        procStatesSequence.set(i, -1 * procId);
+        toBeDeletedProcs.remove(procId);
+      }
+    }
+    return procStatesSequence;
+  }
+
+  private void writeWals() throws IOException {
+    List<Integer> procStates = shuffleProcWriteSequence();
+    TestProcedure[] procs = new TestProcedure[numProcs + 1];  // 0 is not used.
+    int numProcsPerWal = numWals > 0 ? (int)Math.ceil(procStates.size() / numWals)
+        : Integer.MAX_VALUE;
+    long startTime = currentTimeMillis();
+    long lastTime = startTime;
+    for (int i = 0; i < procStates.size(); ++i) {
+      int procId = procStates.get(i);
+      if (procId < 0) {
+        store.delete(procs[-procId].getProcId());
+        procs[-procId] = null;
+      } else if (procs[procId] == null) {
+        procs[procId] = new TestProcedure(procId, 0);
+        procs[procId].setData(serializedState);
+        store.insert(procs[procId], null);
+      } else {
+        store.update(procs[procId]);
+      }
+      if (i > 0 && i % numProcsPerWal == 0) {
+        long currentTime = currentTimeMillis();
+        System.out.println("Forcing wall roll. Time taken on last WAL: " +
+            (currentTime - lastTime) / 1000.0f + " sec");
+        store.rollWriterForTesting();
+        lastTime = currentTime;
+      }
+    }
+    long timeTaken = currentTimeMillis() - startTime;
+    System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time
taken : "
+        + StringUtils.humanTimeDiff(timeTaken) + "\n\n");
+  }
+
+  private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException {
+    System.out.println("Restarting procedure store to read back the WALs");
+    store.stop(false);
+    store.start(1);
+    store.recoverLease();
+
+    long startTime = currentTimeMillis();
+    store.load(loader);
+    long timeTaken = System.currentTimeMillis() - startTime;
+    System.out.println("******************************************");
+    System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec");
+    System.out.println("******************************************");
+  }
+
+  public void tearDownProcedureStore() {
+    store.stop(false);
+    try {
+      store.getFileSystem().delete(store.getLogDir(), true);
+    } catch (IOException e) {
+      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free
up "
+          + "disk space. Location: " + store.getLogDir().toString());
+      System.err.println(e.toString());
+    }
+  }
+
+  @Override
+  protected int doWork() {
+    try {
+      setUpProcedureStore();
+      writeWals();
+      storeRestart(new LoadCounter());
+      return EXIT_SUCCESS;
+    } catch (IOException e) {
+      e.printStackTrace();
+      return EXIT_FAILURE;
+    } finally {
+      tearDownProcedureStore();
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation();
+    tool.setConf(UTIL.getConfiguration());
+    tool.run(args);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd0466f1/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
new file mode 100644
index 0000000..210ac43
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store.wal;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.util.*;
+
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+
+public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
+  protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  // Command line options and defaults.
+  public static int DEFAULT_NUM_THREADS = 20;
+  public static Option NUM_THREADS_OPTION = new Option("threads", true,
+      "Number of parallel threads which will write insert/updates/deletes to WAL. Default:
"
+      + DEFAULT_NUM_THREADS);
+  public static int DEFAULT_NUM_PROCS = 1000000;  // 1M
+  public static Option NUM_PROCS_OPTION = new Option("procs", true,
+      "Total number of procedures. Each procedure writes one insert and one update. Default:
"
+      + DEFAULT_NUM_PROCS);
+  public static int DEFAULT_NUM_WALS = 0;
+  public static Option NUM_WALS_OPTION = new Option("wals", true,
+      "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY
+
+          " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
+  public static int DEFAULT_STATE_SIZE = 1024;  // 1KB
+  public static Option STATE_SIZE_OPTION = new Option("size", true,
+      "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
+          + "bytes");
+  public static Option SYNC_OPTION = new Option("sync", true,
+      "Type of sync to use when writing WAL contents to file system. Accepted values: hflush,
"
+          + "hsync, nosync. Default: hflush");
+  public static String DEFAULT_SYNC_OPTION = "hflush";
+
+  public int numThreads;
+  public long numProcs;
+  public long numProcsPerWal = Long.MAX_VALUE;  // never roll wall based on this value.
+  public int numWals;
+  public String syncType;
+  public int stateSize;
+  static byte[] serializedState;
+  private WALProcedureStore store;
+
+  /** Used by {@link Worker}. */
+  private AtomicLong procIds = new AtomicLong(0);
+  private AtomicBoolean workersFailed = new AtomicBoolean(false);
+  // Timeout for worker threads.
+  private static final int WORKER_THREADS_TIMEOUT_SEC = 600;  // in seconds
+
+  // Non-default configurations.
+  private void setupConf() {
+    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType));
+    if (numWals > 0) {
+      conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
+      numProcsPerWal = numProcs / numWals;
+    }
+  }
+
+  private void setupProcedureStore() throws IOException {
+    Path testDir = UTIL.getDataTestDir();
+    FileSystem fs = testDir.getFileSystem(conf);
+    Path logDir = new Path(testDir, "proc-logs");
+    System.out.println("Logs directory : " + logDir.toString());
+    fs.delete(logDir, true);
+    if ("nosync".equals(syncType)) {
+      store = new NoSyncWalProcedureStore(conf, fs, logDir);
+    } else {
+      store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
+    }
+    store.start(numThreads);
+    store.recoverLease();
+    store.load(new ProcedureTestingUtility.LoadCounter());
+    System.out.println("Starting new log : "
+        + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
+  }
+
+  private void tearDownProcedureStore() {
+    store.stop(false);
+    try {
+      store.getFileSystem().delete(store.getLogDir(), true);
+    } catch (IOException e) {
+      System.err.println("Error: Couldn't delete log dir. You can delete it manually to free
up "
+          + "disk space. Location: " + store.getLogDir().toString());
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Processes and validates command line options.
+   */
+  @Override
+  public void processOptions(CommandLine cmd) {
+    numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
+    numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
+    numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
+    syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
+    assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType):
+        "sync argument can only accept one of these three values: hsync, hflush, nosync";
+    stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
+    serializedState = new byte[stateSize];
+    setupConf();
+  }
+
+  @Override
+  public void addOptions() {
+    addOption(NUM_THREADS_OPTION);
+    addOption(NUM_PROCS_OPTION);
+    addOption(NUM_WALS_OPTION);
+    addOption(SYNC_OPTION);
+    addOption(STATE_SIZE_OPTION);
+  }
+
+  @Override
+  public int doWork() {
+    try {
+      setupProcedureStore();
+      ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+      Future<Integer>[] futures = (Future<Integer>[]) new Object[numThreads];
+      // Start worker threads.
+      long start = System.currentTimeMillis();
+      for (int i = 0; i < numThreads; i++) {
+        futures[i] = executor.submit(this.new Worker(start));
+      }
+      boolean failure = false;
+      try {
+        for (Future<Integer> future : futures) {
+          long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
+          failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
+        }
+      } catch (Exception e) {
+        System.err.println("Exception in worker thread.");
+        e.printStackTrace();
+        return EXIT_FAILURE;
+      }
+      executor.shutdown();
+      if (failure) {
+        return EXIT_FAILURE;
+      }
+      long timeTaken = System.currentTimeMillis() - start;
+      System.out.println("******************************************");
+      System.out.println("Num threads    : " + numThreads);
+      System.out.println("Num procedures : " + numProcs);
+      System.out.println("Sync type      : " + syncType);
+      System.out.println("Time taken     : " + (timeTaken / 1000.0f) + "sec");
+      System.out.println("******************************************");
+      return EXIT_SUCCESS;
+    } catch (IOException e) {
+      e.printStackTrace();
+      return EXIT_FAILURE;
+    } finally {
+      tearDownProcedureStore();
+    }
+  }
+
+  ///////////////////////////////
+  // HELPER CLASSES
+  ///////////////////////////////
+
+  /**
+   * Callable to generate load for wal by inserting/deleting/updating procedures.
+   * If procedure store fails to roll log file (throws IOException), all threads quit, and
at
+   * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
+   */
+  class Worker implements Callable<Integer> {
+    final long start;
+
+    public Worker(long start) {
+      this.start = start;
+    }
+
+    // TODO: Can also collect #procs, time taken by each thread to measure fairness.
+    @Override
+    public Integer call() throws IOException {
+      while (true) {
+        if (workersFailed.get()) {
+          return EXIT_FAILURE;
+        }
+        long procId = procIds.getAndIncrement();
+        if (procId >= numProcs) {
+          break;
+        }
+        if (procId != 0 && procId % 10000 == 0) {
+          long ms = System.currentTimeMillis() - start;
+          System.out.println("Wrote " + procId + " procedures in "
+              + StringUtils.humanTimeDiff(ms));
+        }
+        try{
+          if (procId > 0 && procId % numProcsPerWal == 0) {
+            store.rollWriterForTesting();
+            System.out.println("Starting new log : "
+                + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
+          }
+        } catch (IOException ioe) {
+          // Ask other threads to quit too.
+          workersFailed.set(true);
+          System.err.println("Exception when rolling log file. Current procId = " + procId);
+          ioe.printStackTrace();
+          return EXIT_FAILURE;
+        }
+        ProcedureTestingUtility.TestProcedure proc =
+            new ProcedureTestingUtility.TestProcedure(procId);
+        proc.setData(serializedState);
+        store.insert(proc, null);
+        store.update(proc);
+      }
+      return EXIT_SUCCESS;
+    }
+  }
+
+  public class NoSyncWalProcedureStore extends WALProcedureStore {
+    public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs,
+        final Path logDir) {
+      super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
+        @Override
+        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
+          // no-op
+        }
+      });
+    }
+
+    @Override
+    protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int
count)
+        throws IOException {
+      long totalSynced = 0;
+      for (int i = 0; i < count; ++i) {
+        totalSynced += slots[offset + i].size();
+      }
+      return totalSynced;
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
+    tool.setConf(UTIL.getConfiguration());
+    tool.run(args);
+  }
+}
\ No newline at end of file


Mime
View raw message