hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jerry...@apache.org
Subject hbase git commit: HBASE-19096 Add RowMutions batch support in AsyncTable
Date Wed, 29 Nov 2017 04:41:56 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 8688da9e9 -> 0c4c39553


HBASE-19096 Add RowMutions batch support in AsyncTable

Signed-off-by: Jerry He <jerryjch@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0c4c3955
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0c4c3955
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0c4c3955

Branch: refs/heads/branch-2
Commit: 0c4c3955380e1927311a8f4b092e23532d2e795f
Parents: 8688da9
Author: Jerry He <jerryjch@apache.org>
Authored: Tue Nov 28 18:41:23 2017 -0800
Committer: Jerry He <jerryjch@apache.org>
Committed: Tue Nov 28 18:49:08 2017 -0800

----------------------------------------------------------------------
 .../client/AsyncBatchRpcRetryingCaller.java     |  36 +++--
 .../apache/hadoop/hbase/client/AsyncTable.java  |  12 +-
 .../hbase/client/MultiServerCallable.java       |  62 +++------
 .../hbase/shaded/protobuf/RequestConverter.java | 136 +++++++++++++++----
 .../hbase/client/TestAsyncTableBatch.java       |  19 ++-
 5 files changed, 161 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4c3955/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 2ae68c4..52eb821 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.shaded.io.netty.util.HashedWheelTimer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
@@ -232,27 +232,19 @@ class AsyncBatchRpcRetryingCaller<T> {
   }
 
   private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
-      List<CellScannable> cells) throws IOException {
+      List<CellScannable> cells, Map<Integer, Integer> rowMutationsIndexMap)
throws IOException {
     ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
     ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
     ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
     ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
     for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
-      // TODO: remove the extra for loop as we will iterate it in mutationBuilder.
-      if (!multiRequestBuilder.hasNonceGroup()) {
-        for (Action action : entry.getValue().actions) {
-          if (action.hasNonce()) {
-            multiRequestBuilder.setNonceGroup(conn.getNonceGenerator().getNonceGroup());
-            break;
-          }
-        }
-      }
-      regionActionBuilder.clear();
-      regionActionBuilder.setRegion(
-        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, entry.getKey()));
-      regionActionBuilder = RequestConverter.buildNoDataRegionAction(entry.getKey(),
-        entry.getValue().actions, cells, regionActionBuilder, actionBuilder, mutationBuilder);
-      multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+      long nonceGroup = conn.getNonceGenerator().getNonceGroup();
+      // multiRequestBuilder will be populated with region actions.
+      // rowMutationsIndexMap will be non-empty after the call if there is RowMutations in
the
+      // action list.
+      RequestConverter.buildNoDataRegionActions(entry.getKey(),
+        entry.getValue().actions, cells, multiRequestBuilder, regionActionBuilder, actionBuilder,
+        mutationBuilder, nonceGroup, rowMutationsIndexMap);
     }
     return multiRequestBuilder.build();
   }
@@ -337,8 +329,12 @@ class AsyncBatchRpcRetryingCaller<T> {
       }
       ClientProtos.MultiRequest req;
       List<CellScannable> cells = new ArrayList<>();
