Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0BA66188A3 for ; Tue, 8 Mar 2016 18:05:40 +0000 (UTC) Received: (qmail 51227 invoked by uid 500); 8 Mar 2016 18:05:37 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 51155 invoked by uid 500); 8 Mar 2016 18:05:37 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 50434 invoked by uid 99); 8 Mar 2016 18:05:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Mar 2016 18:05:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BDC91DFA6C; Tue, 8 Mar 2016 18:05:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Tue, 08 Mar 2016 18:05:50 -0000 Message-Id: In-Reply-To: <217926e3b5034dc0aeb5a884dcb03048@git.apache.org> References: <217926e3b5034dc0aeb5a884dcb03048@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/34] hbase-site git commit: Published site at 46cc3d4972d8db478c86e9848afed523cb6dc866. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/58f51408/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestFastFail.MyPreemptiveFastFailInterceptor.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestFastFail.MyPreemptiveFastFailInterceptor.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestFastFail.MyPreemptiveFastFailInterceptor.html index 904fff0..15a2f5b 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestFastFail.MyPreemptiveFastFailInterceptor.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/client/TestFastFail.MyPreemptiveFastFailInterceptor.html @@ -43,277 +43,339 @@ 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.apache.hadoop.conf.Configuration; -038import org.apache.hadoop.hbase.HBaseTestingUtility; -039import org.apache.hadoop.hbase.HColumnDescriptor; -040import org.apache.hadoop.hbase.HConstants; -041import org.apache.hadoop.hbase.HTableDescriptor; -042import org.apache.hadoop.hbase.TableName; -043import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; -044import org.apache.hadoop.hbase.testclassification.ClientTests; -045import org.apache.hadoop.hbase.testclassification.MediumTests; -046import org.apache.hadoop.hbase.util.Bytes; -047import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; -048import org.junit.After; -049import org.junit.AfterClass; -050import org.junit.Before; -051import org.junit.BeforeClass; -052import org.junit.Test; -053import org.junit.Ignore; -054import org.junit.experimental.categories.Category; -055 -056@Category({MediumTests.class, ClientTests.class}) -057public class TestFastFail { -058 private static final Log LOG = LogFactory.getLog(TestFastFail.class); -059 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); -060 private static byte[] FAMILY = Bytes.toBytes("testFamily"); -061 private static final Random random = new Random(); -062 private static int SLAVES = 1; -063 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); -064 private static final int SLEEPTIME = 5000; -065 -066 /** -067 * @throws java.lang.Exception -068 */ -069 @BeforeClass -070 public static void setUpBeforeClass() throws Exception { -071 TEST_UTIL.startMiniCluster(SLAVES); -072 } -073 -074 /** -075 * @throws java.lang.Exception -076 */ -077 @AfterClass -078 public static void tearDownAfterClass() throws Exception { -079 TEST_UTIL.shutdownMiniCluster(); -080 } -081 -082 /** -083 * @throws java.lang.Exception -084 */ -085 @Before -086 public void setUp() throws Exception { -087 MyPreemptiveFastFailInterceptor.numBraveSouls.set(0); -088 } -089 -090 /** -091 * @throws java.lang.Exception -092 */ -093 @After -094 public void tearDown() throws Exception { -095 // Nothing to do. -096 } -097 -098 @Ignore ("Can go zombie -- see HBASE-14421; FIX") @Test -099 public void testFastFail() throws IOException, InterruptedException { -100 Admin admin = TEST_UTIL.getHBaseAdmin(); +038import org.apache.hadoop.hbase.HBaseConfiguration; +039import org.apache.hadoop.hbase.HBaseTestingUtility; +040import org.apache.hadoop.hbase.HColumnDescriptor; +041import org.apache.hadoop.hbase.HConstants; +042import org.apache.hadoop.hbase.HTableDescriptor; +043import org.apache.hadoop.hbase.ServerName; +044import org.apache.hadoop.hbase.TableName; +045import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; +046import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; +047import org.apache.hadoop.hbase.testclassification.ClientTests; +048import org.apache.hadoop.hbase.testclassification.MediumTests; +049import org.apache.hadoop.hbase.util.Bytes; +050import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator; +051import org.junit.After; +052import org.junit.AfterClass; +053import org.junit.Before; +054import org.junit.BeforeClass; +055import org.junit.Test; +056import org.junit.Ignore; +057import org.junit.experimental.categories.Category; +058 +059@Category({MediumTests.class, ClientTests.class}) +060public class TestFastFail { +061 private static final Log LOG = LogFactory.getLog(TestFastFail.class); +062 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); +063 private static byte[] FAMILY = Bytes.toBytes("testFamily"); +064 private static final Random random = new Random(); +065 private static int SLAVES = 1; +066 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); +067 private static final int SLEEPTIME = 5000; +068 +069 /** +070 * @throws java.lang.Exception +071 */ +072 @BeforeClass +073 public static void setUpBeforeClass() throws Exception { +074 TEST_UTIL.startMiniCluster(SLAVES); +075 } +076 +077 /** +078 * @throws java.lang.Exception +079 */ +080 @AfterClass +081 public static void tearDownAfterClass() throws Exception { +082 TEST_UTIL.shutdownMiniCluster(); +083 } +084 +085 /** +086 * @throws java.lang.Exception +087 */ +088 @Before +089 public void setUp() throws Exception { +090 MyPreemptiveFastFailInterceptor.numBraveSouls.set(0); +091 CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0); +092 } +093 +094 /** +095 * @throws java.lang.Exception +096 */ +097 @After +098 public void tearDown() throws Exception { +099 // Nothing to do. +100 } 101 -102 final String tableName = "testClientRelearningExperiment"; -103 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes -104 .toBytes(tableName))); -105 desc.addFamily(new HColumnDescriptor(FAMILY)); -106 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32); -107 final long numRows = 1000; -108 -109 Configuration conf = TEST_UTIL.getConfiguration(); -110 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100); -111 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10); -112 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); -113 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); -114 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, -115 MyPreemptiveFastFailInterceptor.class, -116 PreemptiveFastFailInterceptor.class); -117 -118 final Connection connection = ConnectionFactory.createConnection(conf); -119 -120 /** -121 * Write numRows worth of data, so that the workers can arbitrarily read. -122 */ -123 List<Put> puts = new ArrayList<>(); -124 for (long i = 0; i < numRows; i++) { -125 byte[] rowKey = longToByteArrayKey(i); -126 Put put = new Put(rowKey); -127 byte[] value = rowKey; // value is the same as the row key -128 put.addColumn(FAMILY, QUALIFIER, value); -129 puts.add(put); -130 } -131 try (Table table = connection.getTable(TableName.valueOf(tableName))) { -132 table.put(puts); -133 LOG.info("Written all puts."); +102 @Ignore ("Can go zombie -- see HBASE-14421; FIX") @Test +103 public void testFastFail() throws IOException, InterruptedException { +104 Admin admin = TEST_UTIL.getHBaseAdmin(); +105 +106 final String tableName = "testClientRelearningExperiment"; +107 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes +108 .toBytes(tableName))); +109 desc.addFamily(new HColumnDescriptor(FAMILY)); +110 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32); +111 final long numRows = 1000; +112 +113 Configuration conf = TEST_UTIL.getConfiguration(); +114 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100); +115 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10); +116 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); +117 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); +118 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, +119 MyPreemptiveFastFailInterceptor.class, +120 PreemptiveFastFailInterceptor.class); +121 +122 final Connection connection = ConnectionFactory.createConnection(conf); +123 +124 /** +125 * Write numRows worth of data, so that the workers can arbitrarily read. +126 */ +127 List<Put> puts = new ArrayList<>(); +128 for (long i = 0; i < numRows; i++) { +129 byte[] rowKey = longToByteArrayKey(i); +130 Put put = new Put(rowKey); +131 byte[] value = rowKey; // value is the same as the row key +132 put.addColumn(FAMILY, QUALIFIER, value); +133 puts.add(put); 134 } -135 -136 /** -137 * The number of threads that are going to perform actions against the test -138 * table. -139 */ -140 int nThreads = 100; -141 ExecutorService service = Executors.newFixedThreadPool(nThreads); -142 final CountDownLatch continueOtherHalf = new CountDownLatch(1); -143 final CountDownLatch doneHalfway = new CountDownLatch(nThreads); -144 -145 final AtomicInteger numSuccessfullThreads = new AtomicInteger(0); -146 final AtomicInteger numFailedThreads = new AtomicInteger(0); -147 -148 // The total time taken for the threads to perform the second put; -149 final AtomicLong totalTimeTaken = new AtomicLong(0); -150 final AtomicInteger numBlockedWorkers = new AtomicInteger(0); -151 final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0); -152 -153 List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(); -154 for (int i = 0; i < nThreads; i++) { -155 futures.add(service.submit(new Callable<Boolean>() { -156 /** -157 * The workers are going to perform a couple of reads. The second read -158 * will follow the killing of a regionserver so that we make sure that -159 * some of threads go into PreemptiveFastFailExcception -160 */ -161 public Boolean call() throws Exception { -162 try (Table table = connection.getTable(TableName.valueOf(tableName))) { -163 Thread.sleep(Math.abs(random.nextInt()) % 250); // Add some jitter here -164 byte[] row = longToByteArrayKey(Math.abs(random.nextLong()) -165 % numRows); -166 Get g = new Get(row); -167 g.addColumn(FAMILY, QUALIFIER); -168 try { -169 table.get(g); -170 } catch (Exception e) { -171 LOG.debug("Get failed : ", e); -172 doneHalfway.countDown(); -173 return false; -174 } -175 -176 // Done with one get, proceeding to do the next one. -177 doneHalfway.countDown(); -178 continueOtherHalf.await(); +135 try (Table table = connection.getTable(TableName.valueOf(tableName))) { +136 table.put(puts); +137 LOG.info("Written all puts."); +138 } +139 +140 /** +141 * The number of threads that are going to perform actions against the test +142 * table. +143 */ +144 int nThreads = 100; +145 ExecutorService service = Executors.newFixedThreadPool(nThreads); +146 final CountDownLatch continueOtherHalf = new CountDownLatch(1); +147 final CountDownLatch doneHalfway = new CountDownLatch(nThreads); +148 +149 final AtomicInteger numSuccessfullThreads = new AtomicInteger(0); +150 final AtomicInteger numFailedThreads = new AtomicInteger(0); +151 +152 // The total time taken for the threads to perform the second put; +153 final AtomicLong totalTimeTaken = new AtomicLong(0); +154 final AtomicInteger numBlockedWorkers = new AtomicInteger(0); +155 final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0); +156 +157 List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(); +158 for (int i = 0; i < nThreads; i++) { +159 futures.add(service.submit(new Callable<Boolean>() { +160 /** +161 * The workers are going to perform a couple of reads. The second read +162 * will follow the killing of a regionserver so that we make sure that +163 * some of threads go into PreemptiveFastFailExcception +164 */ +165 public Boolean call() throws Exception { +166 try (Table table = connection.getTable(TableName.valueOf(tableName))) { +167 Thread.sleep(Math.abs(random.nextInt()) % 250); // Add some jitter here +168 byte[] row = longToByteArrayKey(Math.abs(random.nextLong()) +169 % numRows); +170 Get g = new Get(row); +171 g.addColumn(FAMILY, QUALIFIER); +172 try { +173 table.get(g); +174 } catch (Exception e) { +175 LOG.debug("Get failed : ", e); +176 doneHalfway.countDown(); +177 return false; +178 } 179 -180 long startTime = System.currentTimeMillis(); -181 g = new Get(row); -182 g.addColumn(FAMILY, QUALIFIER); -183 try { -184 table.get(g); -185 // The get was successful -186 numSuccessfullThreads.addAndGet(1); -187 } catch (Exception e) { -188 if (e instanceof PreemptiveFastFailException) { -189 // We were issued a PreemptiveFastFailException -190 numPreemptiveFastFailExceptions.addAndGet(1); -191 } -192 // Irrespective of PFFE, the request failed. -193 numFailedThreads.addAndGet(1); -194 return false; -195 } finally { -196 long enTime = System.currentTimeMillis(); -197 totalTimeTaken.addAndGet(enTime - startTime); -198 if ((enTime - startTime) >= SLEEPTIME) { -199 // Considering the slow workers as the blockedWorkers. -200 // This assumes that the threads go full throttle at performing -201 // actions. In case the thread scheduling itself is as slow as -202 // SLEEPTIME, then this test might fail and so, we might have -203 // set it to a higher number on slower machines. -204 numBlockedWorkers.addAndGet(1); -205 } -206 } -207 return true; -208 } catch (Exception e) { -209 LOG.error("Caught unknown exception", e); -210 doneHalfway.countDown(); -211 return false; -212 } -213 } -214 })); -215 } -216 -217 doneHalfway.await(); -218 -219 // Kill a regionserver -220 TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop(); -221 TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing"); +180 // Done with one get, proceeding to do the next one. +181 doneHalfway.countDown(); +182 continueOtherHalf.await(); +183 +184 long startTime = System.currentTimeMillis(); +185 g = new Get(row); +186 g.addColumn(FAMILY, QUALIFIER); +187 try { +188 table.get(g); +189 // The get was successful +190 numSuccessfullThreads.addAndGet(1); +191 } catch (Exception e) { +192 if (e instanceof PreemptiveFastFailException) { +193 // We were issued a PreemptiveFastFailException +194 numPreemptiveFastFailExceptions.addAndGet(1); +195 } +196 // Irrespective of PFFE, the request failed. +197 numFailedThreads.addAndGet(1); +198 return false; +199 } finally { +200 long enTime = System.currentTimeMillis(); +201 totalTimeTaken.addAndGet(enTime - startTime); +202 if ((enTime - startTime) >= SLEEPTIME) { +203 // Considering the slow workers as the blockedWorkers. +204 // This assumes that the threads go full throttle at performing +205 // actions. In case the thread scheduling itself is as slow as +206 // SLEEPTIME, then this test might fail and so, we might have +207 // set it to a higher number on slower machines. +208 numBlockedWorkers.addAndGet(1); +209 } +210 } +211 return true; +212 } catch (Exception e) { +213 LOG.error("Caught unknown exception", e); +214 doneHalfway.countDown(); +215 return false; +216 } +217 } +218 })); +219 } +220 +221 doneHalfway.await(); 222 -223 // Let the threads continue going -224 continueOtherHalf.countDown(); -225 -226 Thread.sleep(2 * SLEEPTIME); -227 // Start a RS in the cluster -228 TEST_UTIL.getHBaseCluster().startRegionServer(); +223 // Kill a regionserver +224 TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop(); +225 TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing"); +226 +227 // Let the threads continue going +228 continueOtherHalf.countDown(); 229 -230 int numThreadsReturnedFalse = 0; -231 int numThreadsReturnedTrue = 0; -232 int numThreadsThrewExceptions = 0; -233 for (Future<Boolean> f : futures) { -234 try { -235 numThreadsReturnedTrue += f.get() ? 1 : 0; -236 numThreadsReturnedFalse += f.get() ? 0 : 1; -237 } catch (Exception e) { -238 numThreadsThrewExceptions++; -239 } -240 } -241 LOG.debug("numThreadsReturnedFalse:" -242 + numThreadsReturnedFalse -243 + " numThreadsReturnedTrue:" -244 + numThreadsReturnedTrue -245 + " numThreadsThrewExceptions:" -246 + numThreadsThrewExceptions -247 + " numFailedThreads:" -248 + numFailedThreads.get() -249 + " numSuccessfullThreads:" -250 + numSuccessfullThreads.get() -251 + " numBlockedWorkers:" -252 + numBlockedWorkers.get() -253 + " totalTimeWaited: " -254 + totalTimeTaken.get() -255 / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers -256 .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get()); -257 -258 assertEquals("The expected number of all the successfull and the failed " -259 + "threads should equal the total number of threads that we spawned", -260 nThreads, numFailedThreads.get() + numSuccessfullThreads.get()); -261 assertEquals( -262 "All the failures should be coming from the secondput failure", -263 numFailedThreads.get(), numThreadsReturnedFalse); -264 assertEquals("Number of threads that threw execution exceptions " -265 + "otherwise should be 0", numThreadsThrewExceptions, 0); -266 assertEquals("The regionservers that returned true should equal to the" -267 + " number of successful threads", numThreadsReturnedTrue, -268 numSuccessfullThreads.get()); -269 assertTrue( -270 "There will be atleast one thread that retried instead of failing", -271 MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0); -272 assertTrue( -273 "There will be atleast one PreemptiveFastFail exception," -274 + " otherwise, the test makes little sense." -275 + "numPreemptiveFastFailExceptions: " -276 + numPreemptiveFastFailExceptions.get(), -277 numPreemptiveFastFailExceptions.get() > 0); -278 -279 assertTrue( -280 "Only few thread should ideally be waiting for the dead " -281 + "regionserver to be coming back. numBlockedWorkers:" -282 + numBlockedWorkers.get() + " threads that retried : " -283 + MyPreemptiveFastFailInterceptor.numBraveSouls.get(), -284 numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls -285 .get()); -286 } -287 -288 public static class MyPreemptiveFastFailInterceptor extends -289 PreemptiveFastFailInterceptor { -290 public static AtomicInteger numBraveSouls = new AtomicInteger(); +230 Thread.sleep(2 * SLEEPTIME); +231 // Start a RS in the cluster +232 TEST_UTIL.getHBaseCluster().startRegionServer(); +233 +234 int numThreadsReturnedFalse = 0; +235 int numThreadsReturnedTrue = 0; +236 int numThreadsThrewExceptions = 0; +237 for (Future<Boolean> f : futures) { +238 try { +239 numThreadsReturnedTrue += f.get() ? 1 : 0; +240 numThreadsReturnedFalse += f.get() ? 0 : 1; +241 } catch (Exception e) { +242 numThreadsThrewExceptions++; +243 } +244 } +245 LOG.debug("numThreadsReturnedFalse:" +246 + numThreadsReturnedFalse +247 + " numThreadsReturnedTrue:" +248 + numThreadsReturnedTrue +249 + " numThreadsThrewExceptions:" +250 + numThreadsThrewExceptions +251 + " numFailedThreads:" +252 + numFailedThreads.get() +253 + " numSuccessfullThreads:" +254 + numSuccessfullThreads.get() +255 + " numBlockedWorkers:" +256 + numBlockedWorkers.get() +257 + " totalTimeWaited: " +258 + totalTimeTaken.get() +259 / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers +260 .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get()); +261 +262 assertEquals("The expected number of all the successfull and the failed " +263 + "threads should equal the total number of threads that we spawned", +264 nThreads, numFailedThreads.get() + numSuccessfullThreads.get()); +265 assertEquals( +266 "All the failures should be coming from the secondput failure", +267 numFailedThreads.get(), numThreadsReturnedFalse); +268 assertEquals("Number of threads that threw execution exceptions " +269 + "otherwise should be 0", numThreadsThrewExceptions, 0); +270 assertEquals("The regionservers that returned true should equal to the" +271 + " number of successful threads", numThreadsReturnedTrue, +272 numSuccessfullThreads.get()); +273 assertTrue( +274 "There will be atleast one thread that retried instead of failing", +275 MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0); +276 assertTrue( +277 "There will be atleast one PreemptiveFastFail exception," +278 + " otherwise, the test makes little sense." +279 + "numPreemptiveFastFailExceptions: " +280 + numPreemptiveFastFailExceptions.get(), +281 numPreemptiveFastFailExceptions.get() > 0); +282 +283 assertTrue( +284 "Only few thread should ideally be waiting for the dead " +285 + "regionserver to be coming back. numBlockedWorkers:" +286 + numBlockedWorkers.get() + " threads that retried : " +287 + MyPreemptiveFastFailInterceptor.numBraveSouls.get(), +288 numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls +289 .get()); +290 } 291 -292 @Override -293 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { -294 boolean ret = super.shouldRetryInspiteOfFastFail(fInfo); -295 if (ret) -296 numBraveSouls.addAndGet(1); -297 return ret; -298 } -299 -300 public MyPreemptiveFastFailInterceptor(Configuration conf) { -301 super(conf); -302 } -303 } -304 -305 private byte[] longToByteArrayKey(long rowKey) { -306 return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); -307 } -308} +292 @Test +293 public void testCallQueueTooBigException() throws Exception { +294 Admin admin = TEST_UTIL.getHBaseAdmin(); +295 +296 final String tableName = "testCallQueueTooBigException"; +297 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes +298 .toBytes(tableName))); +299 desc.addFamily(new HColumnDescriptor(FAMILY)); +300 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3); +301 +302 Configuration conf = TEST_UTIL.getConfiguration(); +303 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100); +304 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500); +305 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); +306 +307 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); +308 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); +309 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, +310 CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class); +311 +312 final Connection connection = ConnectionFactory.createConnection(conf); +313 +314 //Set max call queues size to 0 +315 SimpleRpcScheduler srs = (SimpleRpcScheduler) +316 TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().getScheduler(); +317 Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); +318 newConf.setInt("hbase.ipc.server.max.callqueue.length", 0); +319 srs.onConfigurationChange(newConf); +320 +321 try (Table table = connection.getTable(TableName.valueOf(tableName))) { +322 Get get = new Get(new byte[1]); +323 table.get(get); +324 } catch (Throwable ex) { +325 } +326 +327 assertEquals("There should have been 1 hit", 1, +328 CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get()); +329 +330 newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); +331 newConf.setInt("hbase.ipc.server.max.callqueue.length", 250); +332 srs.onConfigurationChange(newConf); +333 } +334 +335 public static class MyPreemptiveFastFailInterceptor extends +336 PreemptiveFastFailInterceptor { +337 public static AtomicInteger numBraveSouls = new AtomicInteger(); +338 +339 @Override +340 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { +341 boolean ret = super.shouldRetryInspiteOfFastFail(fInfo); +342 if (ret) +343 numBraveSouls.addAndGet(1); +344 return ret; +345 } +346 +347 public MyPreemptiveFastFailInterceptor(Configuration conf) { +348 super(conf); +349 } +350 } +351 +352 private byte[] longToByteArrayKey(long rowKey) { +353 return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); +354 } +355 +356 public static class CallQueueTooBigPffeInterceptor extends +357 PreemptiveFastFailInterceptor { +358 public static AtomicInteger numCallQueueTooBig = new AtomicInteger(); +359 +360 @Override +361 protected void handleFailureToServer(ServerName serverName, Throwable t) { +362 super.handleFailureToServer(serverName, t); +363 numCallQueueTooBig.incrementAndGet(); +364 } +365 +366 public CallQueueTooBigPffeInterceptor(Configuration conf) { +367 super(conf); +368 } +369 } +370}