Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6C4D418484 for ; Tue, 10 Nov 2015 05:31:31 +0000 (UTC) Received: (qmail 26516 invoked by uid 500); 10 Nov 2015 05:31:30 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 26448 invoked by uid 500); 10 Nov 2015 05:31:30 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 23208 invoked by uid 99); 10 Nov 2015 05:31:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Nov 2015 05:31:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F8F9E0ACA; Tue, 10 Nov 2015 05:31:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Tue, 10 Nov 2015 05:31:52 -0000 Message-Id: <21e50b85fe0c45229f598554b98621d2@git.apache.org> In-Reply-To: <98f3b99677f845f58c406fd26c1ca64e@git.apache.org> References: <98f3b99677f845f58c406fd26c1ca64e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [26/51] [partial] hbase git commit: Published website at 112900d0425a8157b89041f0e353ebf5cc259c69 with HBASE-14785 http://git-wip-us.apache.org/repos/asf/hbase/blob/565d7685/devapidocs/src-html/org/apache/hadoop/hbase/client/ConnectionImplementation.ServerErrorTracker.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/ConnectionImplementation.ServerErrorTracker.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/ConnectionImplementation.ServerErrorTracker.html index 0429521..7da7f31 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/ConnectionImplementation.ServerErrorTracker.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/ConnectionImplementation.ServerErrorTracker.html @@ -178,2176 +178,2152 @@ 170 private final MetaCache metaCache; 171 private final MetricsConnection metrics; 172 -173 private int refCount; +173 protected User user; 174 -175 protected User user; +175 private RpcRetryingCallerFactory rpcCallerFactory; 176 -177 private RpcRetryingCallerFactory rpcCallerFactory; +177 private RpcControllerFactory rpcControllerFactory; 178 -179 private RpcControllerFactory rpcControllerFactory; +179 private final RetryingCallerInterceptor interceptor; 180 -181 private final RetryingCallerInterceptor interceptor; -182 -183 /** -184 * Cluster registry of basic info such as clusterid and meta region location. -185 */ -186 Registry registry; +181 /** +182 * Cluster registry of basic info such as clusterid and meta region location. +183 */ +184 Registry registry; +185 +186 private final ClientBackoffPolicy backoffPolicy; 187 -188 private final ClientBackoffPolicy backoffPolicy; -189 -190 /** -191 * constructor -192 * @param conf Configuration object -193 */ -194 ConnectionImplementation(Configuration conf, -195 ExecutorService pool, User user) throws IOException { -196 this.conf = conf; -197 this.user = user; -198 this.batchPool = pool; -199 this.tableConfig = new TableConfiguration(conf); -200 this.closed = false; -201 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, -202 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); -203 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, -204 HConstants.DEFAULT_USE_META_REPLICAS); -205 // how many times to try, one more than max *retry* time -206 this.numTries = tableConfig.getRetriesNumber() + 1; -207 this.rpcTimeout = conf.getInt( -208 HConstants.HBASE_RPC_TIMEOUT_KEY, -209 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); -210 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { -211 synchronized (nonceGeneratorCreateLock) { -212 if (nonceGenerator == null) { -213 nonceGenerator = new PerClientRandomNonceGenerator(); -214 } -215 } -216 } else { -217 nonceGenerator = new NoNonceGenerator(); -218 } -219 -220 this.stats = ServerStatisticTracker.create(conf); -221 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); -222 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); -223 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); -224 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); -225 this.asyncProcess = createAsyncProcess(this.conf); -226 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { -227 this.metrics = new MetricsConnection(this); -228 } else { -229 this.metrics = null; -230 } -231 this.metaCache = new MetaCache(this.metrics); -232 -233 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, -234 HConstants.STATUS_PUBLISHED_DEFAULT); -235 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); -236 Class<? extends ClusterStatusListener.Listener> listenerClass = -237 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, -238 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, -239 ClusterStatusListener.Listener.class); -240 -241 try { -242 this.registry = setupRegistry(); -243 retrieveClusterId(); +188 /** +189 * constructor +190 * @param conf Configuration object +191 */ +192 ConnectionImplementation(Configuration conf, +193 ExecutorService pool, User user) throws IOException { +194 this.conf = conf; +195 this.user = user; +196 this.batchPool = pool; +197 this.tableConfig = new TableConfiguration(conf); +198 this.closed = false; +199 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, +200 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); +201 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, +202 HConstants.DEFAULT_USE_META_REPLICAS); +203 // how many times to try, one more than max *retry* time +204 this.numTries = tableConfig.getRetriesNumber() + 1; +205 this.rpcTimeout = conf.getInt( +206 HConstants.HBASE_RPC_TIMEOUT_KEY, +207 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); +208 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { +209 synchronized (nonceGeneratorCreateLock) { +210 if (nonceGenerator == null) { +211 nonceGenerator = new PerClientRandomNonceGenerator(); +212 } +213 } +214 } else { +215 nonceGenerator = new NoNonceGenerator(); +216 } +217 +218 this.stats = ServerStatisticTracker.create(conf); +219 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); +220 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); +221 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); +222 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); +223 this.asyncProcess = createAsyncProcess(this.conf); +224 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { +225 this.metrics = new MetricsConnection(this); +226 } else { +227 this.metrics = null; +228 } +229 this.metaCache = new MetaCache(this.metrics); +230 +231 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, +232 HConstants.STATUS_PUBLISHED_DEFAULT); +233 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); +234 Class<? extends ClusterStatusListener.Listener> listenerClass = +235 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, +236 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, +237 ClusterStatusListener.Listener.class); +238 +239 try { +240 this.registry = setupRegistry(); +241 retrieveClusterId(); +242 +243 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); 244 -245 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId); -246 -247 // Do we publish the status? -248 if (shouldListen) { -249 if (listenerClass == null) { -250 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + -251 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); -252 } else { -253 clusterStatusListener = new ClusterStatusListener( -254 new ClusterStatusListener.DeadServerHandler() { -255 @Override -256 public void newDead(ServerName sn) { -257 clearCaches(sn); -258 rpcClient.cancelConnections(sn); -259 } -260 }, conf, listenerClass); -261 } -262 } -263 } catch (Throwable e) { -264 // avoid leaks: registry, rpcClient, ... -265 LOG.debug("connection construction failed", e); -266 close(); -267 throw e; -268 } -269 } -270 -271 /** -272 * @param conn The connection for which to replace the generator. -273 * @param cnm Replaces the nonce generator used, for testing. -274 * @return old nonce generator. -275 */ -276 @VisibleForTesting -277 static NonceGenerator injectNonceGeneratorForTesting( -278 ClusterConnection conn, NonceGenerator cnm) { -279 ConnectionImplementation connImpl = (ConnectionImplementation)conn; -280 NonceGenerator ng = connImpl.getNonceGenerator(); -281 LOG.warn("Nonce generator is being replaced by test code for " -282 + cnm.getClass().getName()); -283 nonceGenerator = cnm; -284 return ng; -285 } -286 -287 /** -288 * Look for an exception we know in the remote exception: -289 * - hadoop.ipc wrapped exceptions -290 * - nested exceptions -291 * -292 * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException / -293 * ThrottlingException -294 * @return null if we didn't find the exception, the exception otherwise. -295 */ -296 public static Throwable findException(Object exception) { -297 if (exception == null || !(exception instanceof Throwable)) { -298 return null; -299 } -300 Throwable cur = (Throwable) exception; -301 while (cur != null) { -302 if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException -303 || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException) { -304 return cur; -305 } -306 if (cur instanceof RemoteException) { -307 RemoteException re = (RemoteException) cur; -308 cur = re.unwrapRemoteException( -309 RegionOpeningException.class, RegionMovedException.class, -310 RegionTooBusyException.class); -311 if (cur == null) { -312 cur = re.unwrapRemoteException(); -313 } -314 // unwrapRemoteException can return the exception given as a parameter when it cannot -315 // unwrap it. In this case, there is no need to look further -316 // noinspection ObjectEquality -317 if (cur == re) { -318 return null; -319 } -320 } else { -321 cur = cur.getCause(); -322 } -323 } -324 -325 return null; -326 } -327 -328 @Override -329 public HTableInterface getTable(String tableName) throws IOException { -330 return getTable(TableName.valueOf(tableName)); -331 } -332 -333 @Override -334 public HTableInterface getTable(byte[] tableName) throws IOException { -335 return getTable(TableName.valueOf(tableName)); -336 } -337 -338 @Override -339 public HTableInterface getTable(TableName tableName) throws IOException { -340 return getTable(tableName, getBatchPool()); -341 } -342 -343 @Override -344 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException { -345 return getTable(TableName.valueOf(tableName), pool); -346 } -347 -348 @Override -349 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { -350 return getTable(TableName.valueOf(tableName), pool); -351 } -352 -353 @Override -354 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException { -355 return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); -356 } -357 -358 @Override -359 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { -360 if (params.getTableName() == null) { -361 throw new IllegalArgumentException("TableName cannot be null."); -362 } -363 if (params.getPool() == null) { -364 params.pool(HTable.getDefaultExecutor(getConfiguration())); -365 } -366 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { -367 params.writeBufferSize(tableConfig.getWriteBufferSize()); -368 } -369 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { -370 params.maxKeyValueSize(tableConfig.getMaxKeyValueSize()); -371 } -372 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); -373 } -374 -375 @Override -376 public BufferedMutator getBufferedMutator(TableName tableName) { -377 return getBufferedMutator(new BufferedMutatorParams(tableName)); -378 } -379 -380 @Override -381 public RegionLocator getRegionLocator(TableName tableName) throws IOException { -382 return new HRegionLocator(tableName, this); -383 } -384 -385 @Override -386 public Admin getAdmin() throws IOException { -387 return new HBaseAdmin(this); -388 } -389 -390 @Override -391 public MetricsConnection getConnectionMetrics() { -392 return this.metrics; -393 } -394 -395 private ExecutorService getBatchPool() { -396 if (batchPool == null) { -397 synchronized (this) { -398 if (batchPool == null) { -399 this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), -400 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); -401 this.cleanupPool = true; -402 } -403 } -404 } -405 return this.batchPool; -406 } -407 -408 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint, -409 BlockingQueue<Runnable> passedWorkQueue) { -410 // shared HTable thread executor not yet initialized -411 if (maxThreads == 0) { -412 maxThreads = Runtime.getRuntime().availableProcessors() * 8; -413 } -414 if (coreThreads == 0) { -415 coreThreads = Runtime.getRuntime().availableProcessors() * 8; -416 } -417 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); -418 BlockingQueue<Runnable> workQueue = passedWorkQueue; -419 if (workQueue == null) { -420 workQueue = -421 new LinkedBlockingQueue<Runnable>(maxThreads * -422 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, -423 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); -424 } -425 ThreadPoolExecutor tpe = new ThreadPoolExecutor( -426 coreThreads, -427 maxThreads, -428 keepAliveTime, -429 TimeUnit.SECONDS, -430 workQueue, -431 Threads.newDaemonThreadFactory(toString() + nameHint)); -432 tpe.allowCoreThreadTimeOut(true); -433 return tpe; -434 } -435 -436 private ExecutorService getMetaLookupPool() { -437 if (this.metaLookupPool == null) { -438 synchronized (this) { -439 if (this.metaLookupPool == null) { -440 //Some of the threads would be used for meta replicas -441 //To start with, threads.max.core threads can hit the meta (including replicas). -442 //After that, requests will get queued up in the passed queue, and only after -443 //the queue is full, a new thread will be started -444 this.metaLookupPool = getThreadPool( -445 conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), -446 conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), -447 "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>()); -448 } -449 } -450 } -451 return this.metaLookupPool; -452 } -453 -454 protected ExecutorService getCurrentMetaLookupPool() { -455 return metaLookupPool; -456 } -457 -458 protected ExecutorService getCurrentBatchPool() { -459 return batchPool; -460 } -461 -462 private void shutdownPools() { -463 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { -464 shutdownBatchPool(this.batchPool); -465 } -466 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { -467 shutdownBatchPool(this.metaLookupPool); -468 } -469 } -470 -471 private void shutdownBatchPool(ExecutorService pool) { -472 pool.shutdown(); -473 try { -474 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { -475 pool.shutdownNow(); -476 } -477 } catch (InterruptedException e) { -478 pool.shutdownNow(); -479 } -480 } -481 -482 /** -483 * @return The cluster registry implementation to use. -484 * @throws java.io.IOException -485 */ -486 private Registry setupRegistry() throws IOException { -487 return RegistryFactory.getRegistry(this); -488 } -489 -490 /** -491 * For tests only. -492 */ -493 @VisibleForTesting -494 RpcClient getRpcClient() { -495 return rpcClient; -496 } -497 -498 /** -499 * An identifier that will remain the same for a given connection. -500 */ -501 @Override -502 public String toString(){ -503 return "hconnection-0x" + Integer.toHexString(hashCode()); -504 } +245 // Do we publish the status? +246 if (shouldListen) { +247 if (listenerClass == null) { +248 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + +249 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); +250 } else { +251 clusterStatusListener = new ClusterStatusListener( +252 new ClusterStatusListener.DeadServerHandler() { +253 @Override +254 public void newDead(ServerName sn) { +255 clearCaches(sn); +256 rpcClient.cancelConnections(sn); +257 } +258 }, conf, listenerClass); +259 } +260 } +261 } catch (Throwable e) { +262 // avoid leaks: registry, rpcClient, ... +263 LOG.debug("connection construction failed", e); +264 close(); +265 throw e; +266 } +267 } +268 +269 /** +270 * @param conn The connection for which to replace the generator. +271 * @param cnm Replaces the nonce generator used, for testing. +272 * @return old nonce generator. +273 */ +274 @VisibleForTesting +275 static NonceGenerator injectNonceGeneratorForTesting( +276 ClusterConnection conn, NonceGenerator cnm) { +277 ConnectionImplementation connImpl = (ConnectionImplementation)conn; +278 NonceGenerator ng = connImpl.getNonceGenerator(); +279 LOG.warn("Nonce generator is being replaced by test code for " +280 + cnm.getClass().getName()); +281 nonceGenerator = cnm; +282 return ng; +283 } +284 +285 /** +286 * Look for an exception we know in the remote exception: +287 * - hadoop.ipc wrapped exceptions +288 * - nested exceptions +289 * +290 * Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException / +291 * ThrottlingException +292 * @return null if we didn't find the exception, the exception otherwise. +293 */ +294 public static Throwable findException(Object exception) { +295 if (exception == null || !(exception instanceof Throwable)) { +296 return null; +297 } +298 Throwable cur = (Throwable) exception; +299 while (cur != null) { +300 if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException +301 || cur instanceof RegionTooBusyException || cur instanceof ThrottlingException) { +302 return cur; +303 } +304 if (cur instanceof RemoteException) { +305 RemoteException re = (RemoteException) cur; +306 cur = re.unwrapRemoteException( +307 RegionOpeningException.class, RegionMovedException.class, +308 RegionTooBusyException.class); +309 if (cur == null) { +310 cur = re.unwrapRemoteException(); +311 } +312 // unwrapRemoteException can return the exception given as a parameter when it cannot +313 // unwrap it. In this case, there is no need to look further +314 // noinspection ObjectEquality +315 if (cur == re) { +316 return null; +317 } +318 } else { +319 cur = cur.getCause(); +320 } +321 } +322 +323 return null; +324 } +325 +326 @Override +327 public HTableInterface getTable(String tableName) throws IOException { +328 return getTable(TableName.valueOf(tableName)); +329 } +330 +331 @Override +332 public HTableInterface getTable(byte[] tableName) throws IOException { +333 return getTable(TableName.valueOf(tableName)); +334 } +335 +336 @Override +337 public HTableInterface getTable(TableName tableName) throws IOException { +338 return getTable(tableName, getBatchPool()); +339 } +340 +341 @Override +342 public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException { +343 return getTable(TableName.valueOf(tableName), pool); +344 } +345 +346 @Override +347 public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { +348 return getTable(TableName.valueOf(tableName), pool); +349 } +350 +351 @Override +352 public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException { +353 return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); +354 } +355 +356 @Override +357 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { +358 if (params.getTableName() == null) { +359 throw new IllegalArgumentException("TableName cannot be null."); +360 } +361 if (params.getPool() == null) { +362 params.pool(HTable.getDefaultExecutor(getConfiguration())); +363 } +364 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { +365 params.writeBufferSize(tableConfig.getWriteBufferSize()); +366 } +367 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { +368 params.maxKeyValueSize(tableConfig.getMaxKeyValueSize()); +369 } +370 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); +371 } +372 +373 @Override +374 public BufferedMutator getBufferedMutator(TableName tableName) { +375 return getBufferedMutator(new BufferedMutatorParams(tableName)); +376 } +377 +378 @Override +379 public RegionLocator getRegionLocator(TableName tableName) throws IOException { +380 return new HRegionLocator(tableName, this); +381 } +382 +383 @Override +384 public Admin getAdmin() throws IOException { +385 return new HBaseAdmin(this); +386 } +387 +388 @Override +389 public MetricsConnection getConnectionMetrics() { +390 return this.metrics; +391 } +392 +393 private ExecutorService getBatchPool() { +394 if (batchPool == null) { +395 synchronized (this) { +396 if (batchPool == null) { +397 this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), +398 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); +399 this.cleanupPool = true; +400 } +401 } +402 } +403 return this.batchPool; +404 } +405 +406 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint, +407 BlockingQueue<Runnable> passedWorkQueue) { +408 // shared HTable thread executor not yet initialized +409 if (maxThreads == 0) { +410 maxThreads = Runtime.getRuntime().availableProcessors() * 8; +411 } +412 if (coreThreads == 0) { +413 coreThreads = Runtime.getRuntime().availableProcessors() * 8; +414 } +415 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); +416 BlockingQueue<Runnable> workQueue = passedWorkQueue; +417 if (workQueue == null) { +418 workQueue = +419 new LinkedBlockingQueue<Runnable>(maxThreads * +420 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, +421 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); +422 } +423 ThreadPoolExecutor tpe = new ThreadPoolExecutor( +424 coreThreads, +425 maxThreads, +426 keepAliveTime, +427 TimeUnit.SECONDS, +428 workQueue, +429 Threads.newDaemonThreadFactory(toString() + nameHint)); +430 tpe.allowCoreThreadTimeOut(true); +431 return tpe; +432 } +433 +434 private ExecutorService getMetaLookupPool() { +435 if (this.metaLookupPool == null) { +436 synchronized (this) { +437 if (this.metaLookupPool == null) { +438 //Some of the threads would be used for meta replicas +439 //To start with, threads.max.core threads can hit the meta (including replicas). +440 //After that, requests will get queued up in the passed queue, and only after +441 //the queue is full, a new thread will be started +442 this.metaLookupPool = getThreadPool( +443 conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), +444 conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), +445 "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>()); +446 } +447 } +448 } +449 return this.metaLookupPool; +450 } +451 +452 protected ExecutorService getCurrentMetaLookupPool() { +453 return metaLookupPool; +454 } +455 +456 protected ExecutorService getCurrentBatchPool() { +457 return batchPool; +458 } +459 +460 private void shutdownPools() { +461 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { +462 shutdownBatchPool(this.batchPool); +463 } +464 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { +465 shutdownBatchPool(this.metaLookupPool); +466 } +467 } +468 +469 private void shutdownBatchPool(ExecutorService pool) { +470 pool.shutdown(); +471 try { +472 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { +473 pool.shutdownNow(); +474 } +475 } catch (InterruptedException e) { +476 pool.shutdownNow(); +477 } +478 } +479 +480 /** +481 * @return The cluster registry implementation to use. +482 * @throws java.io.IOException +483 */ +484 private Registry setupRegistry() throws IOException { +485 return RegistryFactory.getRegistry(this); +486 } +487 +488 /** +489 * For tests only. +490 */ +491 @VisibleForTesting +492 RpcClient getRpcClient() { +493 return rpcClient; +494 } +495 +496 /** +497 * An identifier that will remain the same for a given connection. +498 */ +499 @Override +500 public String toString(){ +501 return "hconnection-0x" + Integer.toHexString(hashCode()); +502 } +503 +504 protected String clusterId = null; 505 -506 protected String clusterId = null; -507 -508 protected void retrieveClusterId() { -509 if (clusterId != null) return; -510 this.clusterId = this.registry.getClusterId(); -511 if (clusterId == null) { -512 clusterId = HConstants.CLUSTER_ID_DEFAULT; -513 LOG.debug("clusterid came back null, using default " + clusterId); -514 } -515 } -516 -517 @Override -518 public Configuration getConfiguration() { -519 return this.conf; -520 } -521 -522 private void checkIfBaseNodeAvailable(ZooKeeperWatcher zkw) -523 throws MasterNotRunningException { -524 String errorMsg; -525 try { -526 if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) { -527 errorMsg = "The node " + zkw.baseZNode+" is not in ZooKeeper. " -528 + "It should have been written by the master. " -529 + "Check the value configured in 'zookeeper.znode.parent'. " -530 + "There could be a mismatch with the one configured in the master."; -531 LOG.error(errorMsg); -532 throw new MasterNotRunningException(errorMsg); -533 } -534 } catch (KeeperException e) { -535 errorMsg = "Can't get connection to ZooKeeper: " + e.getMessage(); -536 LOG.error(errorMsg); -537 throw new MasterNotRunningException(errorMsg, e); -538 } -539 } -540 -541 /** -542 * @return true if the master is running, throws an exception otherwise -543 * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running -544 * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException -545 * @deprecated this has been deprecated without a replacement -546 */ -547 @Deprecated -548 @Override -549 public boolean isMasterRunning() -550 throws MasterNotRunningException, ZooKeeperConnectionException { -551 // When getting the master connection, we check it's running, -552 // so if there is no exception, it means we've been able to get a -553 // connection on a running master -554 MasterKeepAliveConnection m = getKeepAliveMasterService(); -555 m.close(); -556 return true; -557 } -558 -559 @Override -560 public HRegionLocation getRegionLocation(final TableName tableName, -561 final byte [] row, boolean reload) -562 throws IOException { -563 return reload? relocateRegion(tableName, row): locateRegion(tableName, row); -564 } -565 -566 @Override -567 public HRegionLocation getRegionLocation(final byte[] tableName, -568 final byte [] row, boolean reload) -569 throws IOException { -570 return getRegionLocation(TableName.valueOf(tableName), row, reload); -571 } -572 -573 @Override -574 public boolean isTableEnabled(TableName tableName) throws IOException { -575 return getTableState(tableName).inStates(TableState.State.ENABLED); -576 } -577 -578 @Override -579 public boolean isTableEnabled(byte[] tableName) throws IOException { -580 return isTableEnabled(TableName.valueOf(tableName)); -581 } -582 -583 @Override -584 public boolean isTableDisabled(TableName tableName) throws IOException { -585 return getTableState(tableName).inStates(TableState.State.DISABLED); -586 } -587 -588 @Override -589 public boolean isTableDisabled(byte[] tableName) throws IOException { -590 return isTableDisabled(TableName.valueOf(tableName)); -591 } -592 -593 @Override -594 public boolean isTableAvailable(final TableName tableName) throws IOException { -595 return isTableAvailable(tableName, null); -596 } -597 -598 @Override -599 public boolean isTableAvailable(final byte[] tableName) throws IOException { -600 return isTableAvailable(TableName.valueOf(tableName)); -601 } -602 -603 @Override -604 public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) -605 throws IOException { -606 if (this.closed) throw new IOException(toString() + " closed"); -607 try { -608 if (!isTableEnabled(tableName)) { -609 LOG.debug("Table " + tableName + " not enabled"); -610 return false; -611 } -612 List<Pair<HRegionInfo, ServerName>> locations = -613 MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); -614 -615 int notDeployed = 0; -616 int regionCount = 0; -617 for (Pair<HRegionInfo, ServerName> pair : locations) { -618 HRegionInfo info = pair.getFirst(); -619 if (pair.getSecond() == null) { -620 if (LOG.isDebugEnabled()) { -621 LOG.debug("Table " + tableName + " has not deployed region " + pair.getFirst() -622 .getEncodedName()); -623 } -624 notDeployed++; -625 } else if (splitKeys != null -626 && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { -627 for (byte[] splitKey : splitKeys) { -628 // Just check if the splitkey is available -629 if (Bytes.equals(info.getStartKey(), splitKey)) { -630 regionCount++; -631 break; -632 } -633 } -634 } else { -635 // Always empty start row should be counted -636 regionCount++; -637 } -638 } -639 if (notDeployed > 0) { -640 if (LOG.isDebugEnabled()) { -641 LOG.debug("Table " + tableName + " has " + notDeployed + " regions"); -642 } -643 return false; -644 } else if (splitKeys != null && regionCount != splitKeys.length + 1) { -645 if (LOG.isDebugEnabled()) { -646 LOG.debug("Table " + tableName + " expected to have " + (splitKeys.length + 1) -647 + " regions, but only " + regionCount + " available"); -648 } -649 return false; -650 } else { -651 if (LOG.isDebugEnabled()) { -652 LOG.debug("Table " + tableName + " should be available"); -653 } -654 return true; -655 } -656 } catch (TableNotFoundException tnfe) { -657 LOG.warn("Table " + tableName + " not enabled, it is not exists"); -658 return false; -659 } -660 } -661 -662 @Override -663 public boolean isTableAvailable(final byte[] tableName, final byte[][] splitKeys) -664 throws IOException { -665 return isTableAvailable(TableName.valueOf(tableName), splitKeys); -666 } -667 -668 @Override -669 public HRegionLocation locateRegion(final byte[] regionName) throws IOException { -670 RegionLocations locations = locateRegion(HRegionInfo.getTable(regionName), -671 HRegionInfo.getStartKey(regionName), false, true); -672 return locations == null ? null : locations.getRegionLocation(); -673 } -674 -675 @Override -676 public boolean isDeadServer(ServerName sn) { -677 if (clusterStatusListener == null) { -678 return false; -679 } else { -680 return clusterStatusListener.isDeadServer(sn); -681 } -682 } -683 -684 @Override -685 public List<HRegionLocation> locateRegions(final TableName tableName) -686 throws IOException { -687 return locateRegions(tableName, false, true); -688 } -689 -690 @Override -691 public List<HRegionLocation> locateRegions(final byte[] tableName) -692 throws IOException { -693 return locateRegions(TableName.valueOf(tableName)); -694 } -695 -696 @Override -697 public List<HRegionLocation> locateRegions(final TableName tableName, -698 final boolean useCache, final boolean offlined) throws IOException { -699 List<HRegionInfo> regions = MetaTableAccessor -700 .getTableRegions(this, tableName, !offlined); -701 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>(); -702 for (HRegionInfo regionInfo : regions) { -703 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true); -704 if (list != null) { -705 for (HRegionLocation loc : list.getRegionLocations()) { -706 if (loc != null) { -707 locations.add(loc); -708 } -709 } -710 } -711 } -712 return locations; -713 } -714 -715 @Override -716 public List<HRegionLocation> locateRegions(final byte[] tableName, -717 final boolean useCache, final boolean offlined) throws IOException { -718 return locateRegions(TableName.valueOf(tableName), useCache, offlined); -719 } -720 -721 @Override -722 public HRegionLocation locateRegion( -723 final TableName tableName, final byte[] row) throws IOException{ -724 RegionLocations locations = locateRegion(tableName, row, true, true); -725 return locations == null ? null : locations.getRegionLocation(); -726 } -727 -728 @Override -729 public HRegionLocation locateRegion(final byte[] tableName, -730 final byte [] row) -731 throws IOException{ -732 return locateRegion(TableName.valueOf(tableName), row); -733 } -734 -735 @Override -736 public HRegionLocation relocateRegion(final TableName tableName, -737 final byte [] row) throws IOException{ -738 RegionLocations locations = relocateRegion(tableName, row, -739 RegionReplicaUtil.DEFAULT_REPLICA_ID); -740 return locations == null ? null : -741 locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); -742 } -743 -744 @Override -745 public RegionLocations relocateRegion(final TableName tableName, -746 final byte [] row, int replicaId) throws IOException{ -747 // Since this is an explicit request not to use any caching, finding -748 // disabled tables should not be desirable. This will ensure that an exception is thrown when -749 // the first time a disabled table is interacted with. -750 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { -751 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); -752 } -753 -754 return locateRegion(tableName, row, false, true, replicaId); -755 } -756 -757 @Override -758 public HRegionLocation relocateRegion(final byte[] tableName, -759 final byte [] row) throws IOException { -760 return relocateRegion(TableName.valueOf(tableName), row); -761 } -762 -763 @Override -764 public RegionLocations locateRegion(final TableName tableName, -765 final byte [] row, boolean useCache, boolean retry) -766 throws IOException { -767 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); -768 } -769 -770 @Override -771 public RegionLocations locateRegion(final TableName tableName, -772 final byte [] row, boolean useCache, boolean retry, int replicaId) -