+      // Map from a created RegionAction to the original index for a RowMutations within
+      // the original list of actions. This will be used to process the results when there
+      // is RowMutations in the action list.
+      Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
       try {
-        req = buildReq(serverReq.actionsByRegion, cells);
+        req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
       } catch (IOException e) {
         onError(serverReq.actionsByRegion, tries, e, sn);
         return;
@@ -353,8 +349,8 @@ class AsyncBatchRpcRetryingCaller<T> {
           onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
         } else {
           try {
-            onComplete(serverReq.actionsByRegion, tries, sn,
-              ResponseConverter.getResults(req, resp, controller.cellScanner()));
+            onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
+              rowMutationsIndexMap, resp, controller.cellScanner()));
           } catch (Exception e) {
             onError(serverReq.actionsByRegion, tries, e, sn);
             return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4c3955/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index b3ccb15..fd08aa3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -431,11 +431,11 @@ public interface AsyncTable<C extends ScanResultConsumerBase>
{
   }
 
   /**
-   * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering
of
-   * execution of the actions is not defined. Meaning if you do a Put and a Get in the same
-   * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what
the Put
-   * had put.
-   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations.
The
+   * ordering of execution of the actions is not defined. Meaning if you do a Put and a Get
in the
+   * same {@link #batch} call, you will not necessarily be guaranteed that the Get returns
what the
+   * Put had put.
+   * @param actions list of Get, Put, Delete, Increment, Append, and RowMutations objects
    * @return A list of {@link CompletableFuture}s that represent the result for each action.
    */
   <T> List<CompletableFuture<T>> batch(List<? extends Row> actions);
@@ -443,7 +443,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
   /**
    * A simple version of batch. It will fail if there are any failures and you will get the
whole
    * result list at once if the operation is succeeded.
-   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @param actions list of Get, Put, Delete, Increment, Append and RowMutations objects
    * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
    */
   default <T> CompletableFuture<List<T>> batchAll(List<? extends Row>
actions) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4c3955/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index ed7e718..4a0ae39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
@@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@@ -100,57 +98,29 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
         (this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
 
     long nonceGroup = multiAction.getNonceGroup();
-    if (nonceGroup != HConstants.NO_NONCE) {
-      multiRequestBuilder.setNonceGroup(nonceGroup);
-    }
-    // Index to track RegionAction within the MultiRequest
-    int regionActionIndex = -1;
+
     // Map from a created RegionAction to the original index for a RowMutations within
-    // its original list of actions
+    // the original list of actions. This will be used to process the results when there
+    // is RowMutations in the action list.
     Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
     // The multi object is a list of Actions by region. Iterate by region.
     for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet())
{
       final byte [] regionName = e.getKey();
       final List<Action> actions = e.getValue();
-      regionActionBuilder.clear();
-      regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
-          HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
-
-      int rowMutations = 0;
-      for (Action action : actions) {
-        Row row = action.getAction();
-        // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
-        // on the one row. We do separate RegionAction for each RowMutations.
-        // We maintain a map to keep track of this RegionAction and the original Action index.
-        if (row instanceof RowMutations) {
-          RowMutations rms = (RowMutations)row;
-          if (this.cellBlock) {
-            // Build a multi request absent its Cell payload. Send data in cellblocks.
-            regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms,
cells,
-              regionActionBuilder, actionBuilder, mutationBuilder);
-          } else {
-            regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms);
-          }
-          regionActionBuilder.setAtomic(true);
-          multiRequestBuilder.addRegionAction(regionActionBuilder.build());
-          regionActionIndex++;
-          rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex());
-          rowMutations++;
-        }
+      if (this.cellBlock) {
+        // Send data in cellblocks.
+        // multiRequestBuilder will be populated with region actions.
+        // rowMutationsIndexMap will be non-empty after the call if there is RowMutations
in the
+        // action list.
+        RequestConverter.buildNoDataRegionActions(regionName, actions, cells, multiRequestBuilder,
+          regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
       }
-
-      if (actions.size() > rowMutations) {
-        if (this.cellBlock) {
-          // Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations.
-          // They have already been handled above. Guess at count of cells
-          regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions,
cells,
-            regionActionBuilder, actionBuilder, mutationBuilder);
-        } else {
-          regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
-            regionActionBuilder, actionBuilder, mutationBuilder);
-        }
-        multiRequestBuilder.addRegionAction(regionActionBuilder.build());
-        regionActionIndex++;
+      else {
+        // multiRequestBuilder will be populated with region actions.
+        // rowMutationsIndexMap will be non-empty after the call if there is RowMutations
in the
+        // action list.
+        RequestConverter.buildRegionActions(regionName, actions, multiRequestBuilder,
+          regionActionBuilder, actionBuilder, mutationBuilder, nonceGroup, rowMutationsIndexMap);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4c3955/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 4fdc87d..039a5b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -79,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Condition;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
@@ -623,19 +625,32 @@ public final class RequestConverter {
   }
 
   /**
-   * Create a protocol buffer multi request for a list of actions.
-   * Propagates Actions original index.
-   *
-   * @param regionName
-   * @param actions
-   * @return a multi request
+   * Create a protocol buffer multi request for a list of actions. Propagates Actions original
+   * index. The passed in multiRequestBuilder will be populated with region actions.
+   * @param regionName The region name of the actions.
+   * @param actions The actions that are grouped by the same region name.
+   * @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
+   * @param regionActionBuilder regionActionBuilder to be used to build region action.
+   * @param actionBuilder actionBuilder to be used to build action.
+   * @param mutationBuilder mutationBuilder to be used to build mutation.
+   * @param nonceGroup nonceGroup to be applied.
+   * @param rowMutationsIndexMap Map of created RegionAction to the original index for a
+   *          RowMutations within the original list of actions
    * @throws IOException
    */
-  public static RegionAction.Builder buildRegionAction(final byte[] regionName,
-      final List<Action> actions, final RegionAction.Builder regionActionBuilder,
+  public static void buildRegionActions(final byte[] regionName,
+      final List<Action> actions, final MultiRequest.Builder multiRequestBuilder,
+      final RegionAction.Builder regionActionBuilder,
       final ClientProtos.Action.Builder actionBuilder,
-      final MutationProto.Builder mutationBuilder) throws IOException {
+      final MutationProto.Builder mutationBuilder,
+      long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException
{
+    regionActionBuilder.clear();
+    RegionAction.Builder builder = getRegionActionBuilderWithRegion(
+      regionActionBuilder, regionName);
     ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
+    boolean hasNonce = false;
+    List<Action> rowMutationsList = new ArrayList<>();
+
     for (Action action: actions) {
       Row row = action.getAction();
       actionBuilder.clear();
@@ -643,19 +658,21 @@ public final class RequestConverter {
       mutationBuilder.clear();
       if (row instanceof Get) {
         Get g = (Get)row;
-        regionActionBuilder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
+        builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
       } else if (row instanceof Put) {
-        regionActionBuilder.addAction(actionBuilder.
+        builder.addAction(actionBuilder.
           setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row, mutationBuilder)));
       } else if (row instanceof Delete) {
-        regionActionBuilder.addAction(actionBuilder.
+        builder.addAction(actionBuilder.
           setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row, mutationBuilder)));
       } else if (row instanceof Append) {
-        regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
+        builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
             MutationType.APPEND, (Append)row, mutationBuilder, action.getNonce())));
+        hasNonce = true;
       } else if (row instanceof Increment) {
-        regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
+        builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutation(
             MutationType.INCREMENT, (Increment)row, mutationBuilder, action.getNonce())));
