Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 44AFF18B5D for ; Tue, 22 Dec 2015 23:24:33 +0000 (UTC) Received: (qmail 77288 invoked by uid 500); 22 Dec 2015 23:24:26 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 77190 invoked by uid 500); 22 Dec 2015 23:24:26 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 75544 invoked by uid 99); 22 Dec 2015 23:24:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Dec 2015 23:24:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EEFD4E0525; Tue, 22 Dec 2015 23:24:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Tue, 22 Dec 2015 23:25:00 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [37/51] [partial] hbase-site git commit: Published site at 95a13b51ee052eb73882682e8f009bfa1e914866. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/32d40534/devapidocs/src-html/org/apache/hadoop/hbase/client/HTable.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/HTable.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/HTable.html index fa3eec2..0fc2c34 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/client/HTable.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/HTable.html @@ -136,1195 +136,1196 @@ 128 if (maxThreads == 0) { 129 maxThreads = 1; // is there a better default? 130 } -131 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); -132 -133 // Using the "direct handoff" approach, new threads will only be created -134 // if it is necessary and will grow unbounded. This could be bad but in HCM -135 // we only create as many Runnables as there are region servers. It means -136 // it also scales when new region servers are added. -137 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, -138 new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable")); -139 pool.allowCoreThreadTimeOut(true); -140 return pool; -141 } -142 -143 /** -144 * Creates an object to access a HBase table. -145 * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to -146 * get a {@link Table} instance (use {@link Table} instead of {@link HTable}). -147 * @param tableName Name of the table. -148 * @param connection HConnection to be used. -149 * @param pool ExecutorService to be used. -150 * @throws IOException if a remote or network exception occurs -151 */ -152 @InterfaceAudience.Private -153 protected HTable(TableName tableName, final ClusterConnection connection, -154 final TableConfiguration tableConfig, -155 final RpcRetryingCallerFactory rpcCallerFactory, -156 final RpcControllerFactory rpcControllerFactory, -157 final ExecutorService pool) throws IOException { -158 if (connection == null || connection.isClosed()) { -159 throw new IllegalArgumentException("Connection is null or closed."); -160 } -161 this.tableName = tableName; -162 this.cleanupConnectionOnClose = false; -163 this.connection = connection; -164 this.configuration = connection.getConfiguration(); -165 this.tableConfiguration = tableConfig; -166 this.pool = pool; -167 if (pool == null) { -168 this.pool = getDefaultExecutor(this.configuration); -169 this.cleanupPoolOnClose = true; -170 } else { -171 this.cleanupPoolOnClose = false; -172 } -173 -174 this.rpcCallerFactory = rpcCallerFactory; -175 this.rpcControllerFactory = rpcControllerFactory; -176 -177 this.finishSetup(); -178 } -179 -180 /** -181 * For internal testing. Uses Connection provided in {@code params}. -182 * @throws IOException -183 */ -184 @VisibleForTesting -185 protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { -186 connection = conn; -187 tableName = params.getTableName(); -188 tableConfiguration = new TableConfiguration(connection.getConfiguration()); -189 cleanupPoolOnClose = false; -190 cleanupConnectionOnClose = false; -191 // used from tests, don't trust the connection is real -192 this.mutator = new BufferedMutatorImpl(conn, null, null, params); -193 } -194 -195 /** -196 * @return maxKeyValueSize from configuration. -197 */ -198 public static int getMaxKeyValueSize(Configuration conf) { -199 return conf.getInt("hbase.client.keyvalue.maxsize", -1); -200 } -201 -202 /** -203 * setup this HTable's parameter based on the passed configuration -204 */ -205 private void finishSetup() throws IOException { -206 if (tableConfiguration == null) { -207 tableConfiguration = new TableConfiguration(configuration); -208 } -209 -210 this.operationTimeout = tableName.isSystemTable() ? -211 tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); -212 this.scannerCaching = tableConfiguration.getScannerCaching(); -213 this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); -214 if (this.rpcCallerFactory == null) { -215 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); -216 } -217 if (this.rpcControllerFactory == null) { -218 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); -219 } -220 -221 // puts need to track errors globally due to how the APIs currently work. -222 multiAp = this.connection.getAsyncProcess(); -223 this.locator = new HRegionLocator(getName(), connection); -224 } -225 -226 /** -227 * {@inheritDoc} -228 */ -229 @Override -230 public Configuration getConfiguration() { -231 return configuration; -232 } -233 -234 /** -235 * {@inheritDoc} -236 */ -237 @Override -238 public byte [] getTableName() { -239 return this.tableName.getName(); -240 } -241 -242 @Override -243 public TableName getName() { -244 return tableName; -245 } -246 -247 /** -248 * <em>INTERNAL</em> Used by unit tests and tools to do low-level -249 * manipulations. -250 * @return An HConnection instance. -251 * @deprecated This method will be changed from public to package protected. -252 */ -253 // TODO(tsuna): Remove this. Unit tests shouldn't require public helpers. -254 @Deprecated -255 @VisibleForTesting -256 public HConnection getConnection() { -257 return this.connection; -258 } -259 -260 /** -261 * {@inheritDoc} -262 */ -263 @Override -264 public HTableDescriptor getTableDescriptor() throws IOException { -265 HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout); -266 if (htd != null) { -267 return new UnmodifyableHTableDescriptor(htd); -268 } -269 return null; -270 } -271 -272 private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException { -273 RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(); -274 try { -275 return caller.callWithRetries(callable, operationTimeout); -276 } finally { -277 callable.close(); -278 } -279 } -280 -281 /** -282 * Get the corresponding start keys and regions for an arbitrary range of -283 * keys. -284 * <p> -285 * @param startKey Starting row in range, inclusive -286 * @param endKey Ending row in range -287 * @param includeEndKey true if endRow is inclusive, false if exclusive -288 * @return A pair of list of start keys and list of HRegionLocations that -289 * contain the specified range -290 * @throws IOException if a remote or network exception occurs -291 */ -292 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( -293 final byte[] startKey, final byte[] endKey, final boolean includeEndKey) -294 throws IOException { -295 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); -296 } -297 -298 /** -299 * Get the corresponding start keys and regions for an arbitrary range of -300 * keys. -301 * <p> -302 * @param startKey Starting row in range, inclusive -303 * @param endKey Ending row in range -304 * @param includeEndKey true if endRow is inclusive, false if exclusive -305 * @param reload true to reload information or false to use cached information -306 * @return A pair of list of start keys and list of HRegionLocations that -307 * contain the specified range -308 * @throws IOException if a remote or network exception occurs -309 */ -310 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( -311 final byte[] startKey, final byte[] endKey, final boolean includeEndKey, -312 final boolean reload) throws IOException { -313 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW); -314 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { -315 throw new IllegalArgumentException( -316 "Invalid range: " + Bytes.toStringBinary(startKey) + -317 " > " + Bytes.toStringBinary(endKey)); -318 } -319 List<byte[]> keysInRange = new ArrayList<byte[]>(); -320 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>(); -321 byte[] currentKey = startKey; -322 do { -323 HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload); -324 keysInRange.add(currentKey); -325 regionsInRange.add(regionLocation); -326 currentKey = regionLocation.getRegionInfo().getEndKey(); -327 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) -328 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 -329 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); -330 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange, -331 regionsInRange); -332 } -333 -334 /** -335 * The underlying {@link HTable} must not be closed. -336 * {@link HTableInterface#getScanner(Scan)} has other usage details. -337 */ -338 @Override -339 public ResultScanner getScanner(final Scan scan) throws IOException { -340 if (scan.getBatch() > 0 && scan.isSmall()) { -341 throw new IllegalArgumentException("Small scan should not be used with batching"); -342 } -343 -344 if (scan.getCaching() <= 0) { -345 scan.setCaching(scannerCaching); -346 } -347 if (scan.getMaxResultSize() <= 0) { -348 scan.setMaxResultSize(scannerMaxResultSize); -349 } -350 -351 Boolean async = scan.isAsyncPrefetch(); -352 if (async == null) { -353 async = tableConfiguration.isClientScannerAsyncPrefetch(); -354 } -355 -356 if (scan.isReversed()) { -357 if (scan.isSmall()) { -358 return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), -359 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, -360 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); -361 } else { -362 return new ReversedClientScanner(getConfiguration(), scan, getName(), -363 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, -364 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); -365 } -366 } -367 -368 if (scan.isSmall()) { -369 return new ClientSmallScanner(getConfiguration(), scan, getName(), -370 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, -371 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); -372 } else { -373 if (async) { -374 return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, -375 this.rpcCallerFactory, this.rpcControllerFactory, -376 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); -377 } else { -378 return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, -379 this.rpcCallerFactory, this.rpcControllerFactory, -380 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); -381 } -382 } -383 } -384 -385 /** -386 * The underlying {@link HTable} must not be closed. -387 * {@link HTableInterface#getScanner(byte[])} has other usage details. -388 */ -389 @Override -390 public ResultScanner getScanner(byte [] family) throws IOException { -391 Scan scan = new Scan(); -392 scan.addFamily(family); -393 return getScanner(scan); -394 } -395 -396 /** -397 * The underlying {@link HTable} must not be closed. -398 * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details. -399 */ -400 @Override -401 public ResultScanner getScanner(byte [] family, byte [] qualifier) -402 throws IOException { -403 Scan scan = new Scan(); -404 scan.addColumn(family, qualifier); -405 return getScanner(scan); -406 } -407 -408 /** -409 * {@inheritDoc} -410 */ -411 @Override -412 public Result get(final Get get) throws IOException { -413 return get(get, get.isCheckExistenceOnly()); -414 } -415 -416 private Result get(Get get, final boolean checkExistenceOnly) throws IOException { -417 // if we are changing settings to the get, clone it. -418 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { -419 get = ReflectionUtils.newInstance(get.getClass(), get); -420 get.setCheckExistenceOnly(checkExistenceOnly); -421 if (get.getConsistency() == null){ -422 get.setConsistency(defaultConsistency); -423 } -424 } -425 -426 if (get.getConsistency() == Consistency.STRONG) { -427 // Good old call. -428 final Get getReq = get; -429 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, -430 getName(), get.getRow()) { -431 @Override -432 public Result call(int callTimeout) throws IOException { -433 ClientProtos.GetRequest request = -434 RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); -435 PayloadCarryingRpcController controller = rpcControllerFactory.newController(); -436 controller.setPriority(tableName); -437 controller.setCallTimeout(callTimeout); -438 try { -439 ClientProtos.GetResponse response = getStub().get(controller, request); -440 if (response == null) return null; -441 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); -442 } catch (ServiceException se) { -443 throw ProtobufUtil.getRemoteException(se); -444 } -445 } -446 }; -447 return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout); -448 } -449 -450 // Call that takes into account the replica -451 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( -452 rpcControllerFactory, tableName, this.connection, get, pool, -453 tableConfiguration.getRetriesNumber(), -454 operationTimeout, -455 tableConfiguration.getPrimaryCallTimeoutMicroSecond()); -456 return callable.call(); -457 } -458 +131 int corePoolSize = conf.getInt("hbase.htable.threads.coresize", 1); +132 long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); +133 +134 // Using the "direct handoff" approach, new threads will only be created +135 // if it is necessary and will grow unbounded. This could be bad but in HCM +136 // we only create as many Runnables as there are region servers. It means +137 // it also scales when new region servers are added. +138 ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxThreads, keepAliveTime, +139 TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable")); +140 pool.allowCoreThreadTimeOut(true); +141 return pool; +142 } +143 +144 /** +145 * Creates an object to access a HBase table. +146 * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to +147 * get a {@link Table} instance (use {@link Table} instead of {@link HTable}). +148 * @param tableName Name of the table. +149 * @param connection HConnection to be used. +150 * @param pool ExecutorService to be used. +151 * @throws IOException if a remote or network exception occurs +152 */ +153 @InterfaceAudience.Private +154 protected HTable(TableName tableName, final ClusterConnection connection, +155 final TableConfiguration tableConfig, +156 final RpcRetryingCallerFactory rpcCallerFactory, +157 final RpcControllerFactory rpcControllerFactory, +158 final ExecutorService pool) throws IOException { +159 if (connection == null || connection.isClosed()) { +160 throw new IllegalArgumentException("Connection is null or closed."); +161 } +162 this.tableName = tableName; +163 this.cleanupConnectionOnClose = false; +164 this.connection = connection; +165 this.configuration = connection.getConfiguration(); +166 this.tableConfiguration = tableConfig; +167 this.pool = pool; +168 if (pool == null) { +169 this.pool = getDefaultExecutor(this.configuration); +170 this.cleanupPoolOnClose = true; +171 } else { +172 this.cleanupPoolOnClose = false; +173 } +174 +175 this.rpcCallerFactory = rpcCallerFactory; +176 this.rpcControllerFactory = rpcControllerFactory; +177 +178 this.finishSetup(); +179 } +180 +181 /** +182 * For internal testing. Uses Connection provided in {@code params}. +183 * @throws IOException +184 */ +185 @VisibleForTesting +186 protected HTable(ClusterConnection conn, BufferedMutatorParams params) throws IOException { +187 connection = conn; +188 tableName = params.getTableName(); +189 tableConfiguration = new TableConfiguration(connection.getConfiguration()); +190 cleanupPoolOnClose = false; +191 cleanupConnectionOnClose = false; +192 // used from tests, don't trust the connection is real +193 this.mutator = new BufferedMutatorImpl(conn, null, null, params); +194 } +195 +196 /** +197 * @return maxKeyValueSize from configuration. +198 */ +199 public static int getMaxKeyValueSize(Configuration conf) { +200 return conf.getInt("hbase.client.keyvalue.maxsize", -1); +201 } +202 +203 /** +204 * setup this HTable's parameter based on the passed configuration +205 */ +206 private void finishSetup() throws IOException { +207 if (tableConfiguration == null) { +208 tableConfiguration = new TableConfiguration(configuration); +209 } +210 +211 this.operationTimeout = tableName.isSystemTable() ? +212 tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); +213 this.scannerCaching = tableConfiguration.getScannerCaching(); +214 this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); +215 if (this.rpcCallerFactory == null) { +216 this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration); +217 } +218 if (this.rpcControllerFactory == null) { +219 this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration); +220 } +221 +222 // puts need to track errors globally due to how the APIs currently work. +223 multiAp = this.connection.getAsyncProcess(); +224 this.locator = new HRegionLocator(getName(), connection); +225 } +226 +227 /** +228 * {@inheritDoc} +229 */ +230 @Override +231 public Configuration getConfiguration() { +232 return configuration; +233 } +234 +235 /** +236 * {@inheritDoc} +237 */ +238 @Override +239 public byte [] getTableName() { +240 return this.tableName.getName(); +241 } +242 +243 @Override +244 public TableName getName() { +245 return tableName; +246 } +247 +248 /** +249 * <em>INTERNAL</em> Used by unit tests and tools to do low-level +250 * manipulations. +251 * @return An HConnection instance. +252 * @deprecated This method will be changed from public to package protected. +253 */ +254 // TODO(tsuna): Remove this. Unit tests shouldn't require public helpers. +255 @Deprecated +256 @VisibleForTesting +257 public HConnection getConnection() { +258 return this.connection; +259 } +260 +261 /** +262 * {@inheritDoc} +263 */ +264 @Override +265 public HTableDescriptor getTableDescriptor() throws IOException { +266 HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, operationTimeout); +267 if (htd != null) { +268 return new UnmodifyableHTableDescriptor(htd); +269 } +270 return null; +271 } +272 +273 private <V> V executeMasterCallable(MasterCallable<V> callable) throws IOException { +274 RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(); +275 try { +276 return caller.callWithRetries(callable, operationTimeout); +277 } finally { +278 callable.close(); +279 } +280 } +281 +282 /** +283 * Get the corresponding start keys and regions for an arbitrary range of +284 * keys. +285 * <p> +286 * @param startKey Starting row in range, inclusive +287 * @param endKey Ending row in range +288 * @param includeEndKey true if endRow is inclusive, false if exclusive +289 * @return A pair of list of start keys and list of HRegionLocations that +290 * contain the specified range +291 * @throws IOException if a remote or network exception occurs +292 */ +293 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( +294 final byte[] startKey, final byte[] endKey, final boolean includeEndKey) +295 throws IOException { +296 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); +297 } +298 +299 /** +300 * Get the corresponding start keys and regions for an arbitrary range of +301 * keys. +302 * <p> +303 * @param startKey Starting row in range, inclusive +304 * @param endKey Ending row in range +305 * @param includeEndKey true if endRow is inclusive, false if exclusive +306 * @param reload true to reload information or false to use cached information +307 * @return A pair of list of start keys and list of HRegionLocations that +308 * contain the specified range +309 * @throws IOException if a remote or network exception occurs +310 */ +311 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange( +312 final byte[] startKey, final byte[] endKey, final boolean includeEndKey, +313 final boolean reload) throws IOException { +314 final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW); +315 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { +316 throw new IllegalArgumentException( +317 "Invalid range: " + Bytes.toStringBinary(startKey) + +318 " > " + Bytes.toStringBinary(endKey)); +319 } +320 List<byte[]> keysInRange = new ArrayList<byte[]>(); +321 List<HRegionLocation> regionsInRange = new ArrayList<HRegionLocation>(); +322 byte[] currentKey = startKey; +323 do { +324 HRegionLocation regionLocation = getRegionLocator().getRegionLocation(currentKey, reload); +325 keysInRange.add(currentKey); +326 regionsInRange.add(regionLocation); +327 currentKey = regionLocation.getRegionInfo().getEndKey(); +328 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) +329 && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 +330 || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); +331 return new Pair<List<byte[]>, List<HRegionLocation>>(keysInRange, +332 regionsInRange); +333 } +334 +335 /** +336 * The underlying {@link HTable} must not be closed. +337 * {@link HTableInterface#getScanner(Scan)} has other usage details. +338 */ +339 @Override +340 public ResultScanner getScanner(final Scan scan) throws IOException { +341 if (scan.getBatch() > 0 && scan.isSmall()) { +342 throw new IllegalArgumentException("Small scan should not be used with batching"); +343 } +344 +345 if (scan.getCaching() <= 0) { +346 scan.setCaching(scannerCaching); +347 } +348 if (scan.getMaxResultSize() <= 0) { +349 scan.setMaxResultSize(scannerMaxResultSize); +350 } +351 +352 Boolean async = scan.isAsyncPrefetch(); +353 if (async == null) { +354 async = tableConfiguration.isClientScannerAsyncPrefetch(); +355 } +356 +357 if (scan.isReversed()) { +358 if (scan.isSmall()) { +359 return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), +360 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, +361 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); +362 } else { +363 return new ReversedClientScanner(getConfiguration(), scan, getName(), +364 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, +365 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); +366 } +367 } +368 +369 if (scan.isSmall()) { +370 return new ClientSmallScanner(getConfiguration(), scan, getName(), +371 this.connection, this.rpcCallerFactory, this.rpcControllerFactory, +372 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); +373 } else { +374 if (async) { +375 return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, +376 this.rpcCallerFactory, this.rpcControllerFactory, +377 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); +378 } else { +379 return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, +380 this.rpcCallerFactory, this.rpcControllerFactory, +381 pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); +382 } +383 } +384 } +385 +386 /** +387 * The underlying {@link HTable} must not be closed. +388 * {@link HTableInterface#getScanner(byte[])} has other usage details. +389 */ +390 @Override +391 public ResultScanner getScanner(byte [] family) throws IOException { +392 Scan scan = new Scan(); +393 scan.addFamily(family); +394 return getScanner(scan); +395 } +396 +397 /** +398 * The underlying {@link HTable} must not be closed. +399 * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details. +400 */ +401 @Override +402 public ResultScanner getScanner(byte [] family, byte [] qualifier) +403 throws IOException { +404 Scan scan = new Scan(); +405 scan.addColumn(family, qualifier); +406 return getScanner(scan); +407 } +408 +409 /** +410 * {@inheritDoc} +411 */ +412 @Override +413 public Result get(final Get get) throws IOException { +414 return get(get, get.isCheckExistenceOnly()); +415 } +416 +417 private Result get(Get get, final boolean checkExistenceOnly) throws IOException { +418 // if we are changing settings to the get, clone it. +419 if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) { +420 get = ReflectionUtils.newInstance(get.getClass(), get); +421 get.setCheckExistenceOnly(checkExistenceOnly); +422 if (get.getConsistency() == null){ +423 get.setConsistency(defaultConsistency); +424 } +425 } +426 +427 if (get.getConsistency() == Consistency.STRONG) { +428 // Good old call. +429 final Get getReq = get; +430 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, +431 getName(), get.getRow()) { +432 @Override +433 public Result call(int callTimeout) throws IOException { +434 ClientProtos.GetRequest request = +435 RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); +436 PayloadCarryingRpcController controller = rpcControllerFactory.newController(); +437 controller.setPriority(tableName); +438 controller.setCallTimeout(callTimeout); +439 try { +440 ClientProtos.GetResponse response = getStub().get(controller, request); +441 if (response == null) return null; +442 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); +443 } catch (ServiceException se) { +444 throw ProtobufUtil.getRemoteException(se); +445 } +446 } +447 }; +448 return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout); +449 } +450 +451 // Call that takes into account the replica +452 RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( +453 rpcControllerFactory, tableName, this.connection, get, pool, +454 tableConfiguration.getRetriesNumber(), +455 operationTimeout, +456 tableConfiguration.getPrimaryCallTimeoutMicroSecond()); +457 return callable.call(); +458 } 459 -460 /** -461 * {@inheritDoc} -462 */ -463 @Override -464 public Result[] get(List<Get> gets) throws IOException { -465 if (gets.size() == 1) { -466 return new Result[]{get(gets.get(0))}; -467 } -468 try { -469 Object[] r1 = new Object[gets.size()]; -470 batch((List) gets, r1); -471 -472 // translate. -473 Result [] results = new Result[r1.length]; -474 int i=0; -475 for (Object o : r1) { -476 // batch ensures if there is a failure we get an exception instead -477 results[i++] = (Result) o; -478 } -479 -480 return results; -481 } catch (InterruptedException e) { -482 throw (InterruptedIOException)new InterruptedIOException().initCause(e); -483 } -484 } -485 -486 /** -487 * {@inheritDoc} -488 */ -489 @Override -490 public void batch(final List<? extends Row> actions, final Object[] results) -491 throws InterruptedException, IOException { -492 AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results); -493 ars.waitUntilDone(); -494 if (ars.hasError()) { -495 throw ars.getErrors(); -496 } -497 } -498 -499 /** -500 * {@inheritDoc} -501 */ -502 @Override -503 public <R> void batchCallback( -504 final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback) -505 throws IOException, InterruptedException { -506 connection.processBatchCallback(actions, tableName, pool, results, callback); -507 } -508 -509 /** -510 * {@inheritDoc} -511 */ -512 @Override -513 public void delete(final Delete delete) -514 throws IOException { -515 RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection, -516 tableName, delete.getRow()) { -517 @Override -518 public Boolean call(int callTimeout) throws IOException { -519 PayloadCarryingRpcController controller = rpcControllerFactory.newController(); -520 controller.setPriority(tableName); -521 controller.setCallTimeout(callTimeout); -522 -523 try { -524 MutateRequest request = RequestConverter.buildMutateRequest( -525 getLocation().getRegionInfo().getRegionName(), delete); -526 MutateResponse response = getStub().mutate(controller, request); -527 return Boolean.valueOf(response.getProcessed()); -528 } catch (ServiceException se) { -529 throw ProtobufUtil.getRemoteException(se); -530 } -531 } -532 }; -533 rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout); -534 } -535 -536 /** -537 * {@inheritDoc} -538 */ -539 @Override -540 public void delete(final List<Delete> deletes) -541 throws IOException { -542 Object[] results = new Object[deletes.size()]; -543 try { -544 batch(deletes, results); -545 } catch (InterruptedException e) { -546 throw (InterruptedIOException)new InterruptedIOException().initCause(e); -547 } finally { -548 // mutate list so that it is empty for complete success, or contains only failed records -549 // results are returned in the same order as the requests in list walk the list backwards, -550 // so we can remove from list without impacting the indexes of earlier members -551 for (int i = results.length - 1; i>=0; i--) { -552 // if result is not null, it succeeded -553 if (results[i] instanceof Result) { -554 deletes.remove(i); -555 } -556 } -557 } -558 } -559 -560 /** -561 * {@inheritDoc} -562 * @throws IOException -563 */ -564 @Override -565 public void put(final Put put) throws IOException { -566 getBufferedMutator().mutate(put); -567 if (autoFlush) { -568 flushCommits(); -569 } -570 } -571 -572 /** -573 * {@inheritDoc} -574 * @throws IOException -575 */ -576 @Override -577 public void put(final List<Put> puts) throws IOException { -578 getBufferedMutator().mutate(puts); -579 if (autoFlush) { -580 flushCommits(); -581 } -582 } -583 -584 /** -585 * {@inheritDoc} -586 */ -587 @Override -588 public void mutateRow(final RowMutations rm) throws IOException { -589 RegionServerCallable<Void> callable = -590 new RegionServerCallable<Void>(connection, getName(), rm.getRow()) { -591 @Override -592 public Void call(int callTimeout) throws IOException { -593 PayloadCarryingRpcController controller = rpcControllerFactory.newController(); -594 controller.setPriority(tableName); -595 controller.setCallTimeout(callTimeout); -596 try { -597 RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( -598 getLocation().getRegionInfo().getRegionName(), rm); -599 regionMutationBuilder.setAtomic(true); -600 MultiRequest request = -601 MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); -602 ClientProtos.MultiResponse response = getStub().multi(controller, request); -603 ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); -604 if (res.hasException()) { -605 Throwable ex = ProtobufUtil.toException(res.getException()); -606 if(ex instanceof IOException) { -607 throw (IOException)ex; -608 } -609 throw new IOException("Failed to mutate row: "+Bytes.toStringBinary(rm.getRow()), ex); -610 } -611 } catch (ServiceException se) { -612 throw ProtobufUtil.getRemoteException(se); -613 } -614 return null; -615 } -616 }; -617 rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout); -618 } -619 -620 /** -621 * {@inheritDoc} -622 */ -623 @Override -624 public Result append(final Append append) throws IOException { -625 if (append.numFamilies() == 0) { -626 throw new IOException( -627 "Invalid arguments to append, no columns specified"); -628 } -629 -630 NonceGenerator ng = this.connection.getNonceGenerator(); -631 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); -632 RegionServerCallable<Result> callable = -633 new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) { -634 @Override -635 public Result call(int callTimeout) throws IOException { -636 PayloadCarryingRpcController controller = rpcControllerFactory.newController(); -637 controller.setPriority(getTableName()); -638 controller.setCallTimeout(callTimeout); -639 try { -640 MutateRequest request = RequestConverter.buildMutateRequest( -641 getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); -642 MutateResponse response = getStub().mutate(controller, request); -643 if (!response.hasResult()) return null; -644 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); -645 } catch (ServiceException se) { -646 throw ProtobufUtil.getRemoteException(se); -647 } -648 } -649 }; -650 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); -651 } -652 -653 /** -654 * {@inheritDoc} -655 */ -656 @Override -657 public Result increment(final Increment increment) throws IOException { -658 if (!increment.hasFamilies()) { -659 throw new IOException( -660 "Invalid arguments to increment, no columns specified"); -661 } -662 NonceGenerator ng = this.connection.getNonceGenerator(); -663 final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); -664 RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, -665 getName(), increment.getRow()) { -666 @Override -667 public Result call(int callTimeout) throws IOException { -668 PayloadCarryingRpcController controller = rpcControllerFactory.newController(); -669 controller.setPriority(getTableName()); -670 controller.setCallTimeout(callTimeout); -671 try { -672 MutateRequest request = RequestConverter.buildMutateRequest( -673 getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); -674 MutateResponse response = getStub().mutate(controller, request); -675 return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); -676 } catch (ServiceException se) { -677 throw ProtobufUtil.getRemoteException(se); -678 } -679 } -680 }; -681 return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); -682 } -683 -684 /** -685 * {@inheritDoc} -686 */ -687 @Override -688 public long incrementColumnValue(final byte [] row, final byte [] family, -689 final byte [] qualifier, final long amount) -690 throws IOException { -691 return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); -692 } -693 -694 /** -695 * {@inheritDoc} -696 */ -697 @Override -698 public long incrementColumnValue(final byte [] row, final byte [] family, -699 final byte [] qualifier, final long amount, final Durability durability) -700 throws IOException { -701 NullPointerExcepti