hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1414890 - in /hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase: IngestIntegrationTestBase.java IntegrationTestDataIngestWithChaosMonkey.java IntegrationTestRebalanceAndKillServers.java util/ChaosMonkey.java util/LoadTestTool.java
Date Wed, 28 Nov 2012 20:08:40 GMT
Author: stack
Date: Wed Nov 28 20:08:39 2012
New Revision: 1414890

URL: http://svn.apache.org/viewvc?rev=1414890&view=rev
Log:
HBASE-7231 port HBASE-7200 create integration test for balancing regions and killing region
servers to 0.94

Added:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServers.java
Modified:
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java?rev=1414890&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
(added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IngestIntegrationTestBase.java
Wed Nov 28 20:08:39 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+
+/**
+ * A base class for tests that do something with the cluster while running
+ * {@link LoadTestTool} to write and verify some data.
+ */
+public abstract class IngestIntegrationTestBase {
+  private static String tableName = null;
+
+  /** A soft limit on how long we should run */
+  private static final String RUN_TIME_KEY = "hbase.%s.runtime";
+
+  protected static final Log LOG = LogFactory.getLog(IngestIntegrationTestBase.class);
+  protected IntegrationTestingUtility util;
+  protected HBaseCluster cluster;
+  private LoadTestTool loadTool;
+
+  protected void setUp(int numSlavesBase) throws Exception {
+    tableName = this.getClass().getSimpleName();
+    util = new IntegrationTestingUtility();
+    LOG.info("Initializing cluster with " + numSlavesBase + " servers");
+    util.initializeCluster(numSlavesBase);
+    LOG.info("Done initializing cluster");
+    cluster = util.getHBaseClusterInterface();
+    deleteTableIfNecessary();
+    loadTool = new LoadTestTool();
+    loadTool.setConf(util.getConfiguration());
+    // Initialize load test tool before we start breaking things;
+    // LoadTestTool init, even when it is a no-op, is very fragile.
+    int ret = loadTool.run(new String[] { "-tn", tableName, "-init_only" });
+    Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
+  }
+
+  protected void tearDown() throws Exception {
+    LOG.info("Restoring the cluster");
+    util.restoreCluster();
+    LOG.info("Done restoring the cluster");
+  }
+
+  private void deleteTableIfNecessary() throws IOException {
+    if (util.getHBaseAdmin().tableExists(tableName)) {
+      util.deleteTable(Bytes.toBytes(tableName));
+    }
+  }
+
+  protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter,
+      int colsPerKey, int recordSize, int writeThreads) throws Exception {
+    LOG.info("Running ingest");
+    LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
+
+    long start = System.currentTimeMillis();
+    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
+    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
+    long startKey = 0;
+
+    long numKeys = getNumKeys(keysPerServerPerIter);
+    while (System.currentTimeMillis() - start < 0.9 * runtime) {
+      LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
+          ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
+
+      int ret = loadTool.run(new String[] {
+          "-tn", tableName,
+          "-write", String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads),
+          "-start_key", String.valueOf(startKey),
+          "-num_keys", String.valueOf(numKeys),
+          "-skip_init"
+      });
+      if (0 != ret) {
+        String errorMsg = "Load failed with error code " + ret;
+        LOG.error(errorMsg);
+        Assert.fail(errorMsg);
+      }
+
+      ret = loadTool.run(new String[] {
+          "-tn", tableName,
+          "-read", "100:20",
+          "-start_key", String.valueOf(startKey),
+          "-num_keys", String.valueOf(numKeys),
+          "-skip_init"
+      });
+      if (0 != ret) {
+        String errorMsg = "Verification failed with error code " + ret;
+        LOG.error(errorMsg);
+        Assert.fail(errorMsg);
+      }
+      startKey += numKeys;
+    }
+  }
+
+  /** Estimates a data size based on the cluster size */
+  private long getNumKeys(int keysPerServer)
+      throws IOException {
+    int numRegionServers = cluster.getClusterStatus().getServersSize();
+    return keysPerServer * numRegionServers;
+  }
+}

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java?rev=1414890&r1=1414889&r2=1414890&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
(original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestDataIngestWithChaosMonkey.java
Wed Nov 28 20:08:39 2012
@@ -22,11 +22,8 @@ import java.io.IOException;
 
 import junit.framework.Assert;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChaosMonkey;
-import org.apache.hadoop.hbase.util.LoadTestTool;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,95 +36,33 @@ import org.junit.experimental.categories
  * configuration parameter.
  */
 @Category(IntegrationTests.class)
-public class IntegrationTestDataIngestWithChaosMonkey {
+public class IntegrationTestDataIngestWithChaosMonkey extends IngestIntegrationTestBase {
 
-  private static final String TABLE_NAME = "TestDataIngestWithChaosMonkey";
   private static final int NUM_SLAVES_BASE = 4; //number of slaves for the smallest cluster
 
-  /** A soft limit on how long we should run */
-  private static final String RUN_TIME_KEY = "hbase.IntegrationTestDataIngestWithChaosMonkey.runtime";
-
-  //run for 5 min by default
+  // run for 5 min by default
   private static final long DEFAULT_RUN_TIME = 5 * 60 * 1000;
 
-  private static final Log LOG = LogFactory.getLog(IntegrationTestDataIngestWithChaosMonkey.class);
-  private IntegrationTestingUtility util;
-  private HBaseCluster cluster;
   private ChaosMonkey monkey;
 
   @Before
   public void setUp() throws Exception {
-    util = new IntegrationTestingUtility();
-
-    util.initializeCluster(NUM_SLAVES_BASE);
-
-    cluster = util.getHBaseClusterInterface();
-    deleteTableIfNecessary();
-
+    super.setUp(NUM_SLAVES_BASE);
     monkey = new ChaosMonkey(util, ChaosMonkey.EVERY_MINUTE_RANDOM_ACTION_POLICY);
     monkey.start();
   }
 
   @After
   public void tearDown() throws Exception {
-    monkey.stop("test has finished, that's why");
-    monkey.waitForStop();
-    util.restoreCluster();
-  }
-
-  private void deleteTableIfNecessary() throws IOException {
-    if (util.getHBaseAdmin().tableExists(TABLE_NAME)) {
-      util.deleteTable(Bytes.toBytes(TABLE_NAME));
+    if (monkey != null) {
+      monkey.stop("test has finished, that's why");
+      monkey.waitForStop();
     }
+    super.tearDown();
   }
 
   @Test
   public void testDataIngest() throws Exception {
-    LOG.info("Running testDataIngest");
-    LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
-
-    LoadTestTool loadTool = new LoadTestTool();
-    loadTool.setConf(util.getConfiguration());
-
-    long start = System.currentTimeMillis();
-    long runtime = util.getConfiguration().getLong(RUN_TIME_KEY, DEFAULT_RUN_TIME);
-    long startKey = 0;
-
-    long numKeys = estimateDataSize();
-    while (System.currentTimeMillis() - start < 0.9 * runtime) {
-      LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
-          ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
-
-      int ret = loadTool.run(new String[] {
-          "-tn", TABLE_NAME,
-          "-write", "10:100:20",
-          "-start_key", String.valueOf(startKey),
-          "-num_keys", String.valueOf(numKeys)
-      });
-
-      //assert that load was successful
-      Assert.assertEquals(0, ret);
-
-      ret = loadTool.run(new String[] {
-          "-tn", TABLE_NAME,
-          "-read", "100:20",
-          "-start_key", String.valueOf(startKey),
-          "-num_keys", String.valueOf(numKeys)
-      });
-
-      //assert that verify was successful
-      Assert.assertEquals(0, ret);
-      startKey += numKeys;
-    }
-  }
-
-  /** Estimates a data size based on the cluster size */
-  protected long estimateDataSize() throws IOException {
-    //base is a 4 slave node cluster.
-    ClusterStatus status = cluster.getClusterStatus();
-    int numRegionServers = status.getServersSize();
-    int multiplication = Math.max(1, numRegionServers / NUM_SLAVES_BASE);
-
-    return 10000 * multiplication;
+    runIngestTest(DEFAULT_RUN_TIME, 2500, 10, 100, 20);
   }
 }

Added: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServers.java?rev=1414890&view=auto
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServers.java
(added)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/IntegrationTestRebalanceAndKillServers.java
Wed Nov 28 20:08:39 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ChaosMonkey;
+import org.apache.hadoop.hbase.util.LoadTestTool;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ChaosMonkey.Action;
+import org.apache.hadoop.hbase.util.ChaosMonkey.RestartActiveMaster;
+import org.apache.hadoop.hbase.util.ChaosMonkey.RestartRandomRs;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+
+/**
+ * A system test which does large data ingestion and verify using {@link LoadTestTool},
+ * while killing the region servers and the master(s) randomly. You can configure how long
+ * should the load test run by using "hbase.IntegrationTestRebalanceAndKillServers  s.runtime"
+ * configuration parameter.
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestRebalanceAndKillServers extends IngestIntegrationTestBase {
+  private static final int NUM_SLAVES_BASE = 4; // number of slaves for the smallest cluster
+  private static final long DEFAULT_RUN_TIME = 5 * 60 * 1000; // run for 5 min by default
+
+  private static final long KILL_SERVICE_EVERY_MS = 45 * 1000;
+  private static final int SERVER_PER_MASTER_KILL = 3;
+  private static final long KILL_SERVER_FOR_MS = 5 * 1000;
+  private static final long KILL_MASTER_FOR_MS = 100;
+
+  private static final long UNBALANCE_REGIONS_EVERY_MS = 30 * 1000;
+  /** @see ChaosMonkey.UnbalanceRegionsAction#UnbalanceRegionsAction(double, double) */
+  private static final double UNBALANCE_TO_FRC_OF_SERVERS = 0.5;
+  /** @see ChaosMonkey.UnbalanceRegionsAction#UnbalanceRegionsAction(double, double) */
+  private static final double UNBALANCE_FRC_OF_REGIONS = 0.5;
+
+  private static final long BALANCE_REGIONS_EVERY_MS = 10 * 1000;
+
+  private ChaosMonkey monkey;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws Exception {
+    super.setUp(NUM_SLAVES_BASE);
+
+    ChaosMonkey.Policy killPolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
+      KILL_SERVICE_EVERY_MS,
+      new Pair<Action,Integer>(new ChaosMonkey.RestartActiveMaster(KILL_MASTER_FOR_MS),
1),
+      new Pair<Action,Integer>(new ChaosMonkey.RestartRandomRs(KILL_SERVER_FOR_MS),
SERVER_PER_MASTER_KILL));
+
+    ChaosMonkey.Policy unbalancePolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
+      UNBALANCE_REGIONS_EVERY_MS,
+      new ChaosMonkey.UnbalanceRegionsAction(UNBALANCE_FRC_OF_REGIONS, UNBALANCE_TO_FRC_OF_SERVERS));
+
+    ChaosMonkey.Policy balancePolicy = new ChaosMonkey.PeriodicRandomActionPolicy(
+      BALANCE_REGIONS_EVERY_MS, new ChaosMonkey.ForceBalancerAction());
+
+    monkey = new ChaosMonkey(util, killPolicy, unbalancePolicy, balancePolicy);
+    monkey.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (monkey != null) {
+      monkey.stop("tearDown");
+      monkey.waitForStop();
+    }
+    super.tearDown();
+  }
+
+  @Test
+  public void testDataIngest() throws Exception {
+    runIngestTest(DEFAULT_RUN_TIME, 2500, 10, 100, 20);
+  }
+}

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java?rev=1414890&r1=1414889&r2=1414890&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/ChaosMonkey.java Wed Nov
28 20:08:39 2012
@@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
+import java.util.Set;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.logging.Log;
@@ -34,15 +37,19 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseCluster;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerLoad;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.IntegrationTestDataIngestWithChaosMonkey;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.protobuf.ServiceException;
 
 /**
  * A utility to injects faults in a running cluster.
@@ -86,6 +93,16 @@ public class ChaosMonkey extends Abstrac
     setPoliciesByName(policies);
   }
 
+  /**
+   * Construct a new ChaosMonkey
+   * @param util the HBaseIntegrationTestingUtility already configured
+   * @param policies custom policies to use
+   */
+  public ChaosMonkey(IntegrationTestingUtility util, Policy... policies) {
+    this.util = util;
+    this.policies = policies;
+  }
+
   private void setPoliciesByName(String... policies) {
     this.policies = new Policy[policies.length];
     for (int i=0; i < policies.length; i++) {
@@ -115,16 +132,16 @@ public class ChaosMonkey extends Abstrac
   /**
    * A (possibly mischievous) action that the ChaosMonkey can perform.
    */
-  private static class Action {
-    long sleepTime; //how long should we sleep
-    ActionContext context;
-    HBaseCluster cluster;
-    ClusterStatus initialStatus;
-    ServerName[] initialServers;
-
-    public Action(long sleepTime) {
-      this.sleepTime = sleepTime;
-    }
+  public static class Action {
+    // TODO: interesting question - should actions be implemented inside
+    //       ChaosMonkey, or outside? If they are inside (initial), the class becomes
+    //       huge and all-encompassing; if they are outside ChaosMonkey becomes just
+    //       a random task scheduler. For now, keep inside.
+
+    protected ActionContext context;
+    protected HBaseCluster cluster;
+    protected ClusterStatus initialStatus;
+    protected ServerName[] initialServers;
 
     void init(ActionContext context) throws Exception {
       this.context = context;
@@ -136,33 +153,28 @@ public class ChaosMonkey extends Abstrac
 
     void perform() throws Exception { };
 
+    // TODO: perhaps these methods should be elsewhere?
     /** Returns current region servers */
-    ServerName[] getCurrentServers() throws IOException {
+    protected ServerName[] getCurrentServers() throws IOException {
       Collection<ServerName> regionServers = cluster.getClusterStatus().getServers();
       return regionServers.toArray(new ServerName[regionServers.size()]);
     }
 
-    void killMaster(ServerName server) throws IOException {
+    protected void killMaster(ServerName server) throws IOException {
       LOG.info("Killing master:" + server);
       cluster.killMaster(server);
       cluster.waitForMasterToStop(server, TIMEOUT);
       LOG.info("Killed master server:" + server);
     }
 
-    void startMaster(ServerName server) throws IOException {
+    protected void startMaster(ServerName server) throws IOException {
       LOG.info("Starting master:" + server.getHostname());
       cluster.startMaster(server.getHostname());
       cluster.waitForActiveAndReadyMaster(TIMEOUT);
       LOG.info("Started master: " + server);
     }
 
-    void restartMaster(ServerName server, long sleepTime) throws IOException {
-      killMaster(server);
-      sleep(sleepTime);
-      startMaster(server);
-    }
-
-    void killRs(ServerName server) throws IOException {
+    protected void killRs(ServerName server) throws IOException {
       LOG.info("Killing region server:" + server);
       cluster.killRegionServer(server);
       cluster.waitForRegionServerToStop(server, TIMEOUT);
@@ -170,19 +182,33 @@ public class ChaosMonkey extends Abstrac
           + cluster.getClusterStatus().getServersSize());
     }
 
-    void startRs(ServerName server) throws IOException {
+    protected void startRs(ServerName server) throws IOException {
       LOG.info("Starting region server:" + server.getHostname());
       cluster.startRegionServer(server.getHostname());
       cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT);
       LOG.info("Started region server:" + server + ". Reported num of rs:"
           + cluster.getClusterStatus().getServersSize());
     }
+  }
+
+  private static class RestartActionBase extends Action {
+    long sleepTime; // how long should we sleep
+
+    public RestartActionBase(long sleepTime) {
+      this.sleepTime = sleepTime;
+    }
 
     void sleep(long sleepTime) {
       LOG.info("Sleeping for:" + sleepTime);
       Threads.sleep(sleepTime);
     }
 
+    void restartMaster(ServerName server, long sleepTime) throws IOException {
+      killMaster(server);
+      sleep(sleepTime);
+      startMaster(server);
+    }
+
     void restartRs(ServerName server, long sleepTime) throws IOException {
       killRs(server);
       sleep(sleepTime);
@@ -190,7 +216,7 @@ public class ChaosMonkey extends Abstrac
     }
   }
 
-  private static class RestartActiveMaster extends Action {
+  public static class RestartActiveMaster extends RestartActionBase {
     public RestartActiveMaster(long sleepTime) {
       super(sleepTime);
     }
@@ -203,17 +229,12 @@ public class ChaosMonkey extends Abstrac
     }
   }
 
-  private static class RestartRandomRs extends Action {
+  public static class RestartRandomRs extends RestartActionBase {
     public RestartRandomRs(long sleepTime) {
       super(sleepTime);
     }
 
     @Override
-    void init(ActionContext context) throws Exception {
-      super.init(context);
-    }
-
-    @Override
     void perform() throws Exception {
       LOG.info("Performing action: Restart random region server");
       ServerName server = selectRandomItem(getCurrentServers());
@@ -222,7 +243,7 @@ public class ChaosMonkey extends Abstrac
     }
   }
 
-  private static class RestartRsHoldingMeta extends RestartRandomRs {
+  public static class RestartRsHoldingMeta extends RestartRandomRs {
     public RestartRsHoldingMeta(long sleepTime) {
       super(sleepTime);
     }
@@ -238,7 +259,7 @@ public class ChaosMonkey extends Abstrac
     }
   }
 
-  private static class RestartRsHoldingRoot extends RestartRandomRs {
+  public static class RestartRsHoldingRoot extends RestartRandomRs {
     public RestartRsHoldingRoot(long sleepTime) {
       super(sleepTime);
     }
@@ -257,7 +278,7 @@ public class ChaosMonkey extends Abstrac
   /**
    * Restarts a ratio of the running regionservers at the same time
    */
-  private static class BatchRestartRs extends Action {
+  public static class BatchRestartRs extends RestartActionBase {
     float ratio; //ratio of regionservers to restart
 
     public BatchRestartRs(long sleepTime, float ratio) {
@@ -266,11 +287,6 @@ public class ChaosMonkey extends Abstrac
     }
 
     @Override
-    void init(ActionContext context) throws Exception {
-      super.init(context);
-    }
-
-    @Override
     void perform() throws Exception {
       LOG.info(String.format("Performing action: Batch restarting %d%% of region servers",
           (int)(ratio * 100)));
@@ -307,7 +323,7 @@ public class ChaosMonkey extends Abstrac
    * Restarts a ratio of the regionservers in a rolling fashion. At each step, either kills
a
    * server, or starts one, sleeping randomly (0-sleepTime) in between steps.
    */
-  private static class RollingBatchRestartRs extends BatchRestartRs {
+  public static class RollingBatchRestartRs extends BatchRestartRs {
     public RollingBatchRestartRs(long sleepTime, float ratio) {
       super(sleepTime, ratio);
     }
@@ -346,6 +362,71 @@ public class ChaosMonkey extends Abstrac
     }
   }
 
+  public static class UnbalanceRegionsAction extends Action {
+    private double fractionOfRegions;
+    private double fractionOfServers;
+    private Random random = new Random();
+
+    /**
+     * Unbalances the regions on the cluster by choosing "target" servers, and moving
+     * some regions from each of the non-target servers to random target servers.
+     * @param fractionOfRegions Fraction of regions to move from each server.
+     * @param fractionOfServers Fraction of servers to be chosen as targets.
+     */
+    public UnbalanceRegionsAction(double fractionOfRegions, double fractionOfServers) {
+      this.fractionOfRegions = fractionOfRegions;
+      this.fractionOfServers = fractionOfServers;
+    }
+
+    @Override
+    void perform() throws Exception {
+      LOG.info("Unbalancing regions");
+      ClusterStatus status = this.cluster.getClusterStatus();
+      List<ServerName> victimServers = new LinkedList<ServerName>(status.getServers());
+      int targetServerCount = (int)Math.ceil(fractionOfServers * victimServers.size());
+      List<byte[]> targetServers = new ArrayList<byte[]>(targetServerCount);
+      for (int i = 0; i < targetServerCount; ++i) {
+        int victimIx = random.nextInt(victimServers.size());
+        String serverName = victimServers.remove(victimIx).getServerName();
+        targetServers.add(Bytes.toBytes(serverName));
+      }
+
+      List<byte[]> victimRegions = new LinkedList<byte[]>();
+      for (ServerName server : victimServers) {
+        HServerLoad serverLoad = status.getLoad(server);
+        // Ugh.
+        List<byte[]> regions = new LinkedList<byte[]>(serverLoad.getRegionsLoad().keySet());
+        int victimRegionCount = (int)Math.ceil(fractionOfRegions * regions.size());
+        LOG.debug("Removing " + victimRegionCount + " regions from " + server.getServerName());
+        for (int i = 0; i < victimRegionCount; ++i) {
+          int victimIx = random.nextInt(regions.size());
+          String regionId = HRegionInfo.encodeRegionName(regions.remove(victimIx));
+          victimRegions.add(Bytes.toBytes(regionId));
+        }
+      }
+
+      LOG.info("Moving " + victimRegions.size() + " regions from " + victimServers.size()
+          + " servers to " + targetServers.size() + " different servers");
+      HBaseAdmin admin = this.context.getHaseIntegrationTestingUtility().getHBaseAdmin();
+      for (byte[] victimRegion : victimRegions) {
+        int targetIx = random.nextInt(targetServers.size());
+        admin.move(victimRegion, targetServers.get(targetIx));
+      }
+    }
+  }
+
+  public static class ForceBalancerAction extends Action {
+    @Override
+    void perform() throws Exception {
+      LOG.info("Balancing regions");
+      HBaseAdmin admin = this.context.getHaseIntegrationTestingUtility().getHBaseAdmin();
+      boolean result = admin.balancer();
+      if (!result) {
+        LOG.error("Balancer didn't succeed");
+      }
+    }
+  }
+
   /**
    * A context for a Policy
    */
@@ -358,7 +439,7 @@ public class ChaosMonkey extends Abstrac
   /**
    * A policy to introduce chaos to the cluster
    */
-  private static abstract class Policy extends StoppableImplementation implements Runnable
{
+  public static abstract class Policy extends StoppableImplementation implements Runnable
{
     PolicyContext context;
     public void init(PolicyContext context) throws Exception {
       this.context = context;
@@ -369,19 +450,32 @@ public class ChaosMonkey extends Abstrac
    * A policy, which picks a random action according to the given weights,
    * and performs it every configurable period.
    */
-  private static class PeriodicRandomActionPolicy extends Policy {
-    private long period;
+  public static class PeriodicRandomActionPolicy extends Policy {
+    private long periodMs;
     private List<Pair<Action, Integer>> actions;
 
-    PeriodicRandomActionPolicy(long period, List<Pair<Action, Integer>> actions)
{
-      this.period = period;
+    public PeriodicRandomActionPolicy(long periodMs, List<Pair<Action, Integer>>
actions) {
+      this.periodMs = periodMs;
       this.actions = actions;
     }
 
+    public PeriodicRandomActionPolicy(long periodMs, Pair<Action, Integer>... actions)
{
+      // We don't expect it to be modified.
+      this(periodMs, Arrays.asList(actions));
+    }
+
+    public PeriodicRandomActionPolicy(long periodMs, Action... actions) {
+      this.periodMs = periodMs;
+      this.actions = new ArrayList<Pair<Action, Integer>>(actions.length);
+      for (Action action : actions) {
+        this.actions.add(new Pair<Action, Integer>(action, 1));
+      }
+    }
+
     @Override
     public void run() {
       //add some jitter
-      int jitter = new Random().nextInt((int)period);
+      int jitter = new Random().nextInt((int)periodMs);
       LOG.info("Sleeping for " + jitter + " to add jitter");
       Threads.sleep(jitter);
 
@@ -396,7 +490,7 @@ public class ChaosMonkey extends Abstrac
               + StringUtils.stringifyException(ex));
         }
 
-        long sleepTime = period - (System.currentTimeMillis() - start);
+        long sleepTime = periodMs - (System.currentTimeMillis() - start);
         if (sleepTime > 0) {
           LOG.info("Sleeping for:" + sleepTime);
           Threads.sleep(sleepTime);
@@ -407,7 +501,7 @@ public class ChaosMonkey extends Abstrac
     @Override
     public void init(PolicyContext context) throws Exception {
       super.init(context);
-      LOG.info("Using ChaosMonkey Policy: " + this.getClass() + ", period:" + period);
+      LOG.info("Using ChaosMonkey Policy: " + this.getClass() + ", period:" + periodMs);
       for (Pair<Action, Integer> action : actions) {
         action.getFirst().init(this.context);
       }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1414890&r1=1414889&r2=1414890&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Wed Nov
28 20:08:39 2012
@@ -95,6 +95,8 @@ public class LoadTestTool extends Abstra
   private static final String OPT_START_KEY = "start_key";
   private static final String OPT_TABLE_NAME = "tn";
   private static final String OPT_ZK_QUORUM = "zk";
+  private static final String OPT_SKIP_INIT = "skip_init";
+  private static final String OPT_INIT_ONLY = "init_only";
 
   private static final long DEFAULT_START_KEY = 0;
 
@@ -126,6 +128,11 @@ public class LoadTestTool extends Abstra
   private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
   private int verifyPercent;
 
+  // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
+  //       console tool itself should only be used from console.
+  private boolean isSkipInit = false;
+  private boolean isInitOnly = false;
+
   private String[] splitColonSeparated(String option,
       int minNumCols, int maxNumCols) {
     String optVal = cmd.getOptionValue(option);
@@ -186,6 +193,7 @@ public class LoadTestTool extends Abstra
     addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
     addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
     addOptWithArg(OPT_READ, OPT_USAGE_READ);
+    addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
     addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
     addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
     addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
@@ -200,10 +208,12 @@ public class LoadTestTool extends Abstra
         "separate puts for every column in a row");
     addOptNoArg(OPT_ENCODE_IN_CACHE_ONLY, OPT_ENCODE_IN_CACHE_ONLY_USAGE);
 
-    addRequiredOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
+    addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
     addOptWithArg(OPT_START_KEY, "The first key to read/write " +
         "(a 0-based index). The default value is " +
         DEFAULT_START_KEY + ".");
+    addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
+        + "already exists");
   }
 
   @Override
@@ -212,20 +222,35 @@ public class LoadTestTool extends Abstra
 
     tableName = Bytes.toBytes(cmd.getOptionValue(OPT_TABLE_NAME,
         DEFAULT_TABLE_NAME));
-    startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
-        String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
-    long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
-        Long.MAX_VALUE - startKey);
-    endKey = startKey + numKeys;
 
     isWrite = cmd.hasOption(OPT_WRITE);
     isRead = cmd.hasOption(OPT_READ);
+    isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
 
-    if (!isWrite && !isRead) {
+    if (!isWrite && !isRead && !isInitOnly) {
       throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
           "-" + OPT_READ + " has to be specified");
     }
 
+    if (isInitOnly && (isRead || isWrite)) {
+      throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
+          + " either -" + OPT_WRITE + " or -" + OPT_READ);
+    }
+
+    if (!isInitOnly) {
+      if (!cmd.hasOption(OPT_NUM_KEYS)) {
+        throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
+            + "read or write mode");
+      }
+      startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
+          String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
+      long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
+          Long.MAX_VALUE - startKey);
+      endKey = startKey + numKeys;
+      isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
+      System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
+    }
+
     encodeInCacheOnly = cmd.hasOption(OPT_ENCODE_IN_CACHE_ONLY);
     parseColumnFamilyOptions(cmd);
 
@@ -274,8 +299,6 @@ public class LoadTestTool extends Abstra
       System.out.println("Percent of keys to verify: " + verifyPercent);
       System.out.println("Reader threads: " + numReaderThreads);
     }
-
-    System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
   }
 
   private void parseColumnFamilyOptions(CommandLine cmd) {
@@ -296,15 +319,27 @@ public class LoadTestTool extends Abstra
         StoreFile.BloomType.valueOf(bloomStr);
   }
 
+  public void initTestTable() throws IOException {
+    HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
+        COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo);
+    applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
+  }
+
   @Override
   protected int doWork() throws IOException {
     if (cmd.hasOption(OPT_ZK_QUORUM)) {
       conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
     }
 
-    HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
-        COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo);
-    applyColumnFamilyOptions(tableName, COLUMN_FAMILIES);
+    if (isInitOnly) {
+      LOG.info("Initializing only; no reads or writes");
+      initTestTable();
+      return 0;
+    }
+
+    if (!isSkipInit) {
+      initTestTable();
+    }
 
     if (isWrite) {
       writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY);



Mime
View raw message