hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jerry...@apache.org
Subject hbase git commit: HBASE-18522 Add RowMutations support to Batch
Date Mon, 14 Aug 2017 16:20:50 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.4 043211760 -> c6f57e0f3


HBASE-18522 Add RowMutations support to Batch


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6f57e0f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6f57e0f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6f57e0f

Branch: refs/heads/branch-1.4
Commit: c6f57e0f382e9dcef48f05da087d12eb0e47e9ad
Parents: 0432117
Author: Jerry He <jerryjch@apache.org>
Authored: Sun Aug 13 18:23:49 2017 -0700
Committer: Jerry He <jerryjch@apache.org>
Committed: Mon Aug 14 09:18:41 2017 -0700

----------------------------------------------------------------------
 .../hbase/client/MultiServerCallable.java       | 66 +++++++++++++++-----
 .../org/apache/hadoop/hbase/client/Table.java   |  6 +-
 .../hadoop/hbase/protobuf/RequestConverter.java |  6 +-
 .../hbase/protobuf/ResponseConverter.java       | 35 ++++++++++-
 .../hbase/client/TestFromClientSide3.java       | 46 ++++++++++++++
 .../hadoop/hbase/client/TestMultiParallel.java  | 49 ++++++++++++---
 6 files changed, 178 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 42c63eb..b2ea941 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -97,12 +98,21 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
     RegionAction.Builder regionActionBuilder = RegionAction.newBuilder();
     ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
     MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
-    List<CellScannable> cells = null;
-    // The multi object is a list of Actions by region.  Iterate by region.
+
+    // Pre-size. Presume at least a KV per Action. There are likely more.
+    List<CellScannable> cells =
+        (this.cellBlock ? new ArrayList<CellScannable>(countOfActions) : null);
+
     long nonceGroup = multiAction.getNonceGroup();
     if (nonceGroup != HConstants.NO_NONCE) {
       multiRequestBuilder.setNonceGroup(nonceGroup);
     }
+    // Index to track RegionAction within the MultiRequest
+    int regionActionIndex = -1;
+    // Map from a created RegionAction for a RowMutations to the original index within
+    // its original list of actions
+    Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
+    // The multi object is a list of Actions by region.  Iterate by region.
     for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet())
{
       final byte [] regionName = e.getKey();
       final List<Action<R>> actions = e.getValue();
@@ -110,19 +120,46 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
       regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
         HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
 
+      int rowMutations = 0;
+      for (Action<R> action : actions) {
+        Row row = action.getAction();
+        // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
+        // on the one row. We do separate RegionAction for each RowMutations.
+        // We maintain a map to keep track of this RegionAction and the original Action index.
+        if (row instanceof RowMutations) {
+          RowMutations rms = (RowMutations)row;
+          if (this.cellBlock) {
+            // Build a multi request absent its Cell payload. Send data in cellblocks.
+            regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, rms,
cells,
+              regionActionBuilder, actionBuilder, mutationBuilder);
+          } else {
+            regionActionBuilder = RequestConverter.buildRegionAction(regionName, rms);
+          }
+          regionActionBuilder.setAtomic(true);
+          multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+          regionActionIndex++;
+          rowMutationsIndexMap.put(regionActionIndex, action.getOriginalIndex());
+          rowMutations++;
+
+          regionActionBuilder.clear();
+          regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
+            HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName) );
+        }
+      }
 
-      if (this.cellBlock) {
-        // Presize.  Presume at least a KV per Action.  There are likely more.
-        if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
-        // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
-        // They have already been handled above. Guess at count of cells
-        regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions,
cells,
-          regionActionBuilder, actionBuilder, mutationBuilder);
-      } else {
-        regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
-          regionActionBuilder, actionBuilder, mutationBuilder);
+      if (actions.size() > rowMutations) {
+        if (this.cellBlock) {
+          // Send data in cellblocks. The call to buildNoDataRegionAction will skip RowMutations.
+          // They have already been handled above. Guess at count of cells
+          regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions,
cells,
+            regionActionBuilder, actionBuilder, mutationBuilder);
+        } else {
+          regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions,
+            regionActionBuilder, actionBuilder, mutationBuilder);
+        }
+        multiRequestBuilder.addRegionAction(regionActionBuilder.build());
+        regionActionIndex++;
       }
-      multiRequestBuilder.addRegionAction(regionActionBuilder.build());
     }
 
     // Controller optionally carries cell data over the proxy/service boundary and also
@@ -140,7 +177,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
       throw ProtobufUtil.getRemoteException(e);
     }
     if (responseProto == null) return null; // Occurs on cancel
