hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeffr...@apache.org
Subject svn commit: r1510056 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
Date Sat, 03 Aug 2013 18:39:56 GMT
Author: jeffreyz
Date: Sat Aug  3 18:39:56 2013
New Revision: 1510056

URL: http://svn.apache.org/r1510056
Log:
hbase-8816: Add support of loading multiple tables into LoadTestTool

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java?rev=1510056&r1=1510055&r2=1510056&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java
Sat Aug  3 18:39:56 2013
@@ -41,8 +41,8 @@ import org.apache.hadoop.util.ToolRunner
 @InterfaceAudience.Private
 public abstract class AbstractHBaseTool implements Tool {
 
-  private static final int EXIT_SUCCESS = 0;
-  private static final int EXIT_FAILURE = 1;
+  protected static final int EXIT_SUCCESS = 0;
+  protected static final int EXIT_FAILURE = 1;
 
   private static final String SHORT_HELP_OPTION = "h";
   private static final String LONG_HELP_OPTION = "help";
@@ -54,6 +54,8 @@ public abstract class AbstractHBaseTool 
   protected Configuration conf = null;
 
   private static final Set<String> requiredOptions = new TreeSet<String>();
+  
+  protected String[] cmdLineArgs = null;
 
   /**
    * Override this to add command-line options using {@link #addOptWithArg}
@@ -90,6 +92,7 @@ public abstract class AbstractHBaseTool 
     try {
       // parse the command line arguments
       cmd = parseArgs(args);
+      cmdLineArgs = args;
     } catch (ParseException e) {
       LOG.error("Error when parsing command-line arguemnts", e);
       printUsage();
@@ -125,14 +128,14 @@ public abstract class AbstractHBaseTool 
     return success;
   }
 
-  private CommandLine parseArgs(String[] args) throws ParseException {
+  protected CommandLine parseArgs(String[] args) throws ParseException {
     options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage");
     addOptions();
     CommandLineParser parser = new BasicParser();
     return parser.parse(options, args);
   }
 
-  private void printUsage() {
+  protected void printUsage() {
     HelpFormatter helpFormatter = new HelpFormatter();
     helpFormatter.setWidth(80);
     String usageHeader = "Options:";

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1510056&r1=1510055&r2=1510056&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
Sat Aug  3 18:39:56 2013
@@ -17,11 +17,17 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -32,6 +38,7 @@ import org.apache.hadoop.hbase.io.compre
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * A command-line utility that reads, writes, and verifies data. Unlike
@@ -102,6 +109,7 @@ public class LoadTestTool extends Abstra
   protected static final String OPT_ZK_QUORUM = "zk";
   protected static final String OPT_SKIP_INIT = "skip_init";
   protected static final String OPT_INIT_ONLY = "init_only";
+  private static final String NUM_TABLES = "num_tables";
 
   protected static final long DEFAULT_START_KEY = 0;
 
@@ -128,10 +136,12 @@ public class LoadTestTool extends Abstra
   protected boolean isMultiPut;
 
   // Reader options
-  protected int numReaderThreads = DEFAULT_NUM_THREADS;
-  protected int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
-  protected int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
-  protected int verifyPercent;
+  private int numReaderThreads = DEFAULT_NUM_THREADS;
+  private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
+  private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
+  private int verifyPercent;
+ 
+  private int numTables = 1;
 
   // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
   //       console tool itself should only be used from console.
@@ -223,6 +233,11 @@ public class LoadTestTool extends Abstra
         DEFAULT_START_KEY + ".");
     addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
         + "already exists");
+    
+    addOptWithArg(NUM_TABLES,
+      "A positive integer number. When a number n is speicfied, load test "
+          + "tool  will load n table parallely. -tn parameter value becomes "
+          + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
   }
 
   @Override
@@ -308,6 +323,11 @@ public class LoadTestTool extends Abstra
       System.out.println("Percent of keys to verify: " + verifyPercent);
       System.out.println("Reader threads: " + numReaderThreads);
     }
+    
+    numTables = 1;
+    if(cmd.hasOption(NUM_TABLES)) {
+      numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
+    }
   }
 
   private void parseColumnFamilyOptions(CommandLine cmd) {
@@ -339,6 +359,14 @@ public class LoadTestTool extends Abstra
 
   @Override
   protected int doWork() throws IOException {
+    if (numTables > 1) {
+      return parallelLoadTables();
+    } else {
+      return loadTable();
+    }
+  }
+
+  protected int loadTable() throws IOException {
     if (cmd.hasOption(OPT_ZK_QUORUM)) {
       conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
     }
@@ -399,11 +427,112 @@ public class LoadTestTool extends Abstra
       success = success && readerThreads.getNumReadErrors() == 0
           && readerThreads.getNumReadFailures() == 0;
     }
-    return success ? 0 : 1;
+    return success ? EXIT_SUCCESS : this.EXIT_FAILURE;
   }
-
+  
   public static void main(String[] args) {
     new LoadTestTool().doStaticMain(args);
   }
 
+  /**
+   * When NUM_TABLES is specified, the function starts multiple worker threads 
+   * which individually start a LoadTestTool instance to load a table. Each 
+   * table name is in format <tn>_<index>. For example, "-tn test -num_tables
2"
+   * , table names will be "test_1", "test_2"
+   * 
+   * @throws IOException
+   */
+  private int parallelLoadTables() 
+      throws IOException {
+    // create new command args
+    String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
+    String[] newArgs = null;
+    if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
+      newArgs = new String[cmdLineArgs.length + 2];
+      newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
+      for (int i = 0; i < cmdLineArgs.length; i++) {
+        newArgs[i + 2] = cmdLineArgs[i];
+      }
+    } else {
+      newArgs = cmdLineArgs;
+    }
+
+    int tableNameValueIndex = -1;
+    for (int j = 0; j < newArgs.length; j++) {
+      if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
+        tableNameValueIndex = j + 1;
+      } else if (newArgs[j].endsWith(NUM_TABLES)) {
+        // change NUM_TABLES to 1 so that each worker loads one table
+        newArgs[j + 1] = "1"; 
+      }
+    }
+
+    // starting to load multiple tables
+    List<WorkerThread> workers = new ArrayList<WorkerThread>();
+    for (int i = 0; i < numTables; i++) {
+      String[] workerArgs = newArgs.clone();
+      workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
+      WorkerThread worker = new WorkerThread(i, workerArgs);
+      workers.add(worker);
+      LOG.info(worker + " starting");
+      worker.start();
+    }
+
+    // wait for all workers finish
+    LOG.info("Waiting for worker threads to finish");
+    for (WorkerThread t : workers) {
+      try {
+        t.join();
+      } catch (InterruptedException ie) {
+        IOException iie = new InterruptedIOException();
+        iie.initCause(ie);
+        throw iie;
+      }
+      checkForErrors();
+    }
+    
+    return EXIT_SUCCESS;
+  }
+
+  // If an exception is thrown by one of worker threads, it will be
+  // stored here.
+  protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+
+  private void workerThreadError(Throwable t) {
+    thrown.compareAndSet(null, t);
+  }
+
+  /**
+   * Check for errors in the writer threads. If any is found, rethrow it.
+   */
+  private void checkForErrors() throws IOException {
+    Throwable thrown = this.thrown.get();
+    if (thrown == null) return;
+    if (thrown instanceof IOException) {
+      throw (IOException) thrown;
+    } else {
+      throw new RuntimeException(thrown);
+    }
+  }
+
+  class WorkerThread extends Thread {
+    private String[] workerArgs;
+
+    WorkerThread(int i, String[] args) {
+      super("WorkerThread-" + i);
+      workerArgs = args;
+    }
+
+    public void run() {
+      try {
+        int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
+        if (ret != 0) {
+          throw new RuntimeException("LoadTestTool exit with non-zero return code.");
+        }
+      } catch (Exception ex) {
+        LOG.error("Error in worker thread", ex);
+        workerThreadError(ex);
+      }
+    }
+  }
 }



Mime
View raw message