+        hasNonce = true;
       } else if (row instanceof RegionCoprocessorServiceExec) {
         RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
         // DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
@@ -667,19 +684,39 @@ public final class RequestConverter {
         } else {
           cpBuilder.clear();
         }
-        regionActionBuilder.addAction(actionBuilder.setServiceCall(
+        builder.addAction(actionBuilder.setServiceCall(
             cpBuilder.setRow(UnsafeByteOperations.unsafeWrap(exec.getRow()))
               .setServiceName(exec.getMethod().getService().getFullName())
               .setMethodName(exec.getMethod().getName())
               .setRequest(value)));
       } else if (row instanceof RowMutations) {
-        // Skip RowMutations, which has been separately converted to RegionAction
-        continue;
+        rowMutationsList.add(action);
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }
     }
-    return regionActionBuilder;
+    if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
+      multiRequestBuilder.setNonceGroup(nonceGroup);
+    }
+    multiRequestBuilder.addRegionAction(builder.build());
+
+    // Process RowMutations here. We can not process it in the big loop above because
+    // it will corrupt the sequence order maintained in cells.
+    // RowMutations is a set of Puts and/or Deletes all to be applied atomically
+    // on the one row. We do separate RegionAction for each RowMutations.
+    // We maintain a map to keep track of this RegionAction and the original Action index.
+    for (Action action : rowMutationsList) {
+      RowMutations rms = (RowMutations) action.getAction();
+      RegionAction.Builder rowMutationsRegionActionBuilder =
+          RequestConverter.buildRegionAction(regionName, rms);
+      rowMutationsRegionActionBuilder.setAtomic(true);
+      // Put it in the multiRequestBuilder
+      multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
+      // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount()
- 1)
+      // in the overall multiRequest.
+      rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
+        action.getOriginalIndex());
+    }
   }
 
   /**
@@ -689,23 +726,35 @@ public final class RequestConverter {
    * coming along otherwise.  Note that Get is different.  It does not contain 'data' and
is always
    * carried by protobuf.  We return references to the data by adding them to the passed
in
    * <code>data</code> param.
-   *
-   * <p>Propagates Actions original index.
-   *
-   * @param regionName
-   * @param actions
+   * <p> Propagates Actions original index.
+   * <p> The passed in multiRequestBuilder will be populated with region actions.
+   * @param regionName The region name of the actions.
+   * @param actions The actions that are grouped by the same region name.
    * @param cells Place to stuff references to actual data.
-   * @return a multi request that does not carry any data.
+   * @param multiRequestBuilder The multiRequestBuilder to be populated with region actions.
+   * @param regionActionBuilder regionActionBuilder to be used to build region action.
+   * @param actionBuilder actionBuilder to be used to build action.
+   * @param mutationBuilder mutationBuilder to be used to build mutation.
+   * @param nonceGroup nonceGroup to be applied.
+   * @param rowMutationsIndexMap Map of created RegionAction to the original index for a
+   *          RowMutations within the original list of actions
    * @throws IOException
    */
