Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6ED001826D for ; Mon, 8 Feb 2016 16:54:42 +0000 (UTC) Received: (qmail 88177 invoked by uid 500); 8 Feb 2016 16:54:34 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 88128 invoked by uid 500); 8 Feb 2016 16:54:34 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 86572 invoked by uid 99); 8 Feb 2016 16:54:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Feb 2016 16:54:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E3A12E0B11; Mon, 8 Feb 2016 16:54:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Mon, 08 Feb 2016 16:55:04 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [34/51] [partial] hbase-site git commit: Published site at eacf7bcf97f09c9a6e68baf9a4a9ceb1d83c9fb0. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/670bf1f0/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html index 658fe8f..d266952 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RegionScannerImpl.html @@ -2914,5347 +2914,5340 @@ 2906 * OperationStatusCode and the exceptionMessage if any. 2907 * @throws IOException 2908 */ -2909 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) -2910 throws IOException { -2911 boolean initialized = false; -2912 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; -2913 startRegionOperation(op); -2914 int cellCountFromCP = 0; -2915 try { -2916 while (!batchOp.isDone()) { -2917 if (!batchOp.isInReplay()) { -2918 checkReadOnly(); -2919 } -2920 checkResources(); -2921 if (!initialized) { -2922 this.writeRequestsCount.add(batchOp.operations.length); -2923 if (!batchOp.isInReplay()) { -2924 cellCountFromCP = doPreMutationHook(batchOp); -2925 } -2926 initialized = true; -2927 } -2928 long addedSize = doMiniBatchMutation(batchOp, cellCountFromCP); -2929 long newSize = this.addAndGetGlobalMemstoreSize(addedSize); -2930 if (isFlushSize(newSize)) { -2931 requestFlush(); -2932 } -2933 } -2934 } finally { -2935 closeRegionOperation(op); -2936 } -2937 return batchOp.retCodeDetails; -2938 } +2909 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException { +2910 boolean initialized = false; +2911 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; +2912 startRegionOperation(op); +2913 try { +2914 while (!batchOp.isDone()) { +2915 if (!batchOp.isInReplay()) { +2916 checkReadOnly(); +2917 } +2918 checkResources(); +2919 +2920 if (!initialized) { +2921 this.writeRequestsCount.add(batchOp.operations.length); +2922 if (!batchOp.isInReplay()) { +2923 doPreMutationHook(batchOp); +2924 } +2925 initialized = true; +2926 } +2927 long addedSize = doMiniBatchMutation(batchOp); +2928 long newSize = this.addAndGetGlobalMemstoreSize(addedSize); +2929 if (isFlushSize(newSize)) { +2930 requestFlush(); +2931 } +2932 } +2933 } finally { +2934 closeRegionOperation(op); +2935 } +2936 return batchOp.retCodeDetails; +2937 } +2938 2939 -2940 -2941 private int doPreMutationHook(BatchOperationInProgress<?> batchOp) -2942 throws IOException { -2943 /* Run coprocessor pre hook outside of locks to avoid deadlock */ -2944 WALEdit walEdit = new WALEdit(); -2945 int cellCount = 0; -2946 if (coprocessorHost != null) { -2947 for (int i = 0 ; i < batchOp.operations.length; i++) { -2948 Mutation m = batchOp.getMutation(i); -2949 if (m instanceof Put) { -2950 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { -2951 // pre hook says skip this Put -2952 // mark as success and skip in doMiniBatchMutation -2953 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -2954 } -2955 } else if (m instanceof Delete) { -2956 Delete curDel = (Delete) m; -2957 if (curDel.getFamilyCellMap().isEmpty()) { -2958 // handle deleting a row case -2959 prepareDelete(curDel); -2960 } -2961 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { -2962 // pre hook says skip this Delete -2963 // mark as success and skip in doMiniBatchMutation -2964 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -2965 } -2966 } else { -2967 // In case of passing Append mutations along with the Puts and Deletes in batchMutate -2968 // mark the operation return code as failure so that it will not be considered in -2969 // the doMiniBatchMutation -2970 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, -2971 "Put/Delete mutations only supported in batchMutate() now"); -2972 } -2973 if (!walEdit.isEmpty()) { -2974 batchOp.walEditsFromCoprocessors[i] = walEdit; -2975 cellCount += walEdit.size(); -2976 walEdit = new WALEdit(); -2977 } -2978 } -2979 } -2980 return cellCount; -2981 } -2982 -2983 @SuppressWarnings("unchecked") -2984 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp, int cellCount) -2985 throws IOException { -2986 boolean isInReplay = batchOp.isInReplay(); -2987 // variable to note if all Put items are for the same CF -- metrics related -2988 boolean putsCfSetConsistent = true; -2989 //The set of columnFamilies first seen for Put. -2990 Set<byte[]> putsCfSet = null; -2991 // variable to note if all Delete items are for the same CF -- metrics related -2992 boolean deletesCfSetConsistent = true; -2993 //The set of columnFamilies first seen for Delete. -2994 Set<byte[]> deletesCfSet = null; -2995 -2996 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; -2997 WALEdit walEdit = null; -2998 MultiVersionConcurrencyControl.WriteEntry writeEntry = null; -2999 long txid = 0; -3000 boolean doRollBackMemstore = false; -3001 boolean locked = false; -3002 -3003 /** Keep track of the locks we hold so we can release them in finally clause */ -3004 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); -3005 // reference family maps directly so coprocessors can mutate them if desired -3006 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; -3007 // We try to set up a batch in the range [firstIndex,lastIndexExclusive) -3008 int firstIndex = batchOp.nextIndexToProcess; -3009 int lastIndexExclusive = firstIndex; -3010 boolean success = false; -3011 int noOfPuts = 0, noOfDeletes = 0; -3012 WALKey walKey = null; -3013 long mvccNum = 0; -3014 try { -3015 // ------------------------------------ -3016 // STEP 1. Try to acquire as many locks as we can, and ensure -3017 // we acquire at least one. -3018 // ---------------------------------- -3019 int numReadyToWrite = 0; -3020 long now = EnvironmentEdgeManager.currentTime(); -3021 while (lastIndexExclusive < batchOp.operations.length) { -3022 Mutation mutation = batchOp.getMutation(lastIndexExclusive); -3023 boolean isPutMutation = mutation instanceof Put; -3024 -3025 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); -3026 // store the family map reference to allow for mutations -3027 familyMaps[lastIndexExclusive] = familyMap; -3028 // skip anything that "ran" already -3029 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() -3030 != OperationStatusCode.NOT_RUN) { -3031 lastIndexExclusive++; -3032 continue; -3033 } -3034 -3035 try { -3036 if (isPutMutation) { -3037 // Check the families in the put. If bad, skip this one. -3038 if (isInReplay) { -3039 removeNonExistentColumnFamilyForReplay(familyMap); -3040 } else { -3041 checkFamilies(familyMap.keySet()); -3042 } -3043 checkTimestamps(mutation.getFamilyCellMap(), now); -3044 } else { -3045 prepareDelete((Delete) mutation); -3046 } -3047 checkRow(mutation.getRow(), "doMiniBatchMutation"); -3048 } catch (NoSuchColumnFamilyException nscf) { -3049 LOG.warn("No such column family in batch mutation", nscf); -3050 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( -3051 OperationStatusCode.BAD_FAMILY, nscf.getMessage()); -3052 lastIndexExclusive++; -3053 continue; -3054 } catch (FailedSanityCheckException fsce) { -3055 LOG.warn("Batch Mutation did not pass sanity check", fsce); -3056 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( -3057 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); -3058 lastIndexExclusive++; -3059 continue; -3060 } catch (WrongRegionException we) { -3061 LOG.warn("Batch mutation had a row that does not belong to this region", we); -3062 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( -3063 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); -3064 lastIndexExclusive++; -3065 continue; -3066 } -3067 -3068 // If we haven't got any rows in our batch, we should block to -3069 // get the next one. -3070 RowLock rowLock = null; -3071 try { -3072 rowLock = getRowLock(mutation.getRow(), true); -3073 } catch (IOException ioe) { -3074 LOG.warn("Failed getting lock in batch put, row=" -3075 + Bytes.toStringBinary(mutation.getRow()), ioe); -3076 } -3077 if (rowLock == null) { -3078 // We failed to grab another lock -3079 break; // stop acquiring more rows for this batch -3080 } else { -3081 acquiredRowLocks.add(rowLock); -3082 } -3083 -3084 lastIndexExclusive++; -3085 numReadyToWrite++; -3086 -3087 if (isPutMutation) { -3088 // If Column Families stay consistent through out all of the -3089 // individual puts then metrics can be reported as a mutliput across -3090 // column families in the first put. -3091 if (putsCfSet == null) { -3092 putsCfSet = mutation.getFamilyCellMap().keySet(); -3093 } else { -3094 putsCfSetConsistent = putsCfSetConsistent -3095 && mutation.getFamilyCellMap().keySet().equals(putsCfSet); -3096 } -3097 } else { -3098 if (deletesCfSet == null) { -3099 deletesCfSet = mutation.getFamilyCellMap().keySet(); -3100 } else { -3101 deletesCfSetConsistent = deletesCfSetConsistent -3102 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet); -3103 } -3104 } -3105 } -3106 -3107 // we should record the timestamp only after we have acquired the rowLock, -3108 // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp -3109 now = EnvironmentEdgeManager.currentTime(); -3110 byte[] byteNow = Bytes.toBytes(now); -3111 -3112 // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? -3113 if (numReadyToWrite <= 0) return 0L; -3114 -3115 // We've now grabbed as many mutations off the list as we can -3116 -3117 // ------------------------------------ -3118 // STEP 2. Update any LATEST_TIMESTAMP timestamps -3119 // ---------------------------------- -3120 for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) { -3121 // skip invalid -3122 if (batchOp.retCodeDetails[i].getOperationStatusCode() -3123 != OperationStatusCode.NOT_RUN) continue; -3124 -3125 Mutation mutation = batchOp.getMutation(i); -3126 if (mutation instanceof Put) { -3127 updateCellTimestamps(familyMaps[i].values(), byteNow); -3128 noOfPuts++; -3129 } else { -3130 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); -3131 noOfDeletes++; -3132 } -3133 rewriteCellTags(familyMaps[i], mutation); -3134 for (List<Cell> cells : familyMaps[i].values()) { -3135 cellCount += cells.size(); -3136 } -3137 } -3138 walEdit = new WALEdit(cellCount); -3139 lock(this.updatesLock.readLock(), numReadyToWrite); -3140 locked = true; -3141 -3142 // calling the pre CP hook for batch mutation -3143 if (!isInReplay && coprocessorHost != null) { -3144 MiniBatchOperationInProgress<Mutation> miniBatchOp = -3145 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(), -3146 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); -3147 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; -3148 } -3149 -3150 // ------------------------------------ -3151 // STEP 3. Build WAL edit -3152 // ---------------------------------- -3153 Durability durability = Durability.USE_DEFAULT; -3154 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3155 // Skip puts that were determined to be invalid during preprocessing -3156 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { -3157 continue; -3158 } -3159 -3160 Mutation m = batchOp.getMutation(i); -3161 Durability tmpDur = getEffectiveDurability(m.getDurability()); -3162 if (tmpDur.ordinal() > durability.ordinal()) { -3163 durability = tmpDur; -3164 } -3165 if (tmpDur == Durability.SKIP_WAL) { -3166 recordMutationWithoutWal(m.getFamilyCellMap()); -3167 continue; -3168 } -3169 -3170 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i); -3171 // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. -3172 // Given how nonces are originally written, these should be contiguous. -3173 // They don't have to be, it will still work, just write more WALEdits than needed. -3174 if (nonceGroup != currentNonceGroup || nonce != currentNonce) { -3175 if (walEdit.size() > 0) { -3176 assert isInReplay; -3177 if (!isInReplay) { -3178 throw new IOException("Multiple nonces per batch and not in replay"); -3179 } -3180 // txid should always increase, so having the one from the last call is ok. -3181 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. -3182 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), -3183 this.htableDescriptor.getTableName(), now, m.getClusterIds(), -3184 currentNonceGroup, currentNonce, mvcc); -3185 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, -3186 walEdit, true); -3187 walEdit = new WALEdit(isInReplay); -3188 walKey = null; -3189 } -3190 currentNonceGroup = nonceGroup; -3191 currentNonce = nonce; -3192 } -3193 -3194 // Add WAL edits by CP -3195 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; -3196 if (fromCP != null) { -3197 for (Cell cell : fromCP.getCells()) { -3198 walEdit.add(cell); -3199 } -3200 } -3201 addFamilyMapToWALEdit(familyMaps[i], walEdit); -3202 } -3203 -3204 // ------------------------- -3205 // STEP 4. Append the final edit to WAL. Do not sync wal. -3206 // ------------------------- -3207 Mutation mutation = batchOp.getMutation(firstIndex); -3208 if (isInReplay) { -3209 // use wal key from the original -3210 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), -3211 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, -3212 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); -3213 long replaySeqId = batchOp.getReplaySequenceId(); -3214 walKey.setOrigLogSeqNum(replaySeqId); -3215 } -3216 if (walEdit.size() > 0) { -3217 if (!isInReplay) { -3218 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. -3219 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), -3220 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, -3221 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); -3222 } -3223 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); +2940 private void doPreMutationHook(BatchOperationInProgress<?> batchOp) +2941 throws IOException { +2942 /* Run coprocessor pre hook outside of locks to avoid deadlock */ +2943 WALEdit walEdit = new WALEdit(); +2944 if (coprocessorHost != null) { +2945 for (int i = 0 ; i < batchOp.operations.length; i++) { +2946 Mutation m = batchOp.getMutation(i); +2947 if (m instanceof Put) { +2948 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) { +2949 // pre hook says skip this Put +2950 // mark as success and skip in doMiniBatchMutation +2951 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; +2952 } +2953 } else if (m instanceof Delete) { +2954 Delete curDel = (Delete) m; +2955 if (curDel.getFamilyCellMap().isEmpty()) { +2956 // handle deleting a row case +2957 prepareDelete(curDel); +2958 } +2959 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) { +2960 // pre hook says skip this Delete +2961 // mark as success and skip in doMiniBatchMutation +2962 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; +2963 } +2964 } else { +2965 // In case of passing Append mutations along with the Puts and Deletes in batchMutate +2966 // mark the operation return code as failure so that it will not be considered in +2967 // the doMiniBatchMutation +2968 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE, +2969 "Put/Delete mutations only supported in batchMutate() now"); +2970 } +2971 if (!walEdit.isEmpty()) { +2972 batchOp.walEditsFromCoprocessors[i] = walEdit; +2973 walEdit = new WALEdit(); +2974 } +2975 } +2976 } +2977 } +2978 +2979 @SuppressWarnings("unchecked") +2980 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException { +2981 boolean isInReplay = batchOp.isInReplay(); +2982 // variable to note if all Put items are for the same CF -- metrics related +2983 boolean putsCfSetConsistent = true; +2984 //The set of columnFamilies first seen for Put. +2985 Set<byte[]> putsCfSet = null; +2986 // variable to note if all Delete items are for the same CF -- metrics related +2987 boolean deletesCfSetConsistent = true; +2988 //The set of columnFamilies first seen for Delete. +2989 Set<byte[]> deletesCfSet = null; +2990 +2991 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; +2992 WALEdit walEdit = new WALEdit(isInReplay); +2993 MultiVersionConcurrencyControl.WriteEntry writeEntry = null; +2994 long txid = 0; +2995 boolean doRollBackMemstore = false; +2996 boolean locked = false; +2997 +2998 /** Keep track of the locks we hold so we can release them in finally clause */ +2999 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); +3000 // reference family maps directly so coprocessors can mutate them if desired +3001 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; +3002 // We try to set up a batch in the range [firstIndex,lastIndexExclusive) +3003 int firstIndex = batchOp.nextIndexToProcess; +3004 int lastIndexExclusive = firstIndex; +3005 boolean success = false; +3006 int noOfPuts = 0, noOfDeletes = 0; +3007 WALKey walKey = null; +3008 long mvccNum = 0; +3009 try { +3010 // ------------------------------------ +3011 // STEP 1. Try to acquire as many locks as we can, and ensure +3012 // we acquire at least one. +3013 // ---------------------------------- +3014 int numReadyToWrite = 0; +3015 long now = EnvironmentEdgeManager.currentTime(); +3016 while (lastIndexExclusive < batchOp.operations.length) { +3017 Mutation mutation = batchOp.getMutation(lastIndexExclusive); +3018 boolean isPutMutation = mutation instanceof Put; +3019 +3020 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap(); +3021 // store the family map reference to allow for mutations +3022 familyMaps[lastIndexExclusive] = familyMap; +3023 +3024 // skip anything that "ran" already +3025 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() +3026 != OperationStatusCode.NOT_RUN) { +3027 lastIndexExclusive++; +3028 continue; +3029 } +3030 +3031 try { +3032 if (isPutMutation) { +3033 // Check the families in the put. If bad, skip this one. +3034 if (isInReplay) { +3035 removeNonExistentColumnFamilyForReplay(familyMap); +3036 } else { +3037 checkFamilies(familyMap.keySet()); +3038 } +3039 checkTimestamps(mutation.getFamilyCellMap(), now); +3040 } else { +3041 prepareDelete((Delete) mutation); +3042 } +3043 checkRow(mutation.getRow(), "doMiniBatchMutation"); +3044 } catch (NoSuchColumnFamilyException nscf) { +3045 LOG.warn("No such column family in batch mutation", nscf); +3046 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( +3047 OperationStatusCode.BAD_FAMILY, nscf.getMessage()); +3048 lastIndexExclusive++; +3049 continue; +3050 } catch (FailedSanityCheckException fsce) { +3051 LOG.warn("Batch Mutation did not pass sanity check", fsce); +3052 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( +3053 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); +3054 lastIndexExclusive++; +3055 continue; +3056 } catch (WrongRegionException we) { +3057 LOG.warn("Batch mutation had a row that does not belong to this region", we); +3058 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( +3059 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); +3060 lastIndexExclusive++; +3061 continue; +3062 } +3063 +3064 // If we haven't got any rows in our batch, we should block to +3065 // get the next one. +3066 RowLock rowLock = null; +3067 try { +3068 rowLock = getRowLock(mutation.getRow(), true); +3069 } catch (IOException ioe) { +3070 LOG.warn("Failed getting lock in batch put, row=" +3071 + Bytes.toStringBinary(mutation.getRow()), ioe); +3072 } +3073 if (rowLock == null) { +3074 // We failed to grab another lock +3075 break; // stop acquiring more rows for this batch +3076 } else { +3077 acquiredRowLocks.add(rowLock); +3078 } +3079 +3080 lastIndexExclusive++; +3081 numReadyToWrite++; +3082 +3083 if (isPutMutation) { +3084 // If Column Families stay consistent through out all of the +3085 // individual puts then metrics can be reported as a mutliput across +3086 // column families in the first put. +3087 if (putsCfSet == null) { +3088 putsCfSet = mutation.getFamilyCellMap().keySet(); +3089 } else { +3090 putsCfSetConsistent = putsCfSetConsistent +3091 && mutation.getFamilyCellMap().keySet().equals(putsCfSet); +3092 } +3093 } else { +3094 if (deletesCfSet == null) { +3095 deletesCfSet = mutation.getFamilyCellMap().keySet(); +3096 } else { +3097 deletesCfSetConsistent = deletesCfSetConsistent +3098 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet); +3099 } +3100 } +3101 } +3102 +3103 // we should record the timestamp only after we have acquired the rowLock, +3104 // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp +3105 now = EnvironmentEdgeManager.currentTime(); +3106 byte[] byteNow = Bytes.toBytes(now); +3107 +3108 // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? +3109 if (numReadyToWrite <= 0) return 0L; +3110 +3111 // We've now grabbed as many mutations off the list as we can +3112 +3113 // ------------------------------------ +3114 // STEP 2. Update any LATEST_TIMESTAMP timestamps +3115 // ---------------------------------- +3116 for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) { +3117 // skip invalid +3118 if (batchOp.retCodeDetails[i].getOperationStatusCode() +3119 != OperationStatusCode.NOT_RUN) continue; +3120 +3121 Mutation mutation = batchOp.getMutation(i); +3122 if (mutation instanceof Put) { +3123 updateCellTimestamps(familyMaps[i].values(), byteNow); +3124 noOfPuts++; +3125 } else { +3126 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow); +3127 noOfDeletes++; +3128 } +3129 rewriteCellTags(familyMaps[i], mutation); +3130 } +3131 +3132 lock(this.updatesLock.readLock(), numReadyToWrite); +3133 locked = true; +3134 +3135 // calling the pre CP hook for batch mutation +3136 if (!isInReplay && coprocessorHost != null) { +3137 MiniBatchOperationInProgress<Mutation> miniBatchOp = +3138 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(), +3139 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); +3140 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; +3141 } +3142 +3143 // ------------------------------------ +3144 // STEP 3. Build WAL edit +3145 // ---------------------------------- +3146 Durability durability = Durability.USE_DEFAULT; +3147 for (int i = firstIndex; i < lastIndexExclusive; i++) { +3148 // Skip puts that were determined to be invalid during preprocessing +3149 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { +3150 continue; +3151 } +3152 +3153 Mutation m = batchOp.getMutation(i); +3154 Durability tmpDur = getEffectiveDurability(m.getDurability()); +3155 if (tmpDur.ordinal() > durability.ordinal()) { +3156 durability = tmpDur; +3157 } +3158 if (tmpDur == Durability.SKIP_WAL) { +3159 recordMutationWithoutWal(m.getFamilyCellMap()); +3160 continue; +3161 } +3162 +3163 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i); +3164 // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. +3165 // Given how nonces are originally written, these should be contiguous. +3166 // They don't have to be, it will still work, just write more WALEdits than needed. +3167 if (nonceGroup != currentNonceGroup || nonce != currentNonce) { +3168 if (walEdit.size() > 0) { +3169 assert isInReplay; +3170 if (!isInReplay) { +3171 throw new IOException("Multiple nonces per batch and not in replay"); +3172 } +3173 // txid should always increase, so having the one from the last call is ok. +3174 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. +3175 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), +3176 this.htableDescriptor.getTableName(), now, m.getClusterIds(), +3177 currentNonceGroup, currentNonce, mvcc); +3178 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, +3179 walEdit, true); +3180 walEdit = new WALEdit(isInReplay); +3181 walKey = null; +3182 } +3183 currentNonceGroup = nonceGroup; +3184 currentNonce = nonce; +3185 } +3186 +3187 // Add WAL edits by CP +3188 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i]; +3189 if (fromCP != null) { +3190 for (Cell cell : fromCP.getCells()) { +3191 walEdit.add(cell); +3192 } +3193 } +3194 addFamilyMapToWALEdit(familyMaps[i], walEdit); +3195 } +3196 +3197 // ------------------------- +3198 // STEP 4. Append the final edit to WAL. Do not sync wal. +3199 // ------------------------- +3200 Mutation mutation = batchOp.getMutation(firstIndex); +3201 if (isInReplay) { +3202 // use wal key from the original +3203 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), +3204 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, +3205 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); +3206 long replaySeqId = batchOp.getReplaySequenceId(); +3207 walKey.setOrigLogSeqNum(replaySeqId); +3208 } +3209 if (walEdit.size() > 0) { +3210 if (!isInReplay) { +3211 // we use HLogKey here instead of WALKey directly to support legacy coprocessors. +3212 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), +3213 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, +3214 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); +3215 } +3216 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); +3217 } +3218 // ------------------------------------ +3219 // Acquire the latest mvcc number +3220 // ---------------------------------- +3221 if (walKey == null) { +3222 // If this is a skip wal operation just get the read point from mvcc +3223 walKey = this.appendEmptyEdit(this.wal); 3224 } -3225 // ------------------------------------ -3226 // Acquire the latest mvcc number -3227 // ---------------------------------- -3228 if (walKey == null) { -3229 // If this is a skip wal operation just get the read point from mvcc -3230 walKey = this.appendEmptyEdit(this.wal); -3231 } -3232 if (!isInReplay) { -3233 writeEntry = walKey.getWriteEntry(); -3234 mvccNum = writeEntry.getWriteNumber(); -3235 } else { -3236 mvccNum = batchOp.getReplaySequenceId(); -3237 } -3238 -3239 // ------------------------------------ -3240 // STEP 5. Write back to memstore -3241 // Write to memstore. It is ok to write to memstore -3242 // first without syncing the WAL because we do not roll -3243 // forward the memstore MVCC. The MVCC will be moved up when -3244 // the complete operation is done. These changes are not yet -3245 // visible to scanners till we update the MVCC. The MVCC is -3246 // moved only when the sync is complete. -3247 // ---------------------------------- -3248 long addedSize = 0; -3249 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3250 if (batchOp.retCodeDetails[i].getOperationStatusCode() -3251 != OperationStatusCode.NOT_RUN) { -3252 continue; -3253 } -3254 doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote -3255 addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); -3256 } -3257 -3258 // ------------------------------- -3259 // STEP 6. Release row locks, etc. -3260 // ------------------------------- -3261 if (locked) { -3262 this.updatesLock.readLock().unlock(); -3263 locked = false; -3264 } -3265 releaseRowLocks(acquiredRowLocks); +3225 if (!isInReplay) { +3226 writeEntry = walKey.getWriteEntry(); +3227 mvccNum = writeEntry.getWriteNumber(); +3228 } else { +3229 mvccNum = batchOp.getReplaySequenceId(); +3230 } +3231 +3232 // ------------------------------------ +3233 // STEP 5. Write back to memstore +3234 // Write to memstore. It is ok to write to memstore +3235 // first without syncing the WAL because we do not roll +3236 // forward the memstore MVCC. The MVCC will be moved up when +3237 // the complete operation is done. These changes are not yet +3238 // visible to scanners till we update the MVCC. The MVCC is +3239 // moved only when the sync is complete. +3240 // ---------------------------------- +3241 long addedSize = 0; +3242 for (int i = firstIndex; i < lastIndexExclusive; i++) { +3243 if (batchOp.retCodeDetails[i].getOperationStatusCode() +3244 != OperationStatusCode.NOT_RUN) { +3245 continue; +3246 } +3247 doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote +3248 addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); +3249 } +3250 +3251 // ------------------------------- +3252 // STEP 6. Release row locks, etc. +3253 // ------------------------------- +3254 if (locked) { +3255 this.updatesLock.readLock().unlock(); +3256 locked = false; +3257 } +3258 releaseRowLocks(acquiredRowLocks); +3259 +3260 // ------------------------- +3261 // STEP 7. Sync wal. +3262 // ------------------------- +3263 if (txid != 0) { +3264 syncOrDefer(txid, durability); +3265 } 3266 -3267 // ------------------------- -3268 // STEP 7. Sync wal. -3269 // ------------------------- -3270 if (txid != 0) { -3271 syncOrDefer(txid, durability); -3272 } -3273 -3274 doRollBackMemstore = false; -3275 // calling the post CP hook for batch mutation -3276 if (!isInReplay && coprocessorHost != null) { -3277 MiniBatchOperationInProgress<Mutation> miniBatchOp = -3278 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(), -3279 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); -3280 coprocessorHost.postBatchMutate(miniBatchOp); -3281 } -3282 -3283 // ------------------------------------------------------------------ -3284 // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. -3285 // ------------------------------------------------------------------ -3286 if (writeEntry != null) { -3287 mvcc.completeAndWait(writeEntry); -3288 writeEntry = null; -3289 } else if (isInReplay) { -3290 // ensure that the sequence id of the region is at least as big as orig log seq id -3291 mvcc.advanceTo(mvccNum); -3292 } -3293 -3294 for (int i = firstIndex; i < lastIndexExclusive; i ++) { -3295 if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { -3296 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; -3297 } -3298 } -3299 -3300 // ------------------------------------ -3301 // STEP 9. Run coprocessor post hooks. This should be done after the wal is -3302 // synced so that the coprocessor contract is adhered to. -3303 // ------------------------------------ -3304 if (!isInReplay && coprocessorHost != null) { -3305 for (int i = firstIndex; i < lastIndexExclusive; i++) { -3306 // only for successful puts -3307 if (batchOp.retCodeDetails[i].getOperationStatusCode() -3308 != OperationStatusCode.SUCCESS) { -3309 continue; -3310 } -3311 Mutation m = batchOp.getMutation(i); -3312 if (m instanceof Put) { -3313 coprocessorHost.postPut((Put) m, walEdit, m.getDurability()); -3314 } else { -3315 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); -3316 } -3317 } -3318 } -3319 -3320 success = true; -3321 return addedSize; -3322 } finally { -3323 // if the wal sync was unsuccessful, remove keys from memstore -3324 if (doRollBackMemstore) { -3325 for (int j = 0; j < familyMaps.length; j++) { -3326 for(List<Cell> cells:familyMaps[j].values()) { -3327 rollbackMemstore(cells); -3328 } -3329 } -3330 if (writeEntry != null) mvcc.complete(writeEntry); -3331 } else if (writeEntry != null) { -3332 mvcc.completeAndWait(writeEntry); -3333 } -3334 -3335 if (locked) { -3336 this.updatesLock.readLock().unlock(); -3337 } -3338 releaseRowLocks(acquiredRowLocks); -3339 -3340 // See if the column families were consistent through the whole thing. -3341 // if they were then keep them. If they were not then pass a null. -3342 // null will be treated as unknown. -3343 // Total time taken might be involving Puts and Deletes. -3344 // Split the time for puts and deletes based on the total number of Puts and Deletes. -3345 -3346 if (noOfPuts > 0) { -3347 // There were some Puts in the batch. -3348 if (this.metricsRegion != null) { -3349 this.metricsRegion.updatePut(); -3350 } -3351 } -3352 if (noOfDeletes > 0) { -3353 // There were some Deletes in the batch. -3354 if (this.metricsRegion != null) { -3355 this.metricsRegion.updateDelete(); +3267 doRollBackMemstore = false; +3268 // calling the post CP hook for batch mutation +3269 if (!isInReplay && coprocessorHost != null) { +3270 MiniBatchOperationInProgress<Mutation> miniBatchOp = +3271 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(), +3272 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); +3273 coprocessorHost.postBatchMutate(miniBatchOp); +3274 } +3275 +3276 // ------------------------------------------------------------------ +3277 // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. +3278 // ------------------------------------------------------------------ +3279 if (writeEntry != null) { +3280 mvcc.completeAndWait(writeEntry); +3281 writeEntry = null; +3282 } else if (isInReplay) { +3283 // ensure that the sequence id of the region is at least as big as orig log seq id +3284 mvcc.advanceTo(mvccNum); +3285 } +3286 +3287 for (int i = firstIndex; i < lastIndexExclusive; i ++) { +3288 if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { +3289 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; +3290 } +3291 } +3292 +3293 // ------------------------------------ +3294 // STEP 9. Run coprocessor post hooks. This should be done after the wal is +3295 // synced so that the coprocessor contract is adhered to. +3296 // ------------------------------------ +3297 if (!isInReplay && coprocessorHost != null) { +3298 for (int i = firstIndex; i < lastIndexExclusive; i++) { +3299 // only for successful puts +3300 if (batchOp.retCodeDetails[i].getOperationStatusCode() +3301 != OperationStatusCode.SUCCESS) { +3302 continue; +3303 } +3304 Mutation m = batchOp.getMutation(i); +3305 if (m instanceof Put) { +