-    return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
+    return ResponseConverter.getResults(requestProto, rowMutationsIndexMap,
+      responseProto, controller.cellScanner());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 1a6572b..ac9cfbd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -104,12 +104,12 @@ public interface Table extends Closeable {
   boolean[] existsAll(List<Get> gets) throws IOException;
 
   /**
-   * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends.
+   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations
    * The ordering of execution of the actions is not defined. Meaning if you do a Put and
a
    * Get in the same {@link #batch} call, you will not necessarily be
    * guaranteed that the Get returns what the Put had put.
    *
-   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @param actions list of Get, Put, Delete, Increment, Append, RowMutations
    * @param results Empty Object[], same size as actions. Provides access to partial
    *                results, in case an exception is thrown. A null in the result array means
that
    *                the call for that action failed, even after retries
@@ -123,7 +123,7 @@ public interface Table extends Closeable {
    * Same as {@link #batch(List, Object[])}, but returns an array of
    * results instead of using a results parameter reference.
    *
-   * @param actions list of Get, Put, Delete, Increment, Append objects
+   * @param actions list of Get, Put, Delete, Increment, Append, RowMutations
    * @return the results from the actions. A null in the return array means that
    *         the call for that action failed, even after retries
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 8163130..32bdcb9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -621,7 +621,8 @@ public final class RequestConverter {
               .setMethodName(exec.getMethod().getName())
               .setRequest(exec.getRequest().toByteString())));
       } else if (row instanceof RowMutations) {
-        throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
+        // Skip RowMutations, which has been separately converted to RegionAction
+        continue;
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }
@@ -699,7 +700,8 @@ public final class RequestConverter {
             .setMethodName(exec.getMethod().getName())
             .setRequest(exec.getRequest().toByteString())));
       } else if (row instanceof RowMutations) {
-        throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
+        // Skip RowMutations, which has been separately converted to RegionAction
+        continue;
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
index ba7041e..5b8498d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
@@ -79,7 +79,8 @@ public final class ResponseConverter {
   /**
    * Get the results from a protocol buffer MultiResponse
    *
-   * @param request the protocol buffer MultiResponse to convert
+   * @param request the original protocol buffer MultiRequest
+   * @param response the protocol buffer MultiResponse to convert
    * @param cells Cells to go with the passed in <code>proto</code>.  Can be
null.
    * @return the results that were in the MultiResponse (a Result or an Exception).
    * @throws IOException
@@ -87,6 +88,22 @@ public final class ResponseConverter {
   public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest
request,
       final MultiResponse response, final CellScanner cells)
   throws IOException {
+    return getResults(request, null, response, cells);
+  }
+
+  /**
+   * Get the results from a protocol buffer MultiResponse
+   *
+   * @param request the original protocol buffer MultiRequest
+   * @param rowMutationsIndexMap
+   * @param response the protocol buffer MultiResponse to convert
+   * @param cells Cells to go with the passed in <code>proto</code>.  Can be
null.
+   * @return the results that were in the MultiResponse (a Result or an Exception).
+   * @throws IOException
+   */
+  public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest
request,
+      Map<Integer, Integer> rowMutationsIndexMap, final MultiResponse response, final
CellScanner cells)
+  throws IOException {
     int requestRegionActionCount = request.getRegionActionCount();
     int responseRegionActionResultCount = response.getRegionActionResultCount();
     if (requestRegionActionCount != responseRegionActionResultCount) {
@@ -120,8 +137,22 @@ public final class ResponseConverter {
             actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion());
       }
 
+      Object responseValue;
+
+      Integer rowMutationsIndex =
+          (rowMutationsIndexMap == null ? null : rowMutationsIndexMap.get(i));
+      if (rowMutationsIndex != null) {
+        // This RegionAction is from a RowMutations in a batch.
+        // If there is an exception from the server, the exception is set at
+        // the RegionActionResult level, which has been handled above.
+        responseValue = response.getProcessed() ?
+            ProtobufUtil.EMPTY_RESULT_EXISTS_TRUE :
+            ProtobufUtil.EMPTY_RESULT_EXISTS_FALSE;
+        results.add(regionName, rowMutationsIndex, responseValue);
+        continue;
+      }
+
       for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
-        Object responseValue;
         if (roe.hasException()) {
           responseValue = ProtobufUtil.toException(roe.getException());
         } else if (roe.hasResult()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 08ccc42..5d7c853 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -314,6 +314,52 @@ public class TestFromClientSide3 {
   }
 
   @Test
+  public void testBatchWithRowMutation() throws Exception {
+    LOG.info("Starting testBatchWithRowMutation");
+    final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation");
+    try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) {
+      byte [][] QUALIFIERS = new byte [][] {
+        Bytes.toBytes("a"), Bytes.toBytes("b")
+      };
+      RowMutations arm = new RowMutations(ROW);
+      Put p = new Put(ROW);
+      p.addColumn(FAMILY, QUALIFIERS[0], VALUE);
+      arm.add(p);
+      Object[] batchResult = new Object[1];
+      t.batch(Arrays.asList(arm), batchResult);
+
+      Get g = new Get(ROW);
+      Result r = t.get(g);
+      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
+
+      arm = new RowMutations(ROW);
+      p = new Put(ROW);
+      p.addColumn(FAMILY, QUALIFIERS[1], VALUE);
+      arm.add(p);
+      Delete d = new Delete(ROW);
+      d.addColumns(FAMILY, QUALIFIERS[0]);
+      arm.add(d);
+      t.batch(Arrays.asList(arm), batchResult);
+      r = t.get(g);
+      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
+      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
+
+      // Test that we get the correct remote exception for RowMutations from batch()
+      try {
+        arm = new RowMutations(ROW);
+        p = new Put(ROW);
+        p.addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE);
+        arm.add(p);
+        t.batch(Arrays.asList(arm), batchResult);
+        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
+      } catch (RetriesExhaustedWithDetailsException e) {
+        String msg = e.getMessage();
+        assertTrue(msg.contains("NoSuchColumnFamilyException"));
+      }
+    }
+  }
+
+  @Test
   public void testHTableExistsMethodSingleRegionSingleGet() throws Exception {
 
     // Test with a single region table.

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6f57e0f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 484bc0e..d18f560 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -576,10 +576,12 @@ public class TestMultiParallel {
   @Test(timeout=300000)
   public void testBatchWithMixedActions() throws Exception {
     LOG.info("test=testBatchWithMixedActions");
-    Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
+    Table table = UTIL.getConnection().getTable(TEST_TABLE);
 
     // Load some data to start
-    Object[] results = table.batch(constructPutRequests());
+    List<Put> puts = constructPutRequests();
+    Object[] results = new Object[puts.size()];
+    table.batch(puts, results);
     validateSizeAndEmpty(results, KEYS.length);
 
     // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
@@ -601,12 +603,12 @@ public class TestMultiParallel {
 
     // 2 put of new column
     Put put = new Put(KEYS[10]);
-    put.add(BYTES_FAMILY, qual2, val2);
+    put.addColumn(BYTES_FAMILY, qual2, val2);
     actions.add(put);
 
     // 3 delete
     Delete delete = new Delete(KEYS[20]);
-    delete.deleteFamily(BYTES_FAMILY);
+    delete.addFamily(BYTES_FAMILY);
     actions.add(delete);
 
     // 4 get
@@ -620,19 +622,38 @@ public class TestMultiParallel {
 
     // 5 put of new column
     put = new Put(KEYS[40]);
-    put.add(BYTES_FAMILY, qual2, val2);
+    put.addColumn(BYTES_FAMILY, qual2, val2);
     actions.add(put);
 
-    results = table.batch(actions);
+    // 6 RowMutations
+    RowMutations rm = new RowMutations(KEYS[50]);
+    put = new Put(KEYS[50]);
+    put.addColumn(BYTES_FAMILY, qual2, val2);
+    rm.add(put);
+    byte[] qual3 = Bytes.toBytes("qual3");
+    byte[] val3 = Bytes.toBytes("putvalue3");
+    put = new Put(KEYS[50]);
+    put.addColumn(BYTES_FAMILY, qual3, val3);
+    rm.add(put);
+    actions.add(rm);
+
+    // 7 Add another Get to the mixed sequence after RowMutations
+    get = new Get(KEYS[10]);
+    get.addColumn(BYTES_FAMILY, QUALIFIER);
+    actions.add(get);
+
+    results = new Object[actions.size()];
+    table.batch(actions, results);
 
     // Validation
 
     validateResult(results[0]);
     validateResult(results[1]);
-    validateEmpty(results[2]);
     validateEmpty(results[3]);
     validateResult(results[4]);
     validateEmpty(results[5]);
+    validateEmpty(results[6]);
+    validateResult(results[7]);
 
     // validate last put, externally from the batch
     get = new Get(KEYS[40]);
@@ -640,6 +661,17 @@ public class TestMultiParallel {
     Result r = table.get(get);
     validateResult(r, qual2, val2);
 
+    // validate last RowMutations, externally from the batch
+    get = new Get(KEYS[50]);
+    get.addColumn(BYTES_FAMILY, qual2);
+    r = table.get(get);
+    validateResult(r, qual2, val2);
+
+    get = new Get(KEYS[50]);
+    get.addColumn(BYTES_FAMILY, qual3);
+    r = table.get(get);
+    validateResult(r, qual3, val3);
+
     table.close();
   }
 
@@ -716,8 +748,7 @@ public class TestMultiParallel {
   private void validateEmpty(Object r1) {
     Result result = (Result)r1;
     Assert.assertTrue(result != null);
-    Assert.assertTrue(result.getRow() == null);
-    Assert.assertEquals(0, result.rawCells().length);
+    Assert.assertTrue(result.isEmpty());
   }
 
   private void validateSizeAndEmpty(Object[] results, int expectedSize) {


Mime
View raw message