Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 014891176F for ; Sat, 28 Jun 2014 00:30:53 +0000 (UTC) Received: (qmail 66656 invoked by uid 500); 28 Jun 2014 00:30:48 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 66541 invoked by uid 500); 28 Jun 2014 00:30:48 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 66002 invoked by uid 99); 28 Jun 2014 00:30:48 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 28 Jun 2014 00:30:48 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1369532B3A5; Sat, 28 Jun 2014 00:30:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: enis@apache.org To: commits@hbase.apache.org Date: Sat, 28 Jun 2014 00:31:11 -0000 Message-Id: <947dfbeab9e24d1a815b740c9c17df54@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/49] git commit: HBASE-10572 Create an IntegrationTest for region replicas HBASE-10572 Create an IntegrationTest for region replicas git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1576465 13f79535-47bb-0310-9956-ffa450edef68 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/759cfb83 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/759cfb83 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/759cfb83 Branch: refs/heads/master Commit: 759cfb83af84e97a6ad30aad811c03b42f99e233 Parents: 55a7c87 Author: Enis Soztutar Authored: Tue Mar 11 18:37:14 2014 +0000 Committer: Enis Soztutar Committed: Fri Jun 27 16:39:38 2014 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/IntegrationTestBase.java | 15 +- .../hadoop/hbase/IntegrationTestIngest.java | 71 +++- .../RestartRandomRsExceptMetaAction.java | 42 +++ .../actions/RollingBatchRestartRsAction.java | 50 ++- .../RollingBatchRestartRsExceptMetaAction.java | 43 +++ .../hbase/chaos/factories/MonkeyFactory.java | 6 +- .../factories/ServerKillingMonkeyFactory.java | 61 ++++ ...stTimeBoundedRequestsWithRegionReplicas.java | 345 +++++++++++++++++++ .../hadoop/hbase/HBaseTestingUtility.java | 35 +- .../apache/hadoop/hbase/util/LoadTestTool.java | 76 +++- .../hadoop/hbase/util/MultiThreadedReader.java | 54 ++- .../hbase/util/MultiThreadedReaderWithACL.java | 5 +- 12 files changed, 730 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java index 77bdd68..85bc5db 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBase.java @@ -133,14 +133,22 @@ public abstract class IntegrationTestBase extends AbstractHBaseTool { util = getTestingUtil(getConf()); MonkeyFactory fact = MonkeyFactory.getFactory(monkeyToUse); if (fact == null) { - // Run with no monkey in distributed context, with real monkey in local test context. - fact = MonkeyFactory.getFactory( - util.isDistributedCluster() ? MonkeyFactory.CALM : MonkeyFactory.SLOW_DETERMINISTIC); + fact = getDefaultMonkeyFactory(); } monkey = fact.setUtil(util) .setTableName(getTablename()) .setProperties(monkeyProps) .setColumnFamilies(getColumnFamilies()).build(); + startMonkey(); + } + + protected MonkeyFactory getDefaultMonkeyFactory() { + // Run with no monkey in distributed context, with real monkey in local test context. + return MonkeyFactory.getFactory( + util.isDistributedCluster() ? MonkeyFactory.CALM : MonkeyFactory.SLOW_DETERMINISTIC); + } + + protected void startMonkey() throws Exception { monkey.start(); } @@ -159,6 +167,7 @@ public abstract class IntegrationTestBase extends AbstractHBaseTool { if (this.util == null) { if (conf == null) { this.util = new IntegrationTestingUtility(); + this.setConf(util.getConfiguration()); } else { this.util = new IntegrationTestingUtility(conf); } http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java index 920a659..f71ff20 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngest.java @@ -42,21 +42,39 @@ import com.google.common.collect.Sets; @Category(IntegrationTests.class) public class IntegrationTestIngest extends IntegrationTestBase { public static final char HIPHEN = '-'; - private static final int SERVER_COUNT = 4; // number of slaves for the smallest cluster + private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster private static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; private static final long JUNIT_RUN_TIME = 10 * 60 * 1000; /** A soft limit on how long we should run */ - private static final String RUN_TIME_KEY = "hbase.%s.runtime"; + protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; + + protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server"; + protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500; + + protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads"; + protected static final int DEFAULT_NUM_WRITE_THREADS = 20; + + protected static final String NUM_READ_THREADS_KEY = "num_read_threads"; + protected static final int DEFAULT_NUM_READ_THREADS = 20; protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class); protected IntegrationTestingUtility util; protected HBaseCluster cluster; protected LoadTestTool loadTool; + protected String[] LOAD_TEST_TOOL_INIT_ARGS = { + LoadTestTool.OPT_COMPRESSION, + LoadTestTool.OPT_DATA_BLOCK_ENCODING, + LoadTestTool.OPT_INMEMORY, + LoadTestTool.OPT_ENCRYPTION, + LoadTestTool.OPT_NUM_REGIONS_PER_SERVER, + LoadTestTool.OPT_REGION_REPLICATION, + }; + @Override public void setUpCluster() throws Exception { - util = getTestingUtil(null); + util = getTestingUtil(getConf()); LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); util.initializeCluster(SERVER_COUNT); LOG.debug("Done initializing/checking cluster"); @@ -70,7 +88,7 @@ public class IntegrationTestIngest extends IntegrationTestBase { } protected void initTable() throws IOException { - int ret = loadTool.run(new String[] { "-tn", getTablename(), "-init_only" }); + int ret = loadTool.run(getArgsForLoadTestToolInitTable()); Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret); } @@ -82,16 +100,24 @@ public class IntegrationTestIngest extends IntegrationTestBase { @Test public void testIngest() throws Exception { - runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10); + runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20); } - private void internalRunIngestTest(long runTime) throws Exception { - runIngestTest(runTime, 2500, 10, 1024, 10); + protected void internalRunIngestTest(long runTime) throws Exception { + String clazz = this.getClass().getSimpleName(); + long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY), + DEFAULT_NUM_KEYS_PER_SERVER); + int numWriteThreads = conf.getInt( + String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS); + int numReadThreads = conf.getInt( + String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS); + runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads); } @Override public String getTablename() { - return this.getClass().getSimpleName(); + String clazz = this.getClass().getSimpleName(); + return conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz); } @Override @@ -104,8 +130,10 @@ public class IntegrationTestIngest extends IntegrationTestBase { util.deleteTable(Bytes.toBytes(getTablename())); } } - protected void runIngestTest(long defaultRunTime, int keysPerServerPerIter, int colsPerKey, - int recordSize, int writeThreads) throws Exception { + + protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, + int recordSize, int writeThreads, int readThreads) throws Exception { + LOG.info("Running ingest"); LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize()); @@ -136,7 +164,8 @@ public class IntegrationTestIngest extends IntegrationTestBase { Assert.fail(errorMsg); } - ret = loadTool.run(getArgsForLoadTestTool("-read", "100:20", startKey, numKeys)); + ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads) + , startKey, numKeys)); if (0 != ret) { String errorMsg = "Verification failed with error code " + ret; LOG.error(errorMsg); @@ -146,6 +175,23 @@ public class IntegrationTestIngest extends IntegrationTestBase { } } + protected String[] getArgsForLoadTestToolInitTable() { + List args = new ArrayList(); + args.add("-tn"); + args.add(getTablename()); + // pass all remaining args from conf with keys . + String clazz = this.getClass().getSimpleName(); + for (String arg : LOAD_TEST_TOOL_INIT_ARGS) { + String val = conf.get(String.format("%s.%s", clazz, arg)); + if (val != null) { + args.add("-" + arg); + args.add(val); + } + } + args.add("-init_only"); + return args.toArray(new String[args.size()]); + } + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, long numKeys) { List args = new ArrayList(); @@ -158,11 +204,12 @@ public class IntegrationTestIngest extends IntegrationTestBase { args.add("-num_keys"); args.add(String.valueOf(numKeys)); args.add("-skip_init"); + return args.toArray(new String[args.size()]); } /** Estimates a data size based on the cluster size */ - private long getNumKeys(int keysPerServer) + protected long getNumKeys(long keysPerServer) throws IOException { int numRegionServers = cluster.getClusterStatus().getServersSize(); return keysPerServer * numRegionServers; http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java new file mode 100644 index 0000000..b78144a --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RestartRandomRsExceptMetaAction.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos.actions; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; + +public class RestartRandomRsExceptMetaAction extends RestartRandomRsAction { + public RestartRandomRsExceptMetaAction(long sleepTime) { + super(sleepTime); + } + + @Override + public void perform() throws Exception { + int tries = 10; + + while (tries-- > 0 && getCurrentServers().length > 1) { + ServerName server = PolicyBasedChaosMonkey.selectRandomItem(getCurrentServers()); + ServerName metaServer = cluster.getServerHoldingMeta(); + if (server != null && !server.equals(metaServer)) { + restartRs(server, sleepTime); + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java index 6df10cb..7530383 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsAction.java @@ -32,37 +32,57 @@ import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; /** * Restarts a ratio of the regionservers in a rolling fashion. At each step, either kills a - * server, or starts one, sleeping randomly (0-sleepTime) in between steps. + * server, or starts one, sleeping randomly (0-sleepTime) in between steps. The parameter maxDeadServers + * limits the maximum number of servers that can be down at the same time during rolling restarts. */ public class RollingBatchRestartRsAction extends BatchRestartRsAction { private static Log LOG = LogFactory.getLog(RollingBatchRestartRsAction.class); + protected int maxDeadServers; // number of maximum dead servers at any given time. Defaults to 5 public RollingBatchRestartRsAction(long sleepTime, float ratio) { + this(sleepTime, ratio, 5); + } + + public RollingBatchRestartRsAction(long sleepTime, float ratio, int maxDeadServers) { super(sleepTime, ratio); + this.maxDeadServers = maxDeadServers; + } + + enum KillOrStart { + KILL, + START } @Override public void perform() throws Exception { LOG.info(String.format("Performing action: Rolling batch restarting %d%% of region servers", (int)(ratio * 100))); - List selectedServers = PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(), - ratio); + List selectedServers = selectServers(); Queue serversToBeKilled = new LinkedList(selectedServers); Queue deadServers = new LinkedList(); - // + // loop while there are servers to be killed or dead servers to be restarted while (!serversToBeKilled.isEmpty() || !deadServers.isEmpty()) { - boolean action = true; //action true = kill server, false = start server + KillOrStart action = KillOrStart.KILL; - if (serversToBeKilled.isEmpty() || deadServers.isEmpty()) { - action = deadServers.isEmpty(); + if (serversToBeKilled.isEmpty()) { // no more servers to kill + action = KillOrStart.START; + } else if (deadServers.isEmpty()) { + action = KillOrStart.KILL; // no more servers to start + } else if (deadServers.size() >= maxDeadServers) { + // we have too many dead servers. Don't kill any more + action = KillOrStart.START; } else { - action = RandomUtils.nextBoolean(); + // do a coin toss + action = RandomUtils.nextBoolean() ? KillOrStart.KILL : KillOrStart.START; } - if (action) { - ServerName server = serversToBeKilled.remove(); + ServerName server; + + switch (action) { + case KILL: + server = serversToBeKilled.remove(); try { killRs(server); } catch (org.apache.hadoop.util.Shell.ExitCodeException e) { @@ -71,21 +91,27 @@ public class RollingBatchRestartRsAction extends BatchRestartRsAction { LOG.info("Problem killing but presume successful; code=" + e.getExitCode(), e); } deadServers.add(server); - } else { + break; + case START: try { - ServerName server = deadServers.remove(); + server = deadServers.remove(); startRs(server); } catch (org.apache.hadoop.util.Shell.ExitCodeException e) { // The start may fail but better to just keep going though we may lose server. // LOG.info("Problem starting, will retry; code=" + e.getExitCode(), e); } + break; } sleep(RandomUtils.nextInt((int)sleepTime)); } } + protected List selectServers() throws IOException { + return PolicyBasedChaosMonkey.selectRandomItems(getCurrentServers(), ratio); + } + /** * Small test to ensure the class basically works. * @param args http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java new file mode 100644 index 0000000..f03b8ec --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/actions/RollingBatchRestartRsExceptMetaAction.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos.actions; + +import java.util.List; + +import org.apache.hadoop.hbase.ServerName; + +/** + * Same as in {@link RollingBatchRestartRsAction} except that this action + * does not restart the region server holding the META table. + */ +public class RollingBatchRestartRsExceptMetaAction extends RollingBatchRestartRsAction { + + public RollingBatchRestartRsExceptMetaAction(long sleepTime, float ratio, int maxDeadServers) { + super(sleepTime, ratio, maxDeadServers); + } + + @Override + protected List selectServers() throws java.io.IOException { + ServerName metaServer = cluster.getServerHoldingMeta(); + List servers = super.selectServers(); + servers.remove(metaServer); + return servers; + }; + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java index 4f3824b..8fb1859 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java @@ -22,10 +22,11 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import com.google.common.collect.ImmutableMap; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey; +import com.google.common.collect.ImmutableMap; + /** * Base class of the factory that will create a ChaosMonkey. */ @@ -60,16 +61,17 @@ public abstract class MonkeyFactory { public abstract ChaosMonkey build(); - public static final String CALM = "calm"; // TODO: the name has become a misnomer since the default (not-slow) monkey has been removed public static final String SLOW_DETERMINISTIC = "slowDeterministic"; public static final String UNBALANCE = "unbalance"; + public static final String SERVER_KILLING = "serverKilling"; public static Map FACTORIES = ImmutableMap.builder() .put(CALM, new CalmMonkeyFactory()) .put(SLOW_DETERMINISTIC, new SlowDeterministicMonkeyFactory()) .put(UNBALANCE, new UnbalanceMonkeyFactory()) + .put(SERVER_KILLING, new ServerKillingMonkeyFactory()) .build(); public static MonkeyFactory getFactory(String factoryName) { http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java new file mode 100644 index 0000000..02b5914 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/ServerKillingMonkeyFactory.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.chaos.factories; + +import org.apache.hadoop.hbase.chaos.actions.Action; +import org.apache.hadoop.hbase.chaos.actions.DumpClusterStatusAction; +import org.apache.hadoop.hbase.chaos.actions.ForceBalancerAction; +import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction; +import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; +import org.apache.hadoop.hbase.chaos.actions.RollingBatchRestartRsExceptMetaAction; +import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey; +import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; +import org.apache.hadoop.hbase.chaos.policies.CompositeSequentialPolicy; +import org.apache.hadoop.hbase.chaos.policies.DoActionsOncePolicy; +import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; + +/** + * Creates ChaosMonkeys for doing server restart actions, but not + * flush / compact / snapshot kind of actions. + */ +public class ServerKillingMonkeyFactory extends MonkeyFactory { + + @Override + public ChaosMonkey build() { + + // Destructive actions to mess things around. Cannot run batch restart + Action[] actions1 = new Action[] { + new RestartRandomRsExceptMetaAction(60000), + new RestartActiveMasterAction(5000), + new RollingBatchRestartRsExceptMetaAction(5000, 1.0f, 2), //only allow 2 servers to be dead + new ForceBalancerAction() + }; + + // Action to log more info for debugging + Action[] actions2 = new Action[] { + new DumpClusterStatusAction() + }; + + return new PolicyBasedChaosMonkey(util, + new CompositeSequentialPolicy( + new DoActionsOncePolicy(60 * 1000, actions1), + new PeriodicRandomActionPolicy(60 * 1000, actions1)), + new PeriodicRandomActionPolicy(60 * 1000, actions2)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java new file mode 100644 index 0000000..9825ea7 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestIngest; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.IntegrationTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; +import org.apache.hadoop.hbase.client.Consistency; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; +import org.apache.hadoop.hbase.util.LoadTestTool; +import org.apache.hadoop.hbase.util.MultiThreadedReader; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Assert; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Lists; + +/** + * An IntegrationTest for doing reads with a timeout, to a read-only table with region + * replicas. ChaosMonkey is run which kills the region servers and master, but ensures + * that meta region server is not killed, and at most 2 region servers are dead at any point + * in time. The expected behavior is that all reads with stale mode true will return + * before the timeout (5 sec by default). The test fails if the read requests does not finish + * in time. + * + *

This test uses LoadTestTool to read and write the data from a single client but + * multiple threads. The data is written first, then we allow the region replicas to catch + * up. Then we start the reader threads doing get requests with stale mode true. Chaos Monkey is + * started after some delay (20 sec by default) after the reader threads are started so that + * there is enough time to fully cache meta. + * + * These parameters (and some other parameters from LoadTestTool) can be used to + * control behavior, given values are default: + *

+ * -Dhbase.DIntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000
+ * 
+ * Use this test with "serverKilling" ChaosMonkey. Sample usage: + *
+ * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas
+ * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40
+ * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40
+ * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling
+ * 
+ */ +@Category(IntegrationTests.class) +public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest { + + private static final Log LOG = LogFactory.getLog( + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class); + + private static final String TEST_NAME + = IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName(); + + protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec + protected static final String GET_TIMEOUT_KEY = "get_timeout_ms"; + + protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec + protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay"; + + protected static final int DEFAULT_REGION_REPLICATION = 3; + + @Override + protected void startMonkey() throws Exception { + // we do not want to start the monkey at the start of the test. + } + + @Override + protected MonkeyFactory getDefaultMonkeyFactory() { + return MonkeyFactory.getFactory(MonkeyFactory.CALM); + } + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + // default replication for this test is 3 + String clazz = this.getClass().getSimpleName(); + conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION), + Integer.toString(DEFAULT_REGION_REPLICATION)); + } + + protected void writeData(int colsPerKey, int recordSize, int writeThreads, + long startKey, long numKeys) throws IOException { + int ret = loadTool.run(getArgsForLoadTestTool("-write", + String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); + if (0 != ret) { + String errorMsg = "Load failed with error code " + ret; + LOG.error(errorMsg); + Assert.fail(errorMsg); + } + } + + @Override + protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, + int recordSize, int writeThreads, int readThreads) throws Exception { + LOG.info("Cluster size:"+ + util.getHBaseClusterInterface().getClusterStatus().getServersSize()); + + long start = System.currentTimeMillis(); + String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); + long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); + long startKey = 0; + + long numKeys = getNumKeys(keysPerServerPerIter); + + + // write data once + LOG.info("Writing some data to the table"); + writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys); + + // flush the table + LOG.info("Flushing the table"); + HBaseAdmin admin = util.getHBaseAdmin(); + admin.flush(getTablename()); + + // re-open the regions to make sure that the replicas are up to date + long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); + if (refreshTime > 0 && refreshTime <= 10000) { + LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated"); + Threads.sleep(refreshTime); + } else { + LOG.info("Reopening the table"); + admin.disableTable(getTablename()); + admin.enableTable(getTablename()); + } + + // We should only start the ChaosMonkey after the readers are started and have cached + // all of the region locations. Because the meta is not replicated, the timebounded reads + // will timeout if meta server is killed. + // We will start the chaos monkey after 1 minute, and since the readers are reading random + // keys, it should be enough to cache every region entry. + long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY) + , DEFAUL_CHAOS_MONKEY_DELAY); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " + + "ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse)); + ScheduledFuture result = executorService.schedule(new Runnable() { + @Override + public void run() { + try { + LOG.info("Starting ChaosMonkey"); + monkey.start(); + monkey.waitForStop(); + } catch (Exception e) { + LOG.warn(StringUtils.stringifyException(e)); + } + + } + }, chaosMonkeyDelay, TimeUnit.MILLISECONDS); + + // set the intended run time for the reader. The reader will do read requests + // to random keys for this amount of time. + long remainingTime = runtime - (System.currentTimeMillis() - start); + LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min"); + this.conf.setLong( + String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName()) + , remainingTime); // load tool shares the same conf + + // now start the readers which will run for configured run time + try { + int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads) + , startKey, numKeys)); + if (0 != ret) { + String errorMsg = "Verification failed with error code " + ret; + LOG.error(errorMsg); + Assert.fail(errorMsg); + } + } finally { + if (result != null) result.cancel(false); + monkey.stop("Stopping the test"); + monkey.waitForStop(); + executorService.shutdown(); + } + } + + @Override + protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, + long numKeys) { + List args = Lists.newArrayList(super.getArgsForLoadTestTool( + mode, modeSpecificArg, startKey, numKeys)); + args.add("-reader"); + args.add(TimeBoundedMultiThreadedReader.class.getName()); + return args.toArray(new String[args.size()]); + } + + public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader { + protected long timeoutNano; + protected AtomicLong timedOutReads = new AtomicLong(); + protected long runTime; + protected Thread timeoutThread; + + public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, + TableName tableName, double verifyPercent) { + super(dataGen, conf, tableName, verifyPercent); + long timeoutMs = conf.getLong( + String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT); + timeoutNano = timeoutMs * 1000000; + LOG.info("Timeout for gets: " + timeoutMs); + String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); + this.runTime = conf.getLong(runTimeKey, -1); + if (this.runTime <= 0) { + throw new IllegalArgumentException("Please configure " + runTimeKey); + } + } + + @Override + public void waitForFinish() { + try { + this.timeoutThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + this.aborted = true; + super.waitForFinish(); + } + + @Override + protected String progressInfo() { + StringBuilder builder = new StringBuilder(super.progressInfo()); + appendToStatus(builder, "get_timeouts", timedOutReads.get()); + return builder.toString(); + } + + @Override + public void start(long startKey, long endKey, int numThreads) throws IOException { + super.start(startKey, endKey, numThreads); + this.timeoutThread = new TimeoutThread(this.runTime); + this.timeoutThread.start(); + } + + @Override + protected HBaseReaderThread createReaderThread(int readerId) throws IOException { + return new TimeBoundedMultiThreadedReaderThread(readerId); + } + + private class TimeoutThread extends Thread { + long timeout; + long reportInterval = 60000; + public TimeoutThread(long timeout) { + this.timeout = timeout; + } + + @Override + public void run() { + while (true) { + long rem = Math.min(timeout, reportInterval); + if (rem <= 0) { + break; + } + LOG.info("Remaining execution time:" + timeout / 60000 + " min"); + Threads.sleep(rem); + timeout -= rem; + } + } + } + + public class TimeBoundedMultiThreadedReaderThread + extends MultiThreadedReader.HBaseReaderThread { + + public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException { + super(readerId); + } + + @Override + protected Get createGet(long keyToRead) throws IOException { + Get get = super.createGet(keyToRead); + get.setConsistency(Consistency.TIMELINE); + return get; + } + + @Override + protected long getNextKeyToRead() { + // always read a random key, assuming that the writer has finished writing all keys + long key = startKey + Math.abs(RandomUtils.nextLong()) + % (endKey - startKey); + return key; + } + + @Override + protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano, + Result result, HTable table, boolean isNullExpected) throws IOException { + super.verifyResultsAndUpdateMetrics(verify, rowKey, elapsedNano, result, table, isNullExpected); + // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC + // to complete, but if the request took longer than timeout, we treat that as error. + if (elapsedNano > timeoutNano) { + timedOutReads.incrementAndGet(); + numReadFailures.addAndGet(1); // fail the test + } + } + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args); + System.exit(ret); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e56b8d0..79bda27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -957,7 +957,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { miniClusterRunning = false; LOG.info("Minicluster is down"); } - + /** * @return True if we removed the test dirs * @throws IOException @@ -3191,11 +3191,27 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public static int createPreSplitLoadTestTable(Configuration conf, TableName tableName, byte[] columnFamily, Algorithm compression, DataBlockEncoding dataBlockEncoding) throws IOException { + return createPreSplitLoadTestTable(conf, tableName, + columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1, + Durability.USE_DEFAULT); + } + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + TableName tableName, byte[] columnFamily, Algorithm compression, + DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication, + Durability durability) + throws IOException { HTableDescriptor desc = new HTableDescriptor(tableName); + desc.setDurability(durability); + desc.setRegionReplication(regionReplication); HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); hcd.setDataBlockEncoding(dataBlockEncoding); hcd.setCompressionType(compression); - return createPreSplitLoadTestTable(conf, desc, hcd); + return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer); } /** @@ -3205,6 +3221,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public static int createPreSplitLoadTestTable(Configuration conf, HTableDescriptor desc, HColumnDescriptor hcd) throws IOException { + return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER); + } + + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException { if (!desc.hasFamily(hcd.getName())) { desc.addFamily(hcd); } @@ -3220,11 +3246,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { throw new IllegalStateException("No live regionservers"); } - int regionsPerServer = conf.getInt(REGIONS_PER_SERVER_KEY, DEFAULT_REGIONS_PER_SERVER); - totalNumberOfRegions = numberOfServers * regionsPerServer; + totalNumberOfRegions = numberOfServers * numRegionsPerServer; LOG.info("Number of live regionservers: " + numberOfServers + ", " + "pre-splitting table into " + totalNumberOfRegions + " regions " + - "(default regions per server: " + regionsPerServer + ")"); + "(regions per server: " + numRegionsPerServer + ")"); byte[][] splits = new RegionSplitter.HexStringSplit().split( totalNumberOfRegions); http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java index a893317..34980c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java @@ -110,10 +110,11 @@ public class LoadTestTool extends AbstractHBaseTool { + "compression) to use for data blocks in the test column family, " + "one of " + Arrays.toString(DataBlockEncoding.values()) + "."; - private static final String OPT_BLOOM = "bloom"; - private static final String OPT_COMPRESSION = "compression"; - private static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; + public static final String OPT_BLOOM = "bloom"; + public static final String OPT_COMPRESSION = "compression"; + public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush"; public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush."; + public static final String OPT_DATA_BLOCK_ENCODING = HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase(); @@ -125,6 +126,9 @@ public class LoadTestTool extends AbstractHBaseTool { public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool." + " Any args for this class can be passed as colon separated after class name"; + public static final String OPT_READER = "reader"; + public static final String OPT_READER_USAGE = "The class for executing the read requests"; + protected static final String OPT_KEY_WINDOW = "key_window"; protected static final String OPT_WRITE = "write"; protected static final String OPT_MAX_READ_ERRORS = "max_read_errors"; @@ -132,7 +136,7 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_NUM_KEYS = "num_keys"; protected static final String OPT_READ = "read"; protected static final String OPT_START_KEY = "start_key"; - protected static final String OPT_TABLE_NAME = "tn"; + public static final String OPT_TABLE_NAME = "tn"; protected static final String OPT_ZK_QUORUM = "zk"; protected static final String OPT_ZK_PARENT_NODE = "zk_root"; protected static final String OPT_SKIP_INIT = "skip_init"; @@ -142,11 +146,20 @@ public class LoadTestTool extends AbstractHBaseTool { protected static final String OPT_BATCHUPDATE = "batchupdate"; protected static final String OPT_UPDATE = "update"; - protected static final String OPT_ENCRYPTION = "encryption"; + public static final String OPT_ENCRYPTION = "encryption"; protected static final String OPT_ENCRYPTION_USAGE = "Enables transparent encryption on the test table, one of " + Arrays.toString(Encryption.getSupportedCiphers()); + public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server"; + protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE + = "Desired number of regions per region server. Defaults to 5."; + protected static int DEFAULT_NUM_REGIONS_PER_SERVER = 5; + + public static final String OPT_REGION_REPLICATION = "region_replication"; + protected static final String OPT_REGION_REPLICATION_USAGE = + "Desired number of replicas per region"; + protected static final long DEFAULT_START_KEY = 0; /** This will be removed as we factor out the dependency on command line */ @@ -195,6 +208,9 @@ public class LoadTestTool extends AbstractHBaseTool { //This file is used to read authentication information in secure clusters. private String authnFileName; + private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; + private int regionReplication = -1; // not set + // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad, // console tool itself should only be used from console. protected boolean isSkipInit = false; @@ -292,6 +308,7 @@ public class LoadTestTool extends AbstractHBaseTool { "separate updates for every column in a row"); addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY); addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE); + addOptWithArg(OPT_READER, OPT_READER_USAGE); addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write"); addOptWithArg(OPT_START_KEY, "The first key to read/write " + @@ -311,6 +328,8 @@ public class LoadTestTool extends AbstractHBaseTool { addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE); addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE); + addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE); + addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE); } @Override @@ -421,13 +440,16 @@ public class LoadTestTool extends AbstractHBaseTool { if (cmd.hasOption(NUM_TABLES)) { numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE); } - regionsPerServer = HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER; - if (cmd.hasOption(OPT_REGIONS_PER_SERVER)) { - regionsPerServer = parseInt(cmd.getOptionValue(OPT_REGIONS_PER_SERVER), 1, - Integer.MAX_VALUE); - conf.setInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY, regionsPerServer); + + numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER; + if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) { + numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER)); + } + + regionReplication = 1; + if (cmd.hasOption(OPT_REGION_REPLICATION)) { + regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION)); } - System.out.println("Regions per server: " + regionsPerServer); } private void parseColumnFamilyOptions(CommandLine cmd) { @@ -451,14 +473,14 @@ public class LoadTestTool extends AbstractHBaseTool { } public void initTestTable() throws IOException { - HTableDescriptor desc = new HTableDescriptor(tableName); + Durability durability = Durability.USE_DEFAULT; if (deferredLogFlush) { - desc.setDurability(Durability.ASYNC_WAL); + durability = Durability.ASYNC_WAL; } - HColumnDescriptor hcd = new HColumnDescriptor(COLUMN_FAMILY); - hcd.setDataBlockEncoding(dataBlockEncodingAlgo); - hcd.setCompressionType(compressAlgo); - HBaseTestingUtility.createPreSplitLoadTestTable(conf, desc, hcd); + + HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName, + COLUMN_FAMILY, compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer, + regionReplication, durability); applyColumnFamilyOptions(tableName, COLUMN_FAMILIES); } @@ -588,7 +610,13 @@ public class LoadTestTool extends AbstractHBaseTool { readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent, userNames); } else { - readerThreads = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent); + String readerClass = null; + if (cmd.hasOption(OPT_READER)) { + readerClass = cmd.getOptionValue(OPT_READER); + } else { + readerClass = MultiThreadedReader.class.getCanonicalName(); + } + readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen); } readerThreads.setMaxErrors(maxReadErrors); readerThreads.setKeyWindow(keyWindow); @@ -662,6 +690,18 @@ public class LoadTestTool extends AbstractHBaseTool { } } + private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName + , LoadTestDataGenerator dataGen) throws IOException { + try { + Class clazz = Class.forName(clazzName); + Constructor constructor = clazz.getConstructor( + LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class); + return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent); + } catch (Exception e) { + throw new IOException(e); + } + } + public static byte[] generateData(final Random r, int length) { byte [] b = new byte [length]; int i = 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java index 0edeea7..b0d44fd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java @@ -39,7 +39,7 @@ public class MultiThreadedReader extends MultiThreadedAction protected Set readers = new HashSet(); private final double verifyPercent; - private volatile boolean aborted; + protected volatile boolean aborted; protected MultiThreadedWriterBase writer = null; @@ -104,11 +104,15 @@ public class MultiThreadedReader extends MultiThreadedAction protected void addReaderThreads(int numThreads) throws IOException { for (int i = 0; i < numThreads; ++i) { - HBaseReaderThread reader = new HBaseReaderThread(i); + HBaseReaderThread reader = createReaderThread(i); readers.add(reader); } } + protected HBaseReaderThread createReaderThread(int readerId) throws IOException { + return new HBaseReaderThread(readerId); + } + public class HBaseReaderThread extends Thread { protected final int readerId; protected final HTable table; @@ -122,6 +126,8 @@ public class MultiThreadedReader extends MultiThreadedAction /** If we are ahead of the writer and reading a random key. */ private boolean readingRandomKey; + private boolean printExceptionTrace = true; + /** * @param readerId only the keys with this remainder from division by * {@link #numThreads} will be read by this thread @@ -204,7 +210,7 @@ public class MultiThreadedReader extends MultiThreadedAction return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow); } - private long getNextKeyToRead() { + protected long getNextKeyToRead() { readingRandomKey = false; if (writer == null || curKey <= maxKeyWeCanRead()) { return curKey++; @@ -235,6 +241,24 @@ public class MultiThreadedReader extends MultiThreadedAction } private Get readKey(long keyToRead) { + Get get = null; + try { + get = createGet(keyToRead); + queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead); + } catch (IOException e) { + numReadFailures.addAndGet(1); + LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + + ", time from start: " + + (System.currentTimeMillis() - startTimeMs) + " ms"); + if (printExceptionTrace) { + LOG.warn(e); + printExceptionTrace = false; + } + } + return get; + } + + protected Get createGet(long keyToRead) throws IOException { Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead)); String cfsString = ""; byte[][] columnFamilies = dataGenerator.getColumnFamilies(); @@ -247,18 +271,9 @@ public class MultiThreadedReader extends MultiThreadedAction cfsString += "[" + Bytes.toStringBinary(cf) + "]"; } } - - try { - get = dataGenerator.beforeGet(keyToRead, get); - if (verbose) { - LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); - } - queryKey(get, RandomUtils.nextInt(100) < verifyPercent, keyToRead); - } catch (IOException e) { - numReadFailures.addAndGet(1); - LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") - + ", time from start: " - + (System.currentTimeMillis() - startTimeMs) + " ms"); + get = dataGenerator.beforeGet(keyToRead, get); + if (verbose) { + LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString); } return get; } @@ -267,15 +282,16 @@ public class MultiThreadedReader extends MultiThreadedAction String rowKey = Bytes.toString(get.getRow()); // read the data - long start = System.currentTimeMillis(); + long start = System.nanoTime(); Result result = table.get(get); - getResultMetricUpdation(verify, rowKey, start, result, table, false); + long end = System.nanoTime(); + verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, table, false); } - protected void getResultMetricUpdation(boolean verify, String rowKey, long start, + protected void verifyResultsAndUpdateMetrics(boolean verify, String rowKey, long elapsedNano, Result result, HTable table, boolean isNullExpected) throws IOException { - totalOpTimeMs.addAndGet(System.currentTimeMillis() - start); + totalOpTimeMs.addAndGet(elapsedNano / 1000000); numKeys.addAndGet(1); if (!result.isEmpty()) { if (verify) { http://git-wip-us.apache.org/repos/asf/hbase/blob/759cfb83/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java index 99b4f1d..df59547 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReaderWithACL.java @@ -89,7 +89,7 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader { final String rowKey = Bytes.toString(get.getRow()); // read the data - final long start = System.currentTimeMillis(); + final long start = System.nanoTime(); PrivilegedExceptionAction action = new PrivilegedExceptionAction() { @Override public Object run() throws Exception { @@ -109,7 +109,8 @@ public class MultiThreadedReaderWithACL extends MultiThreadedReader { } boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0); LOG.info("Read happening from ACL " + isNullExpected); - getResultMetricUpdation(verify, rowKey, start, result, localTable, isNullExpected); + long end = System.nanoTime(); + verifyResultsAndUpdateMetrics(verify, rowKey, end - start, result, localTable, isNullExpected); } catch (IOException e) { recordFailure(keyToRead); }