From commits-return-67730-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Mon Feb 12 16:17:07 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 80CDA180652 for ; Mon, 12 Feb 2018 16:17:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7130F160C31; Mon, 12 Feb 2018 15:17:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E75C6160C3F for ; Mon, 12 Feb 2018 16:17:04 +0100 (CET) Received: (qmail 63757 invoked by uid 500); 12 Feb 2018 15:16:59 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 63732 invoked by uid 99); 12 Feb 2018 15:16:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Feb 2018 15:16:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ACC2EDFADA; Mon, 12 Feb 2018 15:16:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 12 Feb 2018 15:16:57 -0000 Message-Id: <7cb2bcb5025c4e18be2502237d7de3c8@git.apache.org> In-Reply-To: <0deb19e11d2b44f6a9b84304b722996a@git.apache.org> References: <0deb19e11d2b44f6a9b84304b722996a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/6] hbase-site git commit: Published site at . http://git-wip-us.apache.org/repos/asf/hbase-site/blob/2fda40d6/testdevapidocs/src-html/org/apache/hadoop/hbase/master/AbstractTestDLS.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/master/AbstractTestDLS.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/master/AbstractTestDLS.html index c02a436..cdcd37f 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/master/AbstractTestDLS.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/master/AbstractTestDLS.html @@ -50,39 +50,39 @@ 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.TimeoutException; 044import java.util.concurrent.atomic.LongAdder; -045 -046import org.apache.hadoop.conf.Configuration; -047import org.apache.hadoop.fs.FSDataOutputStream; -048import org.apache.hadoop.fs.FileStatus; -049import org.apache.hadoop.fs.FileSystem; -050import org.apache.hadoop.fs.Path; -051import org.apache.hadoop.fs.PathFilter; -052import org.apache.hadoop.hbase.HBaseTestingUtility; -053import org.apache.hadoop.hbase.HConstants; -054import org.apache.hadoop.hbase.KeyValue; -055import org.apache.hadoop.hbase.MiniHBaseCluster; -056import org.apache.hadoop.hbase.NamespaceDescriptor; -057import org.apache.hadoop.hbase.ServerName; -058import org.apache.hadoop.hbase.SplitLogCounters; -059import org.apache.hadoop.hbase.TableName; -060import org.apache.hadoop.hbase.Waiter; -061import org.apache.hadoop.hbase.client.Put; -062import org.apache.hadoop.hbase.client.RegionInfo; -063import org.apache.hadoop.hbase.client.RegionInfoBuilder; -064import org.apache.hadoop.hbase.client.RegionLocator; -065import org.apache.hadoop.hbase.client.Table; -066import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; -067import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; -068import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; -069import org.apache.hadoop.hbase.master.assignment.RegionStates; -070import org.apache.hadoop.hbase.regionserver.HRegion; -071import org.apache.hadoop.hbase.regionserver.HRegionServer; -072import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -073import org.apache.hadoop.hbase.regionserver.Region; -074import org.apache.hadoop.hbase.util.Bytes; -075import org.apache.hadoop.hbase.util.FSUtils; -076import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; -077import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +045import org.apache.hadoop.conf.Configuration; +046import org.apache.hadoop.fs.FSDataOutputStream; +047import org.apache.hadoop.fs.FileStatus; +048import org.apache.hadoop.fs.FileSystem; +049import org.apache.hadoop.fs.Path; +050import org.apache.hadoop.fs.PathFilter; +051import org.apache.hadoop.hbase.HBaseTestingUtility; +052import org.apache.hadoop.hbase.HConstants; +053import org.apache.hadoop.hbase.KeyValue; +054import org.apache.hadoop.hbase.MiniHBaseCluster; +055import org.apache.hadoop.hbase.NamespaceDescriptor; +056import org.apache.hadoop.hbase.ServerName; +057import org.apache.hadoop.hbase.SplitLogCounters; +058import org.apache.hadoop.hbase.TableName; +059import org.apache.hadoop.hbase.Waiter; +060import org.apache.hadoop.hbase.client.Put; +061import org.apache.hadoop.hbase.client.RegionInfo; +062import org.apache.hadoop.hbase.client.RegionInfoBuilder; +063import org.apache.hadoop.hbase.client.RegionLocator; +064import org.apache.hadoop.hbase.client.Table; +065import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; +066import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +067import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; +068import org.apache.hadoop.hbase.master.assignment.RegionStates; +069import org.apache.hadoop.hbase.regionserver.HRegion; +070import org.apache.hadoop.hbase.regionserver.HRegionServer; +071import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +072import org.apache.hadoop.hbase.regionserver.Region; +073import org.apache.hadoop.hbase.util.Bytes; +074import org.apache.hadoop.hbase.util.FSUtils; +075import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; +076import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +077import org.apache.hadoop.hbase.util.Threads; 078import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 079import org.apache.hadoop.hbase.wal.WAL; 080import org.apache.hadoop.hbase.wal.WALEdit; @@ -100,693 +100,701 @@ 092import org.junit.rules.TestName; 093import org.slf4j.Logger; 094import org.slf4j.LoggerFactory; -095import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -096 -097/** -098 * Base class for testing distributed log splitting. -099 */ -100public abstract class AbstractTestDLS { -101 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); -102 -103 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); -104 -105 // Start a cluster with 2 masters and 5 regionservers -106 private static final int NUM_MASTERS = 2; -107 private static final int NUM_RS = 5; -108 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); -109 -110 @Rule -111 public TestName testName = new TestName(); -112 -113 private TableName tableName; -114 private MiniHBaseCluster cluster; -115 private HMaster master; -116 private Configuration conf; -117 -118 @Rule -119 public TestName name = new TestName(); -120 -121 @BeforeClass -122 public static void setup() throws Exception { -123 // Uncomment the following line if more verbosity is needed for -124 // debugging (see HBASE-12285 for details). -125 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); -126 TEST_UTIL.startMiniZKCluster(); -127 TEST_UTIL.startMiniDFSCluster(3); -128 } -129 -130 @AfterClass -131 public static void tearDown() throws Exception { -132 TEST_UTIL.shutdownMiniCluster(); -133 } -134 -135 protected abstract String getWalProvider(); -136 -137 private void startCluster(int numRS) throws Exception { -138 SplitLogCounters.resetCounters(); -139 LOG.info("Starting cluster"); -140 conf.setLong("hbase.splitlog.max.resubmit", 0); -141 // Make the failure test faster -142 conf.setInt("zookeeper.recovery.retry", 0); -143 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); -144 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing -145 conf.setInt("hbase.regionserver.wal.max.splitters", 3); -146 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); -147 conf.set("hbase.wal.provider", getWalProvider()); -148 TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, numRS); -149 cluster = TEST_UTIL.getHBaseCluster(); -150 LOG.info("Waiting for active/ready master"); -151 cluster.waitForActiveAndReadyMaster(); -152 master = cluster.getMaster(); -153 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { -154 @Override -155 public boolean evaluate() throws Exception { -156 return cluster.getLiveRegionServerThreads().size() >= numRS; -157 } -158 }); -159 } -160 -161 @Before -162 public void before() throws Exception { -163 conf = TEST_UTIL.getConfiguration(); -164 tableName = TableName.valueOf(testName.getMethodName()); -165 } -166 -167 @After -168 public void after() throws Exception { -169 TEST_UTIL.shutdownMiniHBaseCluster(); -170 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); -171 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); -172 } -173 -174 @Test -175 public void testRecoveredEdits() throws Exception { -176 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal -177 startCluster(NUM_RS); -178 -179 int numLogLines = 10000; -180 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); -181 // turn off load balancing to prevent regions from moving around otherwise -182 // they will consume recovered.edits -183 master.balanceSwitch(false); -184 FileSystem fs = master.getMasterFileSystem().getFileSystem(); -185 -186 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); -187 -188 Path rootdir = FSUtils.getRootDir(conf); -189 -190 int numRegions = 50; -191 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); -192 Table t = installTable(zkw, numRegions)) { -193 TableName table = t.getName(); -194 List<RegionInfo> regions = null; -195 HRegionServer hrs = null; -196 for (int i = 0; i < NUM_RS; i++) { -197 hrs = rsts.get(i).getRegionServer(); -198 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); -199 // At least one RS will have >= to average number of regions. -200 if (regions.size() >= numRegions / NUM_RS) { -201 break; -202 } -203 } -204 Path logDir = new Path(rootdir, -205 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); -206 -207 LOG.info("#regions = " + regions.size()); -208 Iterator<RegionInfo> it = regions.iterator(); -209 while (it.hasNext()) { -210 RegionInfo region = it.next(); -211 if (region.getTable().getNamespaceAsString() -212 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { -213 it.remove(); -214 } -215 } -216 -217 makeWAL(hrs, regions, numLogLines, 100); -218 -219 slm.splitLogDistributed(logDir); -220 -221 int count = 0; -222 for (RegionInfo hri : regions) { -223 Path tdir = FSUtils.getTableDir(rootdir, table); -224 @SuppressWarnings("deprecation") -225 Path editsdir = WALSplitter -226 .getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); -227 LOG.debug("checking edits dir " + editsdir); -228 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { -229 @Override -230 public boolean accept(Path p) { -231 if (WALSplitter.isSequenceIdFile(p)) { -232 return false; -233 } -234 return true; -235 } -236 }); -237 assertTrue( -238 "edits dir should have more than a single file in it. instead has " + files.length, -239 files.length > 1); -240 for (int i = 0; i < files.length; i++) { -241 int c = countWAL(files[i].getPath(), fs, conf); -242 count += c; -243 } -244 LOG.info(count + " edits in " + files.length + " recovered edits files."); -245 } -246 -247 // check that the log file is moved -248 assertFalse(fs.exists(logDir)); -249 assertEquals(numLogLines, count); -250 } -251 } -252 -253 @Test -254 public void testMasterStartsUpWithLogSplittingWork() throws Exception { -255 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); -256 startCluster(NUM_RS); -257 -258 int numRegionsToCreate = 40; -259 int numLogLines = 1000; -260 // turn off load balancing to prevent regions from moving around otherwise -261 // they will consume recovered.edits -262 master.balanceSwitch(false); -263 -264 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); -265 Table ht = installTable(zkw, numRegionsToCreate)) { -266 HRegionServer hrs = findRSToKill(false); -267 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); -268 makeWAL(hrs, regions, numLogLines, 100); -269 -270 // abort master -271 abortMaster(cluster); -272 -273 // abort RS -274 LOG.info("Aborting region server: " + hrs.getServerName()); -275 hrs.abort("testing"); -276 -277 // wait for abort completes -278 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { -279 @Override -280 public boolean evaluate() throws Exception { -281 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; -282 } -283 }); -284 -285 Thread.sleep(2000); -286 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); -287 -288 // wait for abort completes -289 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { -290 @Override -291 public boolean evaluate() throws Exception { -292 return (HBaseTestingUtility.getAllOnlineRegions(cluster) -293 .size() >= (numRegionsToCreate + 1)); -294 } -295 }); -296 -297 LOG.info("Current Open Regions After Master Node Starts Up:" + -298 HBaseTestingUtility.getAllOnlineRegions(cluster).size()); -299 -300 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); -301 } -302 } -303 -304 /** -305 * The original intention of this test was to force an abort of a region server and to make sure -306 * that the failure path in the region servers is properly evaluated. But it is difficult to -307 * ensure that the region server doesn't finish the log splitting before it aborts. Also now, -308 * there is this code path where the master will preempt the region server when master detects -309 * that the region server has aborted. -310 * @throws Exception -311 */ -312 // Was marked flaky before Distributed Log Replay cleanup. -313 @Test -314 public void testWorkerAbort() throws Exception { -315 LOG.info("testWorkerAbort"); -316 startCluster(3); -317 int numLogLines = 10000; -318 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); -319 FileSystem fs = master.getMasterFileSystem().getFileSystem(); -320 -321 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); -322 HRegionServer hrs = findRSToKill(false); -323 Path rootdir = FSUtils.getRootDir(conf); -324 final Path logDir = new Path(rootdir, -325 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); -326 -327 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); -328 Table t = installTable(zkw, 40)) { -329 makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); -330 -331 new Thread() { -332 @Override -333 public void run() { -334 try { -335 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); -336 } catch (InterruptedException e) { -337 } -338 for (RegionServerThread rst : rsts) { -339 rst.getRegionServer().abort("testing"); -340 break; -341 } -342 } -343 }.start(); -344 FileStatus[] logfiles = fs.listStatus(logDir); -345 TaskBatch batch = new TaskBatch(); -346 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); -347 // waitForCounter but for one of the 2 counters -348 long curt = System.currentTimeMillis(); -349 long waitTime = 80000; -350 long endt = curt + waitTime; -351 while (curt < endt) { -352 if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + -353 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + -354 tot_wkr_preempt_task.sum()) == 0) { -355 Thread.sleep(100); -356 curt = System.currentTimeMillis(); -357 } else { -358 assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + -359 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + -360 tot_wkr_preempt_task.sum())); -361 return; -362 } -363 } -364 fail("none of the following counters went up in " + waitTime + " milliseconds - " + -365 "tot_wkr_task_resigned, tot_wkr_task_err, " + -366 "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task"); -367 } -368 } -369 -370 @Test -371 public void testThreeRSAbort() throws Exception { -372 LOG.info("testThreeRSAbort"); -373 int numRegionsToCreate = 40; -374 int numRowsPerRegion = 100; -375 -376 startCluster(NUM_RS); // NUM_RS=6. -377 -378 try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); -379 Table table = installTable(zkw, numRegionsToCreate)) { -380 populateDataInTable(numRowsPerRegion); -381 -382 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); -383 assertEquals(NUM_RS, rsts.size()); -384 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); -385 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); -386 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); -387 -388 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { -389 -390 @Override -391 public boolean evaluate() throws Exception { -392 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; -393 } -394 -395 @Override -396 public String explainFailure() throws Exception { -397 return "Timed out waiting for server aborts."; -398 } -399 }); -400 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); -401 assertEquals(numRegionsToCreate * numRowsPerRegion, TEST_UTIL.countRows(table)); -402 } -403 } -404 -405 @Test -406 public void testDelayedDeleteOnFailure() throws Exception { -407 LOG.info("testDelayedDeleteOnFailure"); -408 startCluster(1); -409 final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); -410 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); -411 final Path logDir = new Path(new Path(FSUtils.getRootDir(conf), HConstants.HREGION_LOGDIR_NAME), -412 ServerName.valueOf("x", 1, 1).toString()); -413 fs.mkdirs(logDir); -414 ExecutorService executor = null; -415 try { -416 final Path corruptedLogFile = new Path(logDir, "x"); -417 FSDataOutputStream out; -418 out = fs.create(corruptedLogFile); -419 out.write(0); -420 out.write(Bytes.toBytes("corrupted bytes")); -421 out.close(); -422 ZKSplitLogManagerCoordination coordination = -423 (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager()) -424 .getSplitLogManagerCoordination(); -425 coordination.setIgnoreDeleteForTesting(true); -426 executor = Executors.newSingleThreadExecutor(); -427 Runnable runnable = new Runnable() { -428 @Override -429 public void run() { -430 try { -431 // since the logDir is a fake, corrupted one, so the split log worker -432 // will finish it quickly with error, and this call will fail and throw -433 // an IOException. -434 slm.splitLogDistributed(logDir); -435 } catch (IOException ioe) { -436 try { -437 assertTrue(fs.exists(corruptedLogFile)); -438 // this call will block waiting for the task to be removed from the -439 // tasks map which is not going to happen since ignoreZKDeleteForTesting -440 // is set to true, until it is interrupted. -441 slm.splitLogDistributed(logDir); -442 } catch (IOException e) { -443 assertTrue(Thread.currentThread().isInterrupted()); -444 return; -445 } -446 fail("did not get the expected IOException from the 2nd call"); -447 } -448 fail("did not get the expected IOException from the 1st call"); -449 } -450 }; -451 Future<?> result = executor.submit(runnable); -452 try { -453 result.get(2000, TimeUnit.MILLISECONDS); -454 } catch (TimeoutException te) { -455 // it is ok, expected. -456 } -457 waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000); -458 executor.shutdownNow(); -459 executor = null; -460 -461 // make sure the runnable is finished with no exception thrown. -462 result.get(); -463 } finally { -464 if (executor != null) { -465 // interrupt the thread in case the test fails in the middle. -466 // it has no effect if the thread is already terminated. -467 executor.shutdownNow(); -468 } -469 fs.delete(logDir, true); -470 } -471 } -472 -473 @Test -474 public void testReadWriteSeqIdFiles() throws Exception { -475 LOG.info("testReadWriteSeqIdFiles"); -476 startCluster(2); -477 final ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); -478 Table ht = installTable(zkw, 10); -479 try { -480 FileSystem fs = master.getMasterFileSystem().getFileSystem(); -481 Path tableDir = -482 FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf(name.getMethodName())); -483 List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir); -484 long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); -485 WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); -486 assertEquals(newSeqId + 2000, -487 WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); -488 -489 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); -490 FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { -491 @Override -492 public boolean accept(Path p) { -493 return WALSplitter.isSequenceIdFile(p); -494 } -495 }); -496 // only one seqid file should exist -497 assertEquals(1, files.length); -498 -499 // verify all seqId files aren't treated as recovered.edits files -500 NavigableSet<Path> recoveredEdits = -501 WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0)); -502 assertEquals(0, recoveredEdits.size()); -503 } finally { -504 if (ht != null) ht.close(); -505 if (zkw != null) zkw.close(); -506 } -507 } -508 -509 private Table installTable(ZKWatcher zkw, int nrs) throws Exception { -510 return installTable(zkw, nrs, 0); -511 } -512 -513 private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception { -514 // Create a table with regions -515 byte[] family = Bytes.toBytes("family"); -516 LOG.info("Creating table with " + nrs + " regions"); -517 Table table = TEST_UTIL.createMultiRegionTable(tableName, family, nrs); -518 int numRegions = -1; -519 try (RegionLocator r = TEST_UTIL.getConnection().getRegionLocator(tableName)) { -520 numRegions = r.getStartKeys().length; -521 } -522 assertEquals(nrs, numRegions); -523 LOG.info("Waiting for no more RIT\n"); -524 blockUntilNoRIT(zkw, master); -525 // disable-enable cycle to get rid of table's dead regions left behind -526 // by createMultiRegions -527 LOG.debug("Disabling table\n"); -528 TEST_UTIL.getAdmin().disableTable(tableName); -529 LOG.debug("Waiting for no more RIT\n"); -530 blockUntilNoRIT(zkw, master); -531 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); -532 LOG.debug("Verifying only catalog and namespace regions are assigned\n"); -533 if (regions.size() != 2) { -534 for (String oregion : regions) -535 LOG.debug("Region still online: " + oregion); -536 } -537 assertEquals(2 + existingRegions, regions.size()); -538 LOG.debug("Enabling table\n"); -539 TEST_UTIL.getAdmin().enableTable(tableName); -540 LOG.debug("Waiting for no more RIT\n"); -541 blockUntilNoRIT(zkw, master); -542 LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); -543 regions = HBaseTestingUtility.getAllOnlineRegions(cluster); -544 assertEquals(numRegions + 2 + existingRegions, regions.size()); -545 return table; -546 } -547 -548 void populateDataInTable(int nrows) throws Exception { -549 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); -550 assertEquals(NUM_RS, rsts.size()); -551 -552 for (RegionServerThread rst : rsts) { -553 HRegionServer hrs = rst.getRegionServer(); -554 List<RegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); -555 for (RegionInfo hri : hris) { -556 if (hri.getTable().isSystemTable()) { -557 continue; -558 } -559 LOG.debug( -560 "adding data to rs = " + rst.getName() + " region = " + hri.getRegionNameAsString()); -561 Region region = hrs.getOnlineRegion(hri.getRegionName()); -562 assertTrue(region != null); -563 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); -564 } -565 } -566 -567 for (MasterThread mt : cluster.getLiveMasterThreads()) { -568 HRegionServer hrs = mt.getMaster(); -569 List<RegionInfo> hris; -570 try { -571 hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); -572 } catch (ServerNotRunningYetException e) { -573 // It's ok: this master may be a backup. Ignored. -574 continue; -575 } -576 for (RegionInfo hri : hris) { -577 if (hri.getTable().isSystemTable()) { -578 continue; -579 } -580 LOG.debug( -581 "adding data to rs = " + mt.getName() + " region = " + hri.getRegionNameAsString()); -582 Region region = hrs.getOnlineRegion(hri.getRegionName()); -583 assertTrue(region != null); -584 putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), COLUMN_FAMILY); -585 } -586 } -587 } -588 -589 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int num_edits, int edit_size) -590 throws IOException { -591 makeWAL(hrs, regions, num_edits, edit_size, true); -592 } -593 -594 public void makeWAL(HRegionServer hrs, List<RegionInfo> regions, int numEdits, int editSize, -595 boolean cleanShutdown) throws IOException { -596 // remove root and meta region -597 regions.remove(RegionInfoBuilder.FIRST_META_REGIONINFO); -598 -599 for (Iterator<RegionInfo> iter = regions.iterator(); iter.hasNext();) { -600 RegionInfo regionInfo = iter.next(); -601 if (regionInfo.getTable().isSystemTable()) { -602 iter.remove(); -603 } -604 } -605 byte[] value = new byte[editSize]; +095 +096import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +097 +098/** +099 * Base class for testing distributed log splitting. +100 */ +101public abstract class AbstractTestDLS { +102 private static final Logger LOG = LoggerFactory.getLogger(TestSplitLogManager.class); +103 +104 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); +105 +106 // Start a cluster with 2 masters and 5 regionservers +107 private static final int NUM_MASTERS = 2; +108 private static final int NUM_RS = 5; +109 private static byte[] COLUMN_FAMILY = Bytes.toBytes("family"); +110 +111 @Rule +112 public TestName testName = new TestName(); +113 +114 private TableName tableName; +115 private MiniHBaseCluster cluster; +116 private HMaster master; +117 private Configuration conf; +118 +119 @Rule +120 public TestName name = new TestName(); +121 +122 @BeforeClass +123 public static void setup() throws Exception { +124 // Uncomment the following line if more verbosity is needed for +125 // debugging (see HBASE-12285 for details). +126 // Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); +127 TEST_UTIL.startMiniZKCluster(); +128 TEST_UTIL.startMiniDFSCluster(3); +129 } +130 +131 @AfterClass +132 public static void tearDown() throws Exception { +133 TEST_UTIL.shutdownMiniCluster(); +134 } +135 +136 protected abstract String getWalProvider(); +137 +138 private void startCluster(int numRS) throws Exception { +139 SplitLogCounters.resetCounters(); +140 LOG.info("Starting cluster"); +141 conf.setLong("hbase.splitlog.max.resubmit", 0); +142 // Make the failure test faster +143 conf.setInt("zookeeper.recovery.retry", 0); +144 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); +145 conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing +146 conf.setInt("hbase.regionserver.wal.max.splitters", 3); +147 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); +148 conf.set("hbase.wal.provider", getWalProvider()); +149 TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, numRS); +150 cluster = TEST_UTIL.getHBaseCluster(); +151 LOG.info("Waiting for active/ready master"); +152 cluster.waitForActiveAndReadyMaster(); +153 master = cluster.getMaster(); +154 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { +155 @Override +156 public boolean evaluate() throws Exception { +157 return cluster.getLiveRegionServerThreads().size() >= numRS; +158 } +159 }); +160 } +161 +162 @Before +163 public void before() throws Exception { +164 conf = TEST_UTIL.getConfiguration(); +165 tableName = TableName.valueOf(testName.getMethodName()); +166 } +167 +168 @After +169 public void after() throws Exception { +170 TEST_UTIL.shutdownMiniHBaseCluster(); +171 TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true); +172 ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase"); +173 } +174 +175 @Test +176 public void testRecoveredEdits() throws Exception { +177 conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal +178 startCluster(NUM_RS); +179 +180 int numLogLines = 10000; +181 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); +182 // turn off load balancing to prevent regions from moving around otherwise +183 // they will consume recovered.edits +184 master.balanceSwitch(false); +185 FileSystem fs = master.getMasterFileSystem().getFileSystem(); +186 +187 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); +188 +189 Path rootdir = FSUtils.getRootDir(conf); +190 +191 int numRegions = 50; +192 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); +193 Table t = installTable(zkw, numRegions)) { +194 TableName table = t.getName(); +195 List<RegionInfo> regions = null; +196 HRegionServer hrs = null; +197 for (int i = 0; i < NUM_RS; i++) { +198 hrs = rsts.get(i).getRegionServer(); +199 regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); +200 // At least one RS will have >= to average number of regions. +201 if (regions.size() >= numRegions / NUM_RS) { +202 break; +203 } +204 } +205 Path logDir = new Path(rootdir, +206 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); +207 +208 LOG.info("#regions = " + regions.size()); +209 Iterator<RegionInfo> it = regions.iterator(); +210 while (it.hasNext()) { +211 RegionInfo region = it.next(); +212 if (region.getTable().getNamespaceAsString() +213 .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) { +214 it.remove(); +215 } +216 } +217 +218 makeWAL(hrs, regions, numLogLines, 100); +219 +220 slm.splitLogDistributed(logDir); +221 +222 int count = 0; +223 for (RegionInfo hri : regions) { +224 Path tdir = FSUtils.getTableDir(rootdir, table); +225 @SuppressWarnings("deprecation") +226 Path editsdir = WALSplitter +227 .getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); +228 LOG.debug("checking edits dir " + editsdir); +229 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { +230 @Override +231 public boolean accept(Path p) { +232 if (WALSplitter.isSequenceIdFile(p)) { +233 return false; +234 } +235 return true; +236 } +237 }); +238 assertTrue( +239 "edits dir should have more than a single file in it. instead has " + files.length, +240 files.length > 1); +241 for (int i = 0; i < files.length; i++) { +242 int c = countWAL(files[i].getPath(), fs, conf); +243 count += c; +244 } +245 LOG.info(count + " edits in " + files.length + " recovered edits files."); +246 } +247 +248 // check that the log file is moved +249 assertFalse(fs.exists(logDir)); +250 assertEquals(numLogLines, count); +251 } +252 } +253 +254 @Test +255 public void testMasterStartsUpWithLogSplittingWork() throws Exception { +256 conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1); +257 startCluster(NUM_RS); +258 +259 int numRegionsToCreate = 40; +260 int numLogLines = 1000; +261 // turn off load balancing to prevent regions from moving around otherwise +262 // they will consume recovered.edits +263 master.balanceSwitch(false); +264 +265 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); +266 Table ht = installTable(zkw, numRegionsToCreate)) { +267 HRegionServer hrs = findRSToKill(false); +268 List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); +269 makeWAL(hrs, regions, numLogLines, 100); +270 +271 // abort master +272 abortMaster(cluster); +273 +274 // abort RS +275 LOG.info("Aborting region server: " + hrs.getServerName()); +276 hrs.abort("testing"); +277 +278 // wait for abort completes +279 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { +280 @Override +281 public boolean evaluate() throws Exception { +282 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 1; +283 } +284 }); +285 +286 Thread.sleep(2000); +287 LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size()); +288 +289 // wait for abort completes +290 TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() { +291 @Override +292 public boolean evaluate() throws Exception { +293 return (HBaseTestingUtility.getAllOnlineRegions(cluster) +294 .size() >= (numRegionsToCreate + 1)); +295 } +296 }); +297 +298 LOG.info("Current Open Regions After Master Node Starts Up:" + +299 HBaseTestingUtility.getAllOnlineRegions(cluster).size()); +300 +301 assertEquals(numLogLines, TEST_UTIL.countRows(ht)); +302 } +303 } +304 +305 /** +306 * The original intention of this test was to force an abort of a region server and to make sure +307 * that the failure path in the region servers is properly evaluated. But it is difficult to +308 * ensure that the region server doesn't finish the log splitting before it aborts. Also now, +309 * there is this code path where the master will preempt the region server when master detects +310 * that the region server has aborted. +311 * @throws Exception +312 */ +313 // Was marked flaky before Distributed Log Replay cleanup. +314 @Test +315 public void testWorkerAbort() throws Exception { +316 LOG.info("testWorkerAbort"); +317 startCluster(3); +318 int numLogLines = 10000; +319 SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); +320 FileSystem fs = master.getMasterFileSystem().getFileSystem(); +321 +322 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); +323 HRegionServer hrs = findRSToKill(false); +324 Path rootdir = FSUtils.getRootDir(conf); +325 final Path logDir = new Path(rootdir, +326 AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); +327 +328 try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); +329 Table t = installTable(zkw, 40)) { +330 makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); +331 +332 new Thread() { +333 @Override +334 public void run() { +335 try { +336 waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); +337 } catch (InterruptedException e) { +338 } +339 for (RegionServerThread rst : rsts) { +340 rst.getRegionServer().abort("testing"); +341 break; +342 } +343 } +344 }.start(); +345 FileStatus[] logfiles = fs.listStatus(logDir); +346 TaskBatch batch = new TaskBatch(); +347 slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); +348 // waitForCounter but for one of the 2 counters +349 long curt = System.currentTimeMillis(); +350 long waitTime = 80000; +351 long endt = curt + waitTime; +352 while (curt < endt) { +353 if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + +354 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + +355 tot_wkr_preempt_task.sum()) == 0) { +356 Thread.sleep(100); +357 curt = System.currentTimeMillis(); +358 } else { +359 assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() + +360 tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() + +361 tot_wkr_preempt_task.sum())); +362 return; +363 } +364 } +365 fail("none of the following counters went up in " + waitTime + " milliseconds - " + +366 "tot_wkr_task_resigned, tot_wkr_task_err, " + +367 "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task"); +368 } +369 } +370 +371 @Test +372 public void testThreeRSAbort() throws Exception { +373 LOG.info("testThreeRSAbort"); +374 int numRegionsToCreate = 40; +375 int numRowsPerRegion = 100; +376 +377 startCluster(NUM_RS); // NUM_RS=6. +378 +379 try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); +380 Table table = installTable(zkw, numRegionsToCreate)) { +381 populateDataInTable(numRowsPerRegion); +382 +383 List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); +384 assertEquals(NUM_RS, rsts.size()); +385 cluster.killRegionServer(rsts.get(0).getRegionServer().getServerName()); +386 cluster.killRegionServer(rsts.get(1).getRegionServer().getServerName()); +387 cluster.killRegionServer(rsts.get(2).getRegionServer().getServerName()); +388 +389 TEST_UTIL.waitFor(60000, new Waiter.ExplainingPredicate<Exception>() { +390 +391 @Override +392 public boolean evaluate() throws Exception { +393 return cluster.getLiveRegionServerThreads().size() <= NUM_RS - 3; +394 } +395 +396 @Override