Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E159D18FED for ; Wed, 3 Feb 2016 17:25:45 +0000 (UTC) Received: (qmail 63852 invoked by uid 500); 3 Feb 2016 17:25:35 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 63202 invoked by uid 500); 3 Feb 2016 17:25:35 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 62773 invoked by uid 99); 3 Feb 2016 17:25:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Feb 2016 17:25:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C453CE3838; Wed, 3 Feb 2016 17:25:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Wed, 03 Feb 2016 17:25:40 -0000 Message-Id: In-Reply-To: <5e88346a0cd846a89e55c501e7ae5585@git.apache.org> References: <5e88346a0cd846a89e55c501e7ae5585@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/51] [partial] hbase-site git commit: Published site at 2f5767376f42c0416e025df412e3d5944a1b2a67. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6a13df3e/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.TestStep.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.TestStep.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.TestStep.html index 9cddc0a..1240e3e 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.TestStep.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.TestStep.html @@ -27,8 +27,8 @@ 019import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 021import static org.junit.Assert.assertEquals; -022import static org.junit.Assert.assertTrue; -023import static org.junit.Assert.assertNull; +022import static org.junit.Assert.assertNull; +023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; @@ -61,19 +61,19 @@ 053import org.apache.hadoop.hbase.client.Durability; 054import org.apache.hadoop.hbase.client.Get; 055import org.apache.hadoop.hbase.client.Increment; -056import org.apache.hadoop.hbase.client.Mutation; -057import org.apache.hadoop.hbase.client.Put; -058import org.apache.hadoop.hbase.client.Result; -059import org.apache.hadoop.hbase.client.RowMutations; -060import org.apache.hadoop.hbase.client.Scan; -061import org.apache.hadoop.hbase.filter.BinaryComparator; -062import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -063import org.apache.hadoop.hbase.io.HeapSize; -064import org.apache.hadoop.hbase.io.hfile.BlockCache; -065import org.apache.hadoop.hbase.testclassification.MediumTests; -066import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; -067import org.apache.hadoop.hbase.util.Bytes; -068import org.apache.hadoop.hbase.util.Threads; +056import org.apache.hadoop.hbase.client.IsolationLevel; +057import org.apache.hadoop.hbase.client.Mutation; +058import org.apache.hadoop.hbase.client.Put; +059import org.apache.hadoop.hbase.client.Result; +060import org.apache.hadoop.hbase.client.RowMutations; +061import org.apache.hadoop.hbase.client.Scan; +062import org.apache.hadoop.hbase.filter.BinaryComparator; +063import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +064import org.apache.hadoop.hbase.io.HeapSize; +065import org.apache.hadoop.hbase.io.hfile.BlockCache; +066import org.apache.hadoop.hbase.testclassification.MediumTests; +067import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; +068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.wal.WAL; 070import org.junit.After; 071import org.junit.Before; @@ -191,535 +191,539 @@ 183 */ 184 @Test 185 public void testIncrementMultiThreads() throws IOException { -186 LOG.info("Starting test testIncrementMultiThreads"); -187 // run a with mixed column families (1 and 3 versions) -188 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); -189 -190 // create 25 threads, each will increment by its own quantity -191 int numThreads = 25; -192 int incrementsPerThread = 1000; -193 Incrementer[] all = new Incrementer[numThreads]; -194 int expectedTotal = 0; -195 // create all threads -196 for (int i = 0; i < numThreads; i++) { -197 all[i] = new Incrementer(region, i, i, incrementsPerThread); -198 expectedTotal += (i * incrementsPerThread); -199 } -200 -201 // run all threads -202 for (int i = 0; i < numThreads; i++) { -203 all[i].start(); -204 } -205 -206 // wait for all threads to finish -207 for (int i = 0; i < numThreads; i++) { -208 try { -209 all[i].join(); -210 } catch (InterruptedException e) { -211 LOG.info("Ignored", e); -212 } -213 } -214 assertICV(row, fam1, qual1, expectedTotal); -215 assertICV(row, fam1, qual2, expectedTotal*2); -216 assertICV(row, fam2, qual3, expectedTotal*3); -217 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); -218 } -219 -220 -221 private void assertICV(byte [] row, -222 byte [] familiy, -223 byte[] qualifier, -224 long amount) throws IOException { -225 // run a get and see? -226 Get get = new Get(row); -227 get.addColumn(familiy, qualifier); -228 Result result = region.get(get); -229 assertEquals(1, result.size()); -230 -231 Cell kv = result.rawCells()[0]; -232 long r = Bytes.toLong(CellUtil.cloneValue(kv)); -233 assertEquals(amount, r); -234 } -235 -236 private void initHRegion (byte [] tableName, String callingMethod, -237 byte[] ... families) -238 throws IOException { -239 initHRegion(tableName, callingMethod, null, families); -240 } -241 -242 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions, -243 byte[] ... families) -244 throws IOException { -245 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); -246 int i=0; -247 for(byte [] family : families) { -248 HColumnDescriptor hcd = new HColumnDescriptor(family); -249 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); -250 htd.addFamily(hcd); -251 } -252 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); -253 region = TEST_UTIL.createLocalHRegion(info, htd); -254 } -255 -256 /** -257 * A thread that makes a few increment calls -258 */ -259 public static class Incrementer extends Thread { -260 -261 private final Region region; -262 private final int numIncrements; -263 private final int amount; -264 +186 boolean fast = true; +187 LOG.info("Starting test testIncrementMultiThreads"); +188 // run a with mixed column families (1 and 3 versions) +189 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); +190 +191 // Create 100 threads, each will increment by its own quantity. All 100 threads update the +192 // same row over two column families. +193 int numThreads = 100; +194 int incrementsPerThread = 1000; +195 Incrementer[] all = new Incrementer[numThreads]; +196 int expectedTotal = 0; +197 // create all threads +198 for (int i = 0; i < numThreads; i++) { +199 all[i] = new Incrementer(region, i, i, incrementsPerThread); +200 expectedTotal += (i * incrementsPerThread); +201 } +202 +203 // run all threads +204 for (int i = 0; i < numThreads; i++) { +205 all[i].start(); +206 } +207 +208 // wait for all threads to finish +209 for (int i = 0; i < numThreads; i++) { +210 try { +211 all[i].join(); +212 } catch (InterruptedException e) { +213 LOG.info("Ignored", e); +214 } +215 } +216 assertICV(row, fam1, qual1, expectedTotal, fast); +217 assertICV(row, fam1, qual2, expectedTotal*2, fast); +218 assertICV(row, fam2, qual3, expectedTotal*3, fast); +219 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); +220 } +221 +222 +223 private void assertICV(byte [] row, +224 byte [] familiy, +225 byte[] qualifier, +226 long amount, +227 boolean fast) throws IOException { +228 // run a get and see? +229 Get get = new Get(row); +230 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); +231 get.addColumn(familiy, qualifier); +232 Result result = region.get(get); +233 assertEquals(1, result.size()); +234 +235 Cell kv = result.rawCells()[0]; +236 long r = Bytes.toLong(CellUtil.cloneValue(kv)); +237 assertEquals(amount, r); +238 } +239 +240 private void initHRegion (byte [] tableName, String callingMethod, +241 byte[] ... families) +242 throws IOException { +243 initHRegion(tableName, callingMethod, null, families); +244 } +245 +246 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions, +247 byte[] ... families) +248 throws IOException { +249 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); +250 int i=0; +251 for(byte [] family : families) { +252 HColumnDescriptor hcd = new HColumnDescriptor(family); +253 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); +254 htd.addFamily(hcd); +255 } +256 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); +257 region = TEST_UTIL.createLocalHRegion(info, htd); +258 } +259 +260 /** +261 * A thread that makes increment calls always on the same row, this.row against two column +262 * families on this row. +263 */ +264 public static class Incrementer extends Thread { 265 -266 public Incrementer(Region region, -267 int threadNumber, int amount, int numIncrements) { -268 super("incrementer." + threadNumber); -269 this.region = region; -270 this.numIncrements = numIncrements; -271 this.amount = amount; -272 setDaemon(true); -273 } -274 -275 @Override -276 public void run() { -277 for (int i = 0; i < numIncrements; i++) { -278 try { -279 Increment inc = new Increment(row); -280 inc.addColumn(fam1, qual1, amount); -281 inc.addColumn(fam1, qual2, amount*2); -282 inc.addColumn(fam2, qual3, amount*3); -283 inc.setDurability(Durability.ASYNC_WAL); -284 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); -285 -286 // verify: Make sure we only see completed increments -287 Get g = new Get(row); -288 Result result = region.get(g); +266 private final Region region; +267 private final int numIncrements; +268 private final int amount; +269 +270 +271 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { +272 super("Incrementer." + threadNumber); +273 this.region = region; +274 this.numIncrements = numIncrements; +275 this.amount = amount; +276 setDaemon(true); +277 } +278 +279 @Override +280 public void run() { +281 for (int i = 0; i < numIncrements; i++) { +282 try { +283 Increment inc = new Increment(row); +284 inc.addColumn(fam1, qual1, amount); +285 inc.addColumn(fam1, qual2, amount*2); +286 inc.addColumn(fam2, qual3, amount*3); +287 inc.setDurability(Durability.ASYNC_WAL); +288 Result result = region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); 289 if (result != null) { -290 assertTrue(result.getValue(fam1, qual1) != null); -291 assertTrue(result.getValue(fam1, qual2) != null); -292 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, -293 Bytes.toLong(result.getValue(fam1, qual2))); -294 assertTrue(result.getValue(fam2, qual3) != null); -295 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, -296 Bytes.toLong(result.getValue(fam2, qual3))); -297 } -298 } catch (IOException e) { -299 e.printStackTrace(); -300 } -301 } -302 } -303 } -304 -305 @Test -306 public void testAppendMultiThreads() throws IOException { -307 LOG.info("Starting test testAppendMultiThreads"); -308 // run a with mixed column families (1 and 3 versions) -309 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); -310 -311 int numThreads = 100; -312 int opsPerThread = 100; -313 AtomicOperation[] all = new AtomicOperation[numThreads]; -314 final byte[] val = new byte[]{1}; -315 -316 AtomicInteger failures = new AtomicInteger(0); -317 // create all threads -318 for (int i = 0; i < numThreads; i++) { -319 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { -320 @Override -321 public void run() { -322 for (int i=0; i<numOps; i++) { -323 try { -324 Append a = new Append(row); -325 a.add(fam1, qual1, val); -326 a.add(fam1, qual2, val); -327 a.add(fam2, qual3, val); -328 a.setDurability(Durability.ASYNC_WAL); -329 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); -330 -331 Get g = new Get(row); -332 Result result = region.get(g); -333 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); -334 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); -335 } catch (IOException e) { -336 e.printStackTrace(); -337 failures.incrementAndGet(); -338 fail(); -339 } -340 } -341 } -342 }; -343 } -344 -345 // run all threads -346 for (int i = 0; i < numThreads; i++) { -347 all[i].start(); -348 } -349 -350 // wait for all threads to finish -351 for (int i = 0; i < numThreads; i++) { -352 try { -353 all[i].join(); -354 } catch (InterruptedException e) { -355 } -356 } -357 assertEquals(0, failures.get()); -358 Get g = new Get(row); -359 Result result = region.get(g); -360 assertEquals(result.getValue(fam1, qual1).length, 10000); -361 assertEquals(result.getValue(fam1, qual2).length, 10000); -362 assertEquals(result.getValue(fam2, qual3).length, 10000); -363 } -364 /** -365 * Test multi-threaded row mutations. -366 */ -367 @Test -368 public void testRowMutationMultiThreads() throws IOException { -369 LOG.info("Starting test testRowMutationMultiThreads"); -370 initHRegion(tableName, name.getMethodName(), fam1); -371 -372 // create 10 threads, each will alternate between adding and -373 // removing a column -374 int numThreads = 10; -375 int opsPerThread = 250; -376 AtomicOperation[] all = new AtomicOperation[numThreads]; -377 -378 AtomicLong timeStamps = new AtomicLong(0); -379 AtomicInteger failures = new AtomicInteger(0); -380 // create all threads -381 for (int i = 0; i < numThreads; i++) { -382 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { -383 @Override -384 public void run() { -385 boolean op = true; -386 for (int i=0; i<numOps; i++) { -387 try { -388 // throw in some flushes -389 if (i%10==0) { -390 synchronized(region) { -391 LOG.debug("flushing"); -392 region.flush(true); -393 if (i%100==0) { -394 region.compact(false); -395 } -396 } -397 } -398 long ts = timeStamps.incrementAndGet(); -399 RowMutations rm = new RowMutations(row); -400 if (op) { -401 Put p = new Put(row, ts); -402 p.addColumn(fam1, qual1, value1); -403 p.setDurability(Durability.ASYNC_WAL); -404 rm.add(p); -405 Delete d = new Delete(row); -406 d.addColumns(fam1, qual2, ts); -407 d.setDurability(Durability.ASYNC_WAL); -408 rm.add(d); -409 } else { -410 Delete d = new Delete(row); -411 d.addColumns(fam1, qual1, ts); -412 d.setDurability(Durability.ASYNC_WAL); -413 rm.add(d); -414 Put p = new Put(row, ts); -415 p.addColumn(fam1, qual2, value2); -416 p.setDurability(Durability.ASYNC_WAL); -417 rm.add(p); -418 } -419 region.mutateRow(rm); -420 op ^= true; -421 // check: should always see exactly one column -422 Get g = new Get(row); -423 Result r = region.get(g); -424 if (r.size() != 1) { -425 LOG.debug(r); -426 failures.incrementAndGet(); -427 fail(); -428 } -429 } catch (IOException e) { -430 e.printStackTrace(); -431 failures.incrementAndGet(); -432 fail(); -433 } -434 } -435 } -436 }; -437 } -438 -439 // run all threads -440 for (int i = 0; i < numThreads; i++) { -441 all[i].start(); -442 } -443 -444 // wait for all threads to finish -445 for (int i = 0; i < numThreads; i++) { -446 try { -447 all[i].join(); -448 } catch (InterruptedException e) { -449 } -450 } -451 assertEquals(0, failures.get()); -452 } -453 -454 -455 /** -456 * Test multi-threaded region mutations. -457 */ -458 @Test -459 public void testMultiRowMutationMultiThreads() throws IOException { -460 -461 LOG.info("Starting test testMultiRowMutationMultiThreads"); -462 initHRegion(tableName, name.getMethodName(), fam1); -463 -464 // create 10 threads, each will alternate between adding and -465 // removing a column -466 int numThreads = 10; -467 int opsPerThread = 250; -468 AtomicOperation[] all = new AtomicOperation[numThreads]; -469 -470 AtomicLong timeStamps = new AtomicLong(0); -471 AtomicInteger failures = new AtomicInteger(0); -472 final List<byte[]> rowsToLock = Arrays.asList(row, row2); -473 // create all threads -474 for (int i = 0; i < numThreads; i++) { -475 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { -476 @Override -477 public void run() { -478 boolean op = true; -479 for (int i=0; i<numOps; i++) { -480 try { -481 // throw in some flushes -482 if (i%10==0) { -483 synchronized(region) { -484 LOG.debug("flushing"); -485 region.flush(true); -486 if (i%100==0) { -487 region.compact(false); -488 } -489 } -490 } -491 long ts = timeStamps.incrementAndGet(); -492 List<Mutation> mrm = new ArrayList<Mutation>(); -493 if (op) { -494 Put p = new Put(row2, ts); -495 p.addColumn(fam1, qual1, value1); -496 p.setDurability(Durability.ASYNC_WAL); -497 mrm.add(p); -498 Delete d = new Delete(row); -499 d.addColumns(fam1, qual1, ts); -500 d.setDurability(Durability.ASYNC_WAL); -501 mrm.add(d); -502 } else { -503 Delete d = new Delete(row2); -504 d.addColumns(fam1, qual1, ts); -505 d.setDurability(Durability.ASYNC_WAL); -506 mrm.add(d); -507 Put p = new Put(row, ts); -508 p.setDurability(Durability.ASYNC_WAL); -509 p.addColumn(fam1, qual1, value2); -510 mrm.add(p); -511 } -512 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); -513 op ^= true; -514 // check: should always see exactly one column -515 Scan s = new Scan(row); -516 RegionScanner rs = region.getScanner(s); -517 List<Cell> r = new ArrayList<Cell>(); -518 while (rs.next(r)) -519 ; -520 rs.close(); -521 if (r.size() != 1) { -522 LOG.debug(r); -523 failures.incrementAndGet(); -524 fail(); -525 } -526 } catch (IOException e) { -527 e.printStackTrace(); -528 failures.incrementAndGet(); -529 fail(); -530 } -531 } -532 } -533 }; -534 } -535 -536 // run all threads -537 for (int i = 0; i < numThreads; i++) { -538 all[i].start(); -539 } -540 -541 // wait for all threads to finish -542 for (int i = 0; i < numThreads; i++) { -543 try { -544 all[i].join(); -545 } catch (InterruptedException e) { -546 } -547 } -548 assertEquals(0, failures.get()); -549 } -550 -551 public static class AtomicOperation extends Thread { -552 protected final Region region; -553 protected final int numOps; -554 protected final AtomicLong timeStamps; -555 protected final AtomicInteger failures; -556 protected final Random r = new Random(); -557 -558 public AtomicOperation(Region region, int numOps, AtomicLong timeStamps, -559 AtomicInteger failures) { -560 this.region = region; -561 this.numOps = numOps; -562 this.timeStamps = timeStamps; -563 this.failures = failures; -564 } -565 } -566 -567 private static CountDownLatch latch = new CountDownLatch(1); -568 private enum TestStep { -569 INIT, // initial put of 10 to set value of the cell -570 PUT_STARTED, // began doing a put of 50 to cell -571 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC). -572 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11 -573 CHECKANDPUT_COMPLETED // completed checkAndPut -574 // NOTE: at the end of these steps, the value of the cell should be 50, not 11! -575 } -576 private static volatile TestStep testStep = TestStep.INIT; -577 private final String family = "f1"; -578 -579 /** -580 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read -581 * MVCC. -582 * -583 * Moved into TestAtomicOperation from its original location, TestHBase7051 -584 */ -585 @Test -586 public void testPutAndCheckAndPutInParallel() throws Exception { -587 final String tableName = "testPutAndCheckAndPut"; -588 Configuration conf = TEST_UTIL.getConfiguration(); -589 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class); -590 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)) -591 .addFamily(new HColumnDescriptor(family)); -592 this.region = TEST_UTIL.createLocalHRegion(htd, null, null); -593 Put[] puts = new Put[1]; -594 Put put = new Put(Bytes.toBytes("r1")); -595 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10")); -596 puts[0] = put; -597 -598 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE); -599 MultithreadedTestUtil.TestContext ctx = -600 new MultithreadedTestUtil.TestContext(conf); -601 ctx.addThread(new PutThread(ctx, region)); -602 ctx.addThread(new CheckAndPutThread(ctx, region)); -603 ctx.startThreads(); -604 while (testStep != TestStep.CHECKANDPUT_COMPLETED) { -605 Thread.sleep(100); -606 } -607 ctx.stop(); -608 Scan s = new Scan(); -609 RegionScanner scanner = region.getScanner(s); -610 List<Cell> results = new ArrayList<Cell>(); -611 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build(); -612 scanner.next(results, scannerContext); -613 for (Cell keyValue : results) { -614 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue))); -615 } -616 } -617 -618 private class PutThread extends TestThread { -619 private Region region; -620 PutThread(TestContext ctx, Region region) { -621 super(ctx); -622 this.region = region; -623 } -624 -625 public void doWork() throws Exception { -626 Put[] puts = new Put[1]; -627 Put put = new Put(Bytes.toBytes("r1")); -628 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50")); -629 puts[0] = put; -630 testStep = TestStep.PUT_STARTED; -631 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE); -632 } -633 } -634 -635 private class CheckAndPutThread extends TestThread { -636 private Region region; -637 CheckAndPutThread(TestContext ctx, Region region) { -638 super(ctx); -639 this.region = region; -640 } -641 -642 public void doWork() throws Exception { -643 Put[] puts = new Put[1]; -644 Put put = new Put(Bytes.toBytes("r1")); -645 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11")); -646 puts[0] = put; -647 while (testStep != TestStep.PUT_COMPLETED) { -648 Thread.sleep(100); -649 } -650 testStep = TestStep.CHECKANDPUT_STARTED; -651 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), -652 CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true); -653 testStep = TestStep.CHECKANDPUT_COMPLETED; -654 } -655 } -656 -657 public static class MockHRegion extends HRegion { -658 -659 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, -660 final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) { -661 super(tableDir, log, fs, conf, regionInfo, htd, rsServices); -662 } -663 -664 @Override -665 public RowLock getRowLock(final byte[] row, boolean readLock) throws IOException { -666 if (testStep == TestStep.CHECKANDPUT_STARTED) { -667 latch.countDown(); -668 } -669 return new WrappedRowLock(super.getRowLock(row, readLock)); -670 } -671 -672 public class WrappedRowLock implements RowLock { -673 -674 private final RowLock rowLock; -675 -676 private WrappedRowLock(RowLock rowLock) { -677 this.rowLock = rowLock; -678 } +290 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, +291 Bytes.toLong(result.getValue(fam1, qual2))); +292 assertTrue(result.getValue(fam2, qual3) != null); +293 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, +294 Bytes.toLong(result.getValue(fam2, qual3))); +295 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, +296 Bytes.toLong(result.getValue(fam1, qual2))); +297 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3; +298 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); +299 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, +300 fam1Increment, fam2Increment); +301 } +302 } catch (IOException e) { +303 e.printStackTrace(); +304 } +305 } +306 } +307 } +308 +309 @Test +310 public void testAppendMultiThreads() throws IOException { +311 LOG.info("Starting test testAppendMultiThreads"); +312 // run a with mixed column families (1 and 3 versions) +313 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); +314 +315 int numThreads = 100; +316 int opsPerThread = 100; +317 AtomicOperation[] all = new AtomicOperation[numThreads]; +318 final byte[] val = new byte[]{1}; +319 +320 AtomicInteger failures = new AtomicInteger(0); +321 // create all threads +322 for (int i = 0; i < numThreads; i++) { +323 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { +324 @Override +325 public void run() { +326 for (int i=0; i<numOps; i++) { +327 try { +328 Append a = new Append(row); +329 a.add(fam1, qual1, val); +330 a.add(fam1, qual2, val); +331 a.add(fam2, qual3, val); +332 a.setDurability(Durability.ASYNC_WAL); +333 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); +334 +335 Get g = new Get(row); +336 Result result = region.get(g); +337 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); +338 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); +339 } catch (IOException e) { +340 e.printStackTrace(); +341 failures.incrementAndGet(); +342 fail(); +343 } +344 } +345 } +346 }; +347 } +348 +349 // run all threads +350 for (int i = 0; i < numThreads; i++) { +351 all[i].start(); +352 } +353 +354 // wait for all threads to finish +355 for (int i = 0; i < numThreads; i++) { +356 try { +357 all[i].join(); +358 } catch (InterruptedException e) { +359 } +360 } +361 assertEquals(0, failures.get()); +362 Get g = new Get(row); +363 Result result = region.get(g); +364 assertEquals(result.getValue(fam1, qual1).length, 10000); +365 assertEquals(result.getValue(fam1, qual2).length, 10000); +366 assertEquals(result.getValue(fam2, qual3).length, 10000); +367 } +368 /** +369 * Test multi-threaded row mutations. +370 */ +371 @Test +372 public void testRowMutationMultiThreads() throws IOException { +373 LOG.info("Starting test testRowMutationMultiThreads"); +374 initHRegion(tableName, name.getMethodName(), fam1); +375 +376 // create 10 threads, each will alternate between adding and +377 // removing a column +378 int numThreads = 10; +379 int opsPerThread = 250; +380 AtomicOperation[] all = new AtomicOperation[numThreads]; +381 +382 AtomicLong timeStamps = new AtomicLong(0); +383 AtomicInteger failures = new AtomicInteger(0); +384 // create all threads +385 for (int i = 0; i < numThreads; i++) { +386 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { +387 @Override +388 public void run() { +389 boolean op = true; +390 for (int i=0; i<numOps; i++) { +391 try { +392 // throw in some flushes +393 if (i%10==0) { +394 synchronized(region) { +395 LOG.debug("flushing"); +396 region.flush(true); +397 if (i%100==0) { +398 region.compact(false); +399 } +400 } +401 } +402 long ts = timeStamps.incrementAndGet(); +403 RowMutations rm = new RowMutations(row); +404 if (op) { +405 Put p = new Put(row, ts); +406 p.addColumn(fam1, qual1, value1); +407 p.setDurability(Durability.ASYNC_WAL); +408 rm.add(p); +409 Delete d = new Delete(row); +410 d.addColumns(fam1, qual2, ts); +411 d.setDurability(Durability.ASYNC_WAL); +412 rm.add(d); +413 } else { +414 Delete d = new Delete(row); +415 d.addColumns(fam1, qual1, ts); +416 d.setDurability(Durability.ASYNC_WAL); +417 rm.add(d); +418 Put p = new Put(row, ts); +419 p.addColumn(fam1, qual2, value2); +420 p.setDurability(Durability.ASYNC_WAL); +421 rm.add(p); +422 } +423 region.mutateRow(rm); +424 op ^= true; +425 // check: should always see exactly one column +426 Get g = new Get(row); +427 Result r = region.get(g); +428 if (r.size() != 1) { +429 LOG.debug(r); +430 failures.incrementAndGet(); +431 fail(); +432 } +433 } catch (IOException e) { +434 e.printStackTrace(); +435 failures.incrementAndGet(); +436 fail(); +437 } +438 } +439 } +440 }; +441 } +442 +443 // run all threads +444 for (int i = 0; i < numThreads; i++) { +445 all[i].start(); +446 } +447 +448 // wait for all threads to finish +449 for (int i = 0; i < numThreads; i++) { +450 try { +451 all[i].join(); +452 } catch (InterruptedException e) { +453 } +454 } +455 assertEquals(0, failures.get()); +456 } +457 +458 +459 /** +460 * Test multi-threaded region mutations. +461 */ +462 @Test +463 public void testMultiRowMutationMultiThreads() throws IOException { +464 +465 LOG.info("Starting test testMultiRowMutationMultiThreads"); +466 initHRegion(tableName, name.getMethodName(), fam1); +467 +468 // create 10 threads, each will alternate between adding and +469 // removing a column +470 int numThreads = 10; +471 int opsPerThread = 250; +472 AtomicOperation[] all = new AtomicOperation[numThreads]; +473 +474 AtomicLong timeStamps = new AtomicLong(0); +475 AtomicInteger failures = new AtomicInteger(0); +476 final List<byte[]> rowsToLock = Arrays.asList(row, row2); +477 // create all threads +478 for (int i = 0; i < numThreads; i++) { +479 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { +480 @Override +481 public void run() { +482 boolean op = true; +483 for (int i=0; i<numOps; i++) { +484 try { +485 // throw in some flushes +486 if (i%10==0) { +487 synchronized(region) { +488 LOG.debug("flushing"); +489 region.flush(true); +490 if (i%100==0) { +491 region.compact(false); +492 } +493 } +494 } +495 long ts = timeStamps.incrementAndGet(); +496 List<Mutation> mrm = new ArrayList<Mutation>(); +497 if (op) { +498 Put p = new Put(row2, ts); +499 p.addColumn(fam1, qual1, value1); +500 p.setDurability(Durability.ASYNC_WAL); +501 mrm.add(p); +502 Delete d = new Delete(row); +503 d.addColumns(fam1, qual1, ts); +504 d.setDurability(Durability.ASYNC_WAL); +505 mrm.add(d); +506 } else { +507 Delete d = new Delete(row2); +508 d.addColumns(fam1, qual1, ts); +509 d.setDurability(Durability.ASYNC_WAL); +510 mrm.add(d); +511 Put p = new Put(row, ts); +512 p.setDurability(Durability.ASYNC_WAL); +513 p.addColumn(fam1, qual1, value2); +514 mrm.add(p); +515 } +516 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); +517 op ^= true; +518 // check: should always see exactly one column +519 Scan s = new Scan(row); +520 RegionScanner rs = region.getScanner(s); +521 List<Cell> r = new ArrayList<Cell>(); +522 while (rs.next(r)) +523 ; +524 rs.close(); +525 if (r.size() != 1) { +526 LOG.debug(r); +527 failures.incrementAndGet(); +528 fail(); +529 } +530 } catch (IOException e) { +531 e.printStackTrace(); +532 failures.incrementAndGet(); +533 fail(); +534 } +535 } +536 } +537 }; +538 } +539 +540 // run all threads +541 for (int i = 0; i < numThreads; i++) { +542 all[i].start(); +543 } +544 +545 // wait for all threads to finish +546 for (int i = 0; i < numThreads; i++) { +547 try { +548 all[i].join(); +549 } catch (InterruptedException e) { +550 } +551 } +552 assertEquals(0, failures.get()); +553 } +554 +555 public static class AtomicOperation extends Thread { +556 protected final Region region; +557 protected final int numOps; +558 protected final AtomicLong timeStamps; +559 protected final AtomicInteger failures; +560 protected final Random r = new Random(); +561 +562 public AtomicOperation(Region region, int numOps, AtomicLong timeStamps, +563 AtomicInteger failures) { +564 this.region = region; +565 this.numOps = numOps; +566 this.timeStamps = timeStamps; +567 this.failures = failures; +568 } +569 } +570 +571 private static CountDownLatch latch = new CountDownLatch(1); +572 private enum TestStep { +573 INIT, // initial put of 10 to set value of the cell +574 PUT_STARTED, // began doing a put of 50 to cell +575 PUT_COMPLETED, // put complete (released RowLock, but may not have advan