Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2EC7200CD9 for ; Thu, 20 Jul 2017 00:07:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F183116A097; Wed, 19 Jul 2017 22:07:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9C7B916A0AB for ; Thu, 20 Jul 2017 00:07:23 +0200 (CEST) Received: (qmail 47664 invoked by uid 500); 19 Jul 2017 22:07:21 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 45258 invoked by uid 99); 19 Jul 2017 22:07:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2017 22:07:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A9FFEF5592; Wed, 19 Jul 2017 22:07:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 19 Jul 2017 22:07:38 -0000 Message-Id: <3d10d1692ca34d109dfb1538b438059a@git.apache.org> In-Reply-To: <06eabf77f2c545f393210f621f2f13a5@git.apache.org> References: <06eabf77f2c545f393210f621f2f13a5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Wed, 19 Jul 2017 22:07:26 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/9eba7fcf/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html index 504e470..38667c0 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html @@ -2866,5375 +2866,5371 @@ 2858 checkResources(); 2859 startRegionOperation(Operation.DELETE); 2860 try { -2861 delete.getRow(); -2862 // All edits for the given row (across all column families) must happen atomically. -2863 doBatchMutate(delete); -2864 } finally { -2865 closeRegionOperation(Operation.DELETE); -2866 } -2867 } -2868 -2869 /** -2870 * Row needed by below method. -2871 */ -2872 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly"); -2873 -2874 /** -2875 * This is used only by unit tests. Not required to be a public API. -2876 * @param familyMap map of family to edits for the given family. -2877 * @throws IOException -2878 */ -2879 void delete(NavigableMap<byte[], List<Cell>> familyMap, -2880 Durability durability) throws IOException { -2881 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY); -2882 delete.setFamilyCellMap(familyMap); -2883 delete.setDurability(durability); -2884 doBatchMutate(delete); -2885 } -2886 -2887 @Override -2888 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap, -2889 byte[] byteNow) throws IOException { -2890 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { -2891 -2892 byte[] family = e.getKey(); -2893 List<Cell> cells = e.getValue(); -2894 assert cells instanceof RandomAccess; -2895 -2896 Map<byte[], Integer> kvCount = new TreeMap<>(Bytes.BYTES_COMPARATOR); -2897 int listSize = cells.size(); -2898 for (int i=0; i < listSize; i++) { -2899 Cell cell = cells.get(i); -2900 // Check if time is LATEST, change to time of most recent addition if so -2901 // This is expensive. -2902 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) { -2903 byte[] qual = CellUtil.cloneQualifier(cell); -2904 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; -2905 -2906 Integer count = kvCount.get(qual); -2907 if (count == null) { -2908 kvCount.put(qual, 1); -2909 } else { -2910 kvCount.put(qual, count + 1); -2911 } -2912 count = kvCount.get(qual); -2913 -2914 Get get = new Get(CellUtil.cloneRow(cell)); -2915 get.setMaxVersions(count); -2916 get.addColumn(family, qual); -2917 if (coprocessorHost != null) { -2918 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, -2919 byteNow, get)) { -2920 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); -2921 } -2922 } else { -2923 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); -2924 } -2925 } else { -2926 CellUtil.updateLatestStamp(cell, byteNow, 0); -2927 } -2928 } -2929 } -2930 } -2931 -2932 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow) -2933 throws IOException { -2934 List<Cell> result = get(get, false); -2935 -2936 if (result.size() < count) { -2937 // Nothing to delete -2938 CellUtil.updateLatestStamp(cell, byteNow, 0); -2939 return; -2940 } -2941 if (result.size() > count) { -2942 throw new RuntimeException("Unexpected size: " + result.size()); -2943 } -2944 Cell getCell = result.get(count - 1); -2945 CellUtil.setTimestamp(cell, getCell.getTimestamp()); -2946 } -2947 -2948 @Override -2949 public void put(Put put) throws IOException { -2950 checkReadOnly(); -2951 -2952 // Do a rough check that we have resources to accept a write. The check is -2953 // 'rough' in that between the resource check and the call to obtain a -2954 // read lock, resources may run out. For now, the thought is that this -2955 // will be extremely rare; we'll deal with it when it happens. -2956 checkResources(); -2957 startRegionOperation(Operation.PUT); -2958 try { -2959 // All edits for the given row (across all column families) must happen atomically. -2960 doBatchMutate(put); -2961 } finally { -2962 closeRegionOperation(Operation.PUT); -2963 } -2964 } -2965 -2966 /** -2967 * Struct-like class that tracks the progress of a batch operation, -2968 * accumulating status codes and tracking the index at which processing -2969 * is proceeding. -2970 */ -2971 private abstract static class BatchOperation<T> { -2972 T[] operations; -2973 int nextIndexToProcess = 0; -2974 OperationStatus[] retCodeDetails; -2975 WALEdit[] walEditsFromCoprocessors; -2976 -2977 public BatchOperation(T[] operations) { -2978 this.operations = operations; -2979 this.retCodeDetails = new OperationStatus[operations.length]; -2980 this.walEditsFromCoprocessors = new WALEdit[operations.length]; -2981 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); -2982 } -2983 -2984 public abstract Mutation getMutation(int index); -2985 public abstract long getNonceGroup(int index); -2986 public abstract long getNonce(int index); -2987 /** This method is potentially expensive and should only be used for non-replay CP path. */ -2988 public abstract Mutation[] getMutationsForCoprocs(); -2989 public abstract boolean isInReplay(); -2990 public abstract long getReplaySequenceId(); -2991 -2992 public boolean isDone() { -2993 return nextIndexToProcess == operations.length; -2994 } -2995 } -2996 -2997 private static class MutationBatch extends BatchOperation<Mutation> { -2998 private long nonceGroup; -2999 private long nonce; -3000 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { -3001 super(operations); -3002 this.nonceGroup = nonceGroup; -3003 this.nonce = nonce; -3004 } -3005 -3006 @Override -3007 public Mutation getMutation(int index) { -3008 return this.operations[index]; -3009 } -3010 -3011 @Override -3012 public long getNonceGroup(int index) { -3013 return nonceGroup; -3014 } -3015 -3016 @Override -3017 public long getNonce(int index) { -3018 return nonce; -3019 } -3020 -3021 @Override -3022 public Mutation[] getMutationsForCoprocs() { -3023 return this.operations; -3024 } -3025 -3026 @Override -3027 public boolean isInReplay() { -3028 return false; -3029 } -3030 -3031 @Override -3032 public long getReplaySequenceId() { -3033 return 0; -3034 } -3035 } -3036 -3037 private static class ReplayBatch extends BatchOperation<MutationReplay> { -3038 private long replaySeqId = 0; -3039 public ReplayBatch(MutationReplay[] operations, long seqId) { -3040 super(operations); -3041 this.replaySeqId = seqId; -3042 } -3043 -3044 @Override -3045 public Mutation getMutation(int index) { -3046 return this.operations[index].mutation; -3047 } -3048 -3049 @Override -3050 public long getNonceGroup(int index) { -3051 return this.operations[index].nonceGroup; -3052 } -3053 -3054 @Override -3055 public long getNonce(int index) { -3056 return this.operations[index].nonce; -3057 } -3058 -3059 @Override -3060 public Mutation[] getMutationsForCoprocs() { -3061 assert false; -3062 throw new RuntimeException("Should not be called for replay batch"); -3063 } -3064 -3065 @Override -3066 public boolean isInReplay() { -3067 return true; -3068 } -3069 -3070 @Override -3071 public long getReplaySequenceId() { -3072 return this.replaySeqId; -3073 } -3074 } -3075 -3076 @Override -3077 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) -3078 throws IOException { -3079 // As it stands, this is used for 3 things -3080 // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. -3081 // * coprocessor calls (see ex. BulkDeleteEndpoint). -3082 // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... -3083 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); -3084 } -3085 -3086 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { -3087 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); -3088 } -3089 -3090 @Override -3091 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) -3092 throws IOException { -3093 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) -3094 && replaySeqId < lastReplayedOpenRegionSeqId) { -3095 // if it is a secondary replica we should ignore these entries silently -3096 // since they are coming out of order -3097 if (LOG.isTraceEnabled()) { -3098 LOG.trace(getRegionInfo().getEncodedName() + " : " -3099 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId -3100 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId); -3101 for (MutationReplay mut : mutations) { -3102 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation); -3103 } -3104 } -3105 -3106 OperationStatus[] statuses = new OperationStatus[mutations.length]; -3107 for (int i = 0; i < statuses.length; i++) { -3108 statuses[i] = OperationStatus.SUCCESS; -3109 } -3110 return statuses; -3111 } -3112 return batchMutate(new ReplayBatch(mutations, replaySeqId)); -3113 } -3114 -3115 /** -3116 * Perform a batch of mutations. -3117 * It supports only Put and Delete mutations and will ignore other types passed. -3118 * @param batchOp contains the list of mutations -3119 * @return an array of OperationStatus which internally contains the -3120 * OperationStatusCode and the exceptionMessage if any. -3121 * @throws IOException -3122 */ -3123 OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException { -3124 boolean initialized = false; -3125 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; -3126 startRegionOperation(op); -3127 try { -3128 while (!batchOp.isDone()) { -3129 if (!batchOp.isInReplay()) { -3130 checkReadOnly(); -3131 } -3132 checkResources(); -3133 -3134 if (!initialized) { -3135 this.writeRequestsCount.add(batchOp.operations.length); -3136 if (!batchOp.isInReplay()) { -3137 doPreBatchMutateHook(batchOp); -3138 } -3139 initialized = true; -3140 } -3141 doMiniBatchMutate(batchOp); -3142 long newSize = this.getMemstoreSize(); -3143 requestFlushIfNeeded(newSize); -3144 } -3145 } finally { -3146 closeRegionOperation(op); -3147 } -3148 return batchOp.retCodeDetails; -3149 } -3150 -3151 private void doPreBatchMutateHook(BatchOperation<?> batchOp) -3152 throws IOException { -3153 /* Run coprocessor pre hook outside of locks to avoid deadlock */ -3154 WALEdit walEdit = new WALEdit(); -3155 if (coprocessorHost != null) { -3156 for (int i = 0 ; i < batchOp.operations.length; i++) { -3157 Mutation m = batchOp.getMutation(i); -3158 if (m instanceof Put) { -3159 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { -3160 // pre hook says skip this Put -3161 // mark as success and skip in doMiniBatchMutation -3162 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -3163 } -3164 } else if (m instanceof Delete) { -3165 Delete curDel = (Delete) m; -3166 if (curDel.getFamilyCellMap().isEmpty()) { -3167 // handle deleting a row case -3168 prepareDelete(curDel); -3169 } -3170 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { -3171 // pre hook says skip this Delete -3172 // mark as success and skip in doMiniBatchMutation -3173 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -3174 } -3175 } else { -3176 // In case of passing Append mutations along with the Puts and Deletes in batchMutate -3177 // mark the operation return code as failure so that it will not be considered in -3178 // the doMiniBatchMutation -3179 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, -3180 "Put/Delete mutations only supported in batchMutate() now"); -3181 } -3182 if (!walEdit.isEmpty()) { -3183 batchOp.walEditsFromCoprocessors[i] = walEdit; -3184 walEdit = new WALEdit(); -3185 } -3186 } -3187 } -3188 } -3189 -3190 /** -3191 * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} -3192 * In here we also handle replay of edits on region recover. -3193 * @return Change in size brought about by applying <code>batchOp</code> -3194 */ -3195 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", -3196 justification="Findbugs seems to be confused on this.") -3197 @SuppressWarnings("unchecked") -3198 // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 -3199 private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException { -3200 boolean replay = batchOp.isInReplay(); -3201 long currentNonceGroup = HConstants.NO_NONCE; -3202 long currentNonce = HConstants.NO_NONCE; -3203 WALEdit walEdit = null; -3204 boolean locked = false; -3205 // reference family maps directly so coprocessors can mutate them if desired -3206 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; -3207 // We try to set up a batch in the range [firstIndex,lastIndexExclusive) -3208 int firstIndex = batchOp.nextIndexToProcess; -3209 int lastIndexExclusive = firstIndex; -3210 boolean success = false; -3211 int noOfPuts = 0; -3212 int noOfDeletes = 0; -3213 WriteEntry writeEntry = null; -3214 int cellCount = 0; -3215 /** Keep track of the locks we hold so we can release them in finally clause */ -3216 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); -3217 MemstoreSize memstoreSize = new MemstoreSize(); -3218 final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch(); -3219 try { -3220 // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. -3221 int numReadyToWrite = 0; -3222 long now = EnvironmentEdgeManager.currentTime(); -3223 while (lastIndexExclusive < batchOp.operations.length) { -3224 if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { -3225 lastIndexExclusive++; -3226 continue; -3227 } -3228 Mutation mutation = batchOp.getMutation(lastIndexExclusive); -3229 // If we haven't got any rows in our batch, we should block to get the next one. -3230 RowLock rowLock = null; -3231 try { -3232 rowLock = getRowLockInternal(mutation.getRow(), true); -3233 } catch (TimeoutIOException e) { -3234 // We will retry when other exceptions, but we should stop if we timeout . -3235 throw e; -3236 } catch (IOException ioe) { -3237 LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); -3238 } -3239 if (rowLock == null) { -3240 // We failed to grab another lock -3241 break; // Stop acquiring more rows for this batch -3242 } else { -3243 acquiredRowLocks.add(rowLock); -3244 } -3245 -3246 lastIndexExclusive++; -3247 numReadyToWrite++; -3248 if (replay) { -3249 for (List<Cell> cells : mutation.getFamilyCellMap().values()) { -3250 cellCount += cells.size(); -3251 } -3252 } -3253 } -3254 -3255 // We've now grabbed as many mutations off the list as we can -3256 -3257 // STEP 2. Update any LATEST_TIMESTAMP timestamps -3258 // We should record the timestamp only after we have acquired the rowLock, -3259 // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp -3260 now = EnvironmentEdgeManager.currentTime(); -3261 byte[] byteNow = Bytes.toBytes(now); -3262 -3263 // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? -3264 if (numReadyToWrite <= 0) { -3265 return; -3266 } -3267 -3268 for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { -3269 // skip invalid -3270 if (batchOp.retCodeDetails[i].getOperationStatusCode() -3271 != OperationStatusCode.NOT_RUN) { -3272 // lastIndexExclusive was incremented above. -3273 continue; -3274 } -3275 -3276 Mutation mutation = batchOp.getMutation(i); -3277 if (mutation instanceof Put) { -3278 updateCellTimestamps(familyMaps[i].values(), byteNow); -3279 noOfPuts++; -3280 } else { -3281 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); -3282 noOfDeletes++; -3283 } -3284 rewriteCellTags(familyMaps[i], mutation); -3285 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; -3286 if (fromCP != null) { -3287 cellCount += fromCP.size(); -3288 } -3289 if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { -3290 for (List<Cell> cells : familyMaps[i].values()) { -3291 cellCount += cells.size(); -3292 } -3293 } -3294 } -3295 lock(this.updatesLock.readLock(), numReadyToWrite); -3296 locked = true; -3297 -3298 // calling the pre CP hook for batch mutation -3299 if (!replay && coprocessorHost != null) { -3300 MiniBatchOperationInProgress<Mutation> miniBatchOp = -3301 new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), -3302 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); -3303 if (coprocessorHost.preBatchMutate(miniBatchOp)) { -3304 return; -3305 } else { -3306 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3307 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { -3308 // lastIndexExclusive was incremented above. -3309 continue; -3310 } -3311 // we pass (i - firstIndex) below since the call expects a relative index -3312 Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); -3313 if (cpMutations == null) { -3314 continue; -3315 } -3316 Mutation mutation = batchOp.getMutation(i); -3317 boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; -3318 // Else Coprocessor added more Mutations corresponding to the Mutation at this index. -3319 for (int j = 0; j < cpMutations.length; j++) { -3320 Mutation cpMutation = cpMutations[j]; -3321 Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap(); -3322 checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now); -3323 -3324 // Acquire row locks. If not, the whole batch will fail. -3325 acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); +2861 // All edits for the given row (across all column families) must happen atomically. +2862 doBatchMutate(delete); +2863 } finally { +2864 closeRegionOperation(Operation.DELETE); +2865 } +2866 } +2867 +2868 /** +2869 * Row needed by below method. +2870 */ +2871 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly"); +2872 +2873 /** +2874 * This is used only by unit tests. Not required to be a public API. +2875 * @param familyMap map of family to edits for the given family. +2876 * @throws IOException +2877 */ +2878 void delete(NavigableMap<byte[], List<Cell>> familyMap, +2879 Durability durability) throws IOException { +2880 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY); +2881 delete.setFamilyCellMap(familyMap); +2882 delete.setDurability(durability); +2883 doBatchMutate(delete); +2884 } +2885 +2886 @Override +2887 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap, +2888 byte[] byteNow) throws IOException { +2889 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { +2890 +2891 byte[] family = e.getKey(); +2892 List<Cell> cells = e.getValue(); +2893 assert cells instanceof RandomAccess; +2894 +2895 Map<byte[], Integer> kvCount = new TreeMap<>(Bytes.BYTES_COMPARATOR); +2896 int listSize = cells.size(); +2897 for (int i=0; i < listSize; i++) { +2898 Cell cell = cells.get(i); +2899 // Check if time is LATEST, change to time of most recent addition if so +2900 // This is expensive. +2901 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) { +2902 byte[] qual = CellUtil.cloneQualifier(cell); +2903 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; +2904 +2905 Integer count = kvCount.get(qual); +2906 if (count == null) { +2907 kvCount.put(qual, 1); +2908 } else { +2909 kvCount.put(qual, count + 1); +2910 } +2911 count = kvCount.get(qual); +2912 +2913 Get get = new Get(CellUtil.cloneRow(cell)); +2914 get.setMaxVersions(count); +2915 get.addColumn(family, qual); +2916 if (coprocessorHost != null) { +2917 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, +2918 byteNow, get)) { +2919 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); +2920 } +2921 } else { +2922 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); +2923 } +2924 } else { +2925 CellUtil.updateLatestStamp(cell, byteNow, 0); +2926 } +2927 } +2928 } +2929 } +2930 +2931 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow) +2932 throws IOException { +2933 List<Cell> result = get(get, false); +2934 +2935 if (result.size() < count) { +2936 // Nothing to delete +2937 CellUtil.updateLatestStamp(cell, byteNow, 0); +2938 return; +2939 } +2940 if (result.size() > count) { +2941 throw new RuntimeException("Unexpected size: " + result.size()); +2942 } +2943 Cell getCell = result.get(count - 1); +2944 CellUtil.setTimestamp(cell, getCell.getTimestamp()); +2945 } +2946 +2947 @Override +2948 public void put(Put put) throws IOException { +2949 checkReadOnly(); +2950 +2951 // Do a rough check that we have resources to accept a write. The check is +2952 // 'rough' in that between the resource check and the call to obtain a +2953 // read lock, resources may run out. For now, the thought is that this +2954 // will be extremely rare; we'll deal with it when it happens. +2955 checkResources(); +2956 startRegionOperation(Operation.PUT); +2957 try { +2958 // All edits for the given row (across all column families) must happen atomically. +2959 doBatchMutate(put); +2960 } finally { +2961 closeRegionOperation(Operation.PUT); +2962 } +2963 } +2964 +2965 /** +2966 * Struct-like class that tracks the progress of a batch operation, +2967 * accumulating status codes and tracking the index at which processing +2968 * is proceeding. +2969 */ +2970 private abstract static class BatchOperation<T> { +2971 T[] operations; +2972 int nextIndexToProcess = 0; +2973 OperationStatus[] retCodeDetails; +2974 WALEdit[] walEditsFromCoprocessors; +2975 +2976 public BatchOperation(T[] operations) { +2977 this.operations = operations; +2978 this.retCodeDetails = new OperationStatus[operations.length]; +2979 this.walEditsFromCoprocessors = new WALEdit[operations.length]; +2980 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); +2981 } +2982 +2983 public abstract Mutation getMutation(int index); +2984 public abstract long getNonceGroup(int index); +2985 public abstract long getNonce(int index); +2986 /** This method is potentially expensive and should only be used for non-replay CP path. */ +2987 public abstract Mutation[] getMutationsForCoprocs(); +2988 public abstract boolean isInReplay(); +2989 public abstract long getReplaySequenceId(); +2990 +2991 public boolean isDone() { +2992 return nextIndexToProcess == operations.length; +2993 } +2994 } +2995 +2996 private static class MutationBatch extends BatchOperation<Mutation> { +2997 private long nonceGroup; +2998 private long nonce; +2999 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { +3000 super(operations); +3001 this.nonceGroup = nonceGroup; +3002 this.nonce = nonce; +3003 } +3004 +3005 @Override +3006 public Mutation getMutation(int index) { +3007 return this.operations[index]; +3008 } +3009 +3010 @Override +3011 public long getNonceGroup(int index) { +3012 return nonceGroup; +3013 } +3014 +3015 @Override +3016 public long getNonce(int index) { +3017 return nonce; +3018 } +3019 +3020 @Override +3021 public Mutation[] getMutationsForCoprocs() { +3022 return this.operations; +3023 } +3024 +3025 @Override +3026 public boolean isInReplay() { +3027 return false; +3028 } +3029 +3030 @Override +3031 public long getReplaySequenceId() { +3032 return 0; +3033 } +3034 } +3035 +3036 private static class ReplayBatch extends BatchOperation<MutationReplay> { +3037 private long replaySeqId = 0; +3038 public ReplayBatch(MutationReplay[] operations, long seqId) { +3039 super(operations); +3040 this.replaySeqId = seqId; +3041 } +3042 +3043 @Override +3044 public Mutation getMutation(int index) { +3045 return this.operations[index].mutation; +3046 } +3047 +3048 @Override +3049 public long getNonceGroup(int index) { +3050 return this.operations[index].nonceGroup; +3051 } +3052 +3053 @Override +3054 public long getNonce(int index) { +3055 return this.operations[index].nonce; +3056 } +3057 +3058 @Override +3059 public Mutation[] getMutationsForCoprocs() { +3060 assert false; +3061 throw new RuntimeException("Should not be called for replay batch"); +3062 } +3063 +3064 @Override +3065 public boolean isInReplay() { +3066 return true; +3067 } +3068 +3069 @Override +3070 public long getReplaySequenceId() { +3071 return this.replaySeqId; +3072 } +3073 } +3074 +3075 @Override +3076 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) +3077 throws IOException { +3078 // As it stands, this is used for 3 things +3079 // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. +3080 // * coprocessor calls (see ex. BulkDeleteEndpoint). +3081 // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... +3082 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); +3083 } +3084 +3085 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { +3086 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); +3087 } +3088 +3089 @Override +3090 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) +3091 throws IOException { +3092 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) +3093 && replaySeqId < lastReplayedOpenRegionSeqId) { +3094 // if it is a secondary replica we should ignore these entries silently +3095 // since they are coming out of order +3096 if (LOG.isTraceEnabled()) { +3097 LOG.trace(getRegionInfo().getEncodedName() + " : " +3098 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId +3099 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId); +3100 for (MutationReplay mut : mutations) { +3101 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation); +3102 } +3103 } +3104 +3105 OperationStatus[] statuses = new OperationStatus[mutations.length]; +3106 for (int i = 0; i < statuses.length; i++) { +3107 statuses[i] = OperationStatus.SUCCESS; +3108 } +3109 return statuses; +3110 } +3111 return batchMutate(new ReplayBatch(mutations, replaySeqId)); +3112 } +3113 +3114 /** +3115 * Perform a batch of mutations. +3116 * It supports only Put and Delete mutations and will ignore other types passed. +3117 * @param batchOp contains the list of mutations +3118 * @return an array of OperationStatus which internally contains the +3119 * OperationStatusCode and the exceptionMessage if any. +3120 * @throws IOException +3121 */ +3122 OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException { +3123 boolean initialized = false; +3124 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; +3125 startRegionOperation(op); +3126 try { +3127 while (!batchOp.isDone()) { +3128 if (!batchOp.isInReplay()) { +3129 checkReadOnly(); +3130 } +3131 checkResources(); +3132 +3133 if (!initialized) { +3134 this.writeRequestsCount.add(batchOp.operations.length); +3135 if (!batchOp.isInReplay()) { +3136 doPreBatchMutateHook(batchOp); +3137 } +3138 initialized = true; +3139 } +3140 doMiniBatchMutate(batchOp); +3141 long newSize = this.getMemstoreSize(); +3142 requestFlushIfNeeded(newSize); +3143 } +3144 } finally { +3145 closeRegionOperation(op); +3146 } +3147 return batchOp.retCodeDetails; +3148 } +3149 +3150 private void doPreBatchMutateHook(BatchOperation<?> batchOp) +3151 throws IOException { +3152 /* Run coprocessor pre hook outside of locks to avoid deadlock */ +3153 WALEdit walEdit = new WALEdit(); +3154 if (coprocessorHost != null) { +3155 for (int i = 0 ; i < batchOp.operations.length; i++) { +3156 Mutation m = batchOp.getMutation(i); +3157 if (m instanceof Put) { +3158 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { +3159 // pre hook says skip this Put +3160 // mark as success and skip in doMiniBatchMutation +3161 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; +3162 } +3163 } else if (m instanceof Delete) { +3164 Delete curDel = (Delete) m; +3165 if (curDel.getFamilyCellMap().isEmpty()) { +3166 // handle deleting a row case +3167 prepareDelete(curDel); +3168 } +3169 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { +3170 // pre hook says skip this Delete +3171 // mark as success and skip in doMiniBatchMutation +3172 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; +3173 } +3174 } else { +3175 // In case of passing Append mutations along with the Puts and Deletes in batchMutate +3176 // mark the operation return code as failure so that it will not be considered in +3177 // the doMiniBatchMutation +3178 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, +3179 "Put/Delete mutations only supported in batchMutate() now"); +3180 } +3181 if (!walEdit.isEmpty()) { +3182 batchOp.walEditsFromCoprocessors[i] = walEdit; +3183 walEdit = new WALEdit(); +3184 } +3185 } +3186 } +3187 } +3188 +3189 /** +3190 * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} +3191 * In here we also handle replay of edits on region recover. +3192 * @return Change in size brought about by applying <code>batchOp</code> +3193 */ +3194 // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 +3195 private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException { +3196 boolean replay = batchOp.isInReplay(); +3197 long currentNonceGroup = HConstants.NO_NONCE; +3198 long currentNonce = HConstants.NO_NONCE; +3199 WALEdit walEdit = null; +3200 boolean locked = false; +3201 // reference family maps directly so coprocessors can mutate them if desired +3202 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; +3203 // We try to set up a batch in the range [firstIndex,lastIndexExclusive) +3204 int firstIndex = batchOp.nextIndexToProcess; +3205 int lastIndexExclusive = firstIndex; +3206 boolean success = false; +3207 int noOfPuts = 0; +3208 int noOfDeletes = 0; +3209 WriteEntry writeEntry = null; +3210 int cellCount = 0; +3211 /** Keep track of the locks we hold so we can release them in finally clause */ +3212 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); +3213 MemstoreSize memstoreSize = new MemstoreSize(); +3214 final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch(); +3215 try { +3216 // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. +3217 int numReadyToWrite = 0; +3218 long now = EnvironmentEdgeManager.currentTime(); +3219 while (lastIndexExclusive < batchOp.operations.length) { +3220 if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { +3221 lastIndexExclusive++; +3222 continue; +3223 } +3224 Mutation mutation = batchOp.getMutation(lastIndexExclusive); +3225 // If we haven't got any rows in our batch, we should block to get the next one. +3226 RowLock rowLock = null; +3227 try { +3228 rowLock = getRowLockInternal(mutation.getRow(), true); +3229 } catch (TimeoutIOException e) { +3230 // We will retry when other exceptions, but we should stop if we timeout . +3231 throw e; +3232 } catch (IOException ioe) { +3233 LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); +3234 } +3235 if (rowLock == null) { +3236 // We failed to grab another lock +3237 break; // Stop acquiring more rows for this batch +3238 } else { +3239 acquiredRowLocks.add(rowLock); +3240 } +3241 +3242 lastIndexExclusive++; +3243 numReadyToWrite++; +3244 if (replay) { +3245 for (List<Cell> cells : mutation.getFamilyCellMap().values()) { +3246 cellCount += cells.size(); +3247 } +3248 } +3249 } +3250 +3251 // We've now grabbed as many mutations off the list as we can +3252 +3253 // STEP 2. Update any LATEST_TIMESTAMP timestamps +3254 // We should record the timestamp only after we have acquired the rowLock, +3255 // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp +3256 now = EnvironmentEdgeManager.currentTime(); +3257 byte[] byteNow = Bytes.toBytes(now); +3258 +3259 // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? +3260 if (numReadyToWrite <= 0) { +3261 return; +3262 } +3263 +3264 for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { +3265 // skip invalid +3266 if (batchOp.retCodeDetails[i].getOperationStatusCode() +3267 != OperationStatusCode.NOT_RUN) { +3268 // lastIndexExclusive was incremented above. +3269 continue; +3270 } +3271 +3272 Mutation mutation = batchOp.getMutation(i); +3273 if (mutation instanceof Put) { +3274 updateCellTimestamps(familyMaps[i].values(), byteNow); +3275 noOfPuts++; +3276 } else { +3277 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); +3278 noOfDeletes++; +3279 } +3280 rewriteCellTags(familyMaps[i], mutation); +3281 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; +3282 if (fro