-  public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
+  public static void buildNoDataRegionActions(final byte[] regionName,
       final Iterable<Action> actions, final List<CellScannable> cells,
+      final MultiRequest.Builder multiRequestBuilder,
       final RegionAction.Builder regionActionBuilder,
       final ClientProtos.Action.Builder actionBuilder,
-      final MutationProto.Builder mutationBuilder) throws IOException {
+      final MutationProto.Builder mutationBuilder,
+      long nonceGroup, final Map<Integer, Integer> rowMutationsIndexMap) throws IOException
{
+    regionActionBuilder.clear();
     RegionAction.Builder builder = getRegionActionBuilderWithRegion(
       regionActionBuilder, regionName);
     ClientProtos.CoprocessorServiceCall.Builder cpBuilder = null;
+    RegionAction.Builder rowMutationsRegionActionBuilder = null;
+    boolean hasNonce = false;
+    List<Action> rowMutationsList = new ArrayList<>();
+
     for (Action action: actions) {
       Row row = action.getAction();
       actionBuilder.clear();
@@ -740,11 +789,13 @@ public final class RequestConverter {
         cells.add(a);
         builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
           MutationType.APPEND, a, mutationBuilder, action.getNonce())));
+        hasNonce = true;
       } else if (row instanceof Increment) {
         Increment i = (Increment)row;
         cells.add(i);
         builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
           MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
+        hasNonce = true;
       } else if (row instanceof RegionCoprocessorServiceExec) {
         RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
         // DUMB COPY!!! FIX!!! Done to copy from c.g.p.ByteString to shaded ByteString.
@@ -762,13 +813,40 @@ public final class RequestConverter {
               .setMethodName(exec.getMethod().getName())
               .setRequest(value)));
       } else if (row instanceof RowMutations) {
-        // Skip RowMutations, which has been separately converted to RegionAction
-        continue;
+        rowMutationsList.add(action);
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }
     }
