Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ABD2F18C3B for ; Wed, 24 Feb 2016 15:14:21 +0000 (UTC) Received: (qmail 7948 invoked by uid 500); 24 Feb 2016 15:14:08 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 7815 invoked by uid 500); 24 Feb 2016 15:14:08 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 7635 invoked by uid 99); 24 Feb 2016 15:14:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 15:14:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8980CE8EBB; Wed, 24 Feb 2016 15:14:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stack@apache.org To: commits@hbase.apache.org Date: Wed, 24 Feb 2016 15:14:09 -0000 Message-Id: <1e2c2c2418bc48cbb7d13a403fd73c91@git.apache.org> In-Reply-To: <9e032866b05846ffa5879592eaf204b0@git.apache.org> References: <9e032866b05846ffa5879592eaf204b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] hbase git commit: HBASE-15302 Reenable the other tests disabled by HBASE-14678 http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java new file mode 100644 index 0000000..c5728cf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -0,0 +1,1799 @@ +/** + * + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err; +import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.NonceGenerator; +import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; +import org.apache.hadoop.hbase.exceptions.OperationConflictException; +import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MasterTests.class, LargeTests.class}) +@SuppressWarnings("deprecation") +public class TestDistributedLogSplitting { + private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); + static { + // Uncomment the following line if more verbosity is needed for + // debugging (see HBASE-12285 for details). + //Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); + + // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this + // turns it off for this test. TODO: Figure out why scr breaks recovery. + System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); + + } + + // Start a cluster with 2 masters and 6 regionservers + static final int NUM_MASTERS = 2; + static final int NUM_RS = 5; + + MiniHBaseCluster cluster; + HMaster master; + Configuration conf; + static Configuration originalConf; + static HBaseTestingUtility TEST_UTIL; + static MiniDFSCluster dfsCluster; + static MiniZooKeeperCluster zkCluster; + + @BeforeClass + public static void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create()); + dfsCluster = TEST_UTIL.startMiniDFSCluster(1); + zkCluster = TEST_UTIL.startMiniZKCluster(); + originalConf = TEST_UTIL.getConfiguration(); + } + + @AfterClass + public static void tearDown() throws IOException { + TEST_UTIL.shutdownMiniZKCluster(); + TEST_UTIL.shutdownMiniDFSCluster(); + TEST_UTIL.shutdownMiniHBaseCluster(); + } + + private void startCluster(int num_rs) throws Exception { + SplitLogCounters.resetCounters(); + LOG.info("Starting cluster"); + conf.getLong("hbase.splitlog.max.resubmit", 0); + // Make the failure test faster + conf.setInt("zookeeper.recovery.retry", 0); + conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); + conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing + conf.setInt("hbase.regionserver.wal.max.splitters", 3); + conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); + TEST_UTIL.shutdownMiniHBaseCluster(); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.setDFSCluster(dfsCluster); + TEST_UTIL.setZkCluster(zkCluster); + TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs); + cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Waiting for active/ready master"); + cluster.waitForActiveAndReadyMaster(); + master = cluster.getMaster(); + while (cluster.getLiveRegionServerThreads().size() < num_rs) { + Threads.sleep(10); + } + } + + @Before + public void before() throws Exception { + // refresh configuration + conf = HBaseConfiguration.create(originalConf); + } + + @After + public void after() throws Exception { + try { + if (TEST_UTIL.getHBaseCluster() != null) { + for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) { + mt.getMaster().abort("closing...", null); + } + } + TEST_UTIL.shutdownMiniHBaseCluster(); + } finally { + TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); + ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test (timeout=300000) + public void testRecoveredEdits() throws Exception { + LOG.info("testRecoveredEdits"); + conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + startCluster(NUM_RS); + + final int NUM_LOG_LINES = 1000; + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + + List rsts = cluster.getLiveRegionServerThreads(); + + Path rootdir = FSUtils.getRootDir(conf); + + Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null), + "table", "family", 40); + try { + TableName table = t.getName(); + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean foundRs = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo region : regions) { + if (region.getTable().getNameAsString().equalsIgnoreCase("table")) { + foundRs = true; + break; + } + } + if (foundRs) break; + } + final Path logDir = new Path(rootdir, DefaultWALProvider.getWALDirectoryName(hrs + .getServerName().toString())); + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.getTable().getNamespaceAsString() + .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { + it.remove(); + } + } + + makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); + + slm.splitLogDistributed(logDir); + + int count = 0; + for (HRegionInfo hri : regions) { + + Path tdir = FSUtils.getTableDir(rootdir, table); + Path editsdir = + WALSplitter.getRegionDirRecoveredEditsDir( + HRegion.getRegionDir(tdir, hri.getEncodedName())); + LOG.debug("checking edits dir " + editsdir); + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); + assertTrue( + "edits dir should have more than a single file in it. instead has " + files.length, + files.length > 1); + for (int i = 0; i < files.length; i++) { + int c = countWAL(files[i].getPath(), fs, conf); + count += c; + } + LOG.info(count + " edits in " + files.length + " recovered edits files."); + } + + // check that the log file is moved + assertFalse(fs.exists(logDir)); + + assertEquals(NUM_LOG_LINES, count); + } finally { + if (t != null) t.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testLogReplayWithNonMetaRSDown() throws Exception { + LOG.info("testLogReplayWithNonMetaRSDown"); + conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + HRegionServer hrs = findRSToKill(false, "table"); + List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); + + // wait for abort completes + this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator { + private boolean isDups = false; + private LinkedList nonces = new LinkedList(); + + public void startDups() { + isDups = true; + } + + @Override + public long newNonce() { + long nonce = isDups ? nonces.removeFirst() : super.newNonce(); + if (!isDups) { + nonces.add(nonce); + } + return nonce; + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testNonceRecovery() throws Exception { + LOG.info("testNonceRecovery"); + final String TABLE_NAME = "table"; + final String FAMILY_NAME = "family"; + final int NUM_REGIONS_TO_CREATE = 40; + + conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + master.balanceSwitch(false); + + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE); + NonceGeneratorWithDups ng = new NonceGeneratorWithDups(); + NonceGenerator oldNg = + ConnectionUtils.injectNonceGeneratorForTesting( + (ClusterConnection)TEST_UTIL.getConnection(), ng); + + try { + List reqs = new ArrayList(); + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + HRegionServer hrs = rst.getRegionServer(); + List hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo hri : hris) { + if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) { + byte[] key = hri.getStartKey(); + if (key == null || key.length == 0) { + key = Bytes.copy(hri.getEndKey()); + --(key[key.length - 1]); + } + Increment incr = new Increment(key); + incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1); + ht.increment(incr); + reqs.add(incr); + } + } + } + + HRegionServer hrs = findRSToKill(false, "table"); + abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + ng.startDups(); + for (Increment incr : reqs) { + try { + ht.increment(incr); + fail("should have thrown"); + } catch (OperationConflictException ope) { + LOG.debug("Caught as expected: " + ope.getMessage()); + } + } + } finally { + ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) + TEST_UTIL.getConnection(), oldNg); + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testLogReplayWithMetaRSDown() throws Exception { + LOG.info("testRecoveredEditsReplayWithMetaRSDown"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + HRegionServer hrs = findRSToKill(true, "table"); + List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); + + this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + private void abortRSAndVerifyRecovery(HRegionServer hrs, Table ht, final ZooKeeperWatcher zkw, + final int numRegions, final int numofLines) throws Exception { + + abortRSAndWaitForRecovery(hrs, zkw, numRegions); + assertEquals(numofLines, TEST_UTIL.countRows(ht)); + } + + private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw, + final int numRegions) throws Exception { + final MiniHBaseCluster tmpCluster = this.cluster; + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (HBaseTestingUtility.getAllOnlineRegions(tmpCluster).size() + >= (numRegions + 1)); + } + }); + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + return (recoveringRegions != null && recoveringRegions.size() == 0); + } + }); + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testMasterStartsUpWithLogSplittingWork() throws Exception { + LOG.info("testMasterStartsUpWithLogSplittingWork"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); + startCluster(NUM_RS); + + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + HRegionServer hrs = findRSToKill(false, "table"); + List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); + + // abort master + abortMaster(cluster); + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + Thread.sleep(2000); + LOG.info("Current Open Regions:" + + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() + >= (NUM_REGIONS_TO_CREATE + 1)); + } + }); + + LOG.info("Current Open Regions After Master Node Starts Up:" + + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); + + assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testMasterStartsUpWithLogReplayWork() throws Exception { + LOG.info("testMasterStartsUpWithLogReplayWork"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); + startCluster(NUM_RS); + + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + HRegionServer hrs = findRSToKill(false, "table"); + List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); + + // abort master + abortMaster(cluster); + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for the RS dies + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + Thread.sleep(2000); + LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + boolean done = recoveringRegions != null && recoveringRegions.size() == 0; + if (!done) { + LOG.info("Recovering regions: " + recoveringRegions); + } + return done; + } + }); + + LOG.info("Current Open Regions After Master Node Starts Up:" + + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); + + assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testLogReplayTwoSequentialRSDown() throws Exception { + LOG.info("testRecoveredEditsReplayTwoSequentialRSDown"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + List regions = null; + HRegionServer hrs1 = findRSToKill(false, "table"); + regions = ProtobufUtil.getOnlineRegions(hrs1.getRSRpcServices()); + + makeWAL(hrs1, regions, "table", "family", NUM_LOG_LINES, 100); + + // abort RS1 + LOG.info("Aborting region server: " + hrs1.getServerName()); + hrs1.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() + >= (NUM_REGIONS_TO_CREATE + 1)); + } + }); + + // sleep a little bit in order to interrupt recovering in the middle + Thread.sleep(300); + // abort second region server + rsts = cluster.getLiveRegionServerThreads(); + HRegionServer hrs2 = rsts.get(0).getRegionServer(); + LOG.info("Aborting one more region server: " + hrs2.getServerName()); + hrs2.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() + >= (NUM_REGIONS_TO_CREATE + 1)); + } + }); + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + return (recoveringRegions != null && recoveringRegions.size() == 0); + } + }); + + assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testMarkRegionsRecoveringInZK() throws Exception { + LOG.info("testMarkRegionsRecoveringInZK"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + master.balanceSwitch(false); + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = master.getZooKeeper(); + Table ht = installTable(zkw, "table", "family", 40); + try { + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + + Set regionSet = new HashSet(); + HRegionInfo region = null; + HRegionServer hrs = null; + ServerName firstFailedServer = null; + ServerName secondFailedServer = null; + for (int i = 0; i < NUM_RS; i++) { + hrs = rsts.get(i).getRegionServer(); + List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + if (regions.isEmpty()) continue; + region = regions.get(0); + regionSet.add(region); + firstFailedServer = hrs.getServerName(); + secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName(); + break; + } + + slm.markRegionsRecovering(firstFailedServer, regionSet); + slm.markRegionsRecovering(secondFailedServer, regionSet); + + List recoveringRegions = ZKUtil.listChildrenNoWatch(zkw, + ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName())); + + assertEquals(recoveringRegions.size(), 2); + + // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK + final HRegionServer tmphrs = hrs; + TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (tmphrs.getRecoveringRegions().size() == 0); + } + }); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testReplayCmd() throws Exception { + LOG.info("testReplayCmd"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + final int NUM_REGIONS_TO_CREATE = 40; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + if (regions.size() > 0) break; + } + + this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1")); + String originalCheckSum = TEST_UTIL.checksumRows(ht); + + // abort RA and trigger replay + abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + assertEquals("Data should remain after reopening of regions", originalCheckSum, + TEST_UTIL.checksumRows(ht)); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testLogReplayForDisablingTable() throws Exception { + LOG.info("testLogReplayForDisablingTable"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE); + try { + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List regions = null; + HRegionServer hrs = null; + boolean hasRegionsForBothTables = false; + String tableName = null; + for (int i = 0; i < NUM_RS; i++) { + tableName = null; + hasRegionsForBothTables = false; + boolean isCarryingSystem = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo region : regions) { + if (region.getTable().isSystemTable()) { + isCarryingSystem = true; + break; + } + if (tableName != null && + !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) { + // make sure that we find a RS has online regions for both "table" and "disableTable" + hasRegionsForBothTables = true; + break; + } else if (tableName == null) { + tableName = region.getTable().getNameAsString(); + } + } + if (isCarryingSystem) { + continue; + } + if (hasRegionsForBothTables) { + break; + } + } + + // make sure we found a good RS + Assert.assertTrue(hasRegionsForBothTables); + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable()) { + it.remove(); + } + } + makeWAL(hrs, regions, "disableTable", "family", NUM_LOG_LINES, 100, false); + makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100); + + LOG.info("Disabling table\n"); + TEST_UTIL.getHBaseAdmin().disableTable(TableName.valueOf("disableTable")); + TEST_UTIL.waitTableDisabled(TableName.valueOf("disableTable").getName()); + + // abort RS + LOG.info("Aborting region server: " + hrs.getServerName()); + hrs.abort("testing"); + + // wait for abort completes + TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1)); + } + }); + + // wait for regions come online + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (HBaseTestingUtility.getAllOnlineRegions(cluster).size() + >= (NUM_REGIONS_TO_CREATE + 1)); + } + }); + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + ServerManager serverManager = master.getServerManager(); + return (!serverManager.areDeadServersInProgress() && + recoveringRegions != null && recoveringRegions.size() == 0); + } + }); + + int count = 0; + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + Path rootdir = FSUtils.getRootDir(conf); + Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable")); + for (HRegionInfo hri : regions) { + Path editsdir = + WALSplitter.getRegionDirRecoveredEditsDir( + HRegion.getRegionDir(tdir, hri.getEncodedName())); + LOG.debug("checking edits dir " + editsdir); + if(!fs.exists(editsdir)) continue; + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); + if(files != null) { + for(FileStatus file : files) { + int c = countWAL(file.getPath(), fs, conf); + count += c; + LOG.info(c + " edits in " + file.getPath()); + } + } + } + + LOG.info("Verify edits in recovered.edits files"); + assertEquals(NUM_LOG_LINES, count); + LOG.info("Verify replayed edits"); + assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht)); + + // clean up + for (HRegionInfo hri : regions) { + Path editsdir = + WALSplitter.getRegionDirRecoveredEditsDir( + HRegion.getRegionDir(tdir, hri.getEncodedName())); + fs.delete(editsdir, true); + } + disablingHT.close(); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testDisallowWritesInRecovering() throws Exception { + LOG.info("testDisallowWritesInRecovering"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true); + startCluster(NUM_RS); + final int NUM_REGIONS_TO_CREATE = 40; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + + Set regionSet = new HashSet(); + HRegionInfo region = null; + HRegionServer hrs = null; + HRegionServer dstRS = null; + for (int i = 0; i < NUM_RS; i++) { + hrs = rsts.get(i).getRegionServer(); + List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + if (regions.isEmpty()) continue; + region = regions.get(0); + regionSet.add(region); + dstRS = rsts.get((i+1) % NUM_RS).getRegionServer(); + break; + } + + slm.markRegionsRecovering(hrs.getServerName(), regionSet); + // move region in order for the region opened in recovering state + final HRegionInfo hri = region; + final HRegionServer tmpRS = dstRS; + TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(dstRS.getServerName().getServerName())); + // wait for region move completes + final RegionStates regionStates = + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); + TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ServerName sn = regionStates.getRegionServerOfRegion(hri); + return (sn != null && sn.equals(tmpRS.getServerName())); + } + }); + + try { + byte[] key = region.getStartKey(); + if (key == null || key.length == 0) { + key = new byte[] { 0, 0, 0, 0, 1 }; + } + Put put = new Put(key); + put.addColumn(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); + ht.put(put); + } catch (IOException ioe) { + Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException); + RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe; + boolean foundRegionInRecoveryException = false; + for (Throwable t : re.getCauses()) { + if (t instanceof RegionInRecoveryException) { + foundRegionInRecoveryException = true; + break; + } + } + Assert.assertTrue( + "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(), + foundRegionInRecoveryException); + } + } finally { + if (ht != null) ht.close(); + if (ht != null) zkw.close(); + } + } + + /** + * The original intention of this test was to force an abort of a region + * server and to make sure that the failure path in the region servers is + * properly evaluated. But it is difficult to ensure that the region server + * doesn't finish the log splitting before it aborts. Also now, there is + * this code path where the master will preempt the region server when master + * detects that the region server has aborted. + * @throws Exception + */ + @Ignore ("Disabled because flakey") @Test (timeout=300000) + public void testWorkerAbort() throws Exception { + LOG.info("testWorkerAbort"); + startCluster(3); + final int NUM_LOG_LINES = 10000; + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + + final List rsts = cluster.getLiveRegionServerThreads(); + HRegionServer hrs = findRSToKill(false, "table"); + Path rootdir = FSUtils.getRootDir(conf); + final Path logDir = new Path(rootdir, + DefaultWALProvider.getWALDirectoryName(hrs.getServerName().toString())); + + Table t = installTable(new ZooKeeperWatcher(conf, "table-creation", null), + "table", "family", 40); + try { + makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), + "table", "family", NUM_LOG_LINES, 100); + + new Thread() { + @Override + public void run() { + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + for (RegionServerThread rst : rsts) { + rst.getRegionServer().abort("testing"); + break; + } + } + }.start(); + // slm.splitLogDistributed(logDir); + FileStatus[] logfiles = fs.listStatus(logDir); + TaskBatch batch = new TaskBatch(); + slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); + //waitForCounter but for one of the 2 counters + long curt = System.currentTimeMillis(); + long waitTime = 80000; + long endt = curt + waitTime; + while (curt < endt) { + if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + + tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + + tot_wkr_preempt_task.get()) == 0) { + Thread.yield(); + curt = System.currentTimeMillis(); + } else { + assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + + tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + + tot_wkr_preempt_task.get())); + return; + } + } + fail("none of the following counters went up in " + waitTime + + " milliseconds - " + + "tot_wkr_task_resigned, tot_wkr_task_err, " + + "tot_wkr_final_transition_failed, tot_wkr_task_done, " + + "tot_wkr_preempt_task"); + } finally { + if (t != null) t.close(); + } + } + + @Test (timeout=300000) + public void testThreeRSAbort() throws Exception { + LOG.info("testThreeRSAbort"); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_ROWS_PER_REGION = 100; + + startCluster(NUM_RS); // NUM_RS=6. + + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, + "distributed log splitting test", null); + + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + populateDataInTable(NUM_ROWS_PER_REGION, "family"); + + + List rsts = cluster.getLiveRegionServerThreads(); + assertEquals(NUM_RS, rsts.size()); + rsts.get(0).getRegionServer().abort("testing"); + rsts.get(1).getRegionServer().abort("testing"); + rsts.get(2).getRegionServer().abort("testing"); + + long start = EnvironmentEdgeManager.currentTime(); + while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) { + if (EnvironmentEdgeManager.currentTime() - start > 60000) { + assertTrue(false); + } + Thread.sleep(200); + } + + start = EnvironmentEdgeManager.currentTime(); + while (HBaseTestingUtility.getAllOnlineRegions(cluster).size() + < (NUM_REGIONS_TO_CREATE + 1)) { + if (EnvironmentEdgeManager.currentTime() - start > 60000) { + assertTrue("Timedout", false); + } + Thread.sleep(200); + } + + // wait for all regions are fully recovered + TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + List recoveringRegions = zkw.getRecoverableZooKeeper().getChildren( + zkw.recoveringRegionsZNode, false); + return (recoveringRegions != null && recoveringRegions.size() == 0); + } + }); + + assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION, + TEST_UTIL.countRows(ht)); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + + + @Test(timeout=30000) + public void testDelayedDeleteOnFailure() throws Exception { + LOG.info("testDelayedDeleteOnFailure"); + startCluster(1); + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + final FileSystem fs = master.getMasterFileSystem().getFileSystem(); + final Path logDir = new Path(FSUtils.getRootDir(conf), "x"); + fs.mkdirs(logDir); + ExecutorService executor = null; + try { + final Path corruptedLogFile = new Path(logDir, "x"); + FSDataOutputStream out; + out = fs.create(corruptedLogFile); + out.write(0); + out.write(Bytes.toBytes("corrupted bytes")); + out.close(); + ZKSplitLogManagerCoordination coordination = + (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master + .getCoordinatedStateManager()).getSplitLogManagerCoordination(); + coordination.setIgnoreDeleteForTesting(true); + executor = Executors.newSingleThreadExecutor(); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + // since the logDir is a fake, corrupted one, so the split log worker + // will finish it quickly with error, and this call will fail and throw + // an IOException. + slm.splitLogDistributed(logDir); + } catch (IOException ioe) { + try { + assertTrue(fs.exists(corruptedLogFile)); + // this call will block waiting for the task to be removed from the + // tasks map which is not going to happen since ignoreZKDeleteForTesting + // is set to true, until it is interrupted. + slm.splitLogDistributed(logDir); + } catch (IOException e) { + assertTrue(Thread.currentThread().isInterrupted()); + return; + } + fail("did not get the expected IOException from the 2nd call"); + } + fail("did not get the expected IOException from the 1st call"); + } + }; + Future result = executor.submit(runnable); + try { + result.get(2000, TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + // it is ok, expected. + } + waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); + executor.shutdownNow(); + executor = null; + + // make sure the runnable is finished with no exception thrown. + result.get(); + } finally { + if (executor != null) { + // interrupt the thread in case the test fails in the middle. + // it has no effect if the thread is already terminated. + executor.shutdownNow(); + } + fs.delete(logDir, true); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testMetaRecoveryInZK() throws Exception { + LOG.info("testMetaRecoveryInZK"); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + + // only testing meta recovery in ZK operation + HRegionServer hrs = findRSToKill(true, null); + List regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + + LOG.info("#regions = " + regions.size()); + Set tmpRegions = new HashSet(); + tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO); + master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions); + Set userRegionSet = new HashSet(); + userRegionSet.addAll(regions); + master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet); + boolean isMetaRegionInRecovery = false; + List recoveringRegions = + zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); + for (String curEncodedRegionName : recoveringRegions) { + if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + isMetaRegionInRecovery = true; + break; + } + } + assertTrue(isMetaRegionInRecovery); + + master.getMasterFileSystem().splitMetaLog(hrs.getServerName()); + + isMetaRegionInRecovery = false; + recoveringRegions = + zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false); + for (String curEncodedRegionName : recoveringRegions) { + if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + isMetaRegionInRecovery = true; + break; + } + } + // meta region should be recovered + assertFalse(isMetaRegionInRecovery); + zkw.close(); + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testSameVersionUpdatesRecovery() throws Exception { + LOG.info("testSameVersionUpdatesRecovery"); + conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + startCluster(NUM_RS); + final AtomicLong sequenceId = new AtomicLong(100); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals( + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + TableName tableName = TableName.valueOf("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + final WAL wal = hrs.getWAL(curRegionInfo); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + wal.append(htd, curRegionInfo, + new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()), + e, true); + } + wal.sync(); + wal.shutdown(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Ignore("DLR is broken by HBASE-12751") @Test(timeout = 300000) + public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception { + LOG.info("testSameVersionUpdatesRecoveryWithWrites"); + conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); + conf.setInt("hbase.hstore.compactionThreshold", 3); + startCluster(NUM_RS); + final AtomicLong sequenceId = new AtomicLong(100); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 2000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + try { + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + final TableName tableName = TableName.valueOf("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + final WAL wal = hrs.getWAL(curRegionInfo); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), + tableName, System.currentTimeMillis()), e, true); + } + wal.sync(); + wal.shutdown(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush & compaction + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + TEST_UTIL.getHBaseAdmin().compact(tableName); + + // wait for compaction completes + TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE); + } + }); + + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + @Test(timeout = 300000) + public void testReadWriteSeqIdFiles() throws Exception { + LOG.info("testReadWriteSeqIdFiles"); + startCluster(2); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + Table ht = installTable(zkw, "table", "family", 10); + try { + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); + List regionDirs = FSUtils.getRegionDirs(fs, tableDir); + long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); + WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); + assertEquals(newSeqId + 2000, + WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); + + Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); + FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + return WALSplitter.isSequenceIdFile(p); + } + }); + // only one seqid file should exist + assertEquals(1, files.length); + + // verify all seqId files aren't treated as recovered.edits files + NavigableSet recoveredEdits = + WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0)); + assertEquals(0, recoveredEdits.size()); + } finally { + if (ht != null) ht.close(); + if (zkw != null) zkw.close(); + } + } + + Table installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception { + return installTable(zkw, tname, fname, nrs, 0); + } + + Table installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs, + int existingRegions) throws Exception { + // Create a table with regions + TableName table = TableName.valueOf(tname); + byte [] family = Bytes.toBytes(fname); + LOG.info("Creating table with " + nrs + " regions"); + Table ht = TEST_UTIL.createMultiRegionTable(table, family, nrs); + int numRegions = -1; + try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(table)) { + numRegions = r.getStartKeys().length; + } + assertEquals(nrs, numRegions); + LOG.info("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + // disable-enable cycle to get rid of table's dead regions left behind + // by createMultiRegions + LOG.debug("Disabling table\n"); + TEST_UTIL.getHBaseAdmin().disableTable(table); + LOG.debug("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + NavigableSet regions = HBaseTestingUtility.getAllOnlineRegions(cluster); + LOG.debug("Verifying only catalog and namespace regions are assigned\n"); + if (regions.size() != 2) { + for (String oregion : regions) + LOG.debug("Region still online: " + oregion); + } + assertEquals(2 + existingRegions, regions.size()); + LOG.debug("Enabling table\n"); + TEST_UTIL.getHBaseAdmin().enableTable(table); + LOG.debug("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); + regions = HBaseTestingUtility.getAllOnlineRegions(cluster); + assertEquals(numRegions + 2 + existingRegions, regions.size()); + return ht; + } + + void populateDataInTable(int nrows, String fname) throws Exception { + byte [] family = Bytes.toBytes(fname); + + List rsts = cluster.getLiveRegionServerThreads(); + assertEquals(NUM_RS, rsts.size()); + + for (RegionServerThread rst : rsts) { + HRegionServer hrs = rst.getRegionServer(); + List hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo hri : hris) { + if (hri.getTable().isSystemTable()) { + continue; + } + LOG.debug("adding data to rs = " + rst.getName() + + " region = "+ hri.getRegionNameAsString()); + Region region = hrs.getOnlineRegion(hri.getRegionName()); + assertTrue(region != null); + putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family); + } + } + + for (MasterThread mt : cluster.getLiveMasterThreads()) { + HRegionServer hrs = mt.getMaster(); + List hris; + try { + hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + } catch (ServerNotRunningYetException e) { + // It's ok: this master may be a backup. Ignored. + continue; + } + for (HRegionInfo hri : hris) { + if (hri.getTable().isSystemTable()) { + continue; + } + LOG.debug("adding data to rs = " + mt.getName() + + " region = "+ hri.getRegionNameAsString()); + Region region = hrs.getOnlineRegion(hri.getRegionName()); + assertTrue(region != null); + putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family); + } + } + } + + public void makeWAL(HRegionServer hrs, List regions, String tname, String fname, + int num_edits, int edit_size) throws IOException { + makeWAL(hrs, regions, tname, fname, num_edits, edit_size, true); + } + + public void makeWAL(HRegionServer hrs, List regions, String tname, String fname, + int num_edits, int edit_size, boolean cleanShutdown) throws IOException { + TableName fullTName = TableName.valueOf(tname); + // remove root and meta region + regions.remove(HRegionInfo.FIRST_META_REGIONINFO); + // using one sequenceId for edits across all regions is ok. + final AtomicLong sequenceId = new AtomicLong(10); + + + for(Iterator iter = regions.iterator(); iter.hasNext(); ) { + HRegionInfo regionInfo = iter.next(); + if(regionInfo.getTable().isSystemTable()) { + iter.remove(); + } + } + HTableDescriptor htd = new HTableDescriptor(fullTName); + byte[] family = Bytes.toBytes(fname); + htd.addFamily(new HColumnDescriptor(family)); + byte[] value = new byte[edit_size]; + + List hris = new ArrayList(); + for (HRegionInfo region : regions) { + if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) { + continue; + } + hris.add(region); + } + LOG.info("Creating wal edits across " + hris.size() + " regions."); + for (int i = 0; i < edit_size; i++) { + value[i] = (byte) ('a' + (i % 26)); + } + int n = hris.size(); + int[] counts = new int[n]; + // sync every ~30k to line up with desired wal rolls + final int syncEvery = 30 * 1024 / edit_size; + if (n > 0) { + for (int i = 0; i < num_edits; i += 1) { + WALEdit e = new WALEdit(); + HRegionInfo curRegionInfo = hris.get(i % n); + final WAL log = hrs.getWAL(curRegionInfo); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, counts[i % n]); + row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because + // HBaseTestingUtility.createMultiRegions use 5 bytes + // key + byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); + e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); + log.append(htd, curRegionInfo, + new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName, + System.currentTimeMillis()), e, true); + if (0 == i % syncEvery) { + log.sync(); + } + counts[i % n] += 1; + } + } + // done as two passes because the regions might share logs. shutdown is idempotent, but sync + // will cause errors if done after. + for (HRegionInfo info : hris) { + final WAL log = hrs.getWAL(info); + log.sync(); + } + if (cleanShutdown) { + for (HRegionInfo info : hris) { + final WAL log = hrs.getWAL(info); + log.shutdown(); + } + } + for (int i = 0; i < n; i++) { + LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits"); + } + return; + } + + private int countWAL(Path log, FileSystem fs, Configuration conf) + throws IOException { + int count = 0; + WAL.Reader in = WALFactory.createReader(fs, log, conf); + try { + WAL.Entry e; + while ((e = in.next()) != null) { + if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { + count++; + } + } + } finally { + try { + in.close(); + } catch (IOException exception) { + LOG.warn("Problem closing wal: " + exception.getMessage()); + LOG.debug("exception details.", exception); + } + } + return count; + } + + private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master) throws Exception { + TEST_UTIL.waitUntilNoRegionsInTransition(60000); + } + + private void putData(Region region, byte[] startRow, int numRows, byte [] qf, + byte [] ...families) + throws IOException { + for(int i = 0; i < numRows; i++) { + Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); + for(byte [] family : families) { + put.addColumn(family, qf, null); + } + region.put(put); + } + } + + /** + * Load table with puts and deletes with expected values so that we can verify later + */ + private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException { + byte[] k = new byte[3]; + + // add puts + List puts = new ArrayList<>(); + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + k[0] = b1; + k[1] = b2; + k[2] = b3; + Put put = new Put(k); + put.addColumn(f, column, k); + puts.add(put); + } + } + } + t.put(puts); + // add deletes + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + k[0] = 'a'; + k[1] = 'a'; + k[2] = b3; + Delete del = new Delete(k); + t.delete(del); + } + } + + private void waitForCounter(AtomicLong ctr, long oldval, long newval, + long timems) { + long curt = System.currentTimeMillis(); + long endt = curt + timems; + while (curt < endt) { + if (ctr.get() == oldval) { + Thread.yield(); + curt = System.currentTimeMillis(); + } else { + assertEquals(newval, ctr.get()); + return; + } + } + assertTrue(false); + } + + private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException { + for (MasterThread mt : cluster.getLiveMasterThreads()) { + if (mt.getMaster().isActiveMaster()) { + mt.getMaster().abort("Aborting for tests", new Exception("Trace info")); + mt.join(); + break; + } + } + LOG.debug("Master is aborted"); + } + + /** + * Find a RS that has regions of a table. + * @param hasMetaRegion when true, the returned RS has hbase:meta region as well + * @param tableName + * @return + * @throws Exception + */ + private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception { + List rsts = cluster.getLiveRegionServerThreads(); + List regions = null; + HRegionServer hrs = null; + + for (RegionServerThread rst: rsts) { + hrs = rst.getRegionServer(); + while (rst.isAlive() && !hrs.isOnline()) { + Thread.sleep(100); + } + if (!rst.isAlive()) { + continue; + } + boolean isCarryingMeta = false; + boolean foundTableRegion = false; + regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + } + if (tableName == null || region.getTable().getNameAsString().equals(tableName)) { + foundTableRegion = true; + } + if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) { + break; + } + } + if (isCarryingMeta && hasMetaRegion) { + // clients ask for a RS with META + if (!foundTableRegion) { + final HRegionServer destRS = hrs; + // the RS doesn't have regions of the specified table so we need move one to this RS + List tableRegions = + TEST_UTIL.getHBaseAdmin().getTableRegions(TableName.valueOf(tableName)); + final HRegionInfo hri = tableRegions.get(0); + TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(), + Bytes.toBytes(destRS.getServerName().getServerName())); + // wait for region move completes + final RegionStates regionStates = + TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates(); + TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + ServerName sn = regionStates.getRegionServerOfRegion(hri); + return (sn != null && sn.equals(destRS.getServerName())); + } + }); + } + return hrs; + } else if (hasMetaRegion || isCarryingMeta) { + continue; + } + if (foundTableRegion) break; + } + + return hrs; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java new file mode 100644 index 0000000..395eef2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.FlakeyTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({FlakeyTests.class, MediumTests.class}) +public class TestStochasticLoadBalancer2 extends BalancerTestBase { + private static final Log LOG = LogFactory.getLog(TestStochasticLoadBalancer2.class); + + @Test (timeout = 800000) + public void testRegionReplicasOnMidCluster() { + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec + conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0); + TestStochasticLoadBalancer.loadBalancer.setConf(conf); + int numNodes = 200; + int numRegions = 40 * 200; + int replication = 3; // 3 replicas per region + int numRegionsPerServer = 30; //all regions are mostly balanced + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 800000) + public void testRegionReplicasOnLargeCluster() { + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec + conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0); + loadBalancer.setConf(conf); + int numNodes = 1000; + int numRegions = 20 * numNodes; // 20 * replication regions per RS + int numRegionsPerServer = 19; // all servers except one + int numTables = 100; + int replication = 3; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, true); + } + + @Test (timeout = 800000) + public void testRegionReplicasOnMidClusterHighReplication() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec + conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 80; + int numRegions = 6 * numNodes; + int replication = 80; // 80 replicas per region, one for each server + int numRegionsPerServer = 5; + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true); + } + + @Test (timeout = 800000) + public void testRegionReplicationOnMidClusterReplicationGreaterThanNumNodes() { + conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec + conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0); + conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f); + loadBalancer.setConf(conf); + int numNodes = 40; + int numRegions = 6 * 50; + int replication = 50; // 50 replicas per region, more than numNodes + int numRegionsPerServer = 6; + int numTables = 10; + testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, true, false); + } +} \ No newline at end of file