Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 51F7D200D43 for ; Mon, 16 Oct 2017 17:16:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 50788160BEF; Mon, 16 Oct 2017 15:16:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91E25160BE9 for ; Mon, 16 Oct 2017 17:16:21 +0200 (CEST) Received: (qmail 44198 invoked by uid 500); 16 Oct 2017 15:16:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 43944 invoked by uid 99); 16 Oct 2017 15:16:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Oct 2017 15:16:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40DE5DFC40; Mon, 16 Oct 2017 15:16:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Mon, 16 Oct 2017 15:16:22 -0000 Message-Id: <5e38b85e87de48828bff926bbd2a2af6@git.apache.org> In-Reply-To: <645193abe9df4d0a95d3df532bee202f@git.apache.org> References: <645193abe9df4d0a95d3df532bee202f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/51] [partial] hbase-site git commit: Published site at . archived-at: Mon, 16 Oct 2017 15:16:26 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/47abd8e6/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html index 81d256e..73c1480 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.FlushResultImpl.html @@ -2964,5322 +2964,5321 @@ 2956 // This is expensive. 2957 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) { 2958 byte[] qual = CellUtil.cloneQualifier(cell); -2959 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY; -2960 -2961 Integer count = kvCount.get(qual); -2962 if (count == null) { -2963 kvCount.put(qual, 1); -2964 } else { -2965 kvCount.put(qual, count + 1); -2966 } -2967 count = kvCount.get(qual); -2968 -2969 Get get = new Get(CellUtil.cloneRow(cell)); -2970 get.setMaxVersions(count); -2971 get.addColumn(family, qual); -2972 if (coprocessorHost != null) { -2973 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell, -2974 byteNow, get)) { -2975 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); -2976 } -2977 } else { -2978 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow); -2979 } -2980 } else { -2981 CellUtil.updateLatestStamp(cell, byteNow, 0); -2982 } -2983 } -2984 } -2985 } -2986 -2987 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow) -2988 throws IOException { -2989 List<Cell> result = get(get, false); -2990 -2991 if (result.size() < count) { -2992 // Nothing to delete -2993 CellUtil.updateLatestStamp(cell, byteNow, 0); -2994 return; -2995 } -2996 if (result.size() > count) { -2997 throw new RuntimeException("Unexpected size: " + result.size()); -2998 } -2999 Cell getCell = result.get(count - 1); -3000 CellUtil.setTimestamp(cell, getCell.getTimestamp()); -3001 } -3002 -3003 @Override -3004 public void put(Put put) throws IOException { -3005 checkReadOnly(); -3006 -3007 // Do a rough check that we have resources to accept a write. The check is -3008 // 'rough' in that between the resource check and the call to obtain a -3009 // read lock, resources may run out. For now, the thought is that this -3010 // will be extremely rare; we'll deal with it when it happens. -3011 checkResources(); -3012 startRegionOperation(Operation.PUT); -3013 try { -3014 // All edits for the given row (across all column families) must happen atomically. -3015 doBatchMutate(put); -3016 } finally { -3017 closeRegionOperation(Operation.PUT); -3018 } -3019 } -3020 -3021 /** -3022 * Struct-like class that tracks the progress of a batch operation, -3023 * accumulating status codes and tracking the index at which processing -3024 * is proceeding. -3025 */ -3026 private abstract static class BatchOperation<T> { -3027 T[] operations; -3028 int nextIndexToProcess = 0; -3029 OperationStatus[] retCodeDetails; -3030 WALEdit[] walEditsFromCoprocessors; -3031 -3032 public BatchOperation(T[] operations) { -3033 this.operations = operations; -3034 this.retCodeDetails = new OperationStatus[operations.length]; -3035 this.walEditsFromCoprocessors = new WALEdit[operations.length]; -3036 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN); -3037 } -3038 -3039 public abstract Mutation getMutation(int index); -3040 public abstract long getNonceGroup(int index); -3041 public abstract long getNonce(int index); -3042 /** This method is potentially expensive and should only be used for non-replay CP path. */ -3043 public abstract Mutation[] getMutationsForCoprocs(); -3044 public abstract boolean isInReplay(); -3045 public abstract long getReplaySequenceId(); -3046 -3047 public boolean isDone() { -3048 return nextIndexToProcess == operations.length; -3049 } -3050 } -3051 -3052 private static class MutationBatch extends BatchOperation<Mutation> { -3053 private long nonceGroup; -3054 private long nonce; -3055 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { -3056 super(operations); -3057 this.nonceGroup = nonceGroup; -3058 this.nonce = nonce; -3059 } -3060 -3061 @Override -3062 public Mutation getMutation(int index) { -3063 return this.operations[index]; -3064 } -3065 -3066 @Override -3067 public long getNonceGroup(int index) { -3068 return nonceGroup; -3069 } -3070 -3071 @Override -3072 public long getNonce(int index) { -3073 return nonce; -3074 } -3075 -3076 @Override -3077 public Mutation[] getMutationsForCoprocs() { -3078 return this.operations; -3079 } -3080 -3081 @Override -3082 public boolean isInReplay() { -3083 return false; -3084 } -3085 -3086 @Override -3087 public long getReplaySequenceId() { -3088 return 0; -3089 } -3090 } -3091 -3092 private static class ReplayBatch extends BatchOperation<MutationReplay> { -3093 private long replaySeqId = 0; -3094 public ReplayBatch(MutationReplay[] operations, long seqId) { -3095 super(operations); -3096 this.replaySeqId = seqId; -3097 } -3098 -3099 @Override -3100 public Mutation getMutation(int index) { -3101 return this.operations[index].mutation; -3102 } -3103 -3104 @Override -3105 public long getNonceGroup(int index) { -3106 return this.operations[index].nonceGroup; -3107 } -3108 -3109 @Override -3110 public long getNonce(int index) { -3111 return this.operations[index].nonce; -3112 } -3113 -3114 @Override -3115 public Mutation[] getMutationsForCoprocs() { -3116 assert false; -3117 throw new RuntimeException("Should not be called for replay batch"); -3118 } -3119 -3120 @Override -3121 public boolean isInReplay() { -3122 return true; -3123 } -3124 -3125 @Override -3126 public long getReplaySequenceId() { -3127 return this.replaySeqId; -3128 } -3129 } -3130 -3131 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce) -3132 throws IOException { -3133 // As it stands, this is used for 3 things -3134 // * batchMutate with single mutation - put/delete, separate or from checkAndMutate. -3135 // * coprocessor calls (see ex. BulkDeleteEndpoint). -3136 // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd... -3137 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce)); -3138 } -3139 -3140 @Override -3141 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException { -3142 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); -3143 } -3144 -3145 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) -3146 throws IOException { -3147 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) -3148 && replaySeqId < lastReplayedOpenRegionSeqId) { -3149 // if it is a secondary replica we should ignore these entries silently -3150 // since they are coming out of order -3151 if (LOG.isTraceEnabled()) { -3152 LOG.trace(getRegionInfo().getEncodedName() + " : " -3153 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId -3154 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId); -3155 for (MutationReplay mut : mutations) { -3156 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation); -3157 } -3158 } -3159 -3160 OperationStatus[] statuses = new OperationStatus[mutations.length]; -3161 for (int i = 0; i < statuses.length; i++) { -3162 statuses[i] = OperationStatus.SUCCESS; -3163 } -3164 return statuses; -3165 } -3166 return batchMutate(new ReplayBatch(mutations, replaySeqId)); -3167 } -3168 -3169 /** -3170 * Perform a batch of mutations. -3171 * It supports only Put and Delete mutations and will ignore other types passed. -3172 * @param batchOp contains the list of mutations -3173 * @return an array of OperationStatus which internally contains the -3174 * OperationStatusCode and the exceptionMessage if any. -3175 * @throws IOException -3176 */ -3177 OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException { -3178 boolean initialized = false; -3179 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; -3180 startRegionOperation(op); -3181 try { -3182 while (!batchOp.isDone()) { -3183 if (!batchOp.isInReplay()) { -3184 checkReadOnly(); -3185 } -3186 checkResources(); -3187 -3188 if (!initialized) { -3189 this.writeRequestsCount.add(batchOp.operations.length); -3190 if (!batchOp.isInReplay()) { -3191 doPreBatchMutateHook(batchOp); -3192 } -3193 initialized = true; -3194 } -3195 doMiniBatchMutate(batchOp); -3196 long newSize = this.getMemStoreSize(); -3197 requestFlushIfNeeded(newSize); -3198 } -3199 } finally { -3200 closeRegionOperation(op); -3201 } -3202 return batchOp.retCodeDetails; -3203 } -3204 -3205 private void doPreBatchMutateHook(BatchOperation<?> batchOp) -3206 throws IOException { -3207 /* Run coprocessor pre hook outside of locks to avoid deadlock */ -3208 WALEdit walEdit = new WALEdit(); -3209 if (coprocessorHost != null) { -3210 for (int i = 0 ; i < batchOp.operations.length; i++) { -3211 Mutation m = batchOp.getMutation(i); -3212 if (m instanceof Put) { -3213 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { -3214 // pre hook says skip this Put -3215 // mark as success and skip in doMiniBatchMutation -3216 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -3217 } -3218 } else if (m instanceof Delete) { -3219 Delete curDel = (Delete) m; -3220 if (curDel.getFamilyCellMap().isEmpty()) { -3221 // handle deleting a row case -3222 prepareDelete(curDel); -3223 } -3224 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { -3225 // pre hook says skip this Delete -3226 // mark as success and skip in doMiniBatchMutation -3227 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -3228 } -3229 } else { -3230 // In case of passing Append mutations along with the Puts and Deletes in batchMutate -3231 // mark the operation return code as failure so that it will not be considered in -3232 // the doMiniBatchMutation -3233 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, -3234 "Put/Delete mutations only supported in batchMutate() now"); -3235 } -3236 if (!walEdit.isEmpty()) { -3237 batchOp.walEditsFromCoprocessors[i] = walEdit; -3238 walEdit = new WALEdit(); -3239 } -3240 } -3241 } -3242 } -3243 -3244 /** -3245 * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} -3246 * In here we also handle replay of edits on region recover. -3247 * @return Change in size brought about by applying <code>batchOp</code> -3248 */ -3249 // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 -3250 private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException { -3251 boolean replay = batchOp.isInReplay(); -3252 long currentNonceGroup = HConstants.NO_NONCE; -3253 long currentNonce = HConstants.NO_NONCE; -3254 WALEdit walEdit = null; -3255 boolean locked = false; -3256 // reference family maps directly so coprocessors can mutate them if desired -3257 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; -3258 // We try to set up a batch in the range [firstIndex,lastIndexExclusive) -3259 int firstIndex = batchOp.nextIndexToProcess; -3260 int lastIndexExclusive = firstIndex; -3261 boolean success = false; -3262 int noOfPuts = 0; -3263 int noOfDeletes = 0; -3264 WriteEntry writeEntry = null; -3265 int cellCount = 0; -3266 /** Keep track of the locks we hold so we can release them in finally clause */ -3267 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); -3268 MemStoreSize memstoreSize = new MemStoreSize(); -3269 final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch(); -3270 try { -3271 // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. -3272 int numReadyToWrite = 0; -3273 long now = EnvironmentEdgeManager.currentTime(); -3274 while (lastIndexExclusive < batchOp.operations.length) { -3275 if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now, observedExceptions)) { -3276 lastIndexExclusive++; -3277 continue; -3278 } -3279 Mutation mutation = batchOp.getMutation(lastIndexExclusive); -3280 // If we haven't got any rows in our batch, we should block to get the next one. -3281 RowLock rowLock = null; -3282 try { -3283 rowLock = getRowLockInternal(mutation.getRow(), true); -3284 } catch (TimeoutIOException e) { -3285 // We will retry when other exceptions, but we should stop if we timeout . -3286 throw e; -3287 } catch (IOException ioe) { -3288 LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); -3289 } -3290 if (rowLock == null) { -3291 // We failed to grab another lock -3292 break; // Stop acquiring more rows for this batch -3293 } else { -3294 acquiredRowLocks.add(rowLock); -3295 } -3296 -3297 lastIndexExclusive++; -3298 numReadyToWrite++; -3299 if (replay) { -3300 for (List<Cell> cells : mutation.getFamilyCellMap().values()) { -3301 cellCount += cells.size(); -3302 } -3303 } -3304 } -3305 -3306 // We've now grabbed as many mutations off the list as we can -3307 -3308 // STEP 2. Update any LATEST_TIMESTAMP timestamps -3309 // We should record the timestamp only after we have acquired the rowLock, -3310 // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp -3311 now = EnvironmentEdgeManager.currentTime(); -3312 byte[] byteNow = Bytes.toBytes(now); -3313 -3314 // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? -3315 if (numReadyToWrite <= 0) { -3316 return; -3317 } -3318 -3319 for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { -3320 // skip invalid -3321 if (batchOp.retCodeDetails[i].getOperationStatusCode() -3322 != OperationStatusCode.NOT_RUN) { -3323 // lastIndexExclusive was incremented above. -3324 continue; -3325 } -3326 -3327 Mutation mutation = batchOp.getMutation(i); -3328 if (mutation instanceof Put) { -3329 updateCellTimestamps(familyMaps[i].values(), byteNow); -3330 noOfPuts++; -3331 } else { -3332 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); -3333 noOfDeletes++; -3334 } -3335 rewriteCellTags(familyMaps[i], mutation); -3336 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; -3337 if (fromCP != null) { -3338 cellCount += fromCP.size(); -3339 } -3340 if (getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) { -3341 for (List<Cell> cells : familyMaps[i].values()) { -3342 cellCount += cells.size(); -3343 } -3344 } -3345 } -3346 lock(this.updatesLock.readLock(), numReadyToWrite); -3347 locked = true; -3348 -3349 // calling the pre CP hook for batch mutation -3350 if (!replay && coprocessorHost != null) { -3351 MiniBatchOperationInProgress<Mutation> miniBatchOp = -3352 new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), -3353 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); -3354 if (coprocessorHost.preBatchMutate(miniBatchOp)) { -3355 return; -3356 } else { -3357 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3358 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { -3359 // lastIndexExclusive was incremented above. -3360 continue; -3361 } -3362 // we pass (i - firstIndex) below since the call expects a relative index -3363 Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex); -3364 if (cpMutations == null) { -3365 continue; -3366 } -3367 Mutation mutation = batchOp.getMutation(i); -3368 boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL; -3369 // Else Coprocessor added more Mutations corresponding to the Mutation at this index. -3370 for (int j = 0; j < cpMutations.length; j++) { -3371 Mutation cpMutation = cpMutations[j]; -3372 Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap(); -3373 checkAndPrepareMutation(cpMutation, replay, cpFamilyMap, now); -3374 -3375 // Acquire row locks. If not, the whole batch will fail. -3376 acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true)); -3377 -3378 // Returned mutations from coprocessor correspond to the Mutation at index i. We can -3379 // directly add the cells from those mutations to the familyMaps of this mutation. -3380 mergeFamilyMaps(familyMaps[i], cpFamilyMap); // will get added to the memstore later -3381 -3382 // The durability of returned mutation is replaced by the corresponding mutation. -3383 // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the -3384 // cells of returned mutation. -3385 if (!skipWal) { -3386 for (List<Cell> cells : cpFamilyMap.values()) { -3387 cellCount += cells.size(); -3388 } -3389 } -3390 } -3391 } -3392 } -3393 } -3394 -3395 // STEP 3. Build WAL edit -3396 walEdit = new WALEdit(cellCount, replay); -3397 Durability durability = Durability.USE_DEFAULT; -3398 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3399 // Skip puts that were determined to be invalid during preprocessing -3400 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { -3401 continue; -3402 } -3403 -3404 Mutation m = batchOp.getMutation(i); -3405 Durability tmpDur = getEffectiveDurability(m.getDurability()); -3406 if (tmpDur.ordinal() > durability.ordinal()) { -3407 durability = tmpDur; -3408 } -3409 // we use durability of the original mutation for the mutation passed by CP. -3410 if (tmpDur == Durability.SKIP_WAL) { -3411 recordMutationWithoutWal(m.getFamilyCellMap()); -3412 continue; -3413 } -3414 -3415 long nonceGroup = batchOp.getNonceGroup(i); -3416 long nonce = batchOp.getNonce(i); -3417 // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. -3418 // Given how nonces are originally written, these should be contiguous. -3419 // They don't have to be, it will still work, just write more WALEdits than needed. -3420 if (nonceGroup != currentNonceGroup || nonce != currentNonce) { -3421 // Write what we have so far for nonces out to WAL -3422 appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce); -3423 walEdit = new WALEdit(cellCount, replay); -3424 currentNonceGroup = nonceGroup; -3425 currentNonce = nonce; -3426 } -3427 -3428 // Add WAL edits by CP -3429 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; -3430 if (fromCP != null) { -3431 for (Cell cell : fromCP.getCells()) { -3432 walEdit.add(cell); -3433 } -3434 } -3435 addFamilyMapToWALEdit(familyMaps[i], walEdit); -3436 } -3437 -3438 // STEP 4. Append the final edit to WAL and sync. -3439 Mutation mutation = batchOp.getMutation(firstIndex); -3440 WALKey walKey = null; -3441 long txid; -3442 if (replay) { -3443 // use wal key from the original -3444 walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), -3445 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, -3446 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); -3447 walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); -3448 if (!walEdit.isEmpty()) { -3449 txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); -3450 if (txid != 0) { -3451 sync(txid, durability); -3452 } -3453 } -3454 } else { -3455 try { -3456 if (!walEdit.isEmpty()) { -3457 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. -3458 walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), -3459 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, -3460 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, -3461 this.getReplicationScope()); -3462 // TODO: Use the doAppend methods below... complicated by the replay stuff above. -3463 txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); -3464 if (txid != 0) { -3465 sync(txid, durability); -3466 } -3467 if (writeEntry == null) { -3468 // if MVCC not preassigned, wait here until assigned -3469 writeEntry = walKey.getWriteEntry(); -3470 } -3471 } -3472 } catch (IOException ioe) { -3473 if (walKey != null && writeEntry == null) { -3474 // the writeEntry is not preassigned and error occurred during append or sync -3475 mvcc.complete(walKey.getWriteEntry()); -3476 } -3477 throw ioe; -3478 } -3479 } -3480 if (walKey == null) { -3481 // If no walKey, then not in replay and skipping WAL or some such. Begin an MVCC transaction -3482 // to get sequence id. -3483 writeEntry = mvcc.begin(); -3484 } -3485 -3486 // STEP 5. Write back to memstore -3487 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3488 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { -3489 continue; -3490 } -3491 // We need to update the sequence id for following reasons. -3492 // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id. -3493 // 2) If no WAL, FSWALEntry won't be used -3494 // we use durability of the original mutation for the mutation passed by CP. -3495 boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL; -3496 if (updateSeqId) { -3497 this.updateSequenceId(familyMaps[i].values(), -3498 replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); -3499 } -3500 applyFamilyMapToMemStore(familyMaps[i], memstoreSize); -3501 } -3502 -3503 // update memstore size -3504 this.addAndGetMemStoreSize(memstoreSize); -3505 -3506 // calling the post CP hook for batch mutation -3507 if (!replay && coprocessorHost != null) { -3508 MiniBatchOperationInProgress<Mutation> miniBatchOp = -3509 new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), -3510 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); -3511 coprocessorHost.postBatchMutate(miniBatchOp); -3512 } -3513 -3514 // STEP 6. Complete mvcc. -3515 if (replay) { -3516 this.mvcc.advanceTo(batchOp.getReplaySequenceId()); -3517 } else { -3518 // writeEntry won't be empty if not in replay mode -3519 mvcc.completeAndWait(writeEntry); -3520 writeEntry = null; -3521 } -3522 -3523 // STEP 7. Release row locks, etc. -3524 if (locked) { -3525 this.updatesLock.readLock().unlock(); -3526 locked = false; -3527 } -3528 releaseRowLocks(acquiredRowLocks); -3529 -3530 for (int i = firstIndex; i < lastIndexExclusive; i ++) { -3531 if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { -3532 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -3533 } -3534 } -3535 -3536 // STEP 8. Run coprocessor post hooks. This should be done after the wal is -3537 // synced so that the coprocessor contract is adhered to. -3538 if (!replay && coprocessorHost != null) { -3539 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3540 // only for successful puts -3541 if (batchOp.retCodeDetails[i].getOperationStatusCode() -3542 != OperationStatusCode.SUCCESS) { -3543 continue; -3544 } -3545 Mutation m = batchOp.getMutation(i); -3546 if (m instanceof Put) { -3547 coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); -3548 } else { -3549 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); -3550 } -3551 } -3552 } -3553 -3554 success = true; -3555 } finally { -3556 // Call complete rather than completeAndWait because we probably had error if walKey != null -3557 if (writeEntry != null) mvcc.complete(writeEntry); -3558 if (locked) { -3559 this.updatesLock.readLock().unlock(); -3560 } -3561 releaseRowLocks(acquiredRowLocks); -3562 -3563 // See if the column families were consistent through the whole thing. -3564 // if they were then keep them. If they were not then pass a null. -3565 // null will be treated as unknown. -3566 // Total time taken might be involving Puts and Deletes. -3567 // Split the time for puts and deletes based on the total number of Puts and Deletes. -3568 -3569 if (noOfPuts > 0) { -3570 // There were some Puts in the batch. -3571 if (this.metricsRegion != null) { -3572 this.metricsRegion.updatePut(); -3573 } -3574 } -3575 if (noOfDeletes > 0) { -3576 // There were some Deletes in the batch. -3577 if (this.metricsRegion != null) { -3578 this.metricsRegion.updateDelete(); -3579 } -3580 } -3581 if (!success) { -3582 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3583 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) { -3584 batchOp.retCodeDetails[i] = OperationStatus.FAILURE; -3585 } -3586 } -3587 } -3588 if (coprocessorHost != null && !batchOp.isInReplay()) { -3589 // call the coprocessor hook to do any finalization steps -3590 // after the put is done -3591 MiniBatchOperationInProgress<Mutation> miniBatchOp = -3592 new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(), -3593 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); -3594 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success); -3595 } -3596 -3597 batchOp.nextIndexToProcess = lastIndexExclusive; -3598 } -3599 } -3600 -3601 private void mergeFamilyMaps(Map<byte[], List<Cell>> familyMap, -3602 Map<byte[], List<Cell>> toBeMerged) { -3603 for (Map.Entry<byte[], List<Cell>> entry : toBeMerged.entrySet()) { -3604 List<Cell> cells = familyMap.get(entry.getKey()); -3605 if (cells == null) { -3606 familyMap.put(entry.getKey(), entry.getValue()); -3607 } else { -3608 cells.addAll(entry.getValue()); -3609 } -3610 } -3611 } -3612 -3613 private void appendCurrentNonces(final Mutation mutation, final boolean replay, -3614 final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) -3615 throws IOException { -3616 if (walEdit.isEmpty()) return; -3617 if (!replay) throw new IOException("Multiple nonces per batch and not in replay"); -3618 WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), -3619 this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), -3620 currentNonceGroup, currentNonce, mvcc, this.getReplicationScope()); -3621 this.wal.append(this.getRegionInfo(), walKey, walEdit, true); -3622 // Complete the mvcc transaction started down in append else it will block others -3623 this.mvcc.complete(walKey.getWriteEntry()); -3624 } -3625 -3626 private boolean checkBatchOp(BatchOperation<?> batchOp, final int lastIndexExclusive, -3627 final Map<byte[], List<Cell>>[] familyMaps, final long now, -3628 final ObservedExceptionsInBatch observedExceptions) -3629 throws IOException { -3630 boolean skip = false; -3631 // Skip anything that "ran" already -3632 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() -3633 != OperationStatusCode.NOT_RUN) { -3634 return true; -3635 } -3636 Mutation mutation = batchOp.getMutation(lastIndexExclusive); -3637 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); -3638 // store the family map reference to allow for mutations -3639 familyMaps[lastIndexExclusive] = familyMap; -3640 -3641 try { -3642 checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now); -3643 } catch (NoSuchColumnFamilyException nscf) { -3644 final String msg = "No such column family in batch mutation. "; -3645 if (observedExceptions.hasSeenNoSuchFamily()) { -3646 LOG.warn(msg + nscf.getMessage()); -3647 } else { -3648 LOG.warn(msg, nscf); -3649 observedExceptions.sawNoSuchFamily(); -3650 } -3651 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( -3652 OperationStatusCode.BAD_FAMILY, nscf.getMessage()); -3653 skip = true; -3654 } catch (FailedSanityCheckException fsce) { -3655 final String msg = "Batch Mutation did not pass sanity check. "; -3656 if (observedExceptions.hasSeenFailedSanityCheck()) { -3657 LOG.warn(msg + fsce.getMessage()); -3658 } else { -3659 LOG.warn(msg, fsce); -3660 observedExceptions.sawFailedSanityCheck(); -3661 } -3662 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( -3663 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); -3664 skip = true; -3665 } catch (WrongRegionException we) { -3666 final String msg = "Batch mutation had a row that does not belong to this region. "; -3667 if (observedExceptions.hasSeenWrongRegion()) { -3668 LOG.warn(msg + we.getMessage()); -3669 } else { -3670 LOG.warn(msg, we); -3671 observedExceptions.sawWrongRegion(); -3672 } -3673 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( -3674 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); -3675 skip = true; -3676 } -3677 return skip; -3678 } -3679 -3680 private void checkAndPrepareMutation(Mutation mutation, boolean replay, -3681 final Map<byte[], List<Cell>> familyMap, final long now) -3682 throws IOException { -3683 if (mutation instanceof Put) { -3684 // Check the families in the put. If bad, skip this one. -3685 if (replay) { -3686 removeNonExistentColumnFamilyForReplay(familyMap); -3687 } else { -3688 checkFamilies(familyMap.keySet()); -3689 } -3690 checkTimestamps(mutation.getFamilyCellMap(), now); -3691 } else { -3692 prepareDelete((Delete)mutation); -3693 } -3694 checkRow(mutation.getRow(), "doMiniBatchMutation"); -3695 } -3696 -3697 /** -3698 * During replay, there could exist column families which are removed between region server -3699 * failure and replay -3700 */ -3701 private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) { -3702 List<byte[]> nonExistentList = null; -3703 for (byte[] family : familyMap.keySet()) { -3704 if (!this.htableDescriptor.hasColumnFamily(family)) { -3705 if (nonExistentList == null) { -3706 nonExistentList = new ArrayList<>(); -3707 } -3708 nonExistentList.add(family); -3709 } -3710 } -3711 if (nonExistentList != null) { -3712 for (byte[] family : nonExistentList) { -3713 // Perhaps schema was changed between crash and replay -3714 LOG.info("No family for " + Bytes.toString(family) + " omit from reply."); -3715 familyMap.remove(family); -3716 } -3717 } -3718 } -3719 -3720 /** -3721 * Returns effective durability from the passed durability and -3722 * the table descriptor. -3723 */ -3724 protected Durability getEffectiveDurability(Durability d) { -3725 return d == Durability.USE_DEFAULT ? this.durability : d; -3726 } -3727 -3728 @Override -3729 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, -3730 CompareOperator op, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) -3731 throws IOException{ -3732 checkMutationType(mutation, row); -3733 return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, -3734 mutation, writeToWAL); -3735 } -3736 -3737 @Override -3738 public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, -3739 CompareOperator op, ByteArrayComparable comparator, RowMutations rm, -3740 boolean writeToWAL) -3741 throws IOException { -3742 return doCheckAndRowMutate(row, family, qualifier, op, comparator, rm, null, -3743 writeToWAL); -3744 } -3745 -3746 /** -3747 * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has -3748 * switches in the few places where there is deviation. -3749 */ -3750 private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier, -3751 CompareOperator op, ByteArrayComparable comparator, RowMutations rowMutations, -3752 Mutation mutation, boolean writeToWAL) -3753 throws IOException { -3754 // Could do the below checks but seems wacky with two callers only. Just comment out for now. -3755 // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't -3756 // need these commented out checks. -3757 // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null"); -3758 // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set"); -3759 checkReadOnly(); -3760 // TODO, add check for value length also move this check to the client -3761 checkResources(); -3762 startRegionOperation(); -3763 try { -3764 Get get = new Get(row); -3765 checkFamily(family); -3766 get.addColumn(family, qualifier); -3767 // Lock row - note that doBatchMutate will relock this row if called -3768 checkRow(row, "doCheckAndRowMutate"); -3769 RowLock rowLock = getRowLockInternal(get.getRow(), false); -3770 try { -3771 if (mutation != null && this.getCoprocessorHost() != null) { -3772 // Call coprocessor. -3773 Boolean processed = null; -3774 if (mutation instanceof Put) { -3775 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, -3776 qualifier, op, comparator, (Put)mutation); -3777 } else if (mutation instanceof Delete) { -3778 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, -3779 qualifier, op, comparator, (Delete)mutation); -3780 } -3781 if (processed != null) { -3782 return processed; -3783 } -3784 } -3785 // NOTE: We used to wait here until mvcc caught up: mvcc.await(); -3786 // Supposition is that now all changes are done under row locks, then when we go to read, -3787 // we'll get the latest on this row. -3788 List<Cell> result = get(get, false); -3789 boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; -3790 boolean matches = false; -3791 long cellTs = 0; -3792 if (result.isEmpty() && valueIsNull) { -3793 matches = true; -3794 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { -3795 matches = true; -3796 cellTs = result.get(0).getTimestamp(); -3797 } else if (result.size() == 1 && !valueIsNull) { -3798 Cell kv = result.get(0); -3799 cellTs = kv.getTimestamp(); -3800 int compareResult = CellComparator.compareValue(kv, comparator); -3801 matches = matches(op, compareResult); -3802 } -3803 // If matches put the new put or delete the new delete -3804 if (matches) { -3805 // We have acquired the row lock already. If the system clock is NOT monotonically -3806 // non-decreasing (see HBASE-14070) we should make sure that the mutation has a -3807 // larger timestamp than what was observed via Get. doBatchMutate already does this, but -3808 // there is no way to pass the cellTs. See HBASE-14054. -3809 long now = EnvironmentEdgeManager.currentTime(); -3810 long ts = Math.max(now, cellTs); // ensure write is not eclipsed -3811 byte[] byteTs = Bytes.toBytes(ts); -3812 if (mutation != null) { -3813 if (mutation instanceof Put) { -3814