hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1176146 - in /hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual: ./ utils/
Date Tue, 27 Sep 2011 01:07:21 GMT
Author: nspiegelberg
Date: Tue Sep 27 01:07:20 2011
New Revision: 1176146

URL: http://svn.apache.org/viewvc?rev=1176146&view=rev
Log:
copying over tests in hbase/manual from our internal 0.20 branch

Summary:

copying over tests in hbase/manual from our internal 0.20 branch
(VENDOR.hbase/hbase-0.20).

Reviewed By: kranganathan

Test Plan: ran HBaseTest against local HBase instance.

Added:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/HBaseTest.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/RestartMetaTest.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DataGenerator.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/ProcessBasedLocalHBaseCluster.java

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/HBaseTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/HBaseTest.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/HBaseTest.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/HBaseTest.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,227 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.manual.utils.HBaseUtils;
+import org.apache.hadoop.hbase.manual.utils.KillProcessesAndVerify;
+import org.apache.hadoop.hbase.manual.utils.MultiThreadedReader;
+import org.apache.hadoop.hbase.manual.utils.MultiThreadedWriter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class HBaseTest
+{
+  static {
+      // make the root logger display only errors
+      Logger.getRootLogger().setLevel(Level.ERROR);
+      // enable debugging for our package
+      Logger.getLogger("org.apache.hadoop.hbase.manual").setLevel(Level.DEBUG);
+      Logger.getLogger("org.apache.hadoop.hbase.client").setLevel(Level.INFO);
+  }
+
+  // global HBase configuration for the JVM - referenced by all classes.
+  public static List<HBaseConfiguration> configList_ = new ArrayList<HBaseConfiguration>();
+  // startup options
+  public static Options options_ = new Options();
+  // command line options object
+  public static CommandLine cmd_;
+
+  // table name for the test
+  public static byte[] tableName_ = Bytes.toBytes("test1");
+  // column families used by the test
+  public static byte[][] columnFamilies_ = { Bytes.toBytes("actions") };
+  private static final Log LOG = LogFactory.getLog(HBaseTest.class);
+  static Random random_ = new Random();
+
+  // usage string for loading data
+  static final String OPT_USAGE_LOAD = " <num keys>:<average cols per key>:<avg data size>[:<num threads = 20>]";
+  /**
+   * Reads the following params from the command line:
+   *  <Number of keys to load>:<Average columns per key>:<Average data size per column>[:<num threads = 20>]
+   */
+  public void loadData() {
+    // parse command line data
+    String[] cols = cmd_.getOptionValue(OPT_LOAD).split(":");
+    long startKey = 0;
+    long endKey = Long.parseLong(cols[0]);
+    long minColsPerKey = 1;
+    long maxColsPerKey = 2 * Long.parseLong(cols[1]);
+    int minColDataSize = Integer.parseInt(cols[2])/2;
+    int maxColDataSize = Integer.parseInt(cols[2]) * 3 / 2;
+    int numThreads = (endKey - startKey > 1000)? 20 : 1;
+    if (cols.length > 3) {
+      numThreads = Integer.parseInt(cols[3]);
+    }
+
+    // print out the args
+    System.out.printf("Key range %d .. %d\n", startKey, endKey);
+    System.out.printf("Number of Columns/Key: %d..%d\n", minColsPerKey, maxColsPerKey);
+    System.out.printf("Data Size/Column: %d..%d bytes\n", minColDataSize, maxColDataSize);
+    System.out.printf("Client Threads: %d\n", numThreads);
+
+    // start the writers
+    MultiThreadedWriter writer = new MultiThreadedWriter(configList_.get(0), tableName_, columnFamilies_[0]);
+    writer.setBulkLoad(true);
+    writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
+    writer.setDataSize(minColDataSize, maxColDataSize);
+    writer.start(startKey, endKey, numThreads);
+    System.out.printf("Started loading data...");
+//    writer.waitForFinish();
+  }
+
+  static final String OPT_USAGE_READ = " <start key>:<end key>[:<verify percent = 0>:<num threads = 20>]";
+  /**
+   * Reads the following params from the command line:
+   *  <Starting key to read>:<End key to read>:<Percent of keys to verify (data is stamped with key)>[:<num threads = 20>]
+   */
+  public void readData() {
+    // parse command line data
+    String[] cols = cmd_.getOptionValue(OPT_READ).split(":");
+    long startKey = Long.parseLong(cols[0]);
+    long endKey = Long.parseLong(cols[1]);
+    int verifyPercent = Integer.parseInt(cols[2]);
+    int numThreads = (endKey - startKey > 1000)? 20 : 1;
+    if (cols.length > 3) {
+      numThreads = Integer.parseInt(cols[3]);
+    }
+
+    // if the read test is running at the same time as the load,
+    // ignore the start/endKey options
+    if (cmd_.hasOption(OPT_LOAD)) {
+      startKey = -1;
+      endKey = -1;
+    } else {
+      System.out.printf("Key range %d .. %d\n", startKey, endKey);
+    }
+
+    System.out.printf("Verify percent of keys: %d\n", verifyPercent);
+    System.out.printf("Client Threads: %d\n", numThreads);
+
+    // start the readers
+    MultiThreadedReader reader = new MultiThreadedReader(configList_.get(0), tableName_, columnFamilies_[0]);
+    reader.setVerficationPercent(verifyPercent);
+    reader.start(startKey, endKey, numThreads);
+  }
+
+ static final String OPT_USAGE_KILL = " <HBase root path>:<HDFS root path>:<minutes between kills>:<RS kill %>:<#keys to verify>";
+
+  /**
+   * Reads the following params from the command line:
+   *  <Path to HBase root on deployed cluster>:<Path to HDFS root on deployed cluster>:
+   *  <Minutes between server kills>:<RS kill % (versus DN)>:<# keys to verify after kill (# keys loaded in 4 minutes is a good number)>
+   */
+  public void killAndVerify() {
+    // parse command line data
+    String[] cols = cmd_.getOptionValue(OPT_KILL).split(":");
+    String clusterHBasePath = cols[0];
+    String clusterHDFSPath = cols[1];
+    int timeIntervalBetweenKills = Integer.parseInt(cols[2]);
+    int rSKillPercent = Integer.parseInt(cols[3]);
+    int numKeysToVerify = Integer.parseInt(cols[4]);
+
+    System.out.printf("Time between kills:  %d minutes\n", timeIntervalBetweenKills);
+    System.out.printf("RegionServer (rest is DataNode) kill percent: %d\n", rSKillPercent);
+    System.out.printf("Num keys to verify after killing: %d\n", numKeysToVerify);
+
+    (new KillProcessesAndVerify(clusterHBasePath, clusterHDFSPath, timeIntervalBetweenKills, rSKillPercent, numKeysToVerify)).start();
+    System.out.printf("Started kill test...");
+  }
+
+  public static void main(String[] args) {
+    try {
+      // parse the command line args
+      initAndParseArgs(args);
+
+      HBaseTest hBaseTest = new HBaseTest();
+      // create the HBase configuration from ZooKeeper node
+      String[] zkNodes = cmd_.getOptionValue(OPT_ZKNODE).split(":");
+      for(String zkNode : zkNodes) {
+        HBaseConfiguration conf = HBaseUtils.getHBaseConfFromZkNode(zkNode);
+        LOG.info("Adding hbase.zookeeper.quorum = " + conf.get("hbase.zookeeper.quorum"));
+        configList_.add(conf);
+      }
+
+      // create tables if needed
+      for(HBaseConfiguration conf : configList_) {
+        HBaseUtils.createTableIfNotExists(conf, tableName_, columnFamilies_);
+      }
+
+      // write some test data in an infinite loop if needed
+      if(cmd_.hasOption(OPT_LOAD)) {
+        hBaseTest.loadData();
+      }
+      // kill servers and verify the data integrity
+      if(cmd_.hasOption(OPT_KILL)) {
+        hBaseTest.killAndVerify();
+      }
+      // read the test data in an infinite loop
+      if(cmd_.hasOption(OPT_READ)) {
+        hBaseTest.readData();
+      }
+    } catch(Exception e) {
+      e.printStackTrace();
+      printUsage();
+    }
+  }
+
+  private static String USAGE;
+  private static final String HEADER = "HBaseTest";
+  private static final String FOOTER = "";
+  private static final String OPT_ZKNODE = "zk";
+  private static final String OPT_LOAD = "load";
+  private static final String OPT_READ = "read";
+  private static final String OPT_KILL = "kill";
+  static void initAndParseArgs(String[] args) throws ParseException {
+    // set the usage object
+    USAGE =  "bin/hbase org.apache.hadoop.hbase.manual.HBaseTest "
+            + "  -" + OPT_ZKNODE   + " <Zookeeper node>"
+            + "  -" + OPT_LOAD     + OPT_USAGE_LOAD
+            + "  -" + OPT_READ     + OPT_USAGE_READ
+            + "  -" + OPT_KILL     + OPT_USAGE_KILL;
+    // add options
+    options_.addOption(OPT_ZKNODE    , true, "Zookeeper node in the HBase cluster");
+    options_.addOption(OPT_LOAD      , true, OPT_USAGE_LOAD);
+    options_.addOption(OPT_READ      , true, OPT_USAGE_READ);
+    options_.addOption(OPT_KILL      , true, OPT_USAGE_KILL);
+    // parse the passed in options
+    CommandLineParser parser = new BasicParser();
+    cmd_ = parser.parse(options_, args);
+  }
+
+  private static void printUsage() {
+    HelpFormatter helpFormatter = new HelpFormatter( );
+    helpFormatter.setWidth( 80 );
+    helpFormatter.printHelp( USAGE, HEADER, options_, FOOTER );
+  }
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/RestartMetaTest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/RestartMetaTest.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/RestartMetaTest.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/RestartMetaTest.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,147 @@
+package org.apache.hadoop.hbase.manual;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.manual.utils.HBaseUtils;
+import org.apache.hadoop.hbase.manual.utils.KillProcessesAndVerify;
+import org.apache.hadoop.hbase.manual.utils.MultiThreadedReader;
+import org.apache.hadoop.hbase.manual.utils.MultiThreadedWriter;
+import org.apache.hadoop.hbase.manual.utils.ProcessBasedLocalHBaseCluster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class RestartMetaTest {
+  static {
+    // make the root logger display only errors
+    Logger.getRootLogger().setLevel(Level.ERROR);
+    // enable debugging for our package
+    Logger.getLogger("org.apache.hadoop.hbase.manual").setLevel(Level.DEBUG);
+    Logger.getLogger("org.apache.hadoop.hbase.manual").setLevel(Level.DEBUG);
+  }
+
+  // startup options
+  public static Options options_ = new Options();
+
+  private static HBaseConfiguration conf_;
+
+  // command line options object
+  public static CommandLine cmd_;
+
+  // table name for the test
+  public static byte[] tableName_ = Bytes.toBytes("test1");
+
+  // column families used by the test
+  public static byte[][] columnFamilies_ = { Bytes.toBytes("actions") };
+
+  private static final Log LOG = LogFactory.getLog(RestartMetaTest.class);
+
+  static Random random_ = new Random();
+
+
+  public static void loadData() {
+    // parse command line data
+    long startKey = 0;
+    long endKey = 100000;
+    long minColsPerKey = 5;
+    long maxColsPerKey = 15;
+    int minColDataSize = 256;
+    int maxColDataSize = 256 * 3;
+    int numThreads = 10;
+
+    // print out the args
+    System.out.printf("Key range %d .. %d\n", startKey, endKey);
+    System.out.printf("Number of Columns/Key: %d..%d\n", minColsPerKey, maxColsPerKey);
+    System.out.printf("Data Size/Column: %d..%d bytes\n", minColDataSize, maxColDataSize);
+    System.out.printf("Client Threads: %d\n", numThreads);
+
+    // start the writers
+    MultiThreadedWriter writer = new MultiThreadedWriter(conf_, tableName_, columnFamilies_[0]);
+    writer.setBulkLoad(true);
+    writer.setColumnsPerKey(minColsPerKey, maxColsPerKey);
+    writer.setDataSize(minColDataSize, maxColDataSize);
+    writer.start(startKey, endKey, numThreads);
+    System.out.printf("Started loading data...");
+    writer.waitForFinish();
+    System.out.printf("Finished loading data...");
+  }
+
+  public static void main(String[] args) {
+    try {
+      // parse the command line args
+      initAndParseArgs(args);
+
+      String hbaseHome = cmd_.getOptionValue(OPT_HBASE_HOME);
+      int numRegionServers = Integer.parseInt(cmd_.getOptionValue(OPT_NUM_RS));
+
+      ProcessBasedLocalHBaseCluster hbaseCluster =
+          new ProcessBasedLocalHBaseCluster(hbaseHome, numRegionServers);
+
+      // start the process based HBase cluster
+      conf_ = hbaseCluster.start();
+
+      // create tables if needed
+      HBaseUtils.createTableIfNotExists(conf_, tableName_, columnFamilies_);
+
+      LOG.debug("Loading data....\n\n");
+      loadData();
+
+      LOG.debug("Sleeping 30 seconds....\n\n");
+      HBaseUtils.sleep(30000);
+
+      HServerAddress address = HBaseUtils.getMetaRS(conf_);
+      int metaRSPort = address.getPort();
+
+      LOG.debug("Killing META region server running on port " + metaRSPort);
+      hbaseCluster.killRegionServer(metaRSPort);
+      HBaseUtils.sleep(2000);
+
+      LOG.debug("Restarting region server running on port metaRSPort");
+      hbaseCluster.startRegionServer(metaRSPort);
+      HBaseUtils.sleep(2000);
+
+    } catch(Exception e) {
+      e.printStackTrace();
+      printUsage();
+    }
+  }
+
+  private static String USAGE;
+  private static final String HEADER = "HBase Tests";
+  private static final String FOOTER = "";
+  private static final String OPT_HBASE_HOME = "hbase_home";
+  private static final String OPT_NUM_RS     = "num_rs";
+
+  static void initAndParseArgs(String[] args) throws ParseException {
+    // set the usage object
+    USAGE =  "bin/hbase org.apache.hadoop.hbase.manual.RestartMetaTest "
+            + "  -" + OPT_HBASE_HOME   + " <HBase home directory> "
+            + "  -" + OPT_NUM_RS       + " <number of region servers> ";
+
+    // add options
+    options_.addOption(OPT_HBASE_HOME, true, "HBase home directory");
+    options_.addOption(OPT_NUM_RS, true, "Number of Region Servers");
+
+    // parse the passed in options
+    CommandLineParser parser = new BasicParser();
+    cmd_ = parser.parse(options_, args);
+  }
+
+  private static void printUsage() {
+    HelpFormatter helpFormatter = new HelpFormatter( );
+    helpFormatter.setWidth( 80 );
+    helpFormatter.printHelp( USAGE, HEADER, options_, FOOTER );
+  }
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DataGenerator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DataGenerator.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DataGenerator.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DataGenerator.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,88 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+import java.util.Random;
+
+public class DataGenerator {
+  static Random random_ = new Random();
+  /* one byte fill pattern */
+  public static final String fill1B_     = "-";
+  /* 64 byte fill pattern */
+  public static final String fill64B_    = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789  ";
+  /* alternate 64 byte fill pattern */
+  public static final String fill64BAlt_ = "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789+-";
+  /* 1K fill pattern */
+  public static final String fill1K_     = fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_+
+                                           fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_+
+                                           fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_+
+                                           fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_;
+
+  int minDataSize_ = 0;
+  int maxDataSize_ = 0;
+
+  static public String paddedKey(long key) {
+      // left-pad key with zeroes to 10 decimal places.
+      String paddedKey = String.format("%010d", key);
+
+      // flip the key to randomize
+      return (new StringBuffer(paddedKey)).reverse().toString();
+  }
+
+  public DataGenerator(int minDataSize, int maxDataSize) {
+    minDataSize_ = minDataSize;
+    maxDataSize_ = maxDataSize;
+  }
+
+  public byte[] getDataInSize(long key) {
+    int dataSize = minDataSize_ + random_.nextInt(Math.abs(maxDataSize_ - minDataSize_));
+    StringBuilder sb = new StringBuilder();
+
+    // write the key first
+    int sizeLeft = dataSize;
+    String keyAsString = DataGenerator.paddedKey(key);
+    sb.append(keyAsString);
+    sizeLeft -= keyAsString.length();
+
+    for(int i = 0; i < sizeLeft/1024; ++i)
+    {
+      sb.append(fill1K_);
+    }
+    sizeLeft = sizeLeft % 1024;
+    for(int i = 0; i < sizeLeft/64; ++i)
+    {
+      sb.append(fill64B_);
+    }
+    sizeLeft = sizeLeft % 64;
+    for(int i = 0; i < dataSize%64; ++i)
+    {
+      sb.append(fill1B_);
+    }
+
+    return sb.toString().getBytes();
+  }
+
+  public static boolean verify(String rowKey, String actionId, String data) {
+    if(!data.startsWith(rowKey)) {
+      return false;
+    }
+    return true;
+  }
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+public class DisplayFormatUtils
+{
+  public static String formatTime(long elapsedTime) {
+    String format = String.format("%%0%dd", 2);
+    elapsedTime = elapsedTime / 1000;
+    String seconds = String.format(format, elapsedTime % 60);
+    String minutes = String.format(format, (elapsedTime % 3600) / 60);
+    String hours = String.format(format, elapsedTime / 3600);
+    String time =  hours + ":" + minutes + ":" + seconds;
+    return time;
+  }
+
+  public static String formatNumber(long number) {
+    if(number >= 1000000000) {
+      return ((number/1000000000) + "B");
+    }
+    else if(number >= 1000000) {
+      return ((number/1000000) + "M");
+    }
+    else if(number >= 1000) {
+      return ((number/1000) + "K");
+    }
+    else {
+      return (number + "");
+    }
+  }
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,100 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+public class HBaseUtils
+{
+  private static final Log LOG = LogFactory.getLog(HBaseUtils.class);
+
+  public static void sleep(int millisecs) {
+    try {
+      Thread.sleep(millisecs);
+    } catch (InterruptedException e) {
+    }
+  }
+
+  public static HTable getHTable(HBaseConfiguration conf, byte[] tableName) {
+    HTable table = null;
+    try {
+      table = new HTable(conf, tableName);
+    }
+    catch (IOException e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return table;
+  }
+
+  public static void createTableIfNotExists(HBaseConfiguration conf, byte[] tableName, byte[][] columnFamilies) {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    for(byte[] cfName : columnFamilies) {
+      desc.addFamily(new HColumnDescriptor(cfName));
+    }
+    try
+    {
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      admin.createTable(desc);
+    }
+    catch(MasterNotRunningException e) {
+      LOG.error("Master not running.");
+      e.printStackTrace();
+    }
+    catch(TableExistsException e) {
+      LOG.info("Table already exists.");
+    }
+    catch (IOException e)
+    {
+      LOG.error("IO Exception.");
+      e.printStackTrace();
+    }
+  }
+
+  public static HServerAddress getMetaRS(HBaseConfiguration conf) throws IOException {
+    HTable table = new HTable(conf, HConstants.META_TABLE_NAME);
+    HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(""));
+    return hloc.getServerAddress();
+  }
+
+  public static HBaseConfiguration getHBaseConfFromZkNode(String zkNodeName) {
+    Configuration c = new Configuration();
+    c.set("hbase.zookeeper.quorum", zkNodeName);
+    return new HBaseConfiguration(c);
+  }
+
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,194 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.manual.HBaseTest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+public class KillProcessesAndVerify extends Thread
+{
+  private static final Log LOG = LogFactory.getLog(KillProcessesAndVerify.class);
+  /* One minute in millis */
+  public static final int TIME_INTERVAL_ONE_MIN = 1*(60*1000);
+  /* wait between killing an RS/DN - set to 1 hour */
+  public int TIME_INTERVAL_BETWEEN_KILLS = 60 * TIME_INTERVAL_ONE_MIN;
+  /* percent of time we want the RS killed (as opposed to the DN) */
+  public int RS_KILL_PERCENT = 80;
+  /* how many keys to verify when the server is killed - 4 minute window */
+  public int NUM_KEYS_TO_VERIFY = 25000;
+  /* the HTable */
+  HTable hTable_ = null;
+  /* the cluster name */
+  public static String clusterHBasePath_ = null;
+  public static String clusterHDFSPath_ = null;
+
+  public static Random random_ = new Random();
+  private Runtime runtime_ = null;
+
+  public KillProcessesAndVerify(String clusterHBasePath, String clusterHDFSPath, int timeIntervalBetweenKills, int rSKillPercent, int numKeysToVerify) {
+    runtime_ = Runtime.getRuntime();
+    hTable_ = HBaseUtils.getHTable(HBaseTest.configList_.get(0), MultiThreadedWriter.tableName_);
+    clusterHBasePath_ = clusterHBasePath;
+    clusterHDFSPath_ = clusterHDFSPath;
+    TIME_INTERVAL_BETWEEN_KILLS = timeIntervalBetweenKills * TIME_INTERVAL_ONE_MIN;
+    RS_KILL_PERCENT = rSKillPercent;
+    NUM_KEYS_TO_VERIFY = numKeysToVerify;
+  }
+
+  public void run() {
+    while(true) {
+      try
+      {
+        // wait for the next iteration of kills
+        Thread.sleep(TIME_INTERVAL_BETWEEN_KILLS);
+
+        // choose if we are killing an RS or a DN
+        boolean operateOnRS = (random_.nextInt(100) < RS_KILL_PERCENT);
+        // choose the node we want to kill
+        long lastWrittenKey = MultiThreadedWriter.currentKey_.get();
+        HRegionLocation hloc = hTable_.getRegionLocation(Bytes.toBytes(lastWrittenKey));
+        String nodeName = hloc.getServerAddress().getHostname();
+        LOG.debug("Picked type = " + (operateOnRS?"REGIONSERVER":"DATANODE") + ", node = " + nodeName);
+
+        // kill the server
+        String killCommand = getKillCommand(nodeName, operateOnRS);
+        executeCommand(killCommand);
+        LOG.debug("Killed " + (operateOnRS?"REGIONSERVER":"DATANODE") + " on node " + nodeName + ", waiting...");
+
+        if(true) {
+          break;
+        }
+
+        // wait for a while
+        Thread.sleep(TIME_INTERVAL_ONE_MIN/2);
+
+        // start the server
+        String startCommand = getStartCommand(nodeName, operateOnRS);
+        executeCommand(startCommand);
+        LOG.debug("Started " + (operateOnRS?"REGIONSERVER":"DATANODE") + " on node " + nodeName + ", waiting...");
+
+        // wait for a while
+        Thread.sleep(TIME_INTERVAL_ONE_MIN);
+
+        // verify the reads that happened in the last 4 minutes - last 25000 keys
+        int endIdx = MultiThreadedWriter.insertedKeySet_.size() - 1;
+        int startIdx = (endIdx < NUM_KEYS_TO_VERIFY) ? 0 : (endIdx - NUM_KEYS_TO_VERIFY + 1);
+        verifyKeys(MultiThreadedWriter.insertedKeySet_, startIdx, endIdx, MultiThreadedWriter.failedKeySet_);
+        LOG.debug("Done verifying keys, sleep till next interval");
+      }
+      catch (IOException e1)
+      {
+        e1.printStackTrace();
+      }
+      catch (InterruptedException e)
+      {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public String getKillCommand(String nodeName, boolean rsCommand) {
+    // construct the remote kill command
+    String processName = rsCommand?"HRegionServer":"DataNode";
+    String killCmd = "/usr/bin/pgrep -f " + processName + " | /usr/bin/xargs /bin/kill -9";
+
+    // put the ssh call
+    String remoteKillCmd = "ssh " + nodeName + " " + killCmd;
+
+    return remoteKillCmd;
+  }
+
+  public String getStartCommand(String nodeName, boolean rsCommand) {
+    // construct the remote start up command
+    String startCmd =
+      rsCommand?
+      clusterHBasePath_ +"/bin/hbase-daemon.sh start regionserver":
+      clusterHDFSPath_ + "/bin/hadoop-daemons.sh --config /usr/local/hadoop/HDFS-" + clusterHDFSPath_ + "/conf start datanode";
+
+    // put the ssh call
+    String remoteStartCmd = "ssh " + nodeName + " " + startCmd;
+
+    return remoteStartCmd;
+  }
+
+  public void verifyKeys(List<Long> insertedKeys, int verifyStartIdx, int verifyEndIdx, List<Long> failedKeys) {
+    Get get = null;
+
+    for (int idx = verifyStartIdx; idx <= verifyEndIdx; idx++) {
+
+      long rowKey = insertedKeys.get(idx);
+
+      // skip any key that has not been inserted
+      if(failedKeys.contains(rowKey)) {
+        continue;
+      }
+      // query hbase for the key
+      get = new Get(MultiThreadedWriter.HBaseWriter.longToByteArrayKey(rowKey));
+      get.addFamily(MultiThreadedWriter.columnFamily_);
+      try
+      {
+        Result result = hTable_.get(get);
+      }
+      catch (IOException e)
+      {
+        // if this is hit, the key was not found
+        LOG.error("KEY " + rowKey + " was NOT FOUND, it was claimed to be inserted.");
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public void executeCommand(String command) throws InterruptedException, IOException {
+    LOG.debug("Command : " + command);
+    Process p = runtime_.exec(command);
+//    p.waitFor();
+    BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
+
+    BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+
+    // read the output from the command
+    System.out.println("Here is the standard output of the command:\n");
+    String s = null;
+
+    while ((s = stdInput.readLine()) != null) {
+      System.out.println(s);
+    }
+
+    // read any errors from the attempted command
+    System.out.println("Here is the standard error of the command (if any):\n");
+    while ((s = stdError.readLine()) != null) {
+      System.out.println(s);
+    }
+  }
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+
+public abstract class MultiThreadedAction
+{
+  private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class);
+  public static int numThreads_ = 1;
+  public static byte[] tableName_;
+  public static byte[] columnFamily_;
+  public float verifyPercent_ = 0;
+  public long startKey_ = 0;
+  public long endKey_ = 1;
+  public AtomicInteger numThreadsWorking_ = new AtomicInteger(0);
+  public AtomicLong numKeys_ = new AtomicLong(0);
+  public AtomicLong numKeysVerified_ = new AtomicLong(0);
+  public AtomicLong numCols_ = new AtomicLong(0);
+  public AtomicLong numErrors_ = new AtomicLong(0);
+  public AtomicLong numOpFailures_ = new AtomicLong(0);
+  public AtomicLong cumulativeOpTime_ = new AtomicLong(0);
+  public boolean verbose_ = false;
+  public Random random_ = new Random();
+  public HBaseConfiguration conf_;
+
+  public void startReporter(String id) {
+    (new ProgressReporter(id)).start();
+  }
+
+  public class ProgressReporter extends Thread {
+
+     private String id_ = "";
+
+     public ProgressReporter(String id) {
+       id_ = id;
+     }
+
+    public void run() {
+      long startTime = System.currentTimeMillis();
+      long reportingInterval = 5000;
+
+      long priorNumKeys = 0;
+      long priorCumulativeOpTime = 0;
+
+      while(numThreadsWorking_.get() != 0) {
+        String threadsLeft = "[" + id_ + ":" + numThreadsWorking_.get() + "] ";
+        if(numKeys_.get() == 0) {
+          LOG.info(threadsLeft + "Number of keys = 0");
+        }
+        else {
+          long numKeys = numKeys_.get();
+          long time = System.currentTimeMillis() - startTime;
+          long cumulativeOpTime = cumulativeOpTime_.get();
+
+          long numKeysDelta = numKeys - priorNumKeys;
+          long cumulativeOpTimeDelta = cumulativeOpTime - priorCumulativeOpTime;
+
+          LOG.info(threadsLeft + "Keys = " + numKeys +
+                   ", cols = " + DisplayFormatUtils.formatNumber(numCols_.get()) +
+                   ", time = " + DisplayFormatUtils.formatTime(time) +
+                   ((numKeys > 0 && time > 0)? (" Overall: [" +
+                                               "keys/s = " + numKeys*1000/time +
+                                               ", latency = " + cumulativeOpTime/numKeys + " ms]")
+                                             : "") +
+                   ((numKeysDelta > 0) ? (" Current: [" +
+                                         "keys/s = " + numKeysDelta*1000/reportingInterval +
+                                         ", latency = " + cumulativeOpTimeDelta/numKeysDelta + " ms]")
+                                      : "") +
+                   ((numKeysVerified_.get()>0)?(", verified = " + numKeysVerified_.get()):"") +
+                   ((numOpFailures_.get()>0)?(", FAILURES = " + numOpFailures_.get()):"") +
+                   ((numErrors_.get()>0)?(", ERRORS = " + numErrors_.get()):"")
+                   );
+
+          priorNumKeys = numKeys;
+          priorCumulativeOpTime = cumulativeOpTime;
+        }
+        try {
+          Thread.sleep(reportingInterval);
+        }
+        catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+  }
+
+  public abstract void start(long startKey, long endKey, int numThreads);
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,194 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.manual.HBaseTest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class MultiThreadedReader extends MultiThreadedAction
+{
+  private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class);
+  Set<HBaseReader> readers_ = new HashSet<HBaseReader>();
+
+  public MultiThreadedReader(HBaseConfiguration conf, byte[] tableName, byte[] columnFamily) {
+    tableName_ = tableName;
+    columnFamily_ = columnFamily;
+    conf_ = conf;
+  }
+
+  public void setVerficationPercent(float verifyPercent) {
+    verifyPercent_ = verifyPercent;
+  }
+
+  public void start(long startKey, long endKey, int numThreads) {
+    if(verbose_) {
+      LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
+    }
+    startKey_ = startKey;
+    endKey_ = endKey;
+    numThreads_ = numThreads;
+
+    long threadStartKey = startKey;
+    long threadEndKey = startKey;
+    for(int i = 0; i < numThreads_; ++i) {
+      threadStartKey = (startKey == -1) ? -1 : threadEndKey ;
+      threadEndKey = startKey + (i+1) * (endKey - startKey) / numThreads_;
+      HBaseReader reader = new HBaseReader(this, i, threadStartKey, threadEndKey);
+      readers_.add(reader);
+    }
+    numThreadsWorking_.addAndGet(readers_.size());
+    for(HBaseReader reader : readers_) {
+      reader.start();
+    }
+
+    startReporter("R");
+  }
+
+  public static class HBaseReader extends Thread {
+    int id_;
+    MultiThreadedReader reader_;
+    List<HTable> tables_ = new ArrayList<HTable>();
+    long startKey_;
+    long endKey_;
+    static int minDataSize_ = 256;
+    static int maxDataSize_ = 1024;
+    static DataGenerator dataGenerator_ = new DataGenerator(minDataSize_, maxDataSize_);
+
+    public HBaseReader(MultiThreadedReader reader, int id, long startKey, long endKey) {
+      id_ = id;
+      reader_ = reader;
+      HTable table = HBaseUtils.getHTable(reader_.conf_, tableName_);
+      tables_.add(table);
+      startKey_ = startKey;
+      endKey_ = endKey;
+    }
+
+    public void run() {
+      if (reader_.verbose_) {
+        LOG.info("Started thread #" + id_ + " for reads...");
+      }
+      boolean repeatQuery = false;
+      Get get = null;
+      long start = 0;
+      long curKey = 0;
+
+      for(;;) {
+        if(!repeatQuery) {
+
+          if (startKey_ == -1) {
+            // load test is running at the same time.
+            if (MultiThreadedWriter.insertedKeySet_.size() > 0) {
+              int idx = reader_.random_.nextInt(MultiThreadedWriter.insertedKeySet_.size());
+              curKey = MultiThreadedWriter.insertedKeySet_.get(idx);
+            } else {
+              try {
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+              }
+            }
+          } else {
+            curKey = startKey_ + Math.abs(reader_.random_.nextLong())%(endKey_ - startKey_);
+          }
+          get = new Get(DataGenerator.paddedKey(curKey).getBytes());
+          get.addFamily(columnFamily_);
+          // get.addColumn(columnFamily_, Bytes.toBytes("0"));
+        }
+        repeatQuery = false;
+        try {
+          if(reader_.verbose_ && repeatQuery) {
+            LOG.info("[" + id_ + "] " + (repeatQuery?"RE-Querying":"Querying") + " key  = " + curKey + ", cf = " + new String(columnFamily_));
+          }
+          queryKey( get, (reader_.random_.nextInt(100) < reader_.verifyPercent_) );
+        }
+        catch (IOException e) {
+          reader_.numOpFailures_.addAndGet(1);
+          LOG.debug("[" + id_ + "] FAILED read, key = " + (curKey + "") + ", time = " + (System.currentTimeMillis() - start) + " ms");
+          repeatQuery = true;
+        }
+      }
+    }
+
+    public void queryKey(Get get, boolean verify) throws IOException {
+      String rowKey = new String(get.getRow());
+      for(HTable table : tables_) {
+//        if(verbose_) {
+//          HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(rowKey));
+//          LOG.info("Key = " + rowKey + ", RegoinServer: " + hloc.getServerAddress().getHostname());
+//        }
+        // read the data
+        long start = System.currentTimeMillis();
+        Result result = table.get(get);
+        reader_.cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start);
+        reader_.numKeys_.addAndGet(1);
+
+        // if we got no data report error
+        if(result.isEmpty()) {
+           HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(rowKey));
+           LOG.info("Key = " + rowKey + ", RegionServer: " + hloc.getServerAddress().getHostname());
+           reader_.numErrors_.addAndGet(1);
+           LOG.error("No data returned, tried to get actions for key = " + rowKey);
+
+           if (reader_.numErrors_.get() > 3) {
+             LOG.error("Aborting run -- found more than three errors\n");
+             System.exit(-1);
+           }
+        }
+
+        if(result.getFamilyMap(columnFamily_) != null) {
+
+          // increment number of columns read
+          reader_.numCols_.addAndGet(result.getFamilyMap(columnFamily_).size());
+
+          if (verify) {
+            // verify the result
+            List<KeyValue> keyValues = result.list();
+            for(KeyValue kv : keyValues) {
+              String actionId = new String(kv.getQualifier());
+              String data = new String(kv.getValue());
+
+              // if something does not look right report it
+              if(!DataGenerator.verify(rowKey, actionId, data)) {
+                reader_.numErrors_.addAndGet(1);
+                LOG.error("Error checking data for key = " + rowKey + ", actionId = " + actionId);
+              }
+            }
+
+            reader_.numKeysVerified_.addAndGet(1);
+          }
+        }
+      }
+    }
+  }
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,211 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.manual.utils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.manual.HBaseTest;
+
+public class MultiThreadedWriter extends MultiThreadedAction
+{
+  private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
+  static long minColumnsPerKey_ = 1;
+  static long maxColumnsPerKey_ = 10;
+  static int minDataSize_ = 256;
+  static int maxDataSize_ = 1024;
+  Set<HBaseWriter> writers_ = new HashSet<HBaseWriter>();
+  static boolean bulkLoad_ = false;
+  /* This is the current key to be inserted by any thread. Each thread does an
+     atomic get and increment operation and inserts the current value. */
+  public static AtomicLong currentKey_ = null;
+  /* The sorted set of keys inserted by the writers */
+  public static List<Long> insertedKeySet_ = Collections.synchronizedList(new ArrayList<Long>());
+  /* The sorted set of keys NOT inserted by the writers */
+  public static List<Long> failedKeySet_ = Collections.synchronizedList(new ArrayList<Long>());
+
+  public MultiThreadedWriter(HBaseConfiguration conf, byte[] tableName, byte[] columnFamily) {
+    tableName_ = tableName;
+    columnFamily_ = columnFamily;
+    conf_ = conf;
+  }
+
+  public void setBulkLoad(boolean bulkLoad) {
+    bulkLoad_ = bulkLoad;
+  }
+
+  public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) {
+    minColumnsPerKey_ = minColumnsPerKey;
+    maxColumnsPerKey_ = maxColumnsPerKey;
+  }
+
+  public void setDataSize(int minDataSize, int maxDataSize) {
+    minDataSize_ = minDataSize;
+    maxDataSize_ = maxDataSize;
+  }
+
+  public void start(long startKey, long endKey, int numThreads) {
+    if(verbose_) {
+      LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
+    }
+    startKey_ = startKey;
+    endKey_ = endKey;
+    numThreads_ = numThreads;
+    currentKey_ = new AtomicLong(startKey_);
+
+    for(int i = 0; i < numThreads_; ++i) {
+      HBaseWriter writer = new HBaseWriter(this, i);
+      writers_.add(writer);
+    }
+    numThreadsWorking_.addAndGet(writers_.size());
+    for(HBaseWriter writer : writers_) {
+      writer.start();
+    }
+
+    startReporter("W");
+  }
+
+  public void waitForFinish() {
+    while(numThreadsWorking_.get() != 0) {
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    System.out.println("Failed Key Count: " + failedKeySet_.size());
+    for (Long key : failedKeySet_) {
+       System.out.println("Failure for key: " + key);
+    }
+
+  }
+
+  public static class HBaseWriter extends Thread {
+    int id_;
+    MultiThreadedWriter writer_;
+    Random random_ = new Random();
+    List<HTable> tables_ = new ArrayList<HTable>();
+    static DataGenerator dataGenerator_ = new DataGenerator(minDataSize_, maxDataSize_);
+
+    public HBaseWriter(MultiThreadedWriter writer, int id) {
+      id_ = id;
+      writer_ = writer;
+      HTable table = HBaseUtils.getHTable(writer_.conf_, tableName_);
+      tables_.add(table);
+    }
+
+    public void run() {
+      if(MultiThreadedWriter.bulkLoad_) {
+        long rowKey = currentKey_.getAndIncrement();
+        do {
+          long numColumns = minColumnsPerKey_ + Math.abs(random_.nextLong())%(maxColumnsPerKey_-minColumnsPerKey_);
+          bulkInsertKey(rowKey, 0, numColumns);
+          rowKey = currentKey_.getAndIncrement();
+        } while(rowKey < writer_.endKey_);
+      }
+      else {
+        long rowKey = currentKey_.getAndIncrement();
+        do {
+          long numColumns = minColumnsPerKey_ + Math.abs(random_.nextLong())%(maxColumnsPerKey_-minColumnsPerKey_);
+          for(long col = 0; col < numColumns; ++col) {
+            insert(rowKey, col);
+          }
+          rowKey = currentKey_.getAndIncrement();
+        } while(rowKey < writer_.endKey_);
+      }
+      writer_.numThreadsWorking_.decrementAndGet();
+    }
+
+    public static byte[] longToByteArrayKey(long rowKey) {
+      return DataGenerator.paddedKey(rowKey).getBytes();
+    }
+
+    public void insert(long rowKey, long col) {
+      Put put = new Put(longToByteArrayKey(rowKey));
+      put.add(columnFamily_, ("" + col).getBytes(), dataGenerator_.getDataInSize(rowKey));
+      try {
+        long start = System.currentTimeMillis();
+        putIntoTables(put);
+        insertedKeySet_.add(rowKey);
+        writer_.numKeys_.addAndGet(1);
+        writer_.numCols_.addAndGet(1);
+        writer_.cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start);
+      }
+      catch (IOException e) {
+        failedKeySet_.add(rowKey);
+        LOG.error("Failed to insert: " + rowKey);
+        e.printStackTrace();
+      }
+    }
+
+    public void bulkInsertKey(long rowKey, long startCol, long endCol) {
+      if (writer_.verbose_) {
+         LOG.debug("Preparing put for key = " + rowKey + ", cols = [" + startCol + ", " + endCol + ")");
+      }
+
+      if(startCol >= endCol) {
+        return;
+      }
+      Put put = new Put(DataGenerator.paddedKey(rowKey).getBytes());
+      byte[] columnQualifier;
+      byte[] value;
+      for(long i = startCol; i < endCol; ++i) {
+        value = dataGenerator_.getDataInSize(rowKey);
+        columnQualifier = ("" + i).getBytes();
+        put.add(columnFamily_, columnQualifier, value);
+      }
+      try {
+        long start = System.currentTimeMillis();
+        putIntoTables(put);
+        insertedKeySet_.add(rowKey);
+        writer_.numKeys_.addAndGet(1);
+        writer_.numCols_.addAndGet(endCol - startCol);
+        writer_.cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start);
+      }
+      catch (IOException e) {
+        failedKeySet_.add(rowKey);
+        e.printStackTrace();
+      }
+    }
+
+    // error handling correct only for ONE table
+    public void putIntoTables(Put put) throws IOException {
+      for(HTable table : tables_) {
+        table.put(put);
+      }
+    }
+  }
+}

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/ProcessBasedLocalHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/ProcessBasedLocalHBaseCluster.java?rev=1176146&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/ProcessBasedLocalHBaseCluster.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/manual/utils/ProcessBasedLocalHBaseCluster.java Tue Sep 27 01:07:20 2011
@@ -0,0 +1,272 @@
+package org.apache.hadoop.hbase.manual.utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+
+public class ProcessBasedLocalHBaseCluster {
+
+  private static final int rsBasePort_ = 60020; // first region server port
+  private static final String workDir_ = "/tmp/hbase-" + System.getenv("USER");
+  private String hbaseHome_;
+  private int    numRegionServers_;
+
+  private static final Log LOG = LogFactory.getLog(ProcessBasedLocalHBaseCluster.class);
+
+
+  public ProcessBasedLocalHBaseCluster(String hbaseHome,
+                                       int    numRegionServers) {
+    hbaseHome_ = hbaseHome;
+    numRegionServers_ = numRegionServers;
+  }
+
+  public HBaseConfiguration start() {
+    cleanupOldState();
+
+    // start ZK
+    startZK();
+
+    // start master
+    startMaster();
+
+    // start region servers...
+    for (int idx = 0; idx < numRegionServers_; idx++) {
+      startRegionServer(rsBasePort_ + idx);
+    }
+
+    HBaseConfiguration conf = HBaseUtils.getHBaseConfFromZkNode("localhost");
+
+    // Keep retrying until the above servers have really started.
+    // We ensure this by trying to lookup the META table.
+    int maxTries = 10;
+    while (maxTries-- > 0) {
+      try {
+        new HTable(conf, HConstants.META_TABLE_NAME);
+      } catch (Exception e) {
+        try {
+          LOG.info("Waiting for HBase to startup. Retries left.." + maxTries, e);
+          Thread.sleep(1000);
+        } catch (InterruptedException e1) {
+        }
+      }
+    }
+    LOG.info("###Process Based HBase Cluster with " + numRegionServers_ + " region servers up and running... \n\n");
+    return conf;
+  }
+
+  public void startRegionServer(int port) {
+    startServer("regionserver", port);
+  }
+
+  public void startMaster() {
+    startServer("master", 0);
+  }
+
+  public void killRegionServer(int port) {
+    killServer("regionserver", port);
+  }
+
+  public void killMaster() {
+    killServer("master", 0);
+  }
+
+  public void startZK() {
+    executeAsyncCommand(hbaseHome_ + "/bin/hbase-daemon.sh " +
+                        "--config " + hbaseHome_ + "/conf " +
+                        " start zookeeper");
+  }
+
+  private static void executeCommand(String command) {
+    executeCommand(command, null);
+  }
+
+  private static void executeCommand(String command, Map<String, String> envOverrides) {
+    LOG.debug("Command : " + command);
+
+    try {
+      String [] envp = null;
+      if (envOverrides != null) {
+        Map<String, String> map = new HashMap<String, String>(System.getenv());
+        map.putAll(envOverrides);
+        envp = new String[map.size()];
+        int idx = 0;
+        for (Map.Entry<String, String> e: map.entrySet()) {
+          envp[idx++] = e.getKey() + "=" + e.getValue();
+        }
+      }
+
+      Process p = Runtime.getRuntime().exec(command, envp);
+
+      BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
+      BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+
+      // read the output from the command
+      String s = null;
+      while ((s = stdInput.readLine()) != null) {
+        System.out.println(s);
+      }
+
+      // read any errors from the attempted command
+      while ((s = stdError.readLine()) != null) {
+        System.out.println(s);
+      }
+    } catch (IOException e) {
+      LOG.error("Error running: " + command, e);
+    }
+  }
+
+
+  private static class CommandThread extends Thread {
+    private String command_;
+    private Map<String, String> envp_;
+
+    CommandThread(String command, Map<String, String> envp) {
+      command_ = command;
+      envp_ = envp;
+    }
+
+    public void run() {
+      executeCommand(command_, envp_);
+    }
+  }
+
+  private void executeAsyncCommand(String command) {
+    new CommandThread(command, null).start();
+  }
+
+  private void executeAsyncCommand(String command, Map<String, String> envOverrides) {
+    new CommandThread(command, envOverrides).start();
+  }
+
+  private String generateConfig(int regionServerPortNumber) {
+    return String.format(confTemplate_,
+                         regionServerPortNumber);
+  }
+
+  private void cleanupOldState() {
+    executeCommand("rm -rf " + workDir_);
+  }
+
+  private void writeStringToFile(String s, String fileName) {
+    try {
+      BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
+      out.write(s);
+      out.close();
+    }
+    catch (IOException e)
+    {
+      LOG.error("Error writing to: " + fileName, e);
+    }
+  }
+
+  private String serverWorkingDir(String serverName, int port) {
+    String dir;
+    if (serverName.equals("regionserver")) {
+      dir = workDir_ + "/" + serverName + "-" + port;
+    } else {
+      dir = workDir_ + "/" + serverName;
+    }
+    return dir;
+  }
+
+  private int getServerPID(String serverName, int port) {
+    String dir = serverWorkingDir(serverName, port);
+    String user = System.getenv("USER");
+    String pidFile = String.format("%s/hbase-%s-%s.pid",
+                                    dir, user, serverName);
+    Scanner scanner;
+    try {
+      scanner = new Scanner(new File(pidFile));
+      int pid = scanner.nextInt();
+      return pid;
+    } catch (FileNotFoundException e) {
+      LOG.error("Error looking up pid file: " + pidFile, e);
+      return 0;
+    }
+  }
+
+  private void killServer(String serverName, int port) {
+    int pid = getServerPID(serverName, port);
+    LOG.info("Killing " + serverName + "; pid= " + pid);
+    killProcess(pid);
+  }
+
+  private void killProcess(int pid) {
+    // we use abort rather than kill signal.
+    String cmd = "kill -s ABRT " + pid;
+    executeCommand(cmd);
+  }
+
+  // port is meaningful only for RS.
+  private void startServer(String serverName, int port) {
+
+    String conf = generateConfig(port);
+
+    // create working directory for this region server.
+    String dir = serverWorkingDir(serverName, port);
+    executeCommand("mkdir -p " + dir);
+
+    writeStringToFile(conf, dir + "/" + "hbase-site.xml");
+    executeCommand("cp " + hbaseHome_ + "/conf/hbase-default.xml " + dir);
+
+    Map<String, String> envOverrides = new HashMap<String, String>();
+    envOverrides.put("HBASE_LOG_DIR", dir);
+    envOverrides.put("HBASE_PID_DIR", dir);
+
+    executeAsyncCommand(hbaseHome_ + "/bin/hbase-daemon.sh " +
+                        "--config " + dir +
+                        " start " + serverName,
+                        envOverrides);
+  }
+
+  public static String confTemplate_ =
+    "<configuration> " +
+    "  <property> " +
+    "    <name>hbase.cluster.distributed</name> " +
+    "    <value>true</value> " +
+    "    <description>The mode the cluster will be in. Possible values are " +
+    "      false: standalone and pseudo-distributed setups with managed Zookeeper " +
+    "      true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh) " +
+    "    </description> " +
+    "  </property> " +
+    "  <property> " +
+    "    <name>hbase.regionserver.port</name> " +
+    "    <value>%d</value> " +
+    "    <description>The port an HBase region server binds to. " +
+    "    </description> " +
+    "  </property> " +
+    "  <property> " +
+    "    <name>hbase.regionserver.info.port.auto</name> " +
+    "    <value>true</value> " +
+    "    <description>Info server auto port bind. Enables automatic port " +
+    "    search if hbase.regionserver.info.port is already in use. " +
+    "    Useful for testing, turned off by default. " +
+    "    </description> " +
+    "  </property> " +
+    "  <property> " +
+    "    <name>hbase.hregion.max.filesize</name> " +
+    "    <value>10000000</value> " +
+    "    <description> " +
+    "    Maximum HStoreFile size. If any one of a column families' HStoreFiles has " +
+    "    grown to exceed this value, the hosting HRegion is split in two. " +
+    "    Default: 256M. " +
+    "    </description> " +
+    "  </property> " +
+    "</configuration> ";
+
+}



Mime
View raw message