Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D8FB7200CC1 for ; Sun, 25 Jun 2017 17:01:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D5EEA160BE0; Sun, 25 Jun 2017 15:01:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8702C160BF7 for ; Sun, 25 Jun 2017 17:01:20 +0200 (CEST) Received: (qmail 11773 invoked by uid 500); 25 Jun 2017 15:01:19 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 11620 invoked by uid 99); 25 Jun 2017 15:01:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 25 Jun 2017 15:01:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 160C9DFA3B; Sun, 25 Jun 2017 15:01:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Sun, 25 Jun 2017 15:01:27 -0000 Message-Id: <0829375e50624fa1833cab86680595bc@git.apache.org> In-Reply-To: <4269656aec024d9ca6a3695cf919610c@git.apache.org> References: <4269656aec024d9ca6a3695cf919610c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/21] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Sun, 25 Jun 2017 15:01:23 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6bd22543/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannerHolder.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannerHolder.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannerHolder.html index 04ecf44..513a635 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannerHolder.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.RegionScannerHolder.html @@ -262,1950 +262,1950 @@ 254 */ 255 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10; 256 -257 // Request counter. (Includes requests that are not serviced by regions.) -258 final LongAdder requestCount = new LongAdder(); -259 -260 // Request counter for rpc get -261 final LongAdder rpcGetRequestCount = new LongAdder(); -262 -263 // Request counter for rpc scan -264 final LongAdder rpcScanRequestCount = new LongAdder(); +257 /** +258 * Number of rows in a batch operation above which a warning will be logged. +259 */ +260 static final String BATCH_ROWS_THRESHOLD_NAME = "hbase.rpc.rows.warning.threshold"; +261 /** +262 * Default value of {@link RSRpcServices#BATCH_ROWS_THRESHOLD_NAME} +263 */ +264 static final int BATCH_ROWS_THRESHOLD_DEFAULT = 1000; 265 -266 // Request counter for rpc multi -267 final LongAdder rpcMultiRequestCount = new LongAdder(); +266 // Request counter. (Includes requests that are not serviced by regions.) +267 final LongAdder requestCount = new LongAdder(); 268 -269 // Request counter for rpc mutate -270 final LongAdder rpcMutateRequestCount = new LongAdder(); +269 // Request counter for rpc get +270 final LongAdder rpcGetRequestCount = new LongAdder(); 271 -272 // Server to handle client requests. -273 final RpcServerInterface rpcServer; -274 final InetSocketAddress isa; -275 -276 private final HRegionServer regionServer; -277 private final long maxScannerResultSize; -278 -279 // The reference to the priority extraction function -280 private final PriorityFunction priority; -281 -282 private ScannerIdGenerator scannerIdGenerator; -283 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); -284 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients -285 // which may send next or close request to a region scanner which has already been exhausted. The -286 // entries will be removed automatically after scannerLeaseTimeoutPeriod. -287 private final Cache<String, String> closedScanners; -288 /** -289 * The lease timeout period for client scanners (milliseconds). -290 */ -291 private final int scannerLeaseTimeoutPeriod; -292 -293 /** -294 * The RPC timeout period (milliseconds) -295 */ -296 private final int rpcTimeout; -297 -298 /** -299 * The minimum allowable delta to use for the scan limit -300 */ -301 private final long minimumScanTimeLimitDelta; -302 -303 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); -304 -305 /** -306 * An Rpc callback for closing a RegionScanner. -307 */ -308 private static final class RegionScannerCloseCallBack implements RpcCallback { -309 -310 private final RegionScanner scanner; +272 // Request counter for rpc scan +273 final LongAdder rpcScanRequestCount = new LongAdder(); +274 +275 // Request counter for rpc multi +276 final LongAdder rpcMultiRequestCount = new LongAdder(); +277 +278 // Request counter for rpc mutate +279 final LongAdder rpcMutateRequestCount = new LongAdder(); +280 +281 // Server to handle client requests. +282 final RpcServerInterface rpcServer; +283 final InetSocketAddress isa; +284 +285 private final HRegionServer regionServer; +286 private final long maxScannerResultSize; +287 +288 // The reference to the priority extraction function +289 private final PriorityFunction priority; +290 +291 private ScannerIdGenerator scannerIdGenerator; +292 private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>(); +293 // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients +294 // which may send next or close request to a region scanner which has already been exhausted. The +295 // entries will be removed automatically after scannerLeaseTimeoutPeriod. +296 private final Cache<String, String> closedScanners; +297 /** +298 * The lease timeout period for client scanners (milliseconds). +299 */ +300 private final int scannerLeaseTimeoutPeriod; +301 +302 /** +303 * The RPC timeout period (milliseconds) +304 */ +305 private final int rpcTimeout; +306 +307 /** +308 * The minimum allowable delta to use for the scan limit +309 */ +310 private final long minimumScanTimeLimitDelta; 311 -312 public RegionScannerCloseCallBack(RegionScanner scanner) { -313 this.scanner = scanner; -314 } -315 -316 @Override -317 public void run() throws IOException { -318 this.scanner.close(); -319 } -320 } -321 -322 /** -323 * An Rpc callback for doing shipped() call on a RegionScanner. -324 */ -325 private class RegionScannerShippedCallBack implements RpcCallback { -326 -327 private final String scannerName; -328 private final RegionScanner scanner; -329 private final Lease lease; -330 -331 public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) { -332 this.scannerName = scannerName; -333 this.scanner = scanner; -334 this.lease = lease; -335 } -336 -337 @Override -338 public void run() throws IOException { -339 this.scanner.shipped(); -340 // We're done. On way out re-add the above removed lease. The lease was temp removed for this -341 // Rpc call and we are at end of the call now. Time to add it back. -342 if (scanners.containsKey(scannerName)) { -343 if (lease != null) regionServer.leases.addLease(lease); -344 } -345 } -346 } -347 -348 /** -349 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on -350 * completion of multiGets. -351 */ -352 static class RegionScannersCloseCallBack implements RpcCallback { -353 private final List<RegionScanner> scanners = new ArrayList<>(); -354 -355 public void addScanner(RegionScanner scanner) { -356 this.scanners.add(scanner); -357 } -358 -359 @Override -360 public void run() { -361 for (RegionScanner scanner : scanners) { -362 try { -363 scanner.close(); -364 } catch (IOException e) { -365 LOG.error("Exception while closing the scanner " + scanner, e); -366 } -367 } -368 } -369 } -370 -371 /** -372 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. -373 */ -374 private static final class RegionScannerHolder { -375 -376 private final AtomicLong nextCallSeq = new AtomicLong(0); -377 private final String scannerName; -378 private final RegionScanner s; -379 private final Region r; -380 private final RpcCallback closeCallBack; -381 private final RpcCallback shippedCallback; -382 private byte[] rowOfLastPartialResult; -383 private boolean needCursor; +312 /** +313 * Row size threshold for multi requests above which a warning is logged +314 */ +315 private final int rowSizeWarnThreshold; +316 +317 final AtomicBoolean clearCompactionQueues = new AtomicBoolean(false); +318 +319 /** +320 * An Rpc callback for closing a RegionScanner. +321 */ +322 private static final class RegionScannerCloseCallBack implements RpcCallback { +323 +324 private final RegionScanner scanner; +325 +326 public RegionScannerCloseCallBack(RegionScanner scanner) { +327 this.scanner = scanner; +328 } +329 +330 @Override +331 public void run() throws IOException { +332 this.scanner.close(); +333 } +334 } +335 +336 /** +337 * An Rpc callback for doing shipped() call on a RegionScanner. +338 */ +339 private class RegionScannerShippedCallBack implements RpcCallback { +340 +341 private final String scannerName; +342 private final RegionScanner scanner; +343 private final Lease lease; +344 +345 public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) { +346 this.scannerName = scannerName; +347 this.scanner = scanner; +348 this.lease = lease; +349 } +350 +351 @Override +352 public void run() throws IOException { +353 this.scanner.shipped(); +354 // We're done. On way out re-add the above removed lease. The lease was temp removed for this +355 // Rpc call and we are at end of the call now. Time to add it back. +356 if (scanners.containsKey(scannerName)) { +357 if (lease != null) regionServer.leases.addLease(lease); +358 } +359 } +360 } +361 +362 /** +363 * An RpcCallBack that creates a list of scanners that needs to perform callBack operation on +364 * completion of multiGets. +365 */ +366 static class RegionScannersCloseCallBack implements RpcCallback { +367 private final List<RegionScanner> scanners = new ArrayList<>(); +368 +369 public void addScanner(RegionScanner scanner) { +370 this.scanners.add(scanner); +371 } +372 +373 @Override +374 public void run() { +375 for (RegionScanner scanner : scanners) { +376 try { +377 scanner.close(); +378 } catch (IOException e) { +379 LOG.error("Exception while closing the scanner " + scanner, e); +380 } +381 } +382 } +383 } 384 -385 public RegionScannerHolder(String scannerName, RegionScanner s, Region r, -386 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { -387 this.scannerName = scannerName; -388 this.s = s; -389 this.r = r; -390 this.closeCallBack = closeCallBack; -391 this.shippedCallback = shippedCallback; -392 this.needCursor = needCursor; -393 } -394 -395 public long getNextCallSeq() { -396 return nextCallSeq.get(); -397 } +385 /** +386 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. +387 */ +388 private static final class RegionScannerHolder { +389 +390 private final AtomicLong nextCallSeq = new AtomicLong(0); +391 private final String scannerName; +392 private final RegionScanner s; +393 private final Region r; +394 private final RpcCallback closeCallBack; +395 private final RpcCallback shippedCallback; +396 private byte[] rowOfLastPartialResult; +397 private boolean needCursor; 398 -399 public boolean incNextCallSeq(long currentSeq) { -400 // Use CAS to prevent multiple scan request running on the same scanner. -401 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); -402 } -403 } -404 -405 /** -406 * Instantiated as a scanner lease. If the lease times out, the scanner is -407 * closed -408 */ -409 private class ScannerListener implements LeaseListener { -410 private final String scannerName; -411 -412 ScannerListener(final String n) { -413 this.scannerName = n; -414 } -415 -416 @Override -417 public void leaseExpired() { -418 RegionScannerHolder rsh = scanners.remove(this.scannerName); -419 if (rsh != null) { -420 RegionScanner s = rsh.s; -421 LOG.info("Scanner " + this.scannerName + " lease expired on region " -422 + s.getRegionInfo().getRegionNameAsString()); -423 Region region = null; -424 try { -425 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); -426 if (region != null && region.getCoprocessorHost() != null) { -427 region.getCoprocessorHost().preScannerClose(s); -428 } -429 } catch (IOException e) { -430 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); -431 } finally { -432 try { -433 s.close(); -434 if (region != null && region.getCoprocessorHost() != null) { -435 region.getCoprocessorHost().postScannerClose(s); -436 } -437 } catch (IOException e) { -438 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); -439 } -440 } -441 } else { -442 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + -443 " scanner found, hence no chance to close that related scanner!"); -444 } -445 } -446 } -447 -448 private static ResultOrException getResultOrException(final ClientProtos.Result r, -449 final int index){ -450 return getResultOrException(ResponseConverter.buildActionResult(r), index); -451 } -452 -453 private static ResultOrException getResultOrException(final Exception e, final int index) { -454 return getResultOrException(ResponseConverter.buildActionResult(e), index); -455 } -456 -457 private static ResultOrException getResultOrException( -458 final ResultOrException.Builder builder, final int index) { -459 return builder.setIndex(index).build(); +399 public RegionScannerHolder(String scannerName, RegionScanner s, Region r, +400 RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) { +401 this.scannerName = scannerName; +402 this.s = s; +403 this.r = r; +404 this.closeCallBack = closeCallBack; +405 this.shippedCallback = shippedCallback; +406 this.needCursor = needCursor; +407 } +408 +409 public long getNextCallSeq() { +410 return nextCallSeq.get(); +411 } +412 +413 public boolean incNextCallSeq(long currentSeq) { +414 // Use CAS to prevent multiple scan request running on the same scanner. +415 return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); +416 } +417 } +418 +419 /** +420 * Instantiated as a scanner lease. If the lease times out, the scanner is +421 * closed +422 */ +423 private class ScannerListener implements LeaseListener { +424 private final String scannerName; +425 +426 ScannerListener(final String n) { +427 this.scannerName = n; +428 } +429 +430 @Override +431 public void leaseExpired() { +432 RegionScannerHolder rsh = scanners.remove(this.scannerName); +433 if (rsh != null) { +434 RegionScanner s = rsh.s; +435 LOG.info("Scanner " + this.scannerName + " lease expired on region " +436 + s.getRegionInfo().getRegionNameAsString()); +437 Region region = null; +438 try { +439 region = regionServer.getRegion(s.getRegionInfo().getRegionName()); +440 if (region != null && region.getCoprocessorHost() != null) { +441 region.getCoprocessorHost().preScannerClose(s); +442 } +443 } catch (IOException e) { +444 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); +445 } finally { +446 try { +447 s.close(); +448 if (region != null && region.getCoprocessorHost() != null) { +449 region.getCoprocessorHost().postScannerClose(s); +450 } +451 } catch (IOException e) { +452 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e); +453 } +454 } +455 } else { +456 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" + +457 " scanner found, hence no chance to close that related scanner!"); +458 } +459 } 460 } 461 -462 /** -463 * Starts the nonce operation for a mutation, if needed. -464 * @param mutation Mutation. -465 * @param nonceGroup Nonce group from the request. -466 * @returns whether to proceed this mutation. -467 */ -468 private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) -469 throws IOException { -470 if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; -471 boolean canProceed = false; -472 try { -473 canProceed = regionServer.nonceManager.startOperation( -474 nonceGroup, mutation.getNonce(), regionServer); -475 } catch (InterruptedException ex) { -476 throw new InterruptedIOException("Nonce start operation interrupted"); -477 } -478 return canProceed; -479 } -480 -481 /** -482 * Ends nonce operation for a mutation, if needed. -483 * @param mutation Mutation. -484 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. -485 * @param success Whether the operation for this nonce has succeeded. -486 */ -487 private void endNonceOperation(final MutationProto mutation, -488 long nonceGroup, boolean success) { -489 if (regionServer.nonceManager != null && mutation.hasNonce()) { -490 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); +462 private static ResultOrException getResultOrException(final ClientProtos.Result r, +463 final int index){ +464 return getResultOrException(ResponseConverter.buildActionResult(r), index); +465 } +466 +467 private static ResultOrException getResultOrException(final Exception e, final int index) { +468 return getResultOrException(ResponseConverter.buildActionResult(e), index); +469 } +470 +471 private static ResultOrException getResultOrException( +472 final ResultOrException.Builder builder, final int index) { +473 return builder.setIndex(index).build(); +474 } +475 +476 /** +477 * Starts the nonce operation for a mutation, if needed. +478 * @param mutation Mutation. +479 * @param nonceGroup Nonce group from the request. +480 * @returns whether to proceed this mutation. +481 */ +482 private boolean startNonceOperation(final MutationProto mutation, long nonceGroup) +483 throws IOException { +484 if (regionServer.nonceManager == null || !mutation.hasNonce()) return true; +485 boolean canProceed = false; +486 try { +487 canProceed = regionServer.nonceManager.startOperation( +488 nonceGroup, mutation.getNonce(), regionServer); +489 } catch (InterruptedException ex) { +490 throw new InterruptedIOException("Nonce start operation interrupted"); 491 } -492 } -493 -494 private boolean isClientCellBlockSupport(RpcCallContext context) { -495 return context != null && context.isClientCellBlockSupported(); -496 } -497 -498 private void addResult(final MutateResponse.Builder builder, final Result result, -499 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { -500 if (result == null) return; -501 if (clientCellBlockSupported) { -502 builder.setResult(ProtobufUtil.toResultNoData(result)); -503 rpcc.setCellScanner(result.cellScanner()); -504 } else { -505 ClientProtos.Result pbr = ProtobufUtil.toResult(result); -506 builder.setResult(pbr); -507 } -508 } -509 -510 private void addResults(ScanResponse.Builder builder, List<Result> results, -511 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { -512 builder.setStale(!isDefaultRegion); -513 if (results.isEmpty()) { -514 return; -515 } -516 if (clientCellBlockSupported) { -517 for (Result res : results) { -518 builder.addCellsPerResult(res.size()); -519 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); -520 } -521 controller.setCellScanner(CellUtil.createCellScanner(results)); -522 } else { -523 for (Result res : results) { -524 ClientProtos.Result pbr = ProtobufUtil.toResult(res); -525 builder.addResults(pbr); -526 } -527 } -528 } -529 -530 /** -531 * Mutate a list of rows atomically. -532 * -533 * @param region -534 * @param actions -535 * @param cellScanner if non-null, the mutation data -- the Cell content. -536 * @throws IOException -537 */ -538 private void mutateRows(final Region region, -539 final List<ClientProtos.Action> actions, -540 final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { -541 if (!region.getRegionInfo().isMetaTable()) { -542 regionServer.cacheFlusher.reclaimMemStoreMemory(); -543 } -544 RowMutations rm = null; -545 int i = 0; -546 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = -547 ClientProtos.ResultOrException.newBuilder(); -548 for (ClientProtos.Action action: actions) { -549 if (action.hasGet()) { -550 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + -551 action.getGet()); -552 } -553 MutationType type = action.getMutation().getMutateType(); -554 if (rm == null) { -555 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); -556 } -557 switch (type) { -558 case PUT: -559 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); -560 checkCellSizeLimit(region, put); -561 rm.add(put); -562 break; -563 case DELETE: -564 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); -565 break; -566 default: -567 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); -568 } -569 // To unify the response format with doNonAtomicRegionMutation and read through client's -570 // AsyncProcess we have to add an empty result instance per operation -571 resultOrExceptionOrBuilder.clear(); -572 resultOrExceptionOrBuilder.setIndex(i++); -573 builder.addResultOrException( -574 resultOrExceptionOrBuilder.build()); -575 } -576 region.mutateRow(rm); -577 } -578 -579 /** -580 * Mutate a list of rows atomically. -581 * -582 * @param region -583 * @param actions -584 * @param cellScanner if non-null, the mutation data -- the Cell content. -585 * @param row -586 * @param family -587 * @param qualifier -588 * @param compareOp -589 * @param comparator @throws IOException -590 */ -591 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, -592 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, -593 CompareOp compareOp, ByteArrayComparable comparator, RegionActionResult.Builder builder, -594 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { -595 if (!region.getRegionInfo().isMetaTable()) { -596 regionServer.cacheFlusher.reclaimMemStoreMemory(); -597 } -598 RowMutations rm = null; -599 int i = 0; -600 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = -601 ClientProtos.ResultOrException.newBuilder(); -602 for (ClientProtos.Action action: actions) { -603 if (action.hasGet()) { -604 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + -605 action.getGet()); -606 } -607 MutationType type = action.getMutation().getMutateType(); -608 if (rm == null) { -609 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); -610 } -611 switch (type) { -612 case PUT: -613 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); -614 checkCellSizeLimit(region, put); -615 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); -616 rm.add(put); -617 break; -618 case DELETE: -619 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner); -620 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del); -621 rm.add(del); -622 break; -623 default: -624 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); -625 } -626 // To unify the response format with doNonAtomicRegionMutation and read through client's -627 // AsyncProcess we have to add an empty result instance per operation -628 resultOrExceptionOrBuilder.clear(); -629 resultOrExceptionOrBuilder.setIndex(i++); -630 builder.addResultOrException( -631 resultOrExceptionOrBuilder.build()); -632 } -633 return region.checkAndRowMutate(row, family, qualifier, compareOp, -634 comparator, rm, Boolean.TRUE); -635 } -636 -637 /** -638 * Execute an append mutation. -639 * -640 * @param region -641 * @param m -642 * @param cellScanner -643 * @return result to return to client if default operation should be -644 * bypassed as indicated by RegionObserver, null otherwise -645 * @throws IOException -646 */ -647 private Result append(final Region region, final OperationQuota quota, -648 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, -649 ActivePolicyEnforcement spaceQuota) -650 throws IOException { -651 long before = EnvironmentEdgeManager.currentTime(); -652 Append append = ProtobufUtil.toAppend(mutation, cellScanner); -653 checkCellSizeLimit(region, append); -654 spaceQuota.getPolicyEnforcement(region).check(append); -655 quota.addMutation(append); -656 Result r = null; -657 if (region.getCoprocessorHost() != null) { -658 r = region.getCoprocessorHost().preAppend(append); -659 } -660 if (r == null) { -661 boolean canProceed = startNonceOperation(mutation, nonceGroup); -662 boolean success = false; -663 try { -664 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; -665 if (canProceed) { -666 r = region.append(append, nonceGroup, nonce); -667 } else { -668 // convert duplicate append to get -669 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, -670 nonceGroup, nonce); -671 r = Result.create(results); -672 } -673 success = true; -674 } finally { -675 if (canProceed) { -676 endNonceOperation(mutation, nonceGroup, success); -677 } -678 } -679 if (region.getCoprocessorHost() != null) { -680 region.getCoprocessorHost().postAppend(append, r); -681 } -682 } -683 if (regionServer.metricsRegionServer != null) { -684 regionServer.metricsRegionServer.updateAppend( -685 EnvironmentEdgeManager.currentTime() - before); -686 } -687 return r; -688 } -689 -690 /** -691 * Execute an increment mutation. -692 * -693 * @param region -694 * @param mutation -695 * @return the Result -696 * @throws IOException -697 */ -698 private Result increment(final Region region, final OperationQuota quota, -699 final MutationProto mutation, final CellScanner cells, long nonceGroup, -700 ActivePolicyEnforcement spaceQuota) -701 throws IOException { -702 long before = EnvironmentEdgeManager.currentTime(); -703 Increment increment = ProtobufUtil.toIncrement(mutation, cells); -704 checkCellSizeLimit(region, increment); -705 spaceQuota.getPolicyEnforcement(region).check(increment); -706 quota.addMutation(increment); -707 Result r = null; -708 if (region.getCoprocessorHost() != null) { -709 r = region.getCoprocessorHost().preIncrement(increment); -710 } -711 if (r == null) { -712 boolean canProceed = startNonceOperation(mutation, nonceGroup); -713 boolean success = false; -714 try { -715 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; -716 if (canProceed) { -717 r = region.increment(increment, nonceGroup, nonce); -718 } else { -719 // convert duplicate increment to get -720 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, -721 nonce); -722 r = Result.create(results); -723 } -724 success = true; -725 } finally { -726 if (canProceed) { -727 endNonceOperation(mutation, nonceGroup, success); -728 } -729 } -730 if (region.getCoprocessorHost() != null) { -731 r = region.getCoprocessorHost().postIncrement(increment, r); -732 } -733 } -734 if (regionServer.metricsRegionServer != null) { -735 regionServer.metricsRegionServer.updateIncrement( -736 EnvironmentEdgeManager.currentTime() - before); -737 } -738 return r; -739 } -740 -741 /** -742 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when -743 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. -744 * @param region -745 * @param actions -746 * @param cellScanner -747 * @param builder -748 * @param cellsToReturn Could be null. May be allocated in this method. This is what this -749 * method returns as a 'result'. -750 * @param closeCallBack the callback to be used with multigets -751 * @param context the current RpcCallContext -752 * @return Return the <code>cellScanner</code> passed -753 */ -754 private List<CellScannable> doNonAtomicRegionMutation(final Region region, -755 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, -756 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, -757 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, -758 ActivePolicyEnforcement spaceQuotaEnforcement) { -759 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do -760 // one at a time, we instead pass them in batch. Be aware that the corresponding -761 // ResultOrException instance that matches each Put or Delete is then added down in the -762 // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched -763 List<ClientProtos.Action> mutations = null; -764 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); -765 IOException sizeIOE = null; -766 Object lastBlock = null; -767 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); -768 boolean hasResultOrException = false; -769 for (ClientProtos.Action action : actions.getActionList()) { -770 hasResultOrException = false; -771 resultOrExceptionBuilder.clear(); -772 try { -773 Result r = null; -774 -775 if (context != null -776 && context.isRetryImmediatelySupported() -777 && (context.getResponseCellSize() > maxQuotaResultSize -778 || context.getResponseBlockSize() + context.getResponseExceptionSize() -779 > maxQuotaResultSize)) { -780 -781 // We're storing the exception since the exception and reason string won't -782 // change after the response size limit is reached. -783 if (sizeIOE == null ) { -784 // We don't need the stack un-winding do don't throw the exception. -785 // Throwing will kill the JVM's JIT. -786 // -787 // Instead just create the exception and then store it. -788 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" -789 + " CellSize: " + context.getResponseCellSize() -790 + " BlockSize: " + context.getResponseBlockSize()); -791 -792 // Only report the exception once since there's only one request that -793 // caused the exception. Otherwise this number will dominate the exceptions count. -794 rpcServer.getMetrics().exception(sizeIOE); -795 } -796 -797 // Now that there's an exception is known to be created -798 // use it for the response. -799 // -800 // This will create a copy in the builder. -801 hasResultOrException = true; -802 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); -803 resultOrExceptionBuilder.setException(pair); -804 context.incrementResponseExceptionSize(pair.getSerializedSize()); -805 resultOrExceptionBuilder.setIndex(action.getIndex()); -806 builder.addResultOrException(resultOrExceptionBuilder.build()); -807 if (cellScanner != null) { -808 skipCellsForMutation(action, cellScanner); +492 return canProceed; +493 } +494 +495 /** +496 * Ends nonce operation for a mutation, if needed. +497 * @param mutation Mutation. +498 * @param nonceGroup Nonce group from the request. Always 0 in initial implementation. +499 * @param success Whether the operation for this nonce has succeeded. +500 */ +501 private void endNonceOperation(final MutationProto mutation, +502 long nonceGroup, boolean success) { +503 if (regionServer.nonceManager != null && mutation.hasNonce()) { +504 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success); +505 } +506 } +507 +508 private boolean isClientCellBlockSupport(RpcCallContext context) { +509 return context != null && context.isClientCellBlockSupported(); +510 } +511 +512 private void addResult(final MutateResponse.Builder builder, final Result result, +513 final HBaseRpcController rpcc, boolean clientCellBlockSupported) { +514 if (result == null) return; +515 if (clientCellBlockSupported) { +516 builder.setResult(ProtobufUtil.toResultNoData(result)); +517 rpcc.setCellScanner(result.cellScanner()); +518 } else { +519 ClientProtos.Result pbr = ProtobufUtil.toResult(result); +520 builder.setResult(pbr); +521 } +522 } +523 +524 private void addResults(ScanResponse.Builder builder, List<Result> results, +525 HBaseRpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { +526 builder.setStale(!isDefaultRegion); +527 if (results.isEmpty()) { +528 return; +529 } +530 if (clientCellBlockSupported) { +531 for (Result res : results) { +532 builder.addCellsPerResult(res.size()); +533 builder.addPartialFlagPerResult(res.mayHaveMoreCellsInRow()); +534 } +535 controller.setCellScanner(CellUtil.createCellScanner(results)); +536 } else { +537 for (Result res : results) { +538 ClientProtos.Result pbr = ProtobufUtil.toResult(res); +539 builder.addResults(pbr); +540 } +541 } +542 } +543 +544 /** +545 * Mutate a list of rows atomically. +546 * +547 * @param region +548 * @param actions +549 * @param cellScanner if non-null, the mutation data -- the Cell content. +550 * @throws IOException +551 */ +552 private void mutateRows(final Region region, +553 final List<ClientProtos.Action> actions, +554 final CellScanner cellScanner, RegionActionResult.Builder builder) throws IOException { +555 if (!region.getRegionInfo().isMetaTable()) { +556 regionServer.cacheFlusher.reclaimMemStoreMemory(); +557 } +558 RowMutations rm = null; +559 int i = 0; +560 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder = +561 ClientProtos.ResultOrException.newBuilder(); +562 for (ClientProtos.Action action: actions) { +563 if (action.hasGet()) { +564 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" + +565 action.getGet()); +566 } +567 MutationType type = action.getMutation().getMutateType(); +568 if (rm == null) { +569 rm = new RowMutations(action.getMutation().getRow().toByteArray(), actions.size()); +570 } +571 switch (type) { +572 case PUT: +573 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner); +574 checkCellSizeLimit(region, put); +575 rm.add(put); +576 break; +577 case DELETE: +578 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner)); +579 break; +580 default: +581 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name()); +582 } +583 // To unify the response format with doNonAtomicRegionMutation and read through client's +584 // AsyncProcess we have to add an empty result instance per operation +585 resultOrExceptionOrBuilder.clear(); +586 resultOrExceptionOrBuilder.setIndex(i++); +587 builder.addResultOrException( +588 resultOrExceptionOrBuilder.build()); +589 } +590 region.mutateRow(rm); +591 } +592 +593 /** +594 * Mutate a list of rows atomically. +595 * +596 * @param region +597 * @param actions +598 * @param cellScanner if non-null, the mutation data -- the Cell content. +599 * @param row +600 * @param family +601 * @param qualifier +602 * @param compareOp +603 * @param comparator @throws IOException +604 */ +605 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, +606 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, +607 CompareOp compareOp, ByteArrayComparable comparator, RegionActionResult.Builder builder, +608 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { +609 if (!region.getRegionInfo().isMetaTable()) { +610 regionServer.cacheFlusher.reclaimMemStoreMemory(); +611 } +612 RowMutations rm = null; +613 int i = 0; +614 ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =