hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From git-site-r...@apache.org
Subject [11/51] [partial] hbase-site git commit: Published site at .
Date Fri, 10 Nov 2017 15:29:58 GMT
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a108018f/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BulkLoadListener.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BulkLoadListener.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BulkLoadListener.html
index 5c5e0eb..7b41db2 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BulkLoadListener.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.BulkLoadListener.html
@@ -2960,5416 +2960,5447 @@
 <span class="sourceLineNo">2952</span>    protected final ObservedExceptionsInBatch observedExceptions;<a name="line.2952"></a>
 <span class="sourceLineNo">2953</span>    //Durability of the batch (highest durability of all operations)<a name="line.2953"></a>
 <span class="sourceLineNo">2954</span>    protected Durability durability;<a name="line.2954"></a>
-<span class="sourceLineNo">2955</span><a name="line.2955"></a>
-<span class="sourceLineNo">2956</span>    public BatchOperation(final HRegion region, T[] operations) {<a name="line.2956"></a>
-<span class="sourceLineNo">2957</span>      this.operations = operations;<a name="line.2957"></a>
-<span class="sourceLineNo">2958</span>      this.retCodeDetails = new OperationStatus[operations.length];<a name="line.2958"></a>
-<span class="sourceLineNo">2959</span>      Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);<a name="line.2959"></a>
-<span class="sourceLineNo">2960</span>      this.walEditsFromCoprocessors = new WALEdit[operations.length];<a name="line.2960"></a>
-<span class="sourceLineNo">2961</span>      familyCellMaps = new Map[operations.length];<a name="line.2961"></a>
-<span class="sourceLineNo">2962</span><a name="line.2962"></a>
-<span class="sourceLineNo">2963</span>      this.region = region;<a name="line.2963"></a>
-<span class="sourceLineNo">2964</span>      observedExceptions = new ObservedExceptionsInBatch();<a name="line.2964"></a>
-<span class="sourceLineNo">2965</span>      durability = Durability.USE_DEFAULT;<a name="line.2965"></a>
-<span class="sourceLineNo">2966</span>    }<a name="line.2966"></a>
-<span class="sourceLineNo">2967</span><a name="line.2967"></a>
-<span class="sourceLineNo">2968</span>    /**<a name="line.2968"></a>
-<span class="sourceLineNo">2969</span>     * Visitor interface for batch operations<a name="line.2969"></a>
-<span class="sourceLineNo">2970</span>     */<a name="line.2970"></a>
-<span class="sourceLineNo">2971</span>    @FunctionalInterface<a name="line.2971"></a>
-<span class="sourceLineNo">2972</span>    public interface Visitor {<a name="line.2972"></a>
-<span class="sourceLineNo">2973</span>      /**<a name="line.2973"></a>
-<span class="sourceLineNo">2974</span>       * @param index operation index<a name="line.2974"></a>
-<span class="sourceLineNo">2975</span>       * @return If true continue visiting remaining entries, break otherwise<a name="line.2975"></a>
-<span class="sourceLineNo">2976</span>       */<a name="line.2976"></a>
-<span class="sourceLineNo">2977</span>      boolean visit(int index) throws IOException;<a name="line.2977"></a>
-<span class="sourceLineNo">2978</span>    }<a name="line.2978"></a>
-<span class="sourceLineNo">2979</span><a name="line.2979"></a>
-<span class="sourceLineNo">2980</span>    /**<a name="line.2980"></a>
-<span class="sourceLineNo">2981</span>     * Helper method for visiting pending/ all batch operations<a name="line.2981"></a>
-<span class="sourceLineNo">2982</span>     */<a name="line.2982"></a>
-<span class="sourceLineNo">2983</span>    public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor)<a name="line.2983"></a>
-<span class="sourceLineNo">2984</span>        throws IOException {<a name="line.2984"></a>
-<span class="sourceLineNo">2985</span>      assert lastIndexExclusive &lt;= this.size();<a name="line.2985"></a>
-<span class="sourceLineNo">2986</span>      for (int i = nextIndexToProcess; i &lt; lastIndexExclusive; i++) {<a name="line.2986"></a>
-<span class="sourceLineNo">2987</span>        if (!pendingOnly || isOperationPending(i)) {<a name="line.2987"></a>
-<span class="sourceLineNo">2988</span>          if (!visitor.visit(i)) {<a name="line.2988"></a>
-<span class="sourceLineNo">2989</span>            break;<a name="line.2989"></a>
-<span class="sourceLineNo">2990</span>          }<a name="line.2990"></a>
-<span class="sourceLineNo">2991</span>        }<a name="line.2991"></a>
-<span class="sourceLineNo">2992</span>      }<a name="line.2992"></a>
-<span class="sourceLineNo">2993</span>    }<a name="line.2993"></a>
-<span class="sourceLineNo">2994</span><a name="line.2994"></a>
-<span class="sourceLineNo">2995</span>    public abstract Mutation getMutation(int index);<a name="line.2995"></a>
-<span class="sourceLineNo">2996</span>    public abstract long getNonceGroup(int index);<a name="line.2996"></a>
-<span class="sourceLineNo">2997</span>    public abstract long getNonce(int index);<a name="line.2997"></a>
-<span class="sourceLineNo">2998</span>    /** This method is potentially expensive and useful mostly for non-replay CP path. */<a name="line.2998"></a>
-<span class="sourceLineNo">2999</span>    public abstract Mutation[] getMutationsForCoprocs();<a name="line.2999"></a>
-<span class="sourceLineNo">3000</span>    public abstract boolean isInReplay();<a name="line.3000"></a>
-<span class="sourceLineNo">3001</span>    public abstract long getOrigLogSeqNum();<a name="line.3001"></a>
-<span class="sourceLineNo">3002</span>    public abstract void startRegionOperation() throws IOException;<a name="line.3002"></a>
-<span class="sourceLineNo">3003</span>    public abstract void closeRegionOperation() throws IOException;<a name="line.3003"></a>
-<span class="sourceLineNo">3004</span><a name="line.3004"></a>
-<span class="sourceLineNo">3005</span>    /**<a name="line.3005"></a>
-<span class="sourceLineNo">3006</span>     * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs<a name="line.3006"></a>
-<span class="sourceLineNo">3007</span>     * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on<a name="line.3007"></a>
-<span class="sourceLineNo">3008</span>     * entire batch and will be called from outside of class to check and prepare batch. This can<a name="line.3008"></a>
-<span class="sourceLineNo">3009</span>     * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a<a name="line.3009"></a>
-<span class="sourceLineNo">3010</span>     * 'for' loop over mutations.<a name="line.3010"></a>
-<span class="sourceLineNo">3011</span>     */<a name="line.3011"></a>
-<span class="sourceLineNo">3012</span>    public abstract void checkAndPrepare() throws IOException;<a name="line.3012"></a>
-<span class="sourceLineNo">3013</span><a name="line.3013"></a>
-<span class="sourceLineNo">3014</span>    /**<a name="line.3014"></a>
-<span class="sourceLineNo">3015</span>     * Implement any Put request specific check and prepare logic here. Please refer to<a name="line.3015"></a>
-<span class="sourceLineNo">3016</span>     * {@link #checkAndPrepareMutation(Mutation, long)} for how its used.<a name="line.3016"></a>
-<span class="sourceLineNo">3017</span>     */<a name="line.3017"></a>
-<span class="sourceLineNo">3018</span>    protected abstract void checkAndPreparePut(final Put p) throws IOException;<a name="line.3018"></a>
-<span class="sourceLineNo">3019</span><a name="line.3019"></a>
-<span class="sourceLineNo">3020</span>    /**<a name="line.3020"></a>
-<span class="sourceLineNo">3021</span>     *  If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell<a name="line.3021"></a>
-<span class="sourceLineNo">3022</span>     *  count, tags and timestamp for all cells of all operations in a mini-batch.<a name="line.3022"></a>
-<span class="sourceLineNo">3023</span>     */<a name="line.3023"></a>
-<span class="sourceLineNo">3024</span>    public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress&lt;Mutation&gt;<a name="line.3024"></a>
-<span class="sourceLineNo">3025</span>        miniBatchOp, long timestamp, final List&lt;RowLock&gt; acquiredRowLocks) throws IOException;<a name="line.3025"></a>
-<span class="sourceLineNo">3026</span><a name="line.3026"></a>
-<span class="sourceLineNo">3027</span>    /**<a name="line.3027"></a>
-<span class="sourceLineNo">3028</span>     * Write mini-batch operations to MemStore<a name="line.3028"></a>
-<span class="sourceLineNo">3029</span>     */<a name="line.3029"></a>
-<span class="sourceLineNo">3030</span>    public abstract WriteEntry writeMiniBatchOperationsToMemStore(<a name="line.3030"></a>
-<span class="sourceLineNo">3031</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final WriteEntry writeEntry)<a name="line.3031"></a>
-<span class="sourceLineNo">3032</span>        throws IOException;<a name="line.3032"></a>
-<span class="sourceLineNo">3033</span><a name="line.3033"></a>
-<span class="sourceLineNo">3034</span>    protected void writeMiniBatchOperationsToMemStore(<a name="line.3034"></a>
-<span class="sourceLineNo">3035</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final long writeNumber)<a name="line.3035"></a>
-<span class="sourceLineNo">3036</span>        throws IOException {<a name="line.3036"></a>
-<span class="sourceLineNo">3037</span>      MemStoreSizing memStoreAccounting = new MemStoreSizing();<a name="line.3037"></a>
-<span class="sourceLineNo">3038</span>      visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -&gt; {<a name="line.3038"></a>
-<span class="sourceLineNo">3039</span>        // We need to update the sequence id for following reasons.<a name="line.3039"></a>
-<span class="sourceLineNo">3040</span>        // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.<a name="line.3040"></a>
-<span class="sourceLineNo">3041</span>        // 2) If no WAL, FSWALEntry won't be used<a name="line.3041"></a>
-<span class="sourceLineNo">3042</span>        // we use durability of the original mutation for the mutation passed by CP.<a name="line.3042"></a>
-<span class="sourceLineNo">3043</span>        if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) {<a name="line.3043"></a>
-<span class="sourceLineNo">3044</span>          region.updateSequenceId(familyCellMaps[index].values(), writeNumber);<a name="line.3044"></a>
-<span class="sourceLineNo">3045</span>        }<a name="line.3045"></a>
-<span class="sourceLineNo">3046</span>        applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);<a name="line.3046"></a>
-<span class="sourceLineNo">3047</span>        return true;<a name="line.3047"></a>
-<span class="sourceLineNo">3048</span>      });<a name="line.3048"></a>
-<span class="sourceLineNo">3049</span>      // update memStore size<a name="line.3049"></a>
-<span class="sourceLineNo">3050</span>      region.addAndGetMemStoreSize(memStoreAccounting);<a name="line.3050"></a>
-<span class="sourceLineNo">3051</span>    }<a name="line.3051"></a>
-<span class="sourceLineNo">3052</span><a name="line.3052"></a>
-<span class="sourceLineNo">3053</span>    public boolean isDone() {<a name="line.3053"></a>
-<span class="sourceLineNo">3054</span>      return nextIndexToProcess == operations.length;<a name="line.3054"></a>
-<span class="sourceLineNo">3055</span>    }<a name="line.3055"></a>
-<span class="sourceLineNo">3056</span><a name="line.3056"></a>
-<span class="sourceLineNo">3057</span>    public int size() {<a name="line.3057"></a>
-<span class="sourceLineNo">3058</span>      return operations.length;<a name="line.3058"></a>
-<span class="sourceLineNo">3059</span>    }<a name="line.3059"></a>
-<span class="sourceLineNo">3060</span><a name="line.3060"></a>
-<span class="sourceLineNo">3061</span>    public boolean isOperationPending(int index) {<a name="line.3061"></a>
-<span class="sourceLineNo">3062</span>      return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN;<a name="line.3062"></a>
-<span class="sourceLineNo">3063</span>    }<a name="line.3063"></a>
-<span class="sourceLineNo">3064</span><a name="line.3064"></a>
-<span class="sourceLineNo">3065</span>    public List&lt;UUID&gt; getClusterIds() {<a name="line.3065"></a>
-<span class="sourceLineNo">3066</span>      assert size() != 0;<a name="line.3066"></a>
-<span class="sourceLineNo">3067</span>      return getMutation(0).getClusterIds();<a name="line.3067"></a>
-<span class="sourceLineNo">3068</span>    }<a name="line.3068"></a>
-<span class="sourceLineNo">3069</span><a name="line.3069"></a>
-<span class="sourceLineNo">3070</span>    /**<a name="line.3070"></a>
-<span class="sourceLineNo">3071</span>     * Helper method that checks and prepares only one mutation. This can be used to implement<a name="line.3071"></a>
-<span class="sourceLineNo">3072</span>     * {@link #checkAndPrepare()} for entire Batch.<a name="line.3072"></a>
-<span class="sourceLineNo">3073</span>     * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called<a name="line.3073"></a>
-<span class="sourceLineNo">3074</span>     * after prePut()/ preDelete() CP hooks are run for the mutation<a name="line.3074"></a>
-<span class="sourceLineNo">3075</span>     */<a name="line.3075"></a>
-<span class="sourceLineNo">3076</span>    protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)<a name="line.3076"></a>
-<span class="sourceLineNo">3077</span>        throws IOException {<a name="line.3077"></a>
-<span class="sourceLineNo">3078</span>      region.checkRow(mutation.getRow(), "batchMutate");<a name="line.3078"></a>
-<span class="sourceLineNo">3079</span>      if (mutation instanceof Put) {<a name="line.3079"></a>
-<span class="sourceLineNo">3080</span>        // Check the families in the put. If bad, skip this one.<a name="line.3080"></a>
-<span class="sourceLineNo">3081</span>        checkAndPreparePut((Put) mutation);<a name="line.3081"></a>
-<span class="sourceLineNo">3082</span>        region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);<a name="line.3082"></a>
-<span class="sourceLineNo">3083</span>      } else {<a name="line.3083"></a>
-<span class="sourceLineNo">3084</span>        region.prepareDelete((Delete) mutation);<a name="line.3084"></a>
-<span class="sourceLineNo">3085</span>      }<a name="line.3085"></a>
-<span class="sourceLineNo">3086</span>    }<a name="line.3086"></a>
-<span class="sourceLineNo">3087</span><a name="line.3087"></a>
-<span class="sourceLineNo">3088</span>    protected void checkAndPrepareMutation(int index, long timestamp) throws IOException {<a name="line.3088"></a>
-<span class="sourceLineNo">3089</span>      Mutation mutation = getMutation(index);<a name="line.3089"></a>
-<span class="sourceLineNo">3090</span>      try {<a name="line.3090"></a>
-<span class="sourceLineNo">3091</span>        this.checkAndPrepareMutation(mutation, timestamp);<a name="line.3091"></a>
+<span class="sourceLineNo">2955</span>    protected boolean atomic = false;<a name="line.2955"></a>
+<span class="sourceLineNo">2956</span><a name="line.2956"></a>
+<span class="sourceLineNo">2957</span>    public BatchOperation(final HRegion region, T[] operations) {<a name="line.2957"></a>
+<span class="sourceLineNo">2958</span>      this.operations = operations;<a name="line.2958"></a>
+<span class="sourceLineNo">2959</span>      this.retCodeDetails = new OperationStatus[operations.length];<a name="line.2959"></a>
+<span class="sourceLineNo">2960</span>      Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);<a name="line.2960"></a>
+<span class="sourceLineNo">2961</span>      this.walEditsFromCoprocessors = new WALEdit[operations.length];<a name="line.2961"></a>
+<span class="sourceLineNo">2962</span>      familyCellMaps = new Map[operations.length];<a name="line.2962"></a>
+<span class="sourceLineNo">2963</span><a name="line.2963"></a>
+<span class="sourceLineNo">2964</span>      this.region = region;<a name="line.2964"></a>
+<span class="sourceLineNo">2965</span>      observedExceptions = new ObservedExceptionsInBatch();<a name="line.2965"></a>
+<span class="sourceLineNo">2966</span>      durability = Durability.USE_DEFAULT;<a name="line.2966"></a>
+<span class="sourceLineNo">2967</span>    }<a name="line.2967"></a>
+<span class="sourceLineNo">2968</span><a name="line.2968"></a>
+<span class="sourceLineNo">2969</span>    /**<a name="line.2969"></a>
+<span class="sourceLineNo">2970</span>     * Visitor interface for batch operations<a name="line.2970"></a>
+<span class="sourceLineNo">2971</span>     */<a name="line.2971"></a>
+<span class="sourceLineNo">2972</span>    @FunctionalInterface<a name="line.2972"></a>
+<span class="sourceLineNo">2973</span>    public interface Visitor {<a name="line.2973"></a>
+<span class="sourceLineNo">2974</span>      /**<a name="line.2974"></a>
+<span class="sourceLineNo">2975</span>       * @param index operation index<a name="line.2975"></a>
+<span class="sourceLineNo">2976</span>       * @return If true continue visiting remaining entries, break otherwise<a name="line.2976"></a>
+<span class="sourceLineNo">2977</span>       */<a name="line.2977"></a>
+<span class="sourceLineNo">2978</span>      boolean visit(int index) throws IOException;<a name="line.2978"></a>
+<span class="sourceLineNo">2979</span>    }<a name="line.2979"></a>
+<span class="sourceLineNo">2980</span><a name="line.2980"></a>
+<span class="sourceLineNo">2981</span>    /**<a name="line.2981"></a>
+<span class="sourceLineNo">2982</span>     * Helper method for visiting pending/ all batch operations<a name="line.2982"></a>
+<span class="sourceLineNo">2983</span>     */<a name="line.2983"></a>
+<span class="sourceLineNo">2984</span>    public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor)<a name="line.2984"></a>
+<span class="sourceLineNo">2985</span>        throws IOException {<a name="line.2985"></a>
+<span class="sourceLineNo">2986</span>      assert lastIndexExclusive &lt;= this.size();<a name="line.2986"></a>
+<span class="sourceLineNo">2987</span>      for (int i = nextIndexToProcess; i &lt; lastIndexExclusive; i++) {<a name="line.2987"></a>
+<span class="sourceLineNo">2988</span>        if (!pendingOnly || isOperationPending(i)) {<a name="line.2988"></a>
+<span class="sourceLineNo">2989</span>          if (!visitor.visit(i)) {<a name="line.2989"></a>
+<span class="sourceLineNo">2990</span>            break;<a name="line.2990"></a>
+<span class="sourceLineNo">2991</span>          }<a name="line.2991"></a>
+<span class="sourceLineNo">2992</span>        }<a name="line.2992"></a>
+<span class="sourceLineNo">2993</span>      }<a name="line.2993"></a>
+<span class="sourceLineNo">2994</span>    }<a name="line.2994"></a>
+<span class="sourceLineNo">2995</span><a name="line.2995"></a>
+<span class="sourceLineNo">2996</span>    public abstract Mutation getMutation(int index);<a name="line.2996"></a>
+<span class="sourceLineNo">2997</span>    public abstract long getNonceGroup(int index);<a name="line.2997"></a>
+<span class="sourceLineNo">2998</span>    public abstract long getNonce(int index);<a name="line.2998"></a>
+<span class="sourceLineNo">2999</span>    /** This method is potentially expensive and useful mostly for non-replay CP path. */<a name="line.2999"></a>
+<span class="sourceLineNo">3000</span>    public abstract Mutation[] getMutationsForCoprocs();<a name="line.3000"></a>
+<span class="sourceLineNo">3001</span>    public abstract boolean isInReplay();<a name="line.3001"></a>
+<span class="sourceLineNo">3002</span>    public abstract long getOrigLogSeqNum();<a name="line.3002"></a>
+<span class="sourceLineNo">3003</span>    public abstract void startRegionOperation() throws IOException;<a name="line.3003"></a>
+<span class="sourceLineNo">3004</span>    public abstract void closeRegionOperation() throws IOException;<a name="line.3004"></a>
+<span class="sourceLineNo">3005</span><a name="line.3005"></a>
+<span class="sourceLineNo">3006</span>    /**<a name="line.3006"></a>
+<span class="sourceLineNo">3007</span>     * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs<a name="line.3007"></a>
+<span class="sourceLineNo">3008</span>     * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on<a name="line.3008"></a>
+<span class="sourceLineNo">3009</span>     * entire batch and will be called from outside of class to check and prepare batch. This can<a name="line.3009"></a>
+<span class="sourceLineNo">3010</span>     * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a<a name="line.3010"></a>
+<span class="sourceLineNo">3011</span>     * 'for' loop over mutations.<a name="line.3011"></a>
+<span class="sourceLineNo">3012</span>     */<a name="line.3012"></a>
+<span class="sourceLineNo">3013</span>    public abstract void checkAndPrepare() throws IOException;<a name="line.3013"></a>
+<span class="sourceLineNo">3014</span><a name="line.3014"></a>
+<span class="sourceLineNo">3015</span>    /**<a name="line.3015"></a>
+<span class="sourceLineNo">3016</span>     * Implement any Put request specific check and prepare logic here. Please refer to<a name="line.3016"></a>
+<span class="sourceLineNo">3017</span>     * {@link #checkAndPrepareMutation(Mutation, long)} for how its used.<a name="line.3017"></a>
+<span class="sourceLineNo">3018</span>     */<a name="line.3018"></a>
+<span class="sourceLineNo">3019</span>    protected abstract void checkAndPreparePut(final Put p) throws IOException;<a name="line.3019"></a>
+<span class="sourceLineNo">3020</span><a name="line.3020"></a>
+<span class="sourceLineNo">3021</span>    /**<a name="line.3021"></a>
+<span class="sourceLineNo">3022</span>     *  If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell<a name="line.3022"></a>
+<span class="sourceLineNo">3023</span>     *  count, tags and timestamp for all cells of all operations in a mini-batch.<a name="line.3023"></a>
+<span class="sourceLineNo">3024</span>     */<a name="line.3024"></a>
+<span class="sourceLineNo">3025</span>    public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress&lt;Mutation&gt;<a name="line.3025"></a>
+<span class="sourceLineNo">3026</span>        miniBatchOp, long timestamp, final List&lt;RowLock&gt; acquiredRowLocks) throws IOException;<a name="line.3026"></a>
+<span class="sourceLineNo">3027</span><a name="line.3027"></a>
+<span class="sourceLineNo">3028</span>    /**<a name="line.3028"></a>
+<span class="sourceLineNo">3029</span>     * Write mini-batch operations to MemStore<a name="line.3029"></a>
+<span class="sourceLineNo">3030</span>     */<a name="line.3030"></a>
+<span class="sourceLineNo">3031</span>    public abstract WriteEntry writeMiniBatchOperationsToMemStore(<a name="line.3031"></a>
+<span class="sourceLineNo">3032</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final WriteEntry writeEntry)<a name="line.3032"></a>
+<span class="sourceLineNo">3033</span>        throws IOException;<a name="line.3033"></a>
+<span class="sourceLineNo">3034</span><a name="line.3034"></a>
+<span class="sourceLineNo">3035</span>    protected void writeMiniBatchOperationsToMemStore(<a name="line.3035"></a>
+<span class="sourceLineNo">3036</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final long writeNumber)<a name="line.3036"></a>
+<span class="sourceLineNo">3037</span>        throws IOException {<a name="line.3037"></a>
+<span class="sourceLineNo">3038</span>      MemStoreSizing memStoreAccounting = new MemStoreSizing();<a name="line.3038"></a>
+<span class="sourceLineNo">3039</span>      visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -&gt; {<a name="line.3039"></a>
+<span class="sourceLineNo">3040</span>        // We need to update the sequence id for following reasons.<a name="line.3040"></a>
+<span class="sourceLineNo">3041</span>        // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.<a name="line.3041"></a>
+<span class="sourceLineNo">3042</span>        // 2) If no WAL, FSWALEntry won't be used<a name="line.3042"></a>
+<span class="sourceLineNo">3043</span>        // we use durability of the original mutation for the mutation passed by CP.<a name="line.3043"></a>
+<span class="sourceLineNo">3044</span>        if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) {<a name="line.3044"></a>
+<span class="sourceLineNo">3045</span>          region.updateSequenceId(familyCellMaps[index].values(), writeNumber);<a name="line.3045"></a>
+<span class="sourceLineNo">3046</span>        }<a name="line.3046"></a>
+<span class="sourceLineNo">3047</span>        applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);<a name="line.3047"></a>
+<span class="sourceLineNo">3048</span>        return true;<a name="line.3048"></a>
+<span class="sourceLineNo">3049</span>      });<a name="line.3049"></a>
+<span class="sourceLineNo">3050</span>      // update memStore size<a name="line.3050"></a>
+<span class="sourceLineNo">3051</span>      region.addAndGetMemStoreSize(memStoreAccounting);<a name="line.3051"></a>
+<span class="sourceLineNo">3052</span>    }<a name="line.3052"></a>
+<span class="sourceLineNo">3053</span><a name="line.3053"></a>
+<span class="sourceLineNo">3054</span>    public boolean isDone() {<a name="line.3054"></a>
+<span class="sourceLineNo">3055</span>      return nextIndexToProcess == operations.length;<a name="line.3055"></a>
+<span class="sourceLineNo">3056</span>    }<a name="line.3056"></a>
+<span class="sourceLineNo">3057</span><a name="line.3057"></a>
+<span class="sourceLineNo">3058</span>    public int size() {<a name="line.3058"></a>
+<span class="sourceLineNo">3059</span>      return operations.length;<a name="line.3059"></a>
+<span class="sourceLineNo">3060</span>    }<a name="line.3060"></a>
+<span class="sourceLineNo">3061</span><a name="line.3061"></a>
+<span class="sourceLineNo">3062</span>    public boolean isOperationPending(int index) {<a name="line.3062"></a>
+<span class="sourceLineNo">3063</span>      return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN;<a name="line.3063"></a>
+<span class="sourceLineNo">3064</span>    }<a name="line.3064"></a>
+<span class="sourceLineNo">3065</span><a name="line.3065"></a>
+<span class="sourceLineNo">3066</span>    public List&lt;UUID&gt; getClusterIds() {<a name="line.3066"></a>
+<span class="sourceLineNo">3067</span>      assert size() != 0;<a name="line.3067"></a>
+<span class="sourceLineNo">3068</span>      return getMutation(0).getClusterIds();<a name="line.3068"></a>
+<span class="sourceLineNo">3069</span>    }<a name="line.3069"></a>
+<span class="sourceLineNo">3070</span><a name="line.3070"></a>
+<span class="sourceLineNo">3071</span>    boolean isAtomic() {<a name="line.3071"></a>
+<span class="sourceLineNo">3072</span>      return atomic;<a name="line.3072"></a>
+<span class="sourceLineNo">3073</span>    }<a name="line.3073"></a>
+<span class="sourceLineNo">3074</span><a name="line.3074"></a>
+<span class="sourceLineNo">3075</span>    /**<a name="line.3075"></a>
+<span class="sourceLineNo">3076</span>     * Helper method that checks and prepares only one mutation. This can be used to implement<a name="line.3076"></a>
+<span class="sourceLineNo">3077</span>     * {@link #checkAndPrepare()} for entire Batch.<a name="line.3077"></a>
+<span class="sourceLineNo">3078</span>     * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called<a name="line.3078"></a>
+<span class="sourceLineNo">3079</span>     * after prePut()/ preDelete() CP hooks are run for the mutation<a name="line.3079"></a>
+<span class="sourceLineNo">3080</span>     */<a name="line.3080"></a>
+<span class="sourceLineNo">3081</span>    protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)<a name="line.3081"></a>
+<span class="sourceLineNo">3082</span>        throws IOException {<a name="line.3082"></a>
+<span class="sourceLineNo">3083</span>      region.checkRow(mutation.getRow(), "batchMutate");<a name="line.3083"></a>
+<span class="sourceLineNo">3084</span>      if (mutation instanceof Put) {<a name="line.3084"></a>
+<span class="sourceLineNo">3085</span>        // Check the families in the put. If bad, skip this one.<a name="line.3085"></a>
+<span class="sourceLineNo">3086</span>        checkAndPreparePut((Put) mutation);<a name="line.3086"></a>
+<span class="sourceLineNo">3087</span>        region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);<a name="line.3087"></a>
+<span class="sourceLineNo">3088</span>      } else {<a name="line.3088"></a>
+<span class="sourceLineNo">3089</span>        region.prepareDelete((Delete) mutation);<a name="line.3089"></a>
+<span class="sourceLineNo">3090</span>      }<a name="line.3090"></a>
+<span class="sourceLineNo">3091</span>    }<a name="line.3091"></a>
 <span class="sourceLineNo">3092</span><a name="line.3092"></a>
-<span class="sourceLineNo">3093</span>        // store the family map reference to allow for mutations<a name="line.3093"></a>
-<span class="sourceLineNo">3094</span>        familyCellMaps[index] = mutation.getFamilyCellMap();<a name="line.3094"></a>
-<span class="sourceLineNo">3095</span>        // store durability for the batch (highest durability of all operations in the batch)<a name="line.3095"></a>
-<span class="sourceLineNo">3096</span>        Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());<a name="line.3096"></a>
-<span class="sourceLineNo">3097</span>        if (tmpDur.ordinal() &gt; durability.ordinal()) {<a name="line.3097"></a>
-<span class="sourceLineNo">3098</span>          durability = tmpDur;<a name="line.3098"></a>
-<span class="sourceLineNo">3099</span>        }<a name="line.3099"></a>
-<span class="sourceLineNo">3100</span>      } catch (NoSuchColumnFamilyException nscf) {<a name="line.3100"></a>
-<span class="sourceLineNo">3101</span>        final String msg = "No such column family in batch mutation. ";<a name="line.3101"></a>
-<span class="sourceLineNo">3102</span>        if (observedExceptions.hasSeenNoSuchFamily()) {<a name="line.3102"></a>
-<span class="sourceLineNo">3103</span>          LOG.warn(msg + nscf.getMessage());<a name="line.3103"></a>
-<span class="sourceLineNo">3104</span>        } else {<a name="line.3104"></a>
-<span class="sourceLineNo">3105</span>          LOG.warn(msg, nscf);<a name="line.3105"></a>
-<span class="sourceLineNo">3106</span>          observedExceptions.sawNoSuchFamily();<a name="line.3106"></a>
-<span class="sourceLineNo">3107</span>        }<a name="line.3107"></a>
-<span class="sourceLineNo">3108</span>        retCodeDetails[index] = new OperationStatus(<a name="line.3108"></a>
-<span class="sourceLineNo">3109</span>            OperationStatusCode.BAD_FAMILY, nscf.getMessage());<a name="line.3109"></a>
-<span class="sourceLineNo">3110</span>      } catch (FailedSanityCheckException fsce) {<a name="line.3110"></a>
-<span class="sourceLineNo">3111</span>        final String msg = "Batch Mutation did not pass sanity check. ";<a name="line.3111"></a>
-<span class="sourceLineNo">3112</span>        if (observedExceptions.hasSeenFailedSanityCheck()) {<a name="line.3112"></a>
-<span class="sourceLineNo">3113</span>          LOG.warn(msg + fsce.getMessage());<a name="line.3113"></a>
-<span class="sourceLineNo">3114</span>        } else {<a name="line.3114"></a>
-<span class="sourceLineNo">3115</span>          LOG.warn(msg, fsce);<a name="line.3115"></a>
-<span class="sourceLineNo">3116</span>          observedExceptions.sawFailedSanityCheck();<a name="line.3116"></a>
+<span class="sourceLineNo">3093</span>    protected void checkAndPrepareMutation(int index, long timestamp) throws IOException {<a name="line.3093"></a>
+<span class="sourceLineNo">3094</span>      Mutation mutation = getMutation(index);<a name="line.3094"></a>
+<span class="sourceLineNo">3095</span>      try {<a name="line.3095"></a>
+<span class="sourceLineNo">3096</span>        this.checkAndPrepareMutation(mutation, timestamp);<a name="line.3096"></a>
+<span class="sourceLineNo">3097</span><a name="line.3097"></a>
+<span class="sourceLineNo">3098</span>        // store the family map reference to allow for mutations<a name="line.3098"></a>
+<span class="sourceLineNo">3099</span>        familyCellMaps[index] = mutation.getFamilyCellMap();<a name="line.3099"></a>
+<span class="sourceLineNo">3100</span>        // store durability for the batch (highest durability of all operations in the batch)<a name="line.3100"></a>
+<span class="sourceLineNo">3101</span>        Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());<a name="line.3101"></a>
+<span class="sourceLineNo">3102</span>        if (tmpDur.ordinal() &gt; durability.ordinal()) {<a name="line.3102"></a>
+<span class="sourceLineNo">3103</span>          durability = tmpDur;<a name="line.3103"></a>
+<span class="sourceLineNo">3104</span>        }<a name="line.3104"></a>
+<span class="sourceLineNo">3105</span>      } catch (NoSuchColumnFamilyException nscfe) {<a name="line.3105"></a>
+<span class="sourceLineNo">3106</span>        final String msg = "No such column family in batch mutation. ";<a name="line.3106"></a>
+<span class="sourceLineNo">3107</span>        if (observedExceptions.hasSeenNoSuchFamily()) {<a name="line.3107"></a>
+<span class="sourceLineNo">3108</span>          LOG.warn(msg + nscfe.getMessage());<a name="line.3108"></a>
+<span class="sourceLineNo">3109</span>        } else {<a name="line.3109"></a>
+<span class="sourceLineNo">3110</span>          LOG.warn(msg, nscfe);<a name="line.3110"></a>
+<span class="sourceLineNo">3111</span>          observedExceptions.sawNoSuchFamily();<a name="line.3111"></a>
+<span class="sourceLineNo">3112</span>        }<a name="line.3112"></a>
+<span class="sourceLineNo">3113</span>        retCodeDetails[index] = new OperationStatus(<a name="line.3113"></a>
+<span class="sourceLineNo">3114</span>            OperationStatusCode.BAD_FAMILY, nscfe.getMessage());<a name="line.3114"></a>
+<span class="sourceLineNo">3115</span>        if (isAtomic()) { // fail, atomic means all or none<a name="line.3115"></a>
+<span class="sourceLineNo">3116</span>          throw nscfe;<a name="line.3116"></a>
 <span class="sourceLineNo">3117</span>        }<a name="line.3117"></a>
-<span class="sourceLineNo">3118</span>        retCodeDetails[index] = new OperationStatus(<a name="line.3118"></a>
-<span class="sourceLineNo">3119</span>            OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3119"></a>
-<span class="sourceLineNo">3120</span>      } catch (WrongRegionException we) {<a name="line.3120"></a>
-<span class="sourceLineNo">3121</span>        final String msg = "Batch mutation had a row that does not belong to this region. ";<a name="line.3121"></a>
-<span class="sourceLineNo">3122</span>        if (observedExceptions.hasSeenWrongRegion()) {<a name="line.3122"></a>
-<span class="sourceLineNo">3123</span>          LOG.warn(msg + we.getMessage());<a name="line.3123"></a>
-<span class="sourceLineNo">3124</span>        } else {<a name="line.3124"></a>
-<span class="sourceLineNo">3125</span>          LOG.warn(msg, we);<a name="line.3125"></a>
-<span class="sourceLineNo">3126</span>          observedExceptions.sawWrongRegion();<a name="line.3126"></a>
-<span class="sourceLineNo">3127</span>        }<a name="line.3127"></a>
-<span class="sourceLineNo">3128</span>        retCodeDetails[index] = new OperationStatus(<a name="line.3128"></a>
-<span class="sourceLineNo">3129</span>            OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3129"></a>
-<span class="sourceLineNo">3130</span>      }<a name="line.3130"></a>
-<span class="sourceLineNo">3131</span>    }<a name="line.3131"></a>
-<span class="sourceLineNo">3132</span><a name="line.3132"></a>
-<span class="sourceLineNo">3133</span>    /**<a name="line.3133"></a>
-<span class="sourceLineNo">3134</span>     * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which<a name="line.3134"></a>
-<span class="sourceLineNo">3135</span>     * a row lock can be acquired. All mutations with locked rows are considered to be<a name="line.3135"></a>
-<span class="sourceLineNo">3136</span>     * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch<a name="line.3136"></a>
-<span class="sourceLineNo">3137</span>     * is window over {@link BatchOperation} and contains contiguous pending operations.<a name="line.3137"></a>
-<span class="sourceLineNo">3138</span>     *<a name="line.3138"></a>
-<span class="sourceLineNo">3139</span>     * @param acquiredRowLocks keeps track of rowLocks acquired.<a name="line.3139"></a>
-<span class="sourceLineNo">3140</span>     */<a name="line.3140"></a>
-<span class="sourceLineNo">3141</span>    public MiniBatchOperationInProgress&lt;Mutation&gt; lockRowsAndBuildMiniBatch(<a name="line.3141"></a>
-<span class="sourceLineNo">3142</span>        List&lt;RowLock&gt; acquiredRowLocks) throws IOException {<a name="line.3142"></a>
-<span class="sourceLineNo">3143</span>      int readyToWriteCount = 0;<a name="line.3143"></a>
-<span class="sourceLineNo">3144</span>      int lastIndexExclusive = 0;<a name="line.3144"></a>
-<span class="sourceLineNo">3145</span>      for (; lastIndexExclusive &lt; size(); lastIndexExclusive++) {<a name="line.3145"></a>
-<span class="sourceLineNo">3146</span>        if (!isOperationPending(lastIndexExclusive)) {<a name="line.3146"></a>
-<span class="sourceLineNo">3147</span>          continue;<a name="line.3147"></a>
-<span class="sourceLineNo">3148</span>        }<a name="line.3148"></a>
-<span class="sourceLineNo">3149</span>        Mutation mutation = getMutation(lastIndexExclusive);<a name="line.3149"></a>
-<span class="sourceLineNo">3150</span>        // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.3150"></a>
-<span class="sourceLineNo">3151</span>        RowLock rowLock = null;<a name="line.3151"></a>
-<span class="sourceLineNo">3152</span>        try {<a name="line.3152"></a>
-<span class="sourceLineNo">3153</span>          rowLock = region.getRowLockInternal(mutation.getRow(), true);<a name="line.3153"></a>
-<span class="sourceLineNo">3154</span>        } catch (TimeoutIOException e) {<a name="line.3154"></a>
-<span class="sourceLineNo">3155</span>          // We will retry when other exceptions, but we should stop if we timeout .<a name="line.3155"></a>
-<span class="sourceLineNo">3156</span>          throw e;<a name="line.3156"></a>
-<span class="sourceLineNo">3157</span>        } catch (IOException ioe) {<a name="line.3157"></a>
-<span class="sourceLineNo">3158</span>          LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3158"></a>
-<span class="sourceLineNo">3159</span>        }<a name="line.3159"></a>
-<span class="sourceLineNo">3160</span>        if (rowLock == null) {<a name="line.3160"></a>
-<span class="sourceLineNo">3161</span>          // We failed to grab another lock<a name="line.3161"></a>
-<span class="sourceLineNo">3162</span>          break; // Stop acquiring more rows for this batch<a name="line.3162"></a>
-<span class="sourceLineNo">3163</span>        } else {<a name="line.3163"></a>
-<span class="sourceLineNo">3164</span>          acquiredRowLocks.add(rowLock);<a name="line.3164"></a>
-<span class="sourceLineNo">3165</span>        }<a name="line.3165"></a>
-<span class="sourceLineNo">3166</span>        readyToWriteCount++;<a name="line.3166"></a>
-<span class="sourceLineNo">3167</span>      }<a name="line.3167"></a>
-<span class="sourceLineNo">3168</span>      return createMiniBatch(lastIndexExclusive, readyToWriteCount);<a name="line.3168"></a>
-<span class="sourceLineNo">3169</span>    }<a name="line.3169"></a>
-<span class="sourceLineNo">3170</span><a name="line.3170"></a>
-<span class="sourceLineNo">3171</span>    protected MiniBatchOperationInProgress&lt;Mutation&gt; createMiniBatch(final int lastIndexExclusive,<a name="line.3171"></a>
-<span class="sourceLineNo">3172</span>        final int readyToWriteCount) {<a name="line.3172"></a>
-<span class="sourceLineNo">3173</span>      return new MiniBatchOperationInProgress&lt;&gt;(getMutationsForCoprocs(), retCodeDetails,<a name="line.3173"></a>
-<span class="sourceLineNo">3174</span>          walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);<a name="line.3174"></a>
-<span class="sourceLineNo">3175</span>    }<a name="line.3175"></a>
-<span class="sourceLineNo">3176</span><a name="line.3176"></a>
-<span class="sourceLineNo">3177</span>    /**<a name="line.3177"></a>
-<span class="sourceLineNo">3178</span>     * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are<a name="line.3178"></a>
-<span class="sourceLineNo">3179</span>     * present, they are merged to result WALEdit.<a name="line.3179"></a>
-<span class="sourceLineNo">3180</span>     */<a name="line.3180"></a>
-<span class="sourceLineNo">3181</span>    public List&lt;Pair&lt;NonceKey, WALEdit&gt;&gt; buildWALEdits(<a name="line.3181"></a>
-<span class="sourceLineNo">3182</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp) throws IOException {<a name="line.3182"></a>
-<span class="sourceLineNo">3183</span>      List&lt;Pair&lt;NonceKey, WALEdit&gt;&gt; walEdits = new ArrayList&lt;&gt;();<a name="line.3183"></a>
-<span class="sourceLineNo">3184</span><a name="line.3184"></a>
-<span class="sourceLineNo">3185</span>      visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() {<a name="line.3185"></a>
-<span class="sourceLineNo">3186</span>        private Pair&lt;NonceKey, WALEdit&gt; curWALEditForNonce;<a name="line.3186"></a>
-<span class="sourceLineNo">3187</span>        @Override<a name="line.3187"></a>
-<span class="sourceLineNo">3188</span>        public boolean visit(int index) throws IOException {<a name="line.3188"></a>
-<span class="sourceLineNo">3189</span>          Mutation m = getMutation(index);<a name="line.3189"></a>
-<span class="sourceLineNo">3190</span>          // we use durability of the original mutation for the mutation passed by CP.<a name="line.3190"></a>
-<span class="sourceLineNo">3191</span>          if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {<a name="line.3191"></a>
-<span class="sourceLineNo">3192</span>            region.recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3192"></a>
-<span class="sourceLineNo">3193</span>            return true;<a name="line.3193"></a>
-<span class="sourceLineNo">3194</span>          }<a name="line.3194"></a>
-<span class="sourceLineNo">3195</span><a name="line.3195"></a>
-<span class="sourceLineNo">3196</span>          // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each.<a name="line.3196"></a>
-<span class="sourceLineNo">3197</span>          // Given how nonce keys are originally written, these should be contiguous.<a name="line.3197"></a>
-<span class="sourceLineNo">3198</span>          // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3198"></a>
-<span class="sourceLineNo">3199</span>          long nonceGroup = getNonceGroup(index);<a name="line.3199"></a>
-<span class="sourceLineNo">3200</span>          long nonce = getNonce(index);<a name="line.3200"></a>
-<span class="sourceLineNo">3201</span>          if (curWALEditForNonce == null ||<a name="line.3201"></a>
-<span class="sourceLineNo">3202</span>              curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup ||<a name="line.3202"></a>
-<span class="sourceLineNo">3203</span>              curWALEditForNonce.getFirst().getNonce() != nonce) {<a name="line.3203"></a>
-<span class="sourceLineNo">3204</span>            curWALEditForNonce = new Pair&lt;&gt;(new NonceKey(nonceGroup, nonce),<a name="line.3204"></a>
-<span class="sourceLineNo">3205</span>                new WALEdit(miniBatchOp.getCellCount(), isInReplay()));<a name="line.3205"></a>
-<span class="sourceLineNo">3206</span>            walEdits.add(curWALEditForNonce);<a name="line.3206"></a>
-<span class="sourceLineNo">3207</span>          }<a name="line.3207"></a>
-<span class="sourceLineNo">3208</span>          WALEdit walEdit = curWALEditForNonce.getSecond();<a name="line.3208"></a>
-<span class="sourceLineNo">3209</span><a name="line.3209"></a>
-<span class="sourceLineNo">3210</span>          // Add WAL edits by CP<a name="line.3210"></a>
-<span class="sourceLineNo">3211</span>          WALEdit fromCP = walEditsFromCoprocessors[index];<a name="line.3211"></a>
-<span class="sourceLineNo">3212</span>          if (fromCP != null) {<a name="line.3212"></a>
-<span class="sourceLineNo">3213</span>            for (Cell cell : fromCP.getCells()) {<a name="line.3213"></a>
-<span class="sourceLineNo">3214</span>              walEdit.add(cell);<a name="line.3214"></a>
-<span class="sourceLineNo">3215</span>            }<a name="line.3215"></a>
-<span class="sourceLineNo">3216</span>          }<a name="line.3216"></a>
-<span class="sourceLineNo">3217</span>          addFamilyMapToWALEdit(familyCellMaps[index], walEdit);<a name="line.3217"></a>
-<span class="sourceLineNo">3218</span><a name="line.3218"></a>
-<span class="sourceLineNo">3219</span>          return true;<a name="line.3219"></a>
-<span class="sourceLineNo">3220</span>        }<a name="line.3220"></a>
-<span class="sourceLineNo">3221</span>      });<a name="line.3221"></a>
-<span class="sourceLineNo">3222</span>      return walEdits;<a name="line.3222"></a>
-<span class="sourceLineNo">3223</span>    }<a name="line.3223"></a>
-<span class="sourceLineNo">3224</span><a name="line.3224"></a>
-<span class="sourceLineNo">3225</span>    /**<a name="line.3225"></a>
-<span class="sourceLineNo">3226</span>     * This method completes mini-batch operations by calling postBatchMutate() CP hook (if<a name="line.3226"></a>
-<span class="sourceLineNo">3227</span>     * required) and completing mvcc.<a name="line.3227"></a>
-<span class="sourceLineNo">3228</span>     */<a name="line.3228"></a>
-<span class="sourceLineNo">3229</span>    public void completeMiniBatchOperations(<a name="line.3229"></a>
-<span class="sourceLineNo">3230</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final WriteEntry writeEntry)<a name="line.3230"></a>
-<span class="sourceLineNo">3231</span>        throws IOException {<a name="line.3231"></a>
-<span class="sourceLineNo">3232</span>      if (writeEntry != null) {<a name="line.3232"></a>
-<span class="sourceLineNo">3233</span>        region.mvcc.completeAndWait(writeEntry);<a name="line.3233"></a>
-<span class="sourceLineNo">3234</span>      }<a name="line.3234"></a>
-<span class="sourceLineNo">3235</span>    }<a name="line.3235"></a>
-<span class="sourceLineNo">3236</span><a name="line.3236"></a>
-<span class="sourceLineNo">3237</span>    public void doPostOpCleanupForMiniBatch(<a name="line.3237"></a>
-<span class="sourceLineNo">3238</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final WALEdit walEdit,<a name="line.3238"></a>
-<span class="sourceLineNo">3239</span>        boolean success) throws IOException {}<a name="line.3239"></a>
-<span class="sourceLineNo">3240</span><a name="line.3240"></a>
-<span class="sourceLineNo">3241</span>    /**<a name="line.3241"></a>
-<span class="sourceLineNo">3242</span>     * Atomically apply the given map of family-&gt;edits to the memstore.<a name="line.3242"></a>
-<span class="sourceLineNo">3243</span>     * This handles the consistency control on its own, but the caller<a name="line.3243"></a>
-<span class="sourceLineNo">3244</span>     * should already have locked updatesLock.readLock(). This also does<a name="line.3244"></a>
-<span class="sourceLineNo">3245</span>     * &lt;b&gt;not&lt;/b&gt; check the families for validity.<a name="line.3245"></a>
-<span class="sourceLineNo">3246</span>     *<a name="line.3246"></a>
-<span class="sourceLineNo">3247</span>     * @param familyMap Map of Cells by family<a name="line.3247"></a>
-<span class="sourceLineNo">3248</span>     */<a name="line.3248"></a>
-<span class="sourceLineNo">3249</span>    protected void applyFamilyMapToMemStore(Map&lt;byte[], List&lt;Cell&gt;&gt; familyMap,<a name="line.3249"></a>
-<span class="sourceLineNo">3250</span>        MemStoreSizing memstoreAccounting) throws IOException {<a name="line.3250"></a>
-<span class="sourceLineNo">3251</span>      for (Map.Entry&lt;byte[], List&lt;Cell&gt;&gt; e : familyMap.entrySet()) {<a name="line.3251"></a>
-<span class="sourceLineNo">3252</span>        byte[] family = e.getKey();<a name="line.3252"></a>
-<span class="sourceLineNo">3253</span>        List&lt;Cell&gt; cells = e.getValue();<a name="line.3253"></a>
-<span class="sourceLineNo">3254</span>        assert cells instanceof RandomAccess;<a name="line.3254"></a>
-<span class="sourceLineNo">3255</span>        region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);<a name="line.3255"></a>
-<span class="sourceLineNo">3256</span>      }<a name="line.3256"></a>
-<span class="sourceLineNo">3257</span>    }<a name="line.3257"></a>
-<span class="sourceLineNo">3258</span><a name="line.3258"></a>
-<span class="sourceLineNo">3259</span>    /**<a name="line.3259"></a>
-<span class="sourceLineNo">3260</span>     * Append the given map of family-&gt;edits to a WALEdit data structure.<a name="line.3260"></a>
-<span class="sourceLineNo">3261</span>     * This does not write to the WAL itself.<a name="line.3261"></a>
-<span class="sourceLineNo">3262</span>     * @param familyMap map of family-&gt;edits<a name="line.3262"></a>
-<span class="sourceLineNo">3263</span>     * @param walEdit the destination entry to append into<a name="line.3263"></a>
-<span class="sourceLineNo">3264</span>     */<a name="line.3264"></a>
-<span class="sourceLineNo">3265</span>    private void addFamilyMapToWALEdit(Map&lt;byte[], List&lt;Cell&gt;&gt; familyMap,<a name="line.3265"></a>
-<span class="sourceLineNo">3266</span>        WALEdit walEdit) {<a name="line.3266"></a>
-<span class="sourceLineNo">3267</span>      for (List&lt;Cell&gt; edits : familyMap.values()) {<a name="line.3267"></a>
-<span class="sourceLineNo">3268</span>        assert edits instanceof RandomAccess;<a name="line.3268"></a>
-<span class="sourceLineNo">3269</span>        int listSize = edits.size();<a name="line.3269"></a>
-<span class="sourceLineNo">3270</span>        for (int i=0; i &lt; listSize; i++) {<a name="line.3270"></a>
-<span class="sourceLineNo">3271</span>          Cell cell = edits.get(i);<a name="line.3271"></a>
-<span class="sourceLineNo">3272</span>          walEdit.add(cell);<a name="line.3272"></a>
-<span class="sourceLineNo">3273</span>        }<a name="line.3273"></a>
-<span class="sourceLineNo">3274</span>      }<a name="line.3274"></a>
-<span class="sourceLineNo">3275</span>    }<a name="line.3275"></a>
-<span class="sourceLineNo">3276</span>  }<a name="line.3276"></a>
-<span class="sourceLineNo">3277</span><a name="line.3277"></a>
-<span class="sourceLineNo">3278</span>  /**<a name="line.3278"></a>
-<span class="sourceLineNo">3279</span>   * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most<a name="line.3279"></a>
-<span class="sourceLineNo">3280</span>   * of the logic is same.<a name="line.3280"></a>
-<span class="sourceLineNo">3281</span>   */<a name="line.3281"></a>
-<span class="sourceLineNo">3282</span>  private static class MutationBatchOperation extends BatchOperation&lt;Mutation&gt; {<a name="line.3282"></a>
-<span class="sourceLineNo">3283</span>    private long nonceGroup;<a name="line.3283"></a>
-<span class="sourceLineNo">3284</span>    private long nonce;<a name="line.3284"></a>
-<span class="sourceLineNo">3285</span>    public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup,<a name="line.3285"></a>
-<span class="sourceLineNo">3286</span>        long nonce) {<a name="line.3286"></a>
-<span class="sourceLineNo">3287</span>      super(region, operations);<a name="line.3287"></a>
-<span class="sourceLineNo">3288</span>      this.nonceGroup = nonceGroup;<a name="line.3288"></a>
-<span class="sourceLineNo">3289</span>      this.nonce = nonce;<a name="line.3289"></a>
-<span class="sourceLineNo">3290</span>    }<a name="line.3290"></a>
-<span class="sourceLineNo">3291</span><a name="line.3291"></a>
-<span class="sourceLineNo">3292</span>    @Override<a name="line.3292"></a>
-<span class="sourceLineNo">3293</span>    public Mutation getMutation(int index) {<a name="line.3293"></a>
-<span class="sourceLineNo">3294</span>      return this.operations[index];<a name="line.3294"></a>
-<span class="sourceLineNo">3295</span>    }<a name="line.3295"></a>
-<span class="sourceLineNo">3296</span><a name="line.3296"></a>
-<span class="sourceLineNo">3297</span>    @Override<a name="line.3297"></a>
-<span class="sourceLineNo">3298</span>    public long getNonceGroup(int index) {<a name="line.3298"></a>
-<span class="sourceLineNo">3299</span>      return nonceGroup;<a name="line.3299"></a>
-<span class="sourceLineNo">3300</span>    }<a name="line.3300"></a>
-<span class="sourceLineNo">3301</span><a name="line.3301"></a>
-<span class="sourceLineNo">3302</span>    @Override<a name="line.3302"></a>
-<span class="sourceLineNo">3303</span>    public long getNonce(int index) {<a name="line.3303"></a>
-<span class="sourceLineNo">3304</span>      return nonce;<a name="line.3304"></a>
-<span class="sourceLineNo">3305</span>    }<a name="line.3305"></a>
-<span class="sourceLineNo">3306</span><a name="line.3306"></a>
-<span class="sourceLineNo">3307</span>    @Override<a name="line.3307"></a>
-<span class="sourceLineNo">3308</span>    public Mutation[] getMutationsForCoprocs() {<a name="line.3308"></a>
-<span class="sourceLineNo">3309</span>      return this.operations;<a name="line.3309"></a>
-<span class="sourceLineNo">3310</span>    }<a name="line.3310"></a>
-<span class="sourceLineNo">3311</span><a name="line.3311"></a>
-<span class="sourceLineNo">3312</span>    @Override<a name="line.3312"></a>
-<span class="sourceLineNo">3313</span>    public boolean isInReplay() {<a name="line.3313"></a>
-<span class="sourceLineNo">3314</span>      return false;<a name="line.3314"></a>
-<span class="sourceLineNo">3315</span>    }<a name="line.3315"></a>
-<span class="sourceLineNo">3316</span><a name="line.3316"></a>
-<span class="sourceLineNo">3317</span>    @Override<a name="line.3317"></a>
-<span class="sourceLineNo">3318</span>    public long getOrigLogSeqNum() {<a name="line.3318"></a>
-<span class="sourceLineNo">3319</span>      return WALKey.NO_SEQUENCE_ID;<a name="line.3319"></a>
-<span class="sourceLineNo">3320</span>    }<a name="line.3320"></a>
-<span class="sourceLineNo">3321</span><a name="line.3321"></a>
-<span class="sourceLineNo">3322</span>    @Override<a name="line.3322"></a>
-<span class="sourceLineNo">3323</span>    public void startRegionOperation() throws IOException {<a name="line.3323"></a>
-<span class="sourceLineNo">3324</span>      region.startRegionOperation(Operation.BATCH_MUTATE);<a name="line.3324"></a>
-<span class="sourceLineNo">3325</span>    }<a name="line.3325"></a>
-<span class="sourceLineNo">3326</span><a name="line.3326"></a>
-<span class="sourceLineNo">3327</span>    @Override<a name="line.3327"></a>
-<span class="sourceLineNo">3328</span>    public void closeRegionOperation() throws IOException {<a name="line.3328"></a>
-<span class="sourceLineNo">3329</span>      region.closeRegionOperation(Operation.BATCH_MUTATE);<a name="line.3329"></a>
-<span class="sourceLineNo">3330</span>    }<a name="line.3330"></a>
-<span class="sourceLineNo">3331</span><a name="line.3331"></a>
-<span class="sourceLineNo">3332</span>    @Override<a name="line.3332"></a>
-<span class="sourceLineNo">3333</span>    public void checkAndPreparePut(Put p) throws IOException {<a name="line.3333"></a>
-<span class="sourceLineNo">3334</span>      region.checkFamilies(p.getFamilyCellMap().keySet());<a name="line.3334"></a>
-<span class="sourceLineNo">3335</span>    }<a name="line.3335"></a>
-<span class="sourceLineNo">3336</span><a name="line.3336"></a>
-<span class="sourceLineNo">3337</span>    @Override<a name="line.3337"></a>
-<span class="sourceLineNo">3338</span>    public void checkAndPrepare() throws IOException {<a name="line.3338"></a>
-<span class="sourceLineNo">3339</span>      final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes<a name="line.3339"></a>
-<span class="sourceLineNo">3340</span>      visitBatchOperations(true, this.size(), new Visitor() {<a name="line.3340"></a>
-<span class="sourceLineNo">3341</span>        private long now = EnvironmentEdgeManager.currentTime();<a name="line.3341"></a>
-<span class="sourceLineNo">3342</span>        private WALEdit walEdit;<a name="line.3342"></a>
-<span class="sourceLineNo">3343</span>        @Override<a name="line.3343"></a>
-<span class="sourceLineNo">3344</span>        public boolean visit(int index) throws IOException {<a name="line.3344"></a>
-<span class="sourceLineNo">3345</span>          // Run coprocessor pre hook outside of locks to avoid deadlock<a name="line.3345"></a>
-<span class="sourceLineNo">3346</span>          if (region.coprocessorHost != null) {<a name="line.3346"></a>
-<span class="sourceLineNo">3347</span>            if (walEdit == null) {<a name="line.3347"></a>
-<span class="sourceLineNo">3348</span>              walEdit = new WALEdit();<a name="line.3348"></a>
-<span class="sourceLineNo">3349</span>            }<a name="line.3349"></a>
-<span class="sourceLineNo">3350</span>            callPreMutateCPHook(index, walEdit, metrics);<a name="line.3350"></a>
-<span class="sourceLineNo">3351</span>            if (!walEdit.isEmpty()) {<a name="line.3351"></a>
-<span class="sourceLineNo">3352</span>              walEditsFromCoprocessors[index] = walEdit;<a name="line.3352"></a>
-<span class="sourceLineNo">3353</span>              walEdit = null;<a name="line.3353"></a>
-<span class="sourceLineNo">3354</span>            }<a name="line.3354"></a>
-<span class="sourceLineNo">3355</span>          }<a name="line.3355"></a>
-<span class="sourceLineNo">3356</span>          if (isOperationPending(index)) {<a name="line.3356"></a>
-<span class="sourceLineNo">3357</span>            // TODO: Currently validation is done with current time before acquiring locks and<a name="line.3357"></a>
-<span class="sourceLineNo">3358</span>            // updates are done with different timestamps after acquiring locks. This behavior is<a name="line.3358"></a>
-<span class="sourceLineNo">3359</span>            // inherited from the code prior to this change. Can this be changed?<a name="line.3359"></a>
-<span class="sourceLineNo">3360</span>            checkAndPrepareMutation(index, now);<a name="line.3360"></a>
-<span class="sourceLineNo">3361</span>          }<a name="line.3361"></a>
-<span class="sourceLineNo">3362</span>          return true;<a name="line.3362"></a>
-<span class="sourceLineNo">3363</span>        }<a name="line.3363"></a>
-<span class="sourceLineNo">3364</span>      });<a name="line.3364"></a>
-<span class="sourceLineNo">3365</span><a name="line.3365"></a>
-<span class="sourceLineNo">3366</span>      // FIXME: we may update metrics twice! here for all operations bypassed by CP and later in<a name="line.3366"></a>
-<span class="sourceLineNo">3367</span>      // normal processing.<a name="line.3367"></a>
-<span class="sourceLineNo">3368</span>      // Update metrics in same way as it is done when we go the normal processing route (we now<a name="line.3368"></a>
-<span class="sourceLineNo">3369</span>      // update general metrics though a Coprocessor did the work).<a name="line.3369"></a>
-<span class="sourceLineNo">3370</span>      if (region.metricsRegion != null) {<a name="line.3370"></a>
-<span class="sourceLineNo">3371</span>        if (metrics[0] &gt; 0) {<a name="line.3371"></a>
-<span class="sourceLineNo">3372</span>          // There were some Puts in the batch.<a name="line.3372"></a>
-<span class="sourceLineNo">3373</span>          region.metricsRegion.updatePut();<a name="line.3373"></a>
-<span class="sourceLineNo">3374</span>        }<a name="line.3374"></a>
-<span class="sourceLineNo">3375</span>        if (metrics[1] &gt; 0) {<a name="line.3375"></a>
-<span class="sourceLineNo">3376</span>          // There were some Deletes in the batch.<a name="line.3376"></a>
-<span class="sourceLineNo">3377</span>          region.metricsRegion.updateDelete();<a name="line.3377"></a>
-<span class="sourceLineNo">3378</span>        }<a name="line.3378"></a>
-<span class="sourceLineNo">3379</span>      }<a name="line.3379"></a>
-<span class="sourceLineNo">3380</span>    }<a name="line.3380"></a>
-<span class="sourceLineNo">3381</span><a name="line.3381"></a>
-<span class="sourceLineNo">3382</span>    @Override<a name="line.3382"></a>
-<span class="sourceLineNo">3383</span>    public void prepareMiniBatchOperations(MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp,<a name="line.3383"></a>
-<span class="sourceLineNo">3384</span>        long timestamp, final List&lt;RowLock&gt; acquiredRowLocks) throws IOException {<a name="line.3384"></a>
-<span class="sourceLineNo">3385</span>      byte[] byteTS = Bytes.toBytes(timestamp);<a name="line.3385"></a>
-<span class="sourceLineNo">3386</span>      visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -&gt; {<a name="line.3386"></a>
-<span class="sourceLineNo">3387</span>        Mutation mutation = getMutation(index);<a name="line.3387"></a>
-<span class="sourceLineNo">3388</span>        if (mutation instanceof Put) {<a name="line.3388"></a>
-<span class="sourceLineNo">3389</span>          region.updateCellTimestamps(familyCellMaps[index].values(), byteTS);<a name="line.3389"></a>
-<span class="sourceLineNo">3390</span>          miniBatchOp.incrementNumOfPuts();<a name="line.3390"></a>
-<span class="sourceLineNo">3391</span>        } else {<a name="line.3391"></a>
-<span class="sourceLineNo">3392</span>          region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS);<a name="line.3392"></a>
-<span class="sourceLineNo">3393</span>          miniBatchOp.incrementNumOfDeletes();<a name="line.3393"></a>
-<span class="sourceLineNo">3394</span>        }<a name="line.3394"></a>
-<span class="sourceLineNo">3395</span>        region.rewriteCellTags(familyCellMaps[index], mutation);<a name="line.3395"></a>
-<span class="sourceLineNo">3396</span><a name="line.3396"></a>
-<span class="sourceLineNo">3397</span>        // update cell count<a name="line.3397"></a>
-<span class="sourceLineNo">3398</span>        if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {<a name="line.3398"></a>
-<span class="sourceLineNo">3399</span>          for (List&lt;Cell&gt; cells : mutation.getFamilyCellMap().values()) {<a name="line.3399"></a>
-<span class="sourceLineNo">3400</span>            miniBatchOp.addCellCount(cells.size());<a name="line.3400"></a>
-<span class="sourceLineNo">3401</span>          }<a name="line.3401"></a>
-<span class="sourceLineNo">3402</span>        }<a name="line.3402"></a>
+<span class="sourceLineNo">3118</span>      } catch (FailedSanityCheckException fsce) {<a name="line.3118"></a>
+<span class="sourceLineNo">3119</span>        final String msg = "Batch Mutation did not pass sanity check. ";<a name="line.3119"></a>
+<span class="sourceLineNo">3120</span>        if (observedExceptions.hasSeenFailedSanityCheck()) {<a name="line.3120"></a>
+<span class="sourceLineNo">3121</span>          LOG.warn(msg + fsce.getMessage());<a name="line.3121"></a>
+<span class="sourceLineNo">3122</span>        } else {<a name="line.3122"></a>
+<span class="sourceLineNo">3123</span>          LOG.warn(msg, fsce);<a name="line.3123"></a>
+<span class="sourceLineNo">3124</span>          observedExceptions.sawFailedSanityCheck();<a name="line.3124"></a>
+<span class="sourceLineNo">3125</span>        }<a name="line.3125"></a>
+<span class="sourceLineNo">3126</span>        retCodeDetails[index] = new OperationStatus(<a name="line.3126"></a>
+<span class="sourceLineNo">3127</span>            OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3127"></a>
+<span class="sourceLineNo">3128</span>        if (isAtomic()) {<a name="line.3128"></a>
+<span class="sourceLineNo">3129</span>          throw fsce;<a name="line.3129"></a>
+<span class="sourceLineNo">3130</span>        }<a name="line.3130"></a>
+<span class="sourceLineNo">3131</span>      } catch (WrongRegionException we) {<a name="line.3131"></a>
+<span class="sourceLineNo">3132</span>        final String msg = "Batch mutation had a row that does not belong to this region. ";<a name="line.3132"></a>
+<span class="sourceLineNo">3133</span>        if (observedExceptions.hasSeenWrongRegion()) {<a name="line.3133"></a>
+<span class="sourceLineNo">3134</span>          LOG.warn(msg + we.getMessage());<a name="line.3134"></a>
+<span class="sourceLineNo">3135</span>        } else {<a name="line.3135"></a>
+<span class="sourceLineNo">3136</span>          LOG.warn(msg, we);<a name="line.3136"></a>
+<span class="sourceLineNo">3137</span>          observedExceptions.sawWrongRegion();<a name="line.3137"></a>
+<span class="sourceLineNo">3138</span>        }<a name="line.3138"></a>
+<span class="sourceLineNo">3139</span>        retCodeDetails[index] = new OperationStatus(<a name="line.3139"></a>
+<span class="sourceLineNo">3140</span>            OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3140"></a>
+<span class="sourceLineNo">3141</span>        if (isAtomic()) {<a name="line.3141"></a>
+<span class="sourceLineNo">3142</span>          throw we;<a name="line.3142"></a>
+<span class="sourceLineNo">3143</span>        }<a name="line.3143"></a>
+<span class="sourceLineNo">3144</span>      }<a name="line.3144"></a>
+<span class="sourceLineNo">3145</span>    }<a name="line.3145"></a>
+<span class="sourceLineNo">3146</span><a name="line.3146"></a>
+<span class="sourceLineNo">3147</span>    /**<a name="line.3147"></a>
+<span class="sourceLineNo">3148</span>     * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which<a name="line.3148"></a>
+<span class="sourceLineNo">3149</span>     * a row lock can be acquired. All mutations with locked rows are considered to be<a name="line.3149"></a>
+<span class="sourceLineNo">3150</span>     * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch<a name="line.3150"></a>
+<span class="sourceLineNo">3151</span>     * is window over {@link BatchOperation} and contains contiguous pending operations.<a name="line.3151"></a>
+<span class="sourceLineNo">3152</span>     *<a name="line.3152"></a>
+<span class="sourceLineNo">3153</span>     * @param acquiredRowLocks keeps track of rowLocks acquired.<a name="line.3153"></a>
+<span class="sourceLineNo">3154</span>     */<a name="line.3154"></a>
+<span class="sourceLineNo">3155</span>    public MiniBatchOperationInProgress&lt;Mutation&gt; lockRowsAndBuildMiniBatch(<a name="line.3155"></a>
+<span class="sourceLineNo">3156</span>        List&lt;RowLock&gt; acquiredRowLocks) throws IOException {<a name="line.3156"></a>
+<span class="sourceLineNo">3157</span>      int readyToWriteCount = 0;<a name="line.3157"></a>
+<span class="sourceLineNo">3158</span>      int lastIndexExclusive = 0;<a name="line.3158"></a>
+<span class="sourceLineNo">3159</span>      for (; lastIndexExclusive &lt; size(); lastIndexExclusive++) {<a name="line.3159"></a>
+<span class="sourceLineNo">3160</span>        if (!isOperationPending(lastIndexExclusive)) {<a name="line.3160"></a>
+<span class="sourceLineNo">3161</span>          continue;<a name="line.3161"></a>
+<span class="sourceLineNo">3162</span>        }<a name="line.3162"></a>
+<span class="sourceLineNo">3163</span>        Mutation mutation = getMutation(lastIndexExclusive);<a name="line.3163"></a>
+<span class="sourceLineNo">3164</span>        // If we haven't got any rows in our batch, we should block to get the next one.<a name="line.3164"></a>
+<span class="sourceLineNo">3165</span>        RowLock rowLock = null;<a name="line.3165"></a>
+<span class="sourceLineNo">3166</span>        try {<a name="line.3166"></a>
+<span class="sourceLineNo">3167</span>          // if atomic then get exclusive lock, else shared lock<a name="line.3167"></a>
+<span class="sourceLineNo">3168</span>          rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic());<a name="line.3168"></a>
+<span class="sourceLineNo">3169</span>        } catch (TimeoutIOException e) {<a name="line.3169"></a>
+<span class="sourceLineNo">3170</span>          // We will retry when other exceptions, but we should stop if we timeout .<a name="line.3170"></a>
+<span class="sourceLineNo">3171</span>          throw e;<a name="line.3171"></a>
+<span class="sourceLineNo">3172</span>        } catch (IOException ioe) {<a name="line.3172"></a>
+<span class="sourceLineNo">3173</span>          LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3173"></a>
+<span class="sourceLineNo">3174</span>          if (isAtomic()) { // fail, atomic means all or none<a name="line.3174"></a>
+<span class="sourceLineNo">3175</span>            throw ioe;<a name="line.3175"></a>
+<span class="sourceLineNo">3176</span>          }<a name="line.3176"></a>
+<span class="sourceLineNo">3177</span>        }<a name="line.3177"></a>
+<span class="sourceLineNo">3178</span>        if (rowLock == null) {<a name="line.3178"></a>
+<span class="sourceLineNo">3179</span>          // We failed to grab another lock<a name="line.3179"></a>
+<span class="sourceLineNo">3180</span>          if (isAtomic()) {<a name="line.3180"></a>
+<span class="sourceLineNo">3181</span>            throw new IOException("Can't apply all operations atomically!");<a name="line.3181"></a>
+<span class="sourceLineNo">3182</span>          }<a name="line.3182"></a>
+<span class="sourceLineNo">3183</span>          break; // Stop acquiring more rows for this batch<a name="line.3183"></a>
+<span class="sourceLineNo">3184</span>        } else {<a name="line.3184"></a>
+<span class="sourceLineNo">3185</span>          acquiredRowLocks.add(rowLock);<a name="line.3185"></a>
+<span class="sourceLineNo">3186</span>        }<a name="line.3186"></a>
+<span class="sourceLineNo">3187</span>        readyToWriteCount++;<a name="line.3187"></a>
+<span class="sourceLineNo">3188</span>      }<a name="line.3188"></a>
+<span class="sourceLineNo">3189</span>      return createMiniBatch(lastIndexExclusive, readyToWriteCount);<a name="line.3189"></a>
+<span class="sourceLineNo">3190</span>    }<a name="line.3190"></a>
+<span class="sourceLineNo">3191</span><a name="line.3191"></a>
+<span class="sourceLineNo">3192</span>    protected MiniBatchOperationInProgress&lt;Mutation&gt; createMiniBatch(final int lastIndexExclusive,<a name="line.3192"></a>
+<span class="sourceLineNo">3193</span>        final int readyToWriteCount) {<a name="line.3193"></a>
+<span class="sourceLineNo">3194</span>      return new MiniBatchOperationInProgress&lt;&gt;(getMutationsForCoprocs(), retCodeDetails,<a name="line.3194"></a>
+<span class="sourceLineNo">3195</span>          walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);<a name="line.3195"></a>
+<span class="sourceLineNo">3196</span>    }<a name="line.3196"></a>
+<span class="sourceLineNo">3197</span><a name="line.3197"></a>
+<span class="sourceLineNo">3198</span>    /**<a name="line.3198"></a>
+<span class="sourceLineNo">3199</span>     * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are<a name="line.3199"></a>
+<span class="sourceLineNo">3200</span>     * present, they are merged to result WALEdit.<a name="line.3200"></a>
+<span class="sourceLineNo">3201</span>     */<a name="line.3201"></a>
+<span class="sourceLineNo">3202</span>    public List&lt;Pair&lt;NonceKey, WALEdit&gt;&gt; buildWALEdits(<a name="line.3202"></a>
+<span class="sourceLineNo">3203</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp) throws IOException {<a name="line.3203"></a>
+<span class="sourceLineNo">3204</span>      List&lt;Pair&lt;NonceKey, WALEdit&gt;&gt; walEdits = new ArrayList&lt;&gt;();<a name="line.3204"></a>
+<span class="sourceLineNo">3205</span><a name="line.3205"></a>
+<span class="sourceLineNo">3206</span>      visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() {<a name="line.3206"></a>
+<span class="sourceLineNo">3207</span>        private Pair&lt;NonceKey, WALEdit&gt; curWALEditForNonce;<a name="line.3207"></a>
+<span class="sourceLineNo">3208</span>        @Override<a name="line.3208"></a>
+<span class="sourceLineNo">3209</span>        public boolean visit(int index) throws IOException {<a name="line.3209"></a>
+<span class="sourceLineNo">3210</span>          Mutation m = getMutation(index);<a name="line.3210"></a>
+<span class="sourceLineNo">3211</span>          // we use durability of the original mutation for the mutation passed by CP.<a name="line.3211"></a>
+<span class="sourceLineNo">3212</span>          if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {<a name="line.3212"></a>
+<span class="sourceLineNo">3213</span>            region.recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3213"></a>
+<span class="sourceLineNo">3214</span>            return true;<a name="line.3214"></a>
+<span class="sourceLineNo">3215</span>          }<a name="line.3215"></a>
+<span class="sourceLineNo">3216</span><a name="line.3216"></a>
+<span class="sourceLineNo">3217</span>          // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each.<a name="line.3217"></a>
+<span class="sourceLineNo">3218</span>          // Given how nonce keys are originally written, these should be contiguous.<a name="line.3218"></a>
+<span class="sourceLineNo">3219</span>          // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3219"></a>
+<span class="sourceLineNo">3220</span>          long nonceGroup = getNonceGroup(index);<a name="line.3220"></a>
+<span class="sourceLineNo">3221</span>          long nonce = getNonce(index);<a name="line.3221"></a>
+<span class="sourceLineNo">3222</span>          if (curWALEditForNonce == null ||<a name="line.3222"></a>
+<span class="sourceLineNo">3223</span>              curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup ||<a name="line.3223"></a>
+<span class="sourceLineNo">3224</span>              curWALEditForNonce.getFirst().getNonce() != nonce) {<a name="line.3224"></a>
+<span class="sourceLineNo">3225</span>            curWALEditForNonce = new Pair&lt;&gt;(new NonceKey(nonceGroup, nonce),<a name="line.3225"></a>
+<span class="sourceLineNo">3226</span>                new WALEdit(miniBatchOp.getCellCount(), isInReplay()));<a name="line.3226"></a>
+<span class="sourceLineNo">3227</span>            walEdits.add(curWALEditForNonce);<a name="line.3227"></a>
+<span class="sourceLineNo">3228</span>          }<a name="line.3228"></a>
+<span class="sourceLineNo">3229</span>          WALEdit walEdit = curWALEditForNonce.getSecond();<a name="line.3229"></a>
+<span class="sourceLineNo">3230</span><a name="line.3230"></a>
+<span class="sourceLineNo">3231</span>          // Add WAL edits by CP<a name="line.3231"></a>
+<span class="sourceLineNo">3232</span>          WALEdit fromCP = walEditsFromCoprocessors[index];<a name="line.3232"></a>
+<span class="sourceLineNo">3233</span>          if (fromCP != null) {<a name="line.3233"></a>
+<span class="sourceLineNo">3234</span>            for (Cell cell : fromCP.getCells()) {<a name="line.3234"></a>
+<span class="sourceLineNo">3235</span>              walEdit.add(cell);<a name="line.3235"></a>
+<span class="sourceLineNo">3236</span>            }<a name="line.3236"></a>
+<span class="sourceLineNo">3237</span>          }<a name="line.3237"></a>
+<span class="sourceLineNo">3238</span>          addFamilyMapToWALEdit(familyCellMaps[index], walEdit);<a name="line.3238"></a>
+<span class="sourceLineNo">3239</span><a name="line.3239"></a>
+<span class="sourceLineNo">3240</span>          return true;<a name="line.3240"></a>
+<span class="sourceLineNo">3241</span>        }<a name="line.3241"></a>
+<span class="sourceLineNo">3242</span>      });<a name="line.3242"></a>
+<span class="sourceLineNo">3243</span>      return walEdits;<a name="line.3243"></a>
+<span class="sourceLineNo">3244</span>    }<a name="line.3244"></a>
+<span class="sourceLineNo">3245</span><a name="line.3245"></a>
+<span class="sourceLineNo">3246</span>    /**<a name="line.3246"></a>
+<span class="sourceLineNo">3247</span>     * This method completes mini-batch operations by calling postBatchMutate() CP hook (if<a name="line.3247"></a>
+<span class="sourceLineNo">3248</span>     * required) and completing mvcc.<a name="line.3248"></a>
+<span class="sourceLineNo">3249</span>     */<a name="line.3249"></a>
+<span class="sourceLineNo">3250</span>    public void completeMiniBatchOperations(<a name="line.3250"></a>
+<span class="sourceLineNo">3251</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final WriteEntry writeEntry)<a name="line.3251"></a>
+<span class="sourceLineNo">3252</span>        throws IOException {<a name="line.3252"></a>
+<span class="sourceLineNo">3253</span>      if (writeEntry != null) {<a name="line.3253"></a>
+<span class="sourceLineNo">3254</span>        region.mvcc.completeAndWait(writeEntry);<a name="line.3254"></a>
+<span class="sourceLineNo">3255</span>      }<a name="line.3255"></a>
+<span class="sourceLineNo">3256</span>    }<a name="line.3256"></a>
+<span class="sourceLineNo">3257</span><a name="line.3257"></a>
+<span class="sourceLineNo">3258</span>    public void doPostOpCleanupForMiniBatch(<a name="line.3258"></a>
+<span class="sourceLineNo">3259</span>        final MiniBatchOperationInProgress&lt;Mutation&gt; miniBatchOp, final WALEdit walEdit,<a name="line.3259"></a>
+<span class="sourceLineNo">3260</span>        boolean success) throws IOException {}<a name="line.3260"></a>
+<span class="sourceLineNo">3261</span><a name="line.3261"></a>
+<span class="sourceLineNo">3262</span>    /**<a name="line.3262"></a>
+<span class="sourceLineNo">3263</span>     * Atomically apply the given map of family-&gt;edits to the memstore.<a name="line.3263"></a>
+<span class="sourceLineNo">3264</span>     * This handles the consistency control on its own, but the caller<a name="line.3264"></a>
+<span class="sourceLineNo">3265</span>     * should already have locked updatesLock.readLock(). This also does<a name="line.3265"></a>
+<span class="sourceLineNo">3266</span>     * &lt;b&gt;not&lt;/b&gt; check the families for validity.<a name="line.3266"></a>
+<span class="sourceLineNo">3267</span>     *<a name="line.3267"></a>
+<span class="sourceLineNo">3268</span>     * @param familyMap Map of Cells by family<a name="line.3268"></a>
+<span class="sourceLineNo">3269</span>     */<a name="line.3269"></a>
+<span class="sourceLineNo">3270</span>    protected void applyFamilyMapToMemStore(Map&lt;byte[], List&lt;Cell&gt;&gt; familyMap,<a name="line.3270"></a>
+<span class="sourceLineNo">3271</span>        MemStoreSizing memstoreAccounting) throws IOException {<a name="line.3271"></a>
+<span class="sourceLineNo">3272</span>      for (Map.Entry&lt;byte[], List&lt;Cell&gt;&gt; e : familyMap.entrySet()) {<a name="line.3272"></a>
+<span class="sourceLineNo">3273</span>        byte[] family = e.getKey();<a name="line.3273"></a>
+<span class="sourceLineNo">3274</span>        List&lt;Cell&gt; cells = e.getValue();<a name="line.3274"></a>
+<span class="sourceLineNo">3275</span>        assert cells instanceof RandomAccess;<a name="line.3275"></a>
+<span class="sourceLineNo">3276</span>        region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);<a name="line.3276"></a>
+<span class="sourceLineNo">3277</span>      }<a name="line.3277"></a>
+<span class="sourceLineNo">3278</span>    }<a name="line.3278"></a>
+<span class="sourceLineNo">3279</span><a name="line.3279"></a>
+<span class="sourceLineNo">3280</span>    /**<a name="line.3280"></a>
+<span class="sourceLineNo">3281</span>     * Append the given map of family-&gt;edits to a WALEdit data structure.<a name="line.3281"></a>
+<span class="sourceLineNo">3282</span>     * This does not write to the WAL itself.<a name="line.3282"></a>
+<span class="sourceLineNo">3283</span>     * @param familyMap map of family-&gt;edits<a name="line.3283"></a>
+<span class="sourceLineNo">3284</span>     * @param walEdit the destination entry to append into<a name="line.3284"></a>
+<span class="sourceLineNo">3285</span>     */<a name="line.3285"></a>
+<span class="sourceLineNo">3286</span>    private void addFamilyMapToWALEdit(Map&lt;byte[], List&lt;Cell&gt;&gt; familyMap,<a name="line.3286"></a>
+<span class="sourceLineNo">3287</span>        WALEdit walEdit) {<a name="line.3287"></a>
+<span class="sourceLineNo">3288</span>      for (List&lt;Cell&gt; edits : familyMap.values()) {<a name="line.3288"></a>
+<span class="sourceLineNo">3289</span>        assert edits instanceof RandomAccess;<a name="line.3289"></a>
+<span class="sourceLineNo">3290</span>        int listSize = edits.size();<a name="line.3290"></a>
+<span class="sourceLineNo">3291</span>        for (int i=0; i &lt; listSize; i++) {<a name="line.3291"></a>
+<span class="sourceLineNo">3292</span>          Cell cell = edits.get(i);<a name="line.3292"></a>
+<span class="sourceLineNo">3293</span>          walEdit.add(cell);<a name="line.3293"></a>
+<span class="sourceLineNo">3294</span>        }<a name="line.3294"></a>
+<span class="sourceLineNo">3295</span>      }<a name="line.3295"></a>
+<span class="sourceLineNo">3296</span>    }<a name="line.3296"></a>
+<span class="sourceLineNo">3297</span>  }<a name="line.3297"></a>
+<span class="sourceLineNo">3298</span><a name="line.3298"></a>
+<span class="sourceLineNo">3299</span>  /**<a name="line.3299"></a>
+<span class="sourceLineNo">3300</span>   * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most<a name="line.3300"></a>
+<span class="sourceLineNo">3301</span>   * of the logic is same.<a name="line.3301"></a>
+<span class="sourceLineNo">3302</span>   */<a name="line.3302"></a>
+<span class="sourceLineNo">3303</span>  static class MutationBatchOperation extends BatchOperation&lt;Mutation&gt; {<a name="line.3303"></a>
+<span class="sourceLineNo">3304</span>    private long nonceGroup;<a name="line.3304"></a>
+<span class="sourceLineNo">3305</span>    private long nonce;<a name="line.3305"></a>
+<span class="sourceLineNo">3306</span>    public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,<a name="line.3306"></a>
+<span class="sourceLineNo">3307</span>        long nonceGroup, long nonce) {<a name="line.3307"></a>
+<span class="sourceLineNo">3308</span>      super(region, operations);<a name="line.3308"></a>
+<span class="sourceLineNo">3309</span>      this.atomic = atomic;<a name="line.3309"></a>
+<span class="sourceLineNo">3310</span>      this.nonceGroup = nonceGroup;<a name="line.3310"></a>
+<span class="sourceLineNo">3311</span>      this.nonce = nonce;<a name="line.3311"></a>
+<span class="sourceLineNo">3312</span>    }<a name="line.3312"></a>
+<span class="sourceLineNo">3313</span><a name="line.3313"></a>
+<span class="sourceLineNo">3314</span>    @Override<a name="line.3314"></a>
+<span class="sourceLineNo">3315</span>    public Mutation getMutation(int index) {<a name="line.3315"></a>
+<span class="sourceLineNo">3316</span>      return this.operations[index];<a name="line.3316"></a>
+<span class="sourceLineNo">3317</span>    }<a name="line.3317"></a>
+<span class="sourceLineNo">3318</span><a name="line.3318"></a>
+<span class="sourceLineNo">3319</span>    @Override<a name="line.3319"></a>
+<span class="sourceLineNo">3320</span>    public long getNonceGroup(int index) {<a name="line.3320"></a>
+<span class="sourceLineNo">3321</span>      return nonceGroup;<a name="line.3321"></a>
+<span class="sourceLineNo">3322</span>    }<a name="line.3322"></a>
+<span class="sourceLineNo">3323</span><a name="line.3323"></a>
+<span class="sourceLineNo">3324</span>    @Override<a name="line.3324"></a>
+<span class="sourceLineNo">3325</span>    public long getNonce(int index) {<a name="line.3325"></a>
+<span class="sourceLineNo">3326</span>      return nonce;<a name="line.3326"></a>
+<span class="sourceLineNo">3327</span>    }<a name="line.3327"></a>
+<span class="sourceLineNo">3328</span><a name="line.3328"></a>
+<span class="sourceLineNo">3329</span>    @Override<a name="line.3329"></a>
+<span class="sourceLineNo">3330</span>    public Mutation[] getMutationsForCoprocs() {<a name="line.3330"></a>
+<span class="sourceLineNo">3331</span>      return this.operations;<a name="line.3331"></a>
+<span class="sourceLineNo">3332</span>    }<a name="line.3332"></a>
+<span class="sourceLineNo">3333</span><a name="line.3333"></a>
+<span class="sourceLineNo">3334</span>    @Override<a name="line.3334"></a>
+<span class="sourceLineNo">3335</span>    public boolean isInReplay() {<a name="line.3335"></a>
+<span class="sourceLineNo">3336</span>      return false;<a name="line.3336"></a>
+<span class="sourceLineNo">3337</span>    }<a name="line.3337"></a>
+<span class="sourceLineNo">3338</span><a name="line.3338"></a>
+<span class="sourceLineNo">3339</span>    @Override<a name="line.3339"></a>
+<span class="sourceLineNo">3340</span>    public long getOrigLogSeqNum() {<a name="line.3340"></a>
+<span class="sourceLineNo">3341</span>      return WALKey.NO_SEQUENCE_ID;<a name="line.3341"></a>
+<span class="sourceLineNo">3342</span>    }<a name="line.3342"></a>
+<span class="sourceLineNo">3343</span><a name="line.3343"></a>
+<span class="sourceLineNo">3344</span>    @Override<a name="line.3344"></a>
+<span class="sourceLineNo">3345</span>    public void startRegionOperation() throws IOException {<a name="line.3345"></a>
+<span class="sourceLineNo">3346</span>      region.startRegionOperation(Operation.BATCH_MUTATE);<a name="line.3346"></a>
+<span class="sourceLineNo">3347</span>    }<a name="line.3347"></a>
+<span class="sourceLineNo">3348</span><a name="line.3348"></a>
+<span class="sourceLineNo">3349</span>    @Override<a name="line.3349"></a>
+<span class="sourceLineNo">3350</span>    public void closeRegionOperation() throws IOException {<a name="line.3350"></a>
+<span class="sourceLineNo">3351</span>      region.closeRegionOperation(Operation.BATCH_MUTATE);<a name="line.3351"></a>
+<span class="sourceLineNo">3352</span>    }<a name="line.3352"></a>
+<span class="sourceLineNo">3353</span><a name="line.3353"></a>
+<span class="sourceLineNo">3354</span>    @Override<a name="line.3354"></a>
+<span class="sourceLineNo">3355</span>    public void checkAndPreparePut(Put p) throws IOException {<a name="line.3355"></a>
+<span class="sourceLineNo">3356</span>      region.checkFamilies(p.getFamilyCellMap().keySet());<a name="line.3356"></a>
+<span class="sourceLineNo">3357</span>    }<a name="line.3357"></a>
+<span class="sourceLineNo">3358</span><a name="line.3358"></a>
+<span class="sourceLineNo">3359</span>    @Override<a name="line.3359"></a>
+<span class="sourceLineNo">3360</span>    public void checkAndPrepare() throws IOException {<a name="line.3360"></a>
+<span class="sourceLineNo">3361</span>      final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes<a name="line.3361"></a>
+<span class="sourceLineNo">3362</span>      visitBatchOperations(true, this.size(), new Visitor() {<a name="line.3362"></a>
+<span class="sourceLineNo">3363</span>        private long now = EnvironmentEdgeManager.currentTime();<a name="line.3363"></a>
+<span class="sourceLineNo">3364</span>        private WALEdit walEdit;<a name="line.3364"></a>
+<span class="sourceLineNo">3365</span>        @Override<a name="line.3365"></a>
+<span class="sourceLineNo">3366</span>        public boolean visit(int index) throws IOException {<a name="line.3366"></a>
+<span class="sourceLineNo">3367</span>          // Run coprocessor pre hook outside of locks to avoid de

<TRUNCATED>

Mime
View raw message