Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1FE62200D08 for ; Thu, 21 Sep 2017 17:13:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1E703160BCB; Thu, 21 Sep 2017 15:13:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A1CD11609B8 for ; Thu, 21 Sep 2017 17:13:38 +0200 (CEST) Received: (qmail 97046 invoked by uid 500); 21 Sep 2017 15:13:36 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 96305 invoked by uid 99); 21 Sep 2017 15:13:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Sep 2017 15:13:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8875BF5A54; Thu, 21 Sep 2017 15:13:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Thu, 21 Sep 2017 15:13:47 -0000 Message-Id: <687dd939738c4daa9b8d261085716fe3@git.apache.org> In-Reply-To: <8d5689eca5904cd1bafec1556f5885a0@git.apache.org> References: <8d5689eca5904cd1bafec1556f5885a0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [14/51] [partial] hbase-site git commit: Published site at . archived-at: Thu, 21 Sep 2017 15:13:41 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/fa7d6c0c/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.ScannerListener.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.ScannerListener.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.ScannerListener.html index e0b1774..f3d4c5e 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.ScannerListener.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/RSRpcServices.ScannerListener.html @@ -605,13 +605,13 @@ 597 * @param row 598 * @param family 599 * @param qualifier -600 * @param op +600 * @param compareOp 601 * @param comparator @throws IOException 602 */ 603 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions, -604 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, -605 CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, -606 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { +604 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, +605 CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, +606 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { 607 if (!region.getRegionInfo().isMetaTable()) { 608 regionServer.cacheFlusher.reclaimMemStoreMemory(); 609 } @@ -656,2842 +656,2846 @@ 648 649 /** 650 * Execute an append mutation. -651 * @return result to return to client if default operation should be -652 * bypassed as indicated by RegionObserver, null otherwise -653 * @throws IOException -654 */ -655 private Result append(final Region region, final OperationQuota quota, -656 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, -657 ActivePolicyEnforcement spaceQuota) -658 throws IOException { -659 long before = EnvironmentEdgeManager.currentTime(); -660 Append append = ProtobufUtil.toAppend(mutation, cellScanner); -661 checkCellSizeLimit(region, append); -662 spaceQuota.getPolicyEnforcement(region).check(append); -663 quota.addMutation(append); -664 Result r = null; -665 if (region.getCoprocessorHost() != null) { -666 r = region.getCoprocessorHost().preAppend(append); -667 } -668 if (r == null) { -669 boolean canProceed = startNonceOperation(mutation, nonceGroup); -670 boolean success = false; -671 try { -672 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; -673 if (canProceed) { -674 r = region.append(append, nonceGroup, nonce); -675 } else { -676 // convert duplicate append to get -677 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, -678 nonceGroup, nonce); -679 r = Result.create(results); -680 } -681 success = true; -682 } finally { -683 if (canProceed) { -684 endNonceOperation(mutation, nonceGroup, success); -685 } -686 } -687 if (region.getCoprocessorHost() != null) { -688 r = region.getCoprocessorHost().postAppend(append, r); -689 } -690 } -691 if (regionServer.metricsRegionServer != null) { -692 regionServer.metricsRegionServer.updateAppend( -693 EnvironmentEdgeManager.currentTime() - before); +651 * +652 * @param region +653 * @param m +654 * @param cellScanner +655 * @return result to return to client if default operation should be +656 * bypassed as indicated by RegionObserver, null otherwise +657 * @throws IOException +658 */ +659 private Result append(final Region region, final OperationQuota quota, +660 final MutationProto mutation, final CellScanner cellScanner, long nonceGroup, +661 ActivePolicyEnforcement spaceQuota) +662 throws IOException { +663 long before = EnvironmentEdgeManager.currentTime(); +664 Append append = ProtobufUtil.toAppend(mutation, cellScanner); +665 checkCellSizeLimit(region, append); +666 spaceQuota.getPolicyEnforcement(region).check(append); +667 quota.addMutation(append); +668 Result r = null; +669 if (region.getCoprocessorHost() != null) { +670 r = region.getCoprocessorHost().preAppend(append); +671 } +672 if (r == null) { +673 boolean canProceed = startNonceOperation(mutation, nonceGroup); +674 boolean success = false; +675 try { +676 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; +677 if (canProceed) { +678 r = region.append(append, nonceGroup, nonce); +679 } else { +680 // convert duplicate append to get +681 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cellScanner), false, +682 nonceGroup, nonce); +683 r = Result.create(results); +684 } +685 success = true; +686 } finally { +687 if (canProceed) { +688 endNonceOperation(mutation, nonceGroup, success); +689 } +690 } +691 if (region.getCoprocessorHost() != null) { +692 r = region.getCoprocessorHost().postAppend(append, r); +693 } 694 } -695 return r; -696 } -697 -698 /** -699 * Execute an increment mutation. -700 * -701 * @param region -702 * @param mutation -703 * @return the Result -704 * @throws IOException -705 */ -706 private Result increment(final Region region, final OperationQuota quota, -707 final MutationProto mutation, final CellScanner cells, long nonceGroup, -708 ActivePolicyEnforcement spaceQuota) -709 throws IOException { -710 long before = EnvironmentEdgeManager.currentTime(); -711 Increment increment = ProtobufUtil.toIncrement(mutation, cells); -712 checkCellSizeLimit(region, increment); -713 spaceQuota.getPolicyEnforcement(region).check(increment); -714 quota.addMutation(increment); -715 Result r = null; -716 if (region.getCoprocessorHost() != null) { -717 r = region.getCoprocessorHost().preIncrement(increment); -718 } -719 if (r == null) { -720 boolean canProceed = startNonceOperation(mutation, nonceGroup); -721 boolean success = false; -722 try { -723 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; -724 if (canProceed) { -725 r = region.increment(increment, nonceGroup, nonce); -726 } else { -727 // convert duplicate increment to get -728 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, -729 nonce); -730 r = Result.create(results); -731 } -732 success = true; -733 } finally { -734 if (canProceed) { -735 endNonceOperation(mutation, nonceGroup, success); -736 } -737 } -738 if (region.getCoprocessorHost() != null) { -739 r = region.getCoprocessorHost().postIncrement(increment, r); -740 } -741 } -742 if (regionServer.metricsRegionServer != null) { -743 regionServer.metricsRegionServer.updateIncrement( -744 EnvironmentEdgeManager.currentTime() - before); +695 if (regionServer.metricsRegionServer != null) { +696 regionServer.metricsRegionServer.updateAppend( +697 EnvironmentEdgeManager.currentTime() - before); +698 } +699 return r; +700 } +701 +702 /** +703 * Execute an increment mutation. +704 * +705 * @param region +706 * @param mutation +707 * @return the Result +708 * @throws IOException +709 */ +710 private Result increment(final Region region, final OperationQuota quota, +711 final MutationProto mutation, final CellScanner cells, long nonceGroup, +712 ActivePolicyEnforcement spaceQuota) +713 throws IOException { +714 long before = EnvironmentEdgeManager.currentTime(); +715 Increment increment = ProtobufUtil.toIncrement(mutation, cells); +716 checkCellSizeLimit(region, increment); +717 spaceQuota.getPolicyEnforcement(region).check(increment); +718 quota.addMutation(increment); +719 Result r = null; +720 if (region.getCoprocessorHost() != null) { +721 r = region.getCoprocessorHost().preIncrement(increment); +722 } +723 if (r == null) { +724 boolean canProceed = startNonceOperation(mutation, nonceGroup); +725 boolean success = false; +726 try { +727 long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE; +728 if (canProceed) { +729 r = region.increment(increment, nonceGroup, nonce); +730 } else { +731 // convert duplicate increment to get +732 List<Cell> results = region.get(ProtobufUtil.toGet(mutation, cells), false, nonceGroup, +733 nonce); +734 r = Result.create(results); +735 } +736 success = true; +737 } finally { +738 if (canProceed) { +739 endNonceOperation(mutation, nonceGroup, success); +740 } +741 } +742 if (region.getCoprocessorHost() != null) { +743 r = region.getCoprocessorHost().postIncrement(increment, r); +744 } 745 } -746 return r; -747 } -748 -749 /** -750 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when -751 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. -752 * @param region -753 * @param actions -754 * @param cellScanner -755 * @param builder -756 * @param cellsToReturn Could be null. May be allocated in this method. This is what this -757 * method returns as a 'result'. -758 * @param closeCallBack the callback to be used with multigets -759 * @param context the current RpcCallContext -760 * @return Return the <code>cellScanner</code> passed -761 */ -762 private List<CellScannable> doNonAtomicRegionMutation(final Region region, -763 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, -764 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, -765 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, -766 ActivePolicyEnforcement spaceQuotaEnforcement) { -767 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do -768 // one at a time, we instead pass them in batch. Be aware that the corresponding -769 // ResultOrException instance that matches each Put or Delete is then added down in the -770 // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched -771 List<ClientProtos.Action> mutations = null; -772 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); -773 IOException sizeIOE = null; -774 Object lastBlock = null; -775 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); -776 boolean hasResultOrException = false; -777 for (ClientProtos.Action action : actions.getActionList()) { -778 hasResultOrException = false; -779 resultOrExceptionBuilder.clear(); -780 try { -781 Result r = null; -782 -783 if (context != null -784 && context.isRetryImmediatelySupported() -785 && (context.getResponseCellSize() > maxQuotaResultSize -786 || context.getResponseBlockSize() + context.getResponseExceptionSize() -787 > maxQuotaResultSize)) { -788 -789 // We're storing the exception since the exception and reason string won't -790 // change after the response size limit is reached. -791 if (sizeIOE == null ) { -792 // We don't need the stack un-winding do don't throw the exception. -793 // Throwing will kill the JVM's JIT. -794 // -795 // Instead just create the exception and then store it. -796 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" -797 + " CellSize: " + context.getResponseCellSize() -798 + " BlockSize: " + context.getResponseBlockSize()); -799 -800 // Only report the exception once since there's only one request that -801 // caused the exception. Otherwise this number will dominate the exceptions count. -802 rpcServer.getMetrics().exception(sizeIOE); -803 } -804 -805 // Now that there's an exception is known to be created -806 // use it for the response. -807 // -808 // This will create a copy in the builder. -809 hasResultOrException = true; -810 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); -811 resultOrExceptionBuilder.setException(pair); -812 context.incrementResponseExceptionSize(pair.getSerializedSize()); -813 resultOrExceptionBuilder.setIndex(action.getIndex()); -814 builder.addResultOrException(resultOrExceptionBuilder.build()); -815 if (cellScanner != null) { -816 skipCellsForMutation(action, cellScanner); -817 } -818 continue; -819 } -820 if (action.hasGet()) { -821 long before = EnvironmentEdgeManager.currentTime(); -822 try { -823 Get get = ProtobufUtil.toGet(action.getGet()); -824 if (context != null) { -825 r = get(get, ((HRegion) region), closeCallBack, context); -826 } else { -827 r = region.get(get); -828 } -829 } finally { -830 if (regionServer.metricsRegionServer != null) { -831 regionServer.metricsRegionServer.updateGet( -832 EnvironmentEdgeManager.currentTime() - before); -833 } -834 } -835 } else if (action.hasServiceCall()) { -836 hasResultOrException = true; -837 try { -838 com.google.protobuf.Message result = -839 execServiceOnRegion(region, action.getServiceCall()); -840 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = -841 ClientProtos.CoprocessorServiceResult.newBuilder(); -842 resultOrExceptionBuilder.setServiceResult( -843 serviceResultBuilder.setValue( -844 serviceResultBuilder.getValueBuilder() -845 .setName(result.getClass().getName()) -846 // TODO: Copy!!! -847 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); -848 } catch (IOException ioe) { -849 rpcServer.getMetrics().exception(ioe); -850 NameBytesPair pair = ResponseConverter.buildException(ioe); -851 resultOrExceptionBuilder.setException(pair); -852 context.incrementResponseExceptionSize(pair.getSerializedSize()); -853 } -854 } else if (action.hasMutation()) { -855 MutationType type = action.getMutation().getMutateType(); -856 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && -857 !mutations.isEmpty()) { -858 // Flush out any Puts or Deletes already collected. -859 doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); -860 mutations.clear(); -861 } -862 switch (type) { -863 case APPEND: -864 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, -865 spaceQuotaEnforcement); -866 break; -867 case INCREMENT: -868 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, +746 if (regionServer.metricsRegionServer != null) { +747 regionServer.metricsRegionServer.updateIncrement( +748 EnvironmentEdgeManager.currentTime() - before); +749 } +750 return r; +751 } +752 +753 /** +754 * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when +755 * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation. +756 * @param region +757 * @param actions +758 * @param cellScanner +759 * @param builder +760 * @param cellsToReturn Could be null. May be allocated in this method. This is what this +761 * method returns as a 'result'. +762 * @param closeCallBack the callback to be used with multigets +763 * @param context the current RpcCallContext +764 * @return Return the <code>cellScanner</code> passed +765 */ +766 private List<CellScannable> doNonAtomicRegionMutation(final Region region, +767 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner, +768 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup, +769 final RegionScannersCloseCallBack closeCallBack, RpcCallContext context, +770 ActivePolicyEnforcement spaceQuotaEnforcement) { +771 // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do +772 // one at a time, we instead pass them in batch. Be aware that the corresponding +773 // ResultOrException instance that matches each Put or Delete is then added down in the +774 // doBatchOp call. We should be staying aligned though the Put and Delete are deferred/batched +775 List<ClientProtos.Action> mutations = null; +776 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); +777 IOException sizeIOE = null; +778 Object lastBlock = null; +779 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); +780 boolean hasResultOrException = false; +781 for (ClientProtos.Action action : actions.getActionList()) { +782 hasResultOrException = false; +783 resultOrExceptionBuilder.clear(); +784 try { +785 Result r = null; +786 +787 if (context != null +788 && context.isRetryImmediatelySupported() +789 && (context.getResponseCellSize() > maxQuotaResultSize +790 || context.getResponseBlockSize() + context.getResponseExceptionSize() +791 > maxQuotaResultSize)) { +792 +793 // We're storing the exception since the exception and reason string won't +794 // change after the response size limit is reached. +795 if (sizeIOE == null ) { +796 // We don't need the stack un-winding do don't throw the exception. +797 // Throwing will kill the JVM's JIT. +798 // +799 // Instead just create the exception and then store it. +800 sizeIOE = new MultiActionResultTooLarge("Max size exceeded" +801 + " CellSize: " + context.getResponseCellSize() +802 + " BlockSize: " + context.getResponseBlockSize()); +803 +804 // Only report the exception once since there's only one request that +805 // caused the exception. Otherwise this number will dominate the exceptions count. +806 rpcServer.getMetrics().exception(sizeIOE); +807 } +808 +809 // Now that there's an exception is known to be created +810 // use it for the response. +811 // +812 // This will create a copy in the builder. +813 hasResultOrException = true; +814 NameBytesPair pair = ResponseConverter.buildException(sizeIOE); +815 resultOrExceptionBuilder.setException(pair); +816 context.incrementResponseExceptionSize(pair.getSerializedSize()); +817 resultOrExceptionBuilder.setIndex(action.getIndex()); +818 builder.addResultOrException(resultOrExceptionBuilder.build()); +819 if (cellScanner != null) { +820 skipCellsForMutation(action, cellScanner); +821 } +822 continue; +823 } +824 if (action.hasGet()) { +825 long before = EnvironmentEdgeManager.currentTime(); +826 try { +827 Get get = ProtobufUtil.toGet(action.getGet()); +828 if (context != null) { +829 r = get(get, ((HRegion) region), closeCallBack, context); +830 } else { +831 r = region.get(get); +832 } +833 } finally { +834 if (regionServer.metricsRegionServer != null) { +835 regionServer.metricsRegionServer.updateGet( +836 EnvironmentEdgeManager.currentTime() - before); +837 } +838 } +839 } else if (action.hasServiceCall()) { +840 hasResultOrException = true; +841 try { +842 com.google.protobuf.Message result = +843 execServiceOnRegion(region, action.getServiceCall()); +844 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder = +845 ClientProtos.CoprocessorServiceResult.newBuilder(); +846 resultOrExceptionBuilder.setServiceResult( +847 serviceResultBuilder.setValue( +848 serviceResultBuilder.getValueBuilder() +849 .setName(result.getClass().getName()) +850 // TODO: Copy!!! +851 .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray())))); +852 } catch (IOException ioe) { +853 rpcServer.getMetrics().exception(ioe); +854 NameBytesPair pair = ResponseConverter.buildException(ioe); +855 resultOrExceptionBuilder.setException(pair); +856 context.incrementResponseExceptionSize(pair.getSerializedSize()); +857 } +858 } else if (action.hasMutation()) { +859 MutationType type = action.getMutation().getMutateType(); +860 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null && +861 !mutations.isEmpty()) { +862 // Flush out any Puts or Deletes already collected. +863 doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); +864 mutations.clear(); +865 } +866 switch (type) { +867 case APPEND: +868 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup, 869 spaceQuotaEnforcement); 870 break; -871 case PUT: -872 case DELETE: -873 // Collect the individual mutations and apply in a batch -874 if (mutations == null) { -875 mutations = new ArrayList<>(actions.getActionCount()); -876 } -877 mutations.add(action); -878 break; -879 default: -880 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); -881 } -882 } else { -883 throw new HBaseIOException("Unexpected Action type"); -884 } -885 if (r != null) { -886 ClientProtos.Result pbResult = null; -887 if (isClientCellBlockSupport(context)) { -888 pbResult = ProtobufUtil.toResultNoData(r); -889 // Hard to guess the size here. Just make a rough guess. -890 if (cellsToReturn == null) { -891 cellsToReturn = new ArrayList<>(); -892 } -893 cellsToReturn.add(r); -894 } else { -895 pbResult = ProtobufUtil.toResult(r); -896 } -897 lastBlock = addSize(context, r, lastBlock); -898 hasResultOrException = true; -899 resultOrExceptionBuilder.setResult(pbResult); -900 } -901 // Could get to here and there was no result and no exception. Presumes we added -902 // a Put or Delete to the collecting Mutations List for adding later. In this -903 // case the corresponding ResultOrException instance for the Put or Delete will be added -904 // down in the doBatchOp method call rather than up here. -905 } catch (IOException ie) { -906 rpcServer.getMetrics().exception(ie); -907 hasResultOrException = true; -908 NameBytesPair pair = ResponseConverter.buildException(ie); -909 resultOrExceptionBuilder.setException(pair); -910 context.incrementResponseExceptionSize(pair.getSerializedSize()); -911 } -912 if (hasResultOrException) { -913 // Propagate index. -914 resultOrExceptionBuilder.setIndex(action.getIndex()); -915 builder.addResultOrException(resultOrExceptionBuilder.build()); -916 } -917 } -918 // Finish up any outstanding mutations -919 if (mutations != null && !mutations.isEmpty()) { -920 doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); +871 case INCREMENT: +872 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup, +873 spaceQuotaEnforcement); +874 break; +875 case PUT: +876 case DELETE: +877 // Collect the individual mutations and apply in a batch +878 if (mutations == null) { +879 mutations = new ArrayList<>(actions.getActionCount()); +880 } +881 mutations.add(action); +882 break; +883 default: +884 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); +885 } +886 } else { +887 throw new HBaseIOException("Unexpected Action type"); +888 } +889 if (r != null) { +890 ClientProtos.Result pbResult = null; +891 if (isClientCellBlockSupport(context)) { +892 pbResult = ProtobufUtil.toResultNoData(r); +893 // Hard to guess the size here. Just make a rough guess. +894 if (cellsToReturn == null) { +895 cellsToReturn = new ArrayList<>(); +896 } +897 cellsToReturn.add(r); +898 } else { +899 pbResult = ProtobufUtil.toResult(r); +900 } +901 lastBlock = addSize(context, r, lastBlock); +902 hasResultOrException = true; +903 resultOrExceptionBuilder.setResult(pbResult); +904 } +905 // Could get to here and there was no result and no exception. Presumes we added +906 // a Put or Delete to the collecting Mutations List for adding later. In this +907 // case the corresponding ResultOrException instance for the Put or Delete will be added +908 // down in the doBatchOp method call rather than up here. +909 } catch (IOException ie) { +910 rpcServer.getMetrics().exception(ie); +911 hasResultOrException = true; +912 NameBytesPair pair = ResponseConverter.buildException(ie); +913 resultOrExceptionBuilder.setException(pair); +914 context.incrementResponseExceptionSize(pair.getSerializedSize()); +915 } +916 if (hasResultOrException) { +917 // Propagate index. +918 resultOrExceptionBuilder.setIndex(action.getIndex()); +919 builder.addResultOrException(resultOrExceptionBuilder.build()); +920 } 921 } -922 return cellsToReturn; -923 } -924 -925 private void checkCellSizeLimit(final Region region, final Mutation m) throws IOException { -926 if (!(region instanceof HRegion)) { -927 return; -928 } -929 HRegion r = (HRegion)region; -930 if (r.maxCellSize > 0) { -931 CellScanner cells = m.cellScanner(); -932 while (cells.advance()) { -933 int size = CellUtil.estimatedSerializedSizeOf(cells.current()); -934 if (size > r.maxCellSize) { -935 String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; -936 if (LOG.isDebugEnabled()) { -937 LOG.debug(msg); -938 } -939 throw new DoNotRetryIOException(msg); -940 } -941 } -942 } -943 } -944 -945 /** -946 * Execute a list of Put/Delete mutations. -947 * -948 * @param builder -949 * @param region -950 * @param mutations -951 */ -952 private void doBatchOp(final RegionActionResult.Builder builder, final Region region, -953 final OperationQuota quota, final List<ClientProtos.Action> mutations, -954 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { -955 Mutation[] mArray = new Mutation[mutations.size()]; -956 long before = EnvironmentEdgeManager.currentTime(); -957 boolean batchContainsPuts = false, batchContainsDelete = false; -958 try { -959 /** HBASE-17924 -960 * mutationActionMap is a map to map the relation between mutations and actions -961 * since mutation array may have been reoredered.In order to return the right -962 * result or exception to the corresponding actions, We need to know which action -963 * is the mutation belong to. We can't sort ClientProtos.Action array, since they -964 * are bonded to cellscanners. -965 */ -966 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<Mutation, ClientProtos.Action>(); -967 int i = 0; -968 for (ClientProtos.Action action: mutations) { -969 MutationProto m = action.getMutation(); -970 Mutation mutation; -971 if (m.getMutateType() == MutationType.PUT) { -972 mutation = ProtobufUtil.toPut(m, cells); -973 batchContainsPuts = true; -974 } else { -975 mutation = ProtobufUtil.toDelete(m, cells); -976 batchContainsDelete = true; -977 } -978 mutationActionMap.put(mutation, action); -979 mArray[i++] = mutation; -980 checkCellSizeLimit(region, mutation); -981 // Check if a space quota disallows this mutation -982 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); -983 quota.addMutation(mutation); -984 } -985 -986 if (!region.getRegionInfo().isMetaTable()) { -987 regionServer.cacheFlusher.reclaimMemStoreMemory(); +922 // Finish up any outstanding mutations +923 if (mutations != null && !mutations.isEmpty()) { +924 doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement); +925 } +926 return cellsToReturn; +927 } +928 +929 private void checkCellSizeLimit(final Region region, final Mutation m) throws IOException { +930 if (!(region instanceof HRegion)) { +931 return; +932 } +933 HRegion r = (HRegion)region; +934 if (r.maxCellSize > 0) { +935 CellScanner cells = m.cellScanner(); +936 while (cells.advance()) { +937 int size = CellUtil.estimatedSerializedSizeOf(cells.current()); +938 if (size > r.maxCellSize) { +939 String msg = "Cell with size " + size + " exceeds limit of " + r.maxCellSize + " bytes"; +940 if (LOG.isDebugEnabled()) { +941 LOG.debug(msg); +942 } +943 throw new DoNotRetryIOException(msg); +944 } +945 } +946 } +947 } +948 +949 /** +950 * Execute a list of Put/Delete mutations. +951 * +952 * @param builder +953 * @param region +954 * @param mutations +955 */ +956 private void doBatchOp(final RegionActionResult.Builder builder, final Region region, +957 final OperationQuota quota, final List<ClientProtos.Action> mutations, +958 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) { +959 Mutation[] mArray = new Mutation[mutations.size()]; +960 long before = EnvironmentEdgeManager.currentTime(); +961 boolean batchContainsPuts = false, batchContainsDelete = false; +962 try { +963 /** HBASE-17924 +964 * mutationActionMap is a map to map the relation between mutations and actions +965 * since mutation array may have been reoredered.In order to return the right +966 * result or exception to the corresponding actions, We need to know which action +967 * is the mutation belong to. We can't sort ClientProtos.Action array, since they +968 * are bonded to cellscanners. +969 */ +970 Map<Mutation, ClientProtos.Action> mutationActionMap = new HashMap<Mutation, ClientProtos.Action>(); +971 int i = 0; +972 for (ClientProtos.Action action: mutations) { +973 MutationProto m = action.getMutation(); +974 Mutation mutation; +975 if (m.getMutateType() == MutationType.PUT) { +976 mutation = ProtobufUtil.toPut(m, cells); +977 batchContainsPuts = true; +978 } else { +979 mutation = ProtobufUtil.toDelete(m, cells); +980 batchContainsDelete = true; +981 } +982 mutationActionMap.put(mutation, action); +983 mArray[i++] = mutation; +984 checkCellSizeLimit(region, mutation); +985 // Check if a space quota disallows this mutation +986 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation); +987 quota.addMutation(mutation); 988 } 989 -990 // HBASE-17924 -991 // sort to improve lock efficiency -992 Arrays.sort(mArray); +990 if (!region.getRegionInfo().isMetaTable()) { +991 regionServer.cacheFlusher.reclaimMemStoreMemory(); +992 } 993 -994 OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, -995 HConstants.NO_NONCE); -996 for (i = 0; i < codes.length; i++) { -997 Mutation currentMutation = mArray[i]; -998 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); -999 int index = currentAction.getIndex(); -1000 Exception e = null; -1001 switch (codes[i].getOperationStatusCode()) { -1002 case BAD_FAMILY: -1003 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); -1004 builder.addResultOrException(getResultOrException(e, index)); -1005 break; -1006 -1007 case SANITY_CHECK_FAILURE: -1008 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); -1009 builder.addResultOrException(getResultOrException(e, index)); -1010 break; -1011 -1012 default: -1013 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); -1014 builder.addResultOrException(getResultOrException(e, index)); -1015 break; -1016 -1017 case SUCCESS: -1018 builder.addResultOrException(getResultOrException( -1019 ClientProtos.Result.getDefaultInstance(), index)); -1020 break; -1021 } -1022 } -1023 } catch (IOException ie) { -1024 for (int i = 0; i < mutations.size(); i++) { -1025 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex())); +994 // HBASE-17924 +995 // sort to improve lock efficiency +996 Arrays.sort(mArray); +997 +998 OperationStatus[] codes = region.batchMutate(mArray, HConstants.NO_NONCE, +999 HConstants.NO_NONCE); +1000 for (i = 0; i < codes.length; i++) { +1001 Mutation currentMutation = mArray[i]; +1002 ClientProtos.Action currentAction = mutationActionMap.get(currentMutation); +1003 int index = currentAction.getIndex(); +1004 Exception e = null; +1005 switch (codes[i].getOperationStatusCode()) { +1006 case BAD_FAMILY: +1007 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg()); +1008 builder.addResultOrException(getResultOrException(e, index)); +1009 break; +1010 +1011 case SANITY_CHECK_FAILURE: +1012 e = new FailedSanityCheckException(codes[i].getExceptionMsg()); +1013 builder.addResultOrException(getResultOrException(e, index)); +1014 break; +1015 +1016 default: +1017 e = new DoNotRetryIOException(codes[i].getExceptionMsg()); +1018 builder.addResultOrException(getResultOrException(e, index)); +1019 break; +1020 +1021 case SUCCESS: +1022 builder.addResultOrException(getResultOrException( +1023 ClientProtos.Result.getDefaultInstance(), index)); +1024 break; +1025 } 1026 } -1027 } -1028 if (regionServer.metricsRegionServer != null) { -1029 long after = EnvironmentEdgeManager.currentTime(); -1030 if (batchContainsPuts) { -1031 regionServer.metricsRegionServer.updatePutBatch(after - before); -1032 } -1033 if (batchContainsDelete) { -1034 regionServer.metricsRegionServer.updateDeleteBatch(after - before); -1035 } -1036 } -1037 } -1038 -1039 /** -1040 * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of -1041 * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. -1042 * @param region -1043 * @param mutations -1044 * @param replaySeqId -1045 * @return an array of OperationStatus which internally contains the OperationStatusCode and the -1046 * exceptionMessage if any -1047 * @throws IOException -1048 */ -1049 private OperationStatus [] doReplayBatchOp(final Region region, -1050 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException { -1051 long before = EnvironmentEdgeManager.currentTime(); -1052 boolean batchContainsPuts = false, batchContainsDelete = false; -1053 try { -1054 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) { -1055 WALSplitter.MutationReplay m = it.next(); -1056 -1057 if (m.type == MutationType.PUT) { -1058 batchContainsPuts = true; -1059 } else { -1060 batchContainsDelete = true; -1061 } -1062 -1063 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap(); -1064 List<Cell> metaCells = map.get(WALEdit.METAFAMILY); -1065 if (metaCells != null && !metaCells.isEmpty()) { -1066 for (Cell metaCell : metaCells) { -1067 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell); -1068 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); -1069 HRegion hRegion = (HRegion)region; -1070 if (compactionDesc != null) { -1071 // replay the compaction. Remove the files from stores only if we are the primary -1072 // region replica (thus own the files) -1073 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica, -1074 replaySeqId); -1075 continue; -1076 } -1077 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell); -1078 if (flushDesc != null && !isDefaultReplica) { -1079 hRegion.replayWALFlushMarker(flushDesc, replaySeqId); -1080 continue; -1081 } -1082 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell); -1083 if (regionEvent != null && !isDefaultReplica) { -1084 hRegion.replayWALRegionEventMarker(regionEvent); -1085 continue; -1086 } -1087 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); -1088 if (bulkLoadEvent != null) { -1089 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent); -1090 continue; -1091 } -1092 } -1093 it.remove(); -1094 } -1095 } -1096 requestCount.increment(); -1097 requestRowActionCount.add(mutations.size()); -1098 if (!region.getRegionInfo().isMetaTable()) { -1099 regionServer.cacheFlusher.reclaimMemStoreMemory(); -1100 } -1101 return region.batchReplay(mutations.toArray( -1102 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); -1103 } finally { -1104 if (regionServer.metricsRegionServer != null) { -1105 long after = EnvironmentEdgeManager.currentTime(); -1106 if (batchContainsPuts) { -1107 regionServer.metricsRegionServer.updatePutBatch(after - before); -1108 } -1109 if (batchContainsDelete) { -1110 regionServer.metricsRegionServer.updateDeleteBatch(after - before); -1111 } -1112 } -1113 } -1114 }<