-    return builder;
+    if (!multiRequestBuilder.hasNonceGroup() && hasNonce) {
+      multiRequestBuilder.setNonceGroup(nonceGroup);
+    }
+    multiRequestBuilder.addRegionAction(builder.build());
+
+    // Process RowMutations here. We can not process it in the big loop above because
+    // it will corrupt the sequence order maintained in cells.
+    // RowMutations is a set of Puts and/or Deletes all to be applied atomically
+    // on the one row. We do separate RegionAction for each RowMutations.
+    // We maintain a map to keep track of this RegionAction and the original Action index.
+    for (Action action : rowMutationsList) {
+      RowMutations rms = (RowMutations) action.getAction();
+      if (rowMutationsRegionActionBuilder == null) {
+        rowMutationsRegionActionBuilder = ClientProtos.RegionAction.newBuilder();
+      } else {
+        rowMutationsRegionActionBuilder.clear();
+      }
+      rowMutationsRegionActionBuilder.setRegion(
+        RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
+      rowMutationsRegionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName,
rms,
+        cells, rowMutationsRegionActionBuilder, actionBuilder, mutationBuilder);
+      rowMutationsRegionActionBuilder.setAtomic(true);
+      // Put it in the multiRequestBuilder
+      multiRequestBuilder.addRegionAction(rowMutationsRegionActionBuilder.build());
+      // This rowMutations region action is at (multiRequestBuilder.getRegionActionCount()
- 1)
+      // in the overall multiRequest.
+      rowMutationsIndexMap.put(multiRequestBuilder.getRegionActionCount() - 1,
+        action.getOriginalIndex());
+    }
   }
 
 // End utilities for Client

http://git-wip-us.apache.org/repos/asf/hbase/blob/0c4c3955/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index c80b27b..489ad1d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -71,6 +71,7 @@ public class TestAsyncTableBatch {
   private static byte[] FAMILY = Bytes.toBytes("cf");
 
   private static byte[] CQ = Bytes.toBytes("cq");
+  private static byte[] CQ1 = Bytes.toBytes("cq1");
 
   private static int COUNT = 1000;
 
@@ -178,9 +179,9 @@ public class TestAsyncTableBatch {
   }
 
   @Test
-  public void testMixed() throws InterruptedException, ExecutionException {
+  public void testMixed() throws InterruptedException, ExecutionException, IOException {
     AsyncTable<?> table = tableGetter.apply(TABLE_NAME);
-    table.putAll(IntStream.range(0, 5)
+    table.putAll(IntStream.range(0, 7)
         .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(FAMILY, CQ, Bytes.toBytes((long)
i)))
         .collect(Collectors.toList())).get();
     List<Row> actions = new ArrayList<>();
@@ -189,8 +190,14 @@ public class TestAsyncTableBatch {
     actions.add(new Delete(Bytes.toBytes(2)));
     actions.add(new Increment(Bytes.toBytes(3)).addColumn(FAMILY, CQ, 1));
     actions.add(new Append(Bytes.toBytes(4)).addColumn(FAMILY, CQ, Bytes.toBytes(4)));
+    RowMutations rm = new RowMutations(Bytes.toBytes(5));
+    rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ, Bytes.toBytes((long) 100)));
+    rm.add(new Put(Bytes.toBytes(5)).addColumn(FAMILY, CQ1, Bytes.toBytes((long) 200)));
+    actions.add(rm);
+    actions.add(new Get(Bytes.toBytes(6)));
+
     List<Object> results = table.batchAll(actions).get();
-    assertEquals(5, results.size());
+    assertEquals(7, results.size());
     Result getResult = (Result) results.get(0);
     assertEquals(0, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
     assertEquals(2, Bytes.toLong(table.get(new Get(Bytes.toBytes(1))).get().getValue(FAMILY,
CQ)));
@@ -202,6 +209,12 @@ public class TestAsyncTableBatch {
     assertEquals(12, appendValue.length);
     assertEquals(4, Bytes.toLong(appendValue));
     assertEquals(4, Bytes.toInt(appendValue, 8));
+    assertEquals(100,
+      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ)));
+    assertEquals(200,
+      Bytes.toLong(table.get(new Get(Bytes.toBytes(5))).get().getValue(FAMILY, CQ1)));
+    getResult = (Result) results.get(6);
+    assertEquals(6, Bytes.toLong(getResult.getValue(FAMILY, CQ)));
   }
 
   public static final class ErrorInjectObserver implements RegionCoprocessor, RegionObserver
{


Mime
View raw message