Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B6C0E18B56 for ; Fri, 12 Feb 2016 21:25:28 +0000 (UTC) Received: (qmail 13805 invoked by uid 500); 12 Feb 2016 21:25:27 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 13490 invoked by uid 500); 12 Feb 2016 21:25:27 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 12982 invoked by uid 99); 12 Feb 2016 21:25:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Feb 2016 21:25:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04FA4E0231; Fri, 12 Feb 2016 21:25:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Fri, 12 Feb 2016 21:25:30 -0000 Message-Id: <591aac3b093e40c4a24eba40f0195750@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/51] [partial] hbase-site git commit: Published site at 85e1d9a109341c5f4aabb0e82c96ab52e99a6d72. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/526c7822/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestAsyncProcess.AsyncProcessWithFailure.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestAsyncProcess.AsyncProcessWithFailure.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestAsyncProcess.AsyncProcessWithFailure.html index 104ec8d..e1a5218 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestAsyncProcess.AsyncProcessWithFailure.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestAsyncProcess.AsyncProcessWithFailure.html @@ -399,734 +399,739 @@ 391 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { 392 return new RegionLocations(loc1); 393 } -394 } -395 -396 /** -397 * Returns our async process. -398 */ -399 static class MyConnectionImpl2 extends MyConnectionImpl { -400 List<HRegionLocation> hrl; -401 final boolean usedRegions[]; -402 -403 protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException { -404 super(conf); -405 this.hrl = hrl; -406 this.usedRegions = new boolean[hrl.size()]; -407 } -408 -409 @Override -410 public RegionLocations locateRegion(TableName tableName, -411 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { -412 int i = 0; -413 for (HRegionLocation hr : hrl){ -414 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { -415 usedRegions[i] = true; -416 return new RegionLocations(hr); -417 } -418 i++; -419 } -420 return null; -421 } -422 } -423 -424 @Test -425 public void testSubmit() throws Exception { -426 ClusterConnection hc = createHConnection(); -427 AsyncProcess ap = new MyAsyncProcess(hc, conf); +394 +395 @Override +396 public boolean hasCellBlockSupport() { +397 return false; +398 } +399 } +400 +401 /** +402 * Returns our async process. +403 */ +404 static class MyConnectionImpl2 extends MyConnectionImpl { +405 List<HRegionLocation> hrl; +406 final boolean usedRegions[]; +407 +408 protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException { +409 super(conf); +410 this.hrl = hrl; +411 this.usedRegions = new boolean[hrl.size()]; +412 } +413 +414 @Override +415 public RegionLocations locateRegion(TableName tableName, +416 byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { +417 int i = 0; +418 for (HRegionLocation hr : hrl){ +419 if (Arrays.equals(row, hr.getRegionInfo().getStartKey())) { +420 usedRegions[i] = true; +421 return new RegionLocations(hr); +422 } +423 i++; +424 } +425 return null; +426 } +427 } 428 -429 List<Put> puts = new ArrayList<Put>(); -430 puts.add(createPut(1, true)); -431 -432 ap.submit(DUMMY_TABLE, puts, false, null, false); -433 Assert.assertTrue(puts.isEmpty()); -434 } -435 -436 @Test -437 public void testSubmitWithCB() throws Exception { -438 ClusterConnection hc = createHConnection(); -439 final AtomicInteger updateCalled = new AtomicInteger(0); -440 Batch.Callback<Object> cb = new Batch.Callback<Object>() { -441 @Override -442 public void update(byte[] region, byte[] row, Object result) { -443 updateCalled.incrementAndGet(); -444 } -445 }; -446 AsyncProcess ap = new MyAsyncProcess(hc, conf); -447 -448 List<Put> puts = new ArrayList<Put>(); -449 puts.add(createPut(1, true)); -450 -451 final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); -452 Assert.assertTrue(puts.isEmpty()); -453 ars.waitUntilDone(); -454 Assert.assertEquals(updateCalled.get(), 1); -455 } -456 -457 @Test -458 public void testSubmitBusyRegion() throws Exception { -459 ClusterConnection hc = createHConnection(); -460 AsyncProcess ap = new MyAsyncProcess(hc, conf); +429 @Test +430 public void testSubmit() throws Exception { +431 ClusterConnection hc = createHConnection(); +432 AsyncProcess ap = new MyAsyncProcess(hc, conf); +433 +434 List<Put> puts = new ArrayList<Put>(); +435 puts.add(createPut(1, true)); +436 +437 ap.submit(DUMMY_TABLE, puts, false, null, false); +438 Assert.assertTrue(puts.isEmpty()); +439 } +440 +441 @Test +442 public void testSubmitWithCB() throws Exception { +443 ClusterConnection hc = createHConnection(); +444 final AtomicInteger updateCalled = new AtomicInteger(0); +445 Batch.Callback<Object> cb = new Batch.Callback<Object>() { +446 @Override +447 public void update(byte[] region, byte[] row, Object result) { +448 updateCalled.incrementAndGet(); +449 } +450 }; +451 AsyncProcess ap = new MyAsyncProcess(hc, conf); +452 +453 List<Put> puts = new ArrayList<Put>(); +454 puts.add(createPut(1, true)); +455 +456 final AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, cb, false); +457 Assert.assertTrue(puts.isEmpty()); +458 ars.waitUntilDone(); +459 Assert.assertEquals(updateCalled.get(), 1); +460 } 461 -462 List<Put> puts = new ArrayList<Put>(); -463 puts.add(createPut(1, true)); -464 -465 ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); -466 ap.submit(DUMMY_TABLE, puts, false, null, false); -467 Assert.assertEquals(puts.size(), 1); -468 -469 ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn); -470 ap.submit(DUMMY_TABLE, puts, false, null, false); -471 Assert.assertEquals(0, puts.size()); -472 } +462 @Test +463 public void testSubmitBusyRegion() throws Exception { +464 ClusterConnection hc = createHConnection(); +465 AsyncProcess ap = new MyAsyncProcess(hc, conf); +466 +467 List<Put> puts = new ArrayList<Put>(); +468 puts.add(createPut(1, true)); +469 +470 ap.incTaskCounters(Arrays.asList(hri1.getRegionName()), sn); +471 ap.submit(DUMMY_TABLE, puts, false, null, false); +472 Assert.assertEquals(puts.size(), 1); 473 -474 -475 @Test -476 public void testSubmitBusyRegionServer() throws Exception { -477 ClusterConnection hc = createHConnection(); -478 AsyncProcess ap = new MyAsyncProcess(hc, conf); +474 ap.decTaskCounters(Arrays.asList(hri1.getRegionName()), sn); +475 ap.submit(DUMMY_TABLE, puts, false, null, false); +476 Assert.assertEquals(0, puts.size()); +477 } +478 479 -480 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer)); -481 -482 List<Put> puts = new ArrayList<Put>(); -483 puts.add(createPut(1, true)); -484 puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy -485 puts.add(createPut(1, true)); // <== this one will make it, the region is already in -486 puts.add(createPut(2, true)); // <== new region, but the rs is ok -487 -488 ap.submit(DUMMY_TABLE, puts, false, null, false); -489 Assert.assertEquals(" puts=" + puts, 1, puts.size()); -490 -491 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1)); -492 ap.submit(DUMMY_TABLE, puts, false, null, false); -493 Assert.assertTrue(puts.isEmpty()); -494 } +480 @Test +481 public void testSubmitBusyRegionServer() throws Exception { +482 ClusterConnection hc = createHConnection(); +483 AsyncProcess ap = new MyAsyncProcess(hc, conf); +484 +485 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer)); +486 +487 List<Put> puts = new ArrayList<Put>(); +488 puts.add(createPut(1, true)); +489 puts.add(createPut(3, true)); // <== this one won't be taken, the rs is busy +490 puts.add(createPut(1, true)); // <== this one will make it, the region is already in +491 puts.add(createPut(2, true)); // <== new region, but the rs is ok +492 +493 ap.submit(DUMMY_TABLE, puts, false, null, false); +494 Assert.assertEquals(" puts=" + puts, 1, puts.size()); 495 -496 @Test -497 public void testFail() throws Exception { -498 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); -499 -500 List<Put> puts = new ArrayList<Put>(); -501 Put p = createPut(1, false); -502 puts.add(p); -503 -504 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); -505 Assert.assertEquals(0, puts.size()); -506 ars.waitUntilDone(); -507 verifyResult(ars, false); -508 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); -509 -510 Assert.assertEquals(1, ars.getErrors().exceptions.size()); -511 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), -512 failure.equals(ars.getErrors().exceptions.get(0))); -513 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), -514 failure.equals(ars.getErrors().exceptions.get(0))); -515 -516 Assert.assertEquals(1, ars.getFailedOperations().size()); -517 Assert.assertTrue("was: " + ars.getFailedOperations().get(0), -518 p.equals(ars.getFailedOperations().get(0))); -519 } +496 ap.taskCounterPerServer.put(sn2, new AtomicInteger(ap.maxConcurrentTasksPerServer - 1)); +497 ap.submit(DUMMY_TABLE, puts, false, null, false); +498 Assert.assertTrue(puts.isEmpty()); +499 } +500 +501 @Test +502 public void testFail() throws Exception { +503 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); +504 +505 List<Put> puts = new ArrayList<Put>(); +506 Put p = createPut(1, false); +507 puts.add(p); +508 +509 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); +510 Assert.assertEquals(0, puts.size()); +511 ars.waitUntilDone(); +512 verifyResult(ars, false); +513 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); +514 +515 Assert.assertEquals(1, ars.getErrors().exceptions.size()); +516 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), +517 failure.equals(ars.getErrors().exceptions.get(0))); +518 Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), +519 failure.equals(ars.getErrors().exceptions.get(0))); 520 -521 -522 @Test -523 public void testSubmitTrue() throws IOException { -524 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); -525 ap.tasksInProgress.incrementAndGet(); -526 final AtomicInteger ai = new AtomicInteger(1); -527 ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); -528 -529 final AtomicBoolean checkPoint = new AtomicBoolean(false); -530 final AtomicBoolean checkPoint2 = new AtomicBoolean(false); -531 -532 Thread t = new Thread(){ -533 @Override -534 public void run(){ -535 Threads.sleep(1000); -536 Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent -537 ai.decrementAndGet(); -538 ap.tasksInProgress.decrementAndGet(); -539 checkPoint2.set(true); -540 } -541 }; -542 -543 List<Put> puts = new ArrayList<Put>(); -544 Put p = createPut(1, true); -545 puts.add(p); -546 -547 ap.submit(DUMMY_TABLE, puts, false, null, false); -548 Assert.assertFalse(puts.isEmpty()); -549 -550 t.start(); +521 Assert.assertEquals(1, ars.getFailedOperations().size()); +522 Assert.assertTrue("was: " + ars.getFailedOperations().get(0), +523 p.equals(ars.getFailedOperations().get(0))); +524 } +525 +526 +527 @Test +528 public void testSubmitTrue() throws IOException { +529 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); +530 ap.tasksInProgress.incrementAndGet(); +531 final AtomicInteger ai = new AtomicInteger(1); +532 ap.taskCounterPerRegion.put(hri1.getRegionName(), ai); +533 +534 final AtomicBoolean checkPoint = new AtomicBoolean(false); +535 final AtomicBoolean checkPoint2 = new AtomicBoolean(false); +536 +537 Thread t = new Thread(){ +538 @Override +539 public void run(){ +540 Threads.sleep(1000); +541 Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent +542 ai.decrementAndGet(); +543 ap.tasksInProgress.decrementAndGet(); +544 checkPoint2.set(true); +545 } +546 }; +547 +548 List<Put> puts = new ArrayList<Put>(); +549 Put p = createPut(1, true); +550 puts.add(p); 551 -552 ap.submit(DUMMY_TABLE, puts, true, null, false); -553 Assert.assertTrue(puts.isEmpty()); +552 ap.submit(DUMMY_TABLE, puts, false, null, false); +553 Assert.assertFalse(puts.isEmpty()); 554 -555 checkPoint.set(true); -556 while (!checkPoint2.get()){ -557 Threads.sleep(1); -558 } -559 } -560 -561 @Test -562 public void testFailAndSuccess() throws Exception { -563 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); -564 -565 List<Put> puts = new ArrayList<Put>(); -566 puts.add(createPut(1, false)); -567 puts.add(createPut(1, true)); -568 puts.add(createPut(1, true)); +555 t.start(); +556 +557 ap.submit(DUMMY_TABLE, puts, true, null, false); +558 Assert.assertTrue(puts.isEmpty()); +559 +560 checkPoint.set(true); +561 while (!checkPoint2.get()){ +562 Threads.sleep(1); +563 } +564 } +565 +566 @Test +567 public void testFailAndSuccess() throws Exception { +568 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); 569 -570 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); -571 Assert.assertTrue(puts.isEmpty()); -572 ars.waitUntilDone(); -573 verifyResult(ars, false, true, true); -574 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); -575 ap.callsCt.set(0); -576 Assert.assertEquals(1, ars.getErrors().actions.size()); -577 -578 puts.add(createPut(1, true)); -579 // Wait for AP to be free. While ars might have the result, ap counters are decreased later. -580 ap.waitUntilDone(); -581 ars = ap.submit(DUMMY_TABLE, puts, false, null, true); -582 Assert.assertEquals(0, puts.size()); -583 ars.waitUntilDone(); -584 Assert.assertEquals(1, ap.callsCt.get()); -585 verifyResult(ars, true); -586 } -587 -588 @Test -589 public void testFlush() throws Exception { -590 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); -591 -592 List<Put> puts = new ArrayList<Put>(); -593 puts.add(createPut(1, false)); -594 puts.add(createPut(1, true)); -595 puts.add(createPut(1, true)); +570 List<Put> puts = new ArrayList<Put>(); +571 puts.add(createPut(1, false)); +572 puts.add(createPut(1, true)); +573 puts.add(createPut(1, true)); +574 +575 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); +576 Assert.assertTrue(puts.isEmpty()); +577 ars.waitUntilDone(); +578 verifyResult(ars, false, true, true); +579 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); +580 ap.callsCt.set(0); +581 Assert.assertEquals(1, ars.getErrors().actions.size()); +582 +583 puts.add(createPut(1, true)); +584 // Wait for AP to be free. While ars might have the result, ap counters are decreased later. +585 ap.waitUntilDone(); +586 ars = ap.submit(DUMMY_TABLE, puts, false, null, true); +587 Assert.assertEquals(0, puts.size()); +588 ars.waitUntilDone(); +589 Assert.assertEquals(1, ap.callsCt.get()); +590 verifyResult(ars, true); +591 } +592 +593 @Test +594 public void testFlush() throws Exception { +595 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); 596 -597 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); -598 ars.waitUntilDone(); -599 verifyResult(ars, false, true, true); -600 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); +597 List<Put> puts = new ArrayList<Put>(); +598 puts.add(createPut(1, false)); +599 puts.add(createPut(1, true)); +600 puts.add(createPut(1, true)); 601 -602 Assert.assertEquals(1, ars.getFailedOperations().size()); -603 } -604 -605 @Test -606 public void testMaxTask() throws Exception { -607 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); -608 -609 for (int i = 0; i < 1000; i++) { -610 ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn); -611 } -612 -613 final Thread myThread = Thread.currentThread(); -614 -615 Thread t = new Thread() { -616 @Override -617 public void run() { -618 Threads.sleep(2000); -619 myThread.interrupt(); -620 } -621 }; -622 -623 List<Put> puts = new ArrayList<Put>(); -624 puts.add(createPut(1, true)); -625 -626 t.start(); +602 AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); +603 ars.waitUntilDone(); +604 verifyResult(ars, false, true, true); +605 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); +606 +607 Assert.assertEquals(1, ars.getFailedOperations().size()); +608 } +609 +610 @Test +611 public void testMaxTask() throws Exception { +612 final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false); +613 +614 for (int i = 0; i < 1000; i++) { +615 ap.incTaskCounters(Arrays.asList("dummy".getBytes()), sn); +616 } +617 +618 final Thread myThread = Thread.currentThread(); +619 +620 Thread t = new Thread() { +621 @Override +622 public void run() { +623 Threads.sleep(2000); +624 myThread.interrupt(); +625 } +626 }; 627 -628 try { -629 ap.submit(DUMMY_TABLE, puts, false, null, false); -630 Assert.fail("We should have been interrupted."); -631 } catch (InterruptedIOException expected) { -632 } -633 -634 final long sleepTime = 2000; -635 -636 Thread t2 = new Thread() { -637 @Override -638 public void run() { -639 Threads.sleep(sleepTime); -640 while (ap.tasksInProgress.get() > 0) { -641 ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn); -642 } -643 } -644 }; -645 t2.start(); -646 -647 long start = System.currentTimeMillis(); -648 ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false); -649 long end = System.currentTimeMillis(); -650 -651 //Adds 100 to secure us against approximate timing. -652 Assert.assertTrue(start + 100L + sleepTime > end); -653 } -654 -655 private static ClusterConnection createHConnection() throws IOException { -656 ClusterConnection hc = createHConnectionCommon(); -657 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); -658 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); -659 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); -660 setMockLocation(hc, FAILS, new RegionLocations(loc2)); -661 return hc; -662 } -663 -664 private static ClusterConnection createHConnectionWithReplicas() throws IOException { -665 ClusterConnection hc = createHConnectionCommon(); -666 setMockLocation(hc, DUMMY_BYTES_1, hrls1); -667 setMockLocation(hc, DUMMY_BYTES_2, hrls2); -668 setMockLocation(hc, DUMMY_BYTES_3, hrls3); -669 return hc; -670 } -671 -672 private static void setMockLocation(ClusterConnection hc, byte[] row, -673 RegionLocations result) throws IOException { -674 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), -675 Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); -676 } -677 -678 private static ClusterConnection createHConnectionCommon() { -679 ClusterConnection hc = Mockito.mock(ClusterConnection.class); -680 NonceGenerator ng = Mockito.mock(NonceGenerator.class); -681 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); -682 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); -683 Mockito.when(hc.getConfiguration()).thenReturn(conf); -684 return hc; -685 } -686 -687 @Test -688 public void testHTablePutSuccess() throws Exception { -689 BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); -690 ht.ap = new MyAsyncProcess(createHConnection(), conf, true); +628 List<Put> puts = new ArrayList<Put>(); +629 puts.add(createPut(1, true)); +630 +631 t.start(); +632 +633 try { +634 ap.submit(DUMMY_TABLE, puts, false, null, false); +635 Assert.fail("We should have been interrupted."); +636 } catch (InterruptedIOException expected) { +637 } +638 +639 final long sleepTime = 2000; +640 +641 Thread t2 = new Thread() { +642 @Override +643 public void run() { +644 Threads.sleep(sleepTime); +645 while (ap.tasksInProgress.get() > 0) { +646 ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn); +647 } +648 } +649 }; +650 t2.start(); +651 +652 long start = System.currentTimeMillis(); +653 ap.submit(DUMMY_TABLE, new ArrayList<Row>(), false, null, false); +654 long end = System.currentTimeMillis(); +655 +656 //Adds 100 to secure us against approximate timing. +657 Assert.assertTrue(start + 100L + sleepTime > end); +658 } +659 +660 private static ClusterConnection createHConnection() throws IOException { +661 ClusterConnection hc = createHConnectionCommon(); +662 setMockLocation(hc, DUMMY_BYTES_1, new RegionLocations(loc1)); +663 setMockLocation(hc, DUMMY_BYTES_2, new RegionLocations(loc2)); +664 setMockLocation(hc, DUMMY_BYTES_3, new RegionLocations(loc3)); +665 setMockLocation(hc, FAILS, new RegionLocations(loc2)); +666 return hc; +667 } +668 +669 private static ClusterConnection createHConnectionWithReplicas() throws IOException { +670 ClusterConnection hc = createHConnectionCommon(); +671 setMockLocation(hc, DUMMY_BYTES_1, hrls1); +672 setMockLocation(hc, DUMMY_BYTES_2, hrls2); +673 setMockLocation(hc, DUMMY_BYTES_3, hrls3); +674 return hc; +675 } +676 +677 private static void setMockLocation(ClusterConnection hc, byte[] row, +678 RegionLocations result) throws IOException { +679 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), +680 Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result); +681 } +682 +683 private static ClusterConnection createHConnectionCommon() { +684 ClusterConnection hc = Mockito.mock(ClusterConnection.class); +685 NonceGenerator ng = Mockito.mock(NonceGenerator.class); +686 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); +687 Mockito.when(hc.getNonceGenerator()).thenReturn(ng); +688 Mockito.when(hc.getConfiguration()).thenReturn(conf); +689 return hc; +690 } 691 -692 Put put = createPut(1, true); -693 -694 Assert.assertEquals(0, ht.getWriteBufferSize()); -695 ht.mutate(put); -696 Assert.assertEquals(0, ht.getWriteBufferSize()); -697 } +692 @Test +693 public void testHTablePutSuccess() throws Exception { +694 BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); +695 ht.ap = new MyAsyncProcess(createHConnection(), conf, true); +696 +697 Put put = createPut(1, true); 698 -699 private void doHTableFailedPut(boolean bufferOn) throws Exception { -700 ClusterConnection conn = createHConnection(); -701 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); -702 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); -703 ht.mutator.ap = ap; -704 if (bufferOn) { -705 ht.setWriteBufferSize(1024L * 1024L); -706 } else { -707 ht.setWriteBufferSize(0L); -708 } -709 -710 Put put = createPut(1, false); -711 -712 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); -713 try { -714 ht.put(put); -715 if (bufferOn) { -716 ht.flushCommits(); -717 } -718 Assert.fail(); -719 } catch (RetriesExhaustedException expected) { -720 } -721 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); -722 // The table should have sent one request, maybe after multiple attempts -723 AsyncRequestFuture ars = null; -724 for (AsyncRequestFuture someReqs : ap.allReqs) { -725 if (someReqs.getResults().length == 0) continue; -726 Assert.assertTrue(ars == null); -727 ars = someReqs; -728 } -729 Assert.assertTrue(ars != null); -730 verifyResult(ars, false); -731 -732 // This should not raise any exception, puts have been 'received' before by the catch. -733 ht.close(); -734 } -735 -736 @Test -737 public void testHTableFailedPutWithBuffer() throws Exception { -738 doHTableFailedPut(true); +699 Assert.assertEquals(0, ht.getWriteBufferSize()); +700 ht.mutate(put); +701 Assert.assertEquals(0, ht.getWriteBufferSize()); +702 } +703 +704 private void doHTableFailedPut(boolean bufferOn) throws Exception { +705 ClusterConnection conn = createHConnection(); +706 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); +707 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); +708 ht.mutator.ap = ap; +709 if (bufferOn) { +710 ht.setWriteBufferSize(1024L * 1024L); +711 } else { +712 ht.setWriteBufferSize(0L); +713 } +714 +715 Put put = createPut(1, false); +716 +717 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); +718 try { +719 ht.put(put); +720 if (bufferOn) { +721 ht.flushCommits(); +722 } +723 Assert.fail(); +724 } catch (RetriesExhaustedException expected) { +725 } +726 Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize.get()); +727 // The table should have sent one request, maybe after multiple attempts +728 AsyncRequestFuture ars = null; +729 for (AsyncRequestFuture someReqs : ap.allReqs) { +730 if (someReqs.getResults().length == 0) continue; +731 Assert.assertTrue(ars == null); +732 ars = someReqs; +733 } +734 Assert.assertTrue(ars != null); +735 verifyResult(ars, false); +736 +737 // This should not raise any exception, puts have been 'received' before by the catch. +738 ht.close(); 739 } 740 741 @Test -742 public void testHTableFailedPutWithoutBuffer() throws Exception { -743 doHTableFailedPut(false); +742 public void testHTableFailedPutWithBuffer() throws Exception { +743 doHTableFailedPut(true); 744 } 745 746 @Test -747 public void testHTableFailedPutAndNewPut() throws Exception { -748 ClusterConnection conn = createHConnection(); -749 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, -750 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0)); -751 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); -752 mutator.ap = ap; -753 -754 Put p = createPut(1, false); -755 mutator.mutate(p); -756 -757 ap.waitUntilDone(); // Let's do all the retries. +747 public void testHTableFailedPutWithoutBuffer() throws Exception { +748 doHTableFailedPut(false); +749 } +750 +751 @Test +752 public void testHTableFailedPutAndNewPut() throws Exception { +753 ClusterConnection conn = createHConnection(); +754 BufferedMutatorImpl mutator = new BufferedMutatorImpl(conn, null, null, +755 new BufferedMutatorParams(DUMMY_TABLE).writeBufferSize(0)); +756 MyAsyncProcess ap = new MyAsyncProcess(conn, conf, true); +757 mutator.ap = ap; 758 -759 // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the -760 // doPut if it fails. -761 // This said, it's not a very easy going behavior. For example, when we insert a list of -762 // puts, we may raise an exception in the middle of the list. It's then up to the caller to -763 // manage what was inserted, what was tried but failed, and what was not even tried. -764 p = createPut(1, true); -765 Assert.assertEquals(0, mutator.writeAsyncBuffer.size()); -766 try { -767 mutator.mutate(p); -768 Assert.fail(); -769 } catch (RetriesExhaustedException expected) { -770 } -771 Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); -772 } -773 -774 @Test -775 public void testBatch() throws IOException, InterruptedException { -776 ClusterConnection conn = new MyConnectionImpl(conf); -777 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); -778 ht.multiAp = new MyAsyncProcess(conn, conf, false); -779 -780 List<Put> puts = new ArrayList<Put>(); -781 puts.add(createPut(1, true)); -782 puts.add(createPut(1, true)); -783 puts.add(createPut(1, true)); -784 puts.add(createPut(1, true)); -785 puts.add(createPut(1, false)); // <=== the bad apple, position 4 +759 Put p = createPut(1, false); +760 mutator.mutate(p); +761 +762 ap.waitUntilDone(); // Let's do all the retries. +763 +764 // We're testing that we're behaving as we were behaving in 0.94: sending exceptions in the +765 // doPut if it fails. +766 // This said, it's not a very easy going behavior. For example, when we insert a list of +767 // puts, we may raise an exception in the middle of the list. It's then up to the caller to +768 // manage what was inserted, what was tried but failed, and what was not even tried. +769 p = createPut(1, true); +770 Assert.assertEquals(0, mutator.writeAsyncBuffer.size()); +771 try { +772 mutator.mutate(p); +773 Assert.fail(); +774 } catch (RetriesExhaustedException expected) { +775 } +776 Assert.assertEquals("the put should not been inserted.", 0, mutator.writeAsyncBuffer.size()); +777 } +778 +779 @Test +780 public void testBatch() throws IOException, InterruptedException { +781 ClusterConnection conn = new MyConnectionImpl(conf); +782 HTable ht = new HTable(conn, new BufferedMutatorParams(DUMMY_TABLE)); +783 ht.multiAp = new MyAsyncProcess(conn, conf, false); +784 +785 List<Put> puts = new ArrayList<Put>(); 786 puts.add(createPut(1, true)); -787 puts.add(createPut(1, false)); // <=== another bad apple, position 6 -788 -789 Object[] res = new Object[puts.size()]; -790 try { -791 ht.processBatch(puts, res); -792 Assert.fail(); -793 } catch (RetriesExhaustedException expected) { -794 } -795 -796 Assert.assertEquals(res[0], success); -797 Assert.assertEquals(res[1], success); -798 Assert.assertEquals(res[2], success); -799 Assert.assertEquals(res[3], success); -800 Assert.assertEquals(res[4], failure); -801 Assert.assertEquals(res[5], success); -802 Assert.assertEquals(res[6], failure); -803 } -804 -805 @Test -806 public void testErrorsServers() throws IOException { -807 Configuration configuration = new Configuration(conf); -808 ClusterConnection conn = new MyConnectionImpl(configuration); -809 BufferedMutatorImpl mutator = -810 new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); -811 configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); -812 -813 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); -814 mutator.ap = ap; -815 -816 Assert.assertNotNull(mutator.ap.createServerErrorTracker()); -817 Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); -818 mutator.ap.serverTrackerTimeout = 1; -819 -820 Put p = createPut(1, false); -821 mutator.mutate(p); -822 -823 try { -824 mutator.flush(); -825 Assert.fail(); -826 } catch (RetriesExhaustedWithDetailsException expected) { -827 } -828 // Checking that the ErrorsServers came into play and didn't make us stop immediately -829 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); -830 } -831 -832 @Test -833 public void testGlobalErrors() throws IOException { -834 ClusterConnection conn = new MyConnectionImpl(conf); -835 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); -836 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); -837 mutator.ap = ap; -838 -839 Assert.assertNotNull(mutator.ap.createServerErrorTracker()); -840 -841 Put p = createPut(1, true); -842 mutator.mutate(p); +787 puts.add(createPut(1, true)); +788 puts.add(createPut(1, true)); +789 puts.add(createPut(1, true)); +790 puts.add(createPut(1, false)); // <=== the bad apple, position 4 +791 puts.add(createPut(1, true)); +792 puts.add(createPut(1, false)); // <=== another bad apple, position 6 +793 +794 Object[] res = new Object[puts.size()]; +795 try { +796 ht.processBatch(puts, res); +797 Assert.fail(); +798 } catch (RetriesExhaustedException expected) { +799 } +800 +801 Assert.assertEquals(res[0], success); +802 Assert.assertEquals(res[1], success); +803 Assert.assertEquals(res[2], success); +804 Assert.assertEquals(res[3], success); +805 Assert.assertEquals(res[4], failure); +806 Assert.assertEquals(res[5], success); +807 Assert.assertEquals(res[6], failure); +808 } +809 +810 @Test +811 public void testErrorsServers() throws IOException { +812 Configuration configuration = new Configuration(conf); +813 ClusterConnection conn = new MyConnectionImpl(configuration); +814 BufferedMutatorImpl mutator = +815 new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE)); +816 configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true); +817 +818 MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true); +819 mutator.ap = ap; +820 +821 Assert.assertNotNull(mutator.ap.createServerErrorTracker()); +822 Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); +823 mutator.ap.serverTrackerTimeout = 1; +824 +825 Put p = createPut(1, false); +826 mutator.mutate(p); +827 +828 try { +829 mutator.flush(); +830 Assert.fail(); +831 } catch (RetriesExhaustedWithDetailsException expected) { +832 } +833 // Checking that the ErrorsServers came into play and didn't make us stop immediately +834 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); +835 } +836 +837 @Test +838 public void testGlobalErrors() throws IOException { +839 ClusterConnection conn = new MyConnectionImpl(conf); +840 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); +841 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new IOException("test")); +842 mutator.ap = ap; 843 -844 try { -845 mutator.flush(); -846 Assert.fail(); -847 } catch (RetriesExhaustedWithDetailsException expected) { -848 } -849 // Checking that the ErrorsServers came into play and didn't make us stop immediately -850 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); -851 } -852 -853 -854 @Test -855 public void testCallQueueTooLarge() throws IOException { -856 ClusterConnection conn = new MyConnectionImpl(conf); -857 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); -858 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); -859 mutator.ap = ap; -860 -861 Assert.assertNotNull(mutator.ap.createServerErrorTracker()); -862 -863 Put p = createPut(1, true); -864 mutator.mutate(p); +844 Assert.assertNotNull(mutator.ap.createServerErrorTracker()); +845 +846 Put p = createPut(1, true); +847 mutator.mutate(p); +848 +849 try { +850 mutator.flush(); +851 Assert.fail(); +852 } catch (RetriesExhaustedWithDetailsException expected) { +853 } +854 // Checking that the ErrorsServers came into play and didn't make us stop immediately +855 Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); +856 } +857 +858 +859 @Test +860 public void testCallQueueTooLarge() throws IOException { +861 ClusterConnection conn = new MyConnectionImpl(conf); +862 BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE); +863 AsyncProcessWithFailure ap = new AsyncProcessWithFailure(conn, conf, new CallQueueTooBigException()); +864 mutator.ap = ap; 865 -866 try { -867 mutator.flush(); -868 Assert.fail(); -869 } catch (RetriesExhaustedWithDetailsException expected) { -870 } -871 // Checking that the ErrorsServ