hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1529348 [1/3] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/test/java/org...
Date Fri, 04 Oct 2013 23:38:42 GMT
Author: stack
Date: Fri Oct  4 23:38:41 2013
New Revision: 1529348

URL: http://svn.apache.org/r1529348
Log:
HBASE-9612 Ability to batch edits destined to different regions

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
    hbase/trunk/hbase-protocol/README.txt
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/Client.proto
    hbase/trunk/hbase-protocol/src/main/protobuf/RPC.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionBusyWait.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestResourceFilter.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Fri Oct  4 23:38:41 2013
@@ -255,12 +255,14 @@ class AsyncProcess<CResult> {
    * @param atLeastOne true if we should submit at least a subset.
    */
   public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
-    if (rows.isEmpty()){
+    if (rows.isEmpty()) {
       return;
     }
 
+    // This looks like we are keying by region but HRegionLocation has a comparator that compares
+    // on the server portion only (hostname + port) so this Map collects regions by server.
     Map<HRegionLocation, MultiAction<Row>> actionsByServer =
-        new HashMap<HRegionLocation, MultiAction<Row>>();
+      new HashMap<HRegionLocation, MultiAction<Row>>();
     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
 
     do {
@@ -321,10 +323,7 @@ class AsyncProcess<CResult> {
    * @return the destination. Null if we couldn't find it.
    */
   private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
-    if (row == null){
-      throw new IllegalArgumentException("row cannot be null");
-    }
-
+    if (row == null) throw new IllegalArgumentException("row cannot be null");
     HRegionLocation loc = null;
     IOException locationException = null;
     try {
@@ -476,29 +475,29 @@ class AsyncProcess<CResult> {
                               final int numAttempt,
                               final HConnectionManager.ServerErrorTracker errorsByServer) {
     // Send the queries and add them to the inProgress list
+    // This iteration is by server (the HRegionLocation comparator is by server portion only).
     for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
       final HRegionLocation loc = e.getKey();
-      final MultiAction<Row> multi = e.getValue();
-      incTaskCounters(multi.getRegions(), loc.getServerName());
-
+      final MultiAction<Row> multiAction = e.getValue();
+      incTaskCounters(multiAction.getRegions(), loc.getServerName());
       Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
         @Override
         public void run() {
           MultiResponse res;
           try {
-            MultiServerCallable<Row> callable = createCallable(loc, multi);
+            MultiServerCallable<Row> callable = createCallable(loc, multiAction);
             try {
               res = createCaller(callable).callWithoutRetries(callable);
             } catch (IOException e) {
-              LOG.warn("The call to the region server failed, we don't know where we stand, " +
-                  loc.getServerName(), e);
-              resubmitAll(initialActions, multi, loc, numAttempt + 1, e, errorsByServer);
+              LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +
+                ", resubmitting all since not sure where we are at", e);
+              resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
               return;
             }
 
-            receiveMultiAction(initialActions, multi, loc, res, numAttempt, errorsByServer);
+            receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
           } finally {
-            decTaskCounters(multi.getRegions(), loc.getServerName());
+            decTaskCounters(multiAction.getRegions(), loc.getServerName());
           }
         }
       });
@@ -508,12 +507,12 @@ class AsyncProcess<CResult> {
       } catch (RejectedExecutionException ree) {
         // This should never happen. But as the pool is provided by the end user, let's secure
         //  this a little.
-        decTaskCounters(multi.getRegions(), loc.getServerName());
+        decTaskCounters(multiAction.getRegions(), loc.getServerName());
         LOG.warn("The task was rejected by the pool. This is unexpected." +
             " Server is " + loc.getServerName(), ree);
         // We're likely to fail again, but this will increment the attempt counter, so it will
         //  finish.
-        resubmitAll(initialActions, multi, loc, numAttempt + 1, ree, errorsByServer);
+        resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);
       }
     }
   }
@@ -590,12 +589,11 @@ class AsyncProcess<CResult> {
     // Do not use the exception for updating cache because it might be coming from
     // any of the regions in the MultiAction.
     hConnection.updateCachedLocations(tableName,
-        rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
+      rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location);
     errorsByServer.reportServerError(location);
-
-    List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
-    for (List<Action<Row>> actions : rsActions.actions.values()) {
-      for (Action<Row> action : actions) {
+    List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size());
+    for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
+      for (Action<Row> action : e.getValue()) {
         if (manageError(numAttempt, action.getOriginalIndex(), action.getAction(),
             true, t, location)) {
           toReplay.add(action);
@@ -605,7 +603,7 @@ class AsyncProcess<CResult> {
 
     if (toReplay.isEmpty()) {
       LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for all " +
-        initialActions.size() + "ops, NOT resubmitting, " + location.getServerName());
+        initialActions.size() + " ops, NOT resubmitting, " + location.getServerName());
     } else {
       submit(initialActions, toReplay, numAttempt, errorsByServer);
     }
@@ -669,11 +667,11 @@ class AsyncProcess<CResult> {
           }
         } else { // success
           if (callback != null) {
-            Action<Row> correspondingAction = initialActions.get(regionResult.getFirst());
+            int index = regionResult.getFirst();
+            Action<Row> correspondingAction = initialActions.get(index);
             Row row = correspondingAction.getAction();
             //noinspection unchecked
-            this.callback.success(correspondingAction.getOriginalIndex(),
-                resultsForRS.getKey(), row, (CResult) result);
+            this.callback.success(index, resultsForRS.getKey(), row, (CResult) result);
           }
         }
       }
@@ -694,8 +692,7 @@ class AsyncProcess<CResult> {
       try {
         Thread.sleep(backOffTime);
       } catch (InterruptedException e) {
-        LOG.warn("Not sent: " + toReplay.size() +
-            " operations, " + location, e);
+        LOG.warn("Not sent: " + toReplay.size() + " operations, " + location, e);
         Thread.interrupted();
         return;
       }
@@ -705,10 +702,11 @@ class AsyncProcess<CResult> {
       if (failureCount != 0) {
         // We have a failure but nothing to retry. We're done, it's a final failure..
         LOG.warn("Attempt #" + numAttempt + "/" + numTries + " failed for " + failureCount +
-            " ops on " + location.getServerName() + " NOT resubmitting." + location);
+            " ops on " + location.getServerName() + " NOT resubmitting. " + location);
       } else if (numAttempt > START_LOG_ERRORS_CNT + 1 && LOG.isDebugEnabled()) {
         // The operation was successful, but needed several attempts. Let's log this.
-        LOG.debug("Attempt #" + numAttempt + "/" + numTries + " is finally successful.");
+        LOG.debug("Attempt #" + numAttempt + "/" + numTries + " finally suceeded, size=" +
+          toReplay.size());
       }
     }
   }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java Fri Oct  4 23:38:41 2013
@@ -171,6 +171,7 @@ public class ClientSmallScanner extends 
         ScanResponse response = null;
         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
         try {
+          controller.setPriority(getTableName());
           response = getStub().scan(controller, request);
           return ResponseConverter.getResults(controller.cellScanner(),
               response);

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java Fri Oct  4 23:38:41 2013
@@ -32,7 +32,6 @@ import java.util.TreeSet;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -72,6 +71,8 @@ public class Get extends OperationWithAt
   private int storeOffset = 0;
   private Filter filter = null;
   private TimeRange tr = new TimeRange();
+  private boolean checkExistenceOnly = false;
+  private boolean closestRowBefore = false;
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
 
@@ -87,6 +88,22 @@ public class Get extends OperationWithAt
     this.row = row;
   }
 
+  public boolean isCheckExistenceOnly() {
+    return checkExistenceOnly;
+  }
+
+  public void setCheckExistenceOnly(boolean checkExistenceOnly) {
+    this.checkExistenceOnly = checkExistenceOnly;
+  }
+
+  public boolean isClosestRowBefore() {
+    return closestRowBefore;
+  }
+
+  public void setClosestRowBefore(boolean closestRowBefore) {
+    this.closestRowBefore = closestRowBefore;
+  }
+
   /**
    * Get all columns from the specified family.
    * <p>

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Fri Oct  4 23:38:41 2013
@@ -643,6 +643,7 @@ public class HBaseAdmin implements Abort
             .getServerName());
         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
         try {
+          controller.setPriority(tableName);
           ScanResponse response = server.scan(controller, request);
           values = ResponseConverter.getResults(controller.cellScanner(), response);
         } catch (ServiceException se) {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Oct  4 23:38:41 2013
@@ -2165,8 +2165,7 @@ public class HConnectionManager {
       boolean isStaleDelete = false;
       HRegionLocation oldLocation;
       synchronized (this.cachedRegionLocations) {
-        Map<byte[], HRegionLocation> tableLocations =
-          getTableLocations(hri.getTable());
+        Map<byte[], HRegionLocation> tableLocations = getTableLocations(hri.getTable());
         oldLocation = tableLocations.get(hri.getStartKey());
         if (oldLocation != null) {
            // Do not delete the cache entry if it's not for the same server that gave us the error.
@@ -2363,6 +2362,7 @@ public class HConnectionManager {
       }
     }
 
+
     /*
      * Return the number of cached region for a table. It will only be called
      * from a unit test.

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri Oct  4 23:38:41 2013
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -61,11 +60,10 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -85,9 +83,6 @@ import com.google.protobuf.ServiceExcept
  * <p>In case of reads, some fields used by a Scan are shared among all threads.
  * The HTable implementation can either not contract to be safe in case of a Get
  *
- * <p>To access a table in a multi threaded environment, please consider
- * using the {@link HTablePool} class to create your HTable instances.
- *
  * <p>Instances of HTable passed the same {@link Configuration} instance will
  * share connections to servers out on the cluster and to the zookeeper ensemble
  * as well as caches of region locations.  This is usually a *good* thing and it
@@ -959,8 +954,13 @@ public class HTable implements HTableInt
         new RegionServerCallable<Void>(connection, getName(), rm.getRow()) {
       public Void call() throws IOException {
         try {
-          MultiRequest request = RequestConverter.buildMultiRequest(
+          RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
             getLocation().getRegionInfo().getRegionName(), rm);
+          regionMutationBuilder.setAtomic(true);
+          MultiRequest request =
+            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+          PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
+          pcrc.setPriority(tableName);
           getStub().multi(null, request);
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
@@ -987,6 +987,7 @@ public class HTable implements HTableInt
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), append);
             PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+            rpcController.setPriority(getTableName());
             MutateResponse response = getStub().mutate(rpcController, request);
             if (!response.hasResult()) return null;
             return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
@@ -1013,9 +1014,10 @@ public class HTable implements HTableInt
         try {
           MutateRequest request = RequestConverter.buildMutateRequest(
             getLocation().getRegionInfo().getRegionName(), increment);
-            PayloadCarryingRpcController rpcContoller = new PayloadCarryingRpcController();
-            MutateResponse response = getStub().mutate(rpcContoller, request);
-            return ProtobufUtil.toResult(response.getResult(), rpcContoller.cellScanner());
+            PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+            rpcController.setPriority(getTableName());
+            MutateResponse response = getStub().mutate(rpcController, request);
+            return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
           }
@@ -1074,6 +1076,7 @@ public class HTable implements HTableInt
               getLocation().getRegionInfo().getRegionName(), row, family,
               qualifier, amount, durability);
             PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+            rpcController.setPriority(getTableName());
             MutateResponse response = getStub().mutate(rpcController, request);
             Result result =
               ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
@@ -1142,61 +1145,10 @@ public class HTable implements HTableInt
    */
   @Override
   public boolean exists(final Get get) throws IOException {
-    RegionServerCallable<Boolean> callable =
-        new RegionServerCallable<Boolean>(connection, getName(), get.getRow()) {
-      public Boolean call() throws IOException {
-        try {
-          GetRequest request = RequestConverter.buildGetRequest(
-            getLocation().getRegionInfo().getRegionName(), get, true);
-          GetResponse response = getStub().get(null, request);
-          return response.getExists();
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
-      }
-    };
-    return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
-  }
-
-  /**
-   * Goal of this inner class is to keep track of the initial position of a get in a list before
-   * sorting it. This is used to send back results in the same orders we got the Gets before we sort
-   * them.
-   */
-  private static class SortedGet implements Comparable<SortedGet> {
-    protected int initialIndex = -1; // Used to store the get initial index in a list.
-    protected Get get; // Encapsulated Get instance.
-
-    public SortedGet (Get get, int initialIndex) {
-      this.get = get;
-      this.initialIndex = initialIndex;
-    }
-
-    public int getInitialIndex() {
-      return initialIndex;
-    }
-
-    @Override
-    public int compareTo(SortedGet o) {
-      return get.compareTo(o.get);
-    }
-
-    public Get getGet() {
-      return get;
-    }
-
-    @Override
-    public int hashCode() {
-      return get.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof SortedGet)
-        return get.equals(((SortedGet)obj).get);
-      else
-        return false;
-    }
+    get.setCheckExistenceOnly(true);
+    Result r = get(get);
+    assert r.getExists() != null;
+    return r.getExists();
   }
 
   /**
@@ -1204,100 +1156,26 @@ public class HTable implements HTableInt
    */
   @Override
   public Boolean[] exists(final List<Get> gets) throws IOException {
-    // Prepare the sorted list of gets. Take the list of gets received, and encapsulate them into
-    // a list of SortedGet instances. Simple list parsing, so complexity here is O(n)
-    // The list is later used to recreate the response order based on the order the Gets
-    // got received.
-    ArrayList<SortedGet> sortedGetsList = new ArrayList<HTable.SortedGet>();
-    for (int indexGet = 0; indexGet < gets.size(); indexGet++) {
-      sortedGetsList.add(new SortedGet (gets.get(indexGet), indexGet));
-    }
-
-    // Sorting the list to get the Gets ordered based on the key.
-    Collections.sort(sortedGetsList); // O(n log n)
-
-    // step 1: sort the requests by regions to send them bundled.
-    // Map key is startKey index. Map value is the list of Gets related to the region starting
-    // with the startKey.
-    Map<Integer, List<Get>> getsByRegion = new HashMap<Integer, List<Get>>();
-
-    // Reference map to quickly find back in which region a get belongs.
-    Map<Get, Integer> getToRegionIndexMap = new HashMap<Get, Integer>();
-    Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
-
-    int regionIndex = 0;
-    for (final SortedGet get : sortedGetsList) {
-      // Progress on the regions until we find the one the current get resides in.
-      while ((regionIndex < startEndKeys.getSecond().length) && ((Bytes.compareTo(startEndKeys.getSecond()[regionIndex], get.getGet().getRow()) <= 0))) {
-        regionIndex++;
-      }
-      List<Get> regionGets = getsByRegion.get(regionIndex);
-      if (regionGets == null) {
-        regionGets = new ArrayList<Get>();
-        getsByRegion.put(regionIndex, regionGets);
-      }
-      regionGets.add(get.getGet());
-      getToRegionIndexMap.put(get.getGet(), regionIndex);
-    }
+    if (gets.isEmpty()) return new Boolean[]{};
+    if (gets.size() == 1) return new Boolean[]{exists(gets.get(0))};
 
-    // step 2: make the requests
-    Map<Integer, Future<List<Boolean>>> futures =
-        new HashMap<Integer, Future<List<Boolean>>>(sortedGetsList.size());
-    for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
-      Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
-        public List<Boolean> call() throws Exception {
-          RegionServerCallable<List<Boolean>> callable =
-            new RegionServerCallable<List<Boolean>>(connection, getName(),
-              getsByRegionEntry.getValue().get(0).getRow()) {
-            public List<Boolean> call() throws IOException {
-              try {
-                MultiGetRequest requests = RequestConverter.buildMultiGetRequest(
-                  getLocation().getRegionInfo().getRegionName(), getsByRegionEntry.getValue(),
-                  true, false);
-                MultiGetResponse responses = getStub().multiGet(null, requests);
-                return responses.getExistsList();
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-            }
-          };
-          return rpcCallerFactory.<List<Boolean>> newCaller().callWithRetries(callable,
-              operationTimeout);
-        }
-      };
-      futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
+    for (Get g: gets){
+      g.setCheckExistenceOnly(true);
     }
 
-    // step 3: collect the failures and successes
-    Map<Integer, List<Boolean>> responses = new HashMap<Integer, List<Boolean>>();
-    for (final Map.Entry<Integer, List<Get>> sortedGetEntry : getsByRegion.entrySet()) {
-      try {
-        Future<List<Boolean>> future = futures.get(sortedGetEntry.getKey());
-        List<Boolean> resp = future.get();
-
-        if (resp == null) {
-          LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
-        }
-        responses.put(sortedGetEntry.getKey(), resp);
-      } catch (ExecutionException e) {
-        LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
-      } catch (InterruptedException e) {
-        LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
-        Thread.currentThread().interrupt();
-      }
+    Object[] r1;
+    try {
+      r1 = batch(gets);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
     }
-    Boolean[] results = new Boolean[sortedGetsList.size()];
 
-    // step 4: build the response.
-    Map<Integer, Integer> indexes = new HashMap<Integer, Integer>();
-    for (int i = 0; i < sortedGetsList.size(); i++) {
-      Integer regionInfoIndex = getToRegionIndexMap.get(sortedGetsList.get(i).getGet());
-      Integer index = indexes.get(regionInfoIndex);
-      if (index == null) {
-        index = 0;
-      }
-      results[sortedGetsList.get(i).getInitialIndex()] = responses.get(regionInfoIndex).get(index);
-      indexes.put(regionInfoIndex, index + 1);
+    // translate.
+    Boolean[] results = new Boolean[r1.length];
+    int i = 0;
+    for (Object o : r1) {
+      // batch ensures if there is a failure we get an exception instead
+      results[i++] = ((Result)o).getExists();
     }
 
     return results;

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Fri Oct  4 23:38:41 2013
@@ -24,14 +24,17 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.protobuf.ServiceException;
 
@@ -42,91 +45,74 @@ import com.google.protobuf.ServiceExcept
  * @param <R>
  */
 class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
-  private final MultiAction<R> multi;
+  private final MultiAction<R> multiAction;
   private final boolean cellBlock;
 
   MultiServerCallable(final HConnection connection, final TableName tableName,
       final HRegionLocation location, final MultiAction<R> multi) {
     super(connection, tableName, null);
-    this.multi = multi;
+    this.multiAction = multi;
     setLocation(location);
     this.cellBlock = isCellBlock();
   }
 
   MultiAction<R> getMulti() {
-    return this.multi;
+    return this.multiAction;
   }
 
   @Override
   public MultiResponse call() throws IOException {
-    MultiResponse response = new MultiResponse();
-    // The multi object is a list of Actions by region.
-    for (Map.Entry<byte[], List<Action<R>>> e: this.multi.actions.entrySet()) {
-      byte[] regionName = e.getKey();
-      int rowMutations = 0;
-      List<Action<R>> actions = e.getValue();
-      for (Action<R> action : actions) {
-        Row row = action.getAction();
-        // Row Mutations are a set of Puts and/or Deletes all to be applied atomically
-        // on the one row.  We do these a row at a time.
-        if (row instanceof RowMutations) {
-          RowMutations rms = (RowMutations)row;
-          List<CellScannable> cells = null;
-          MultiRequest multiRequest;
-          try {
-            if (this.cellBlock) {
-              // Stick all Cells for all RowMutations in here into 'cells'.  Populated when we call
-              // buildNoDataMultiRequest in the below.
-              cells = new ArrayList<CellScannable>(rms.getMutations().size());
-              // Build a multi request absent its Cell payload (this is the 'nodata' in the below).
-              multiRequest = RequestConverter.buildNoDataMultiRequest(regionName, rms, cells);
-            } else {
-              multiRequest = RequestConverter.buildMultiRequest(regionName, rms);
-            }
-            // Carry the cells if any over the proxy/pb Service interface using the payload
-            // carrying rpc controller.
-            getStub().multi(new PayloadCarryingRpcController(cells), multiRequest);
-            // This multi call does not return results.
-            response.add(regionName, action.getOriginalIndex(), Result.EMPTY_RESULT);
-          } catch (ServiceException se) {
-            response.add(regionName, action.getOriginalIndex(),
-              ProtobufUtil.getRemoteException(se));
-          }
-          rowMutations++;
-        }
-      }
-      // Are there any non-RowMutation actions to send for this region?
-      if (actions.size() > rowMutations) {
-        Exception ex = null;
-        List<Object> results = null;
-        List<CellScannable> cells = null;
-        MultiRequest multiRequest;
-        try {
-          if (isCellBlock()) {
-            // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
-            // They have already been handled above.
-            cells = new ArrayList<CellScannable>(actions.size() - rowMutations);
-            multiRequest = RequestConverter.buildNoDataMultiRequest(regionName, actions, cells);
-          } else {
-            multiRequest = RequestConverter.buildMultiRequest(regionName, actions);
-          }
-          // Controller optionally carries cell data over the proxy/service boundary and also
-          // optionally ferries cell response data back out again.
-          PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
-          ClientProtos.MultiResponse responseProto = getStub().multi(controller, multiRequest);
-          results = ResponseConverter.getResults(responseProto, controller.cellScanner());
-        } catch (ServiceException se) {
-          ex = ProtobufUtil.getRemoteException(se);
-        }
-        for (int i = 0, n = actions.size(); i < n; i++) {
-          int originalIndex = actions.get(i).getOriginalIndex();
-          response.add(regionName, originalIndex, results == null ? ex : results.get(i));
-        }
+    int countOfActions = this.multiAction.size();
+    if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
+    MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
+    List<CellScannable> cells = null;
+    // The multi object is a list of Actions by region.  Iterate by region.
+    for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
+      final byte [] regionName = e.getKey();
+      final List<Action<R>> actions = e.getValue();
+      RegionAction.Builder regionActionBuilder;
+      if (this.cellBlock) {
+        // Presize.  Presume at least a KV per Action.  There are likely more.
+        if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
+        // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
+        // They have already been handled above. Guess at count of cells
+        regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);
+      } else {
+        regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);
       }
+      multiRequestBuilder.addRegionAction(regionActionBuilder.build());
     }
-    return response;
+    // Controller optionally carries cell data over the proxy/service boundary and also
+    // optionally ferries cell response data back out again.
+    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+    controller.setPriority(getTableName());
+    ClientProtos.MultiResponse responseProto;
+    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
+    try {
+      responseProto = getStub().multi(controller, requestProto);
+    } catch (ServiceException e) {
+      return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
+    }
+    return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
   }
 
+  /**
+   * @param request
+   * @param t
+   * @return Return a response that has every action in request failed w/ the passed in
+   * exception <code>t</code> -- this will get them all retried after some backoff.
+   */
+  private static MultiResponse createAllFailedResponse(final ClientProtos.MultiRequest request,
+      final Throwable t) {
+    MultiResponse massFailedResponse = new MultiResponse();
+    for (RegionAction rAction: request.getRegionActionList()) {
+      byte [] regionName = rAction.getRegion().getValue().toByteArray();
+      for (ClientProtos.Action action: rAction.getActionList()) {
+        massFailedResponse.add(regionName, new Pair<Integer, Object>(action.getIndex(), t));
+      }
+    }
+    return massFailedResponse;
+  }
 
   /**
    * @return True if we should send data in cellblocks.  This is an expensive call.  Cache the

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java Fri Oct  4 23:38:41 2013
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.Byte
 @InterfaceStability.Stable
 public class Result implements CellScannable {
   private Cell[] cells;
+  private Boolean exists; // if the query was just to check existence.
   // We're not using java serialization.  Transient here is just a marker to say
   // that this is where we cache row if we're ever asked for it.
   private transient byte [] row = null;
@@ -108,7 +109,7 @@ public class Result implements CellScann
   @Deprecated
   public Result(List<KeyValue> kvs) {
     // TODO: Here we presume the passed in Cells are KVs.  One day this won't always be so.
-    this(kvs.toArray(new Cell[kvs.size()]));
+    this(kvs.toArray(new Cell[kvs.size()]), null);
   }
 
   /**
@@ -117,7 +118,14 @@ public class Result implements CellScann
    * @param cells List of cells
    */
   public static Result create(List<Cell> cells) {
-    return new Result(cells.toArray(new Cell[cells.size()]));
+    return new Result(cells.toArray(new Cell[cells.size()]), null);
+  }
+
+  public static Result create(List<Cell> cells, Boolean exists) {
+    if (exists != null){
+      return new Result(null, exists);
+    }
+    return new Result(cells.toArray(new Cell[cells.size()]), exists);
   }
 
   /**
@@ -126,12 +134,13 @@ public class Result implements CellScann
    * @param cells array of cells
    */
   public static Result create(Cell[] cells) {
-    return new Result(cells);
+    return new Result(cells, null);
   }
 
   /** Private ctor. Use {@link #create(Cell[])}. */
-  private Result(Cell[] cells) {
+  private Result(Cell[] cells, Boolean exists) {
     this.cells = cells;
+    this.exists = exists;
   }
 
   /**
@@ -796,4 +805,12 @@ public class Result implements CellScann
   public CellScanner cellScanner() {
     return CellUtil.createCellScanner(this.cells);
   }
+
+  public Boolean getExists() {
+    return exists;
+  }
+
+  public void setExists(Boolean exists) {
+    this.exists = exists;
+  }
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Fri Oct  4 23:38:41 2013
@@ -163,6 +163,7 @@ public class ScannerCallable extends Reg
           ScanResponse response = null;
           PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
           try {
+            controller.setPriority(getTableName());
             response = getStub().scan(controller, request);
             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
             // from client to server will increment this number in both sides. Client passes this

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java Fri Oct  4 23:38:41 2013
@@ -23,7 +23,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
@@ -36,6 +37,15 @@ import com.google.protobuf.RpcController
  */
 @InterfaceAudience.Private
 public class PayloadCarryingRpcController implements RpcController, CellScannable {
+  /**
+   * Priority to set on this request.  Set it here in controller so available composing the
+   * request.  This is the ordained way of setting priorities going forward.  We will be
+   * undoing the old annotation-based mechanism.
+   */
+  // Currently only multi call makes use of this.  Eventually this should be only way to set
+  // priority.
+  private int priority = 0;
+
   // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException
 
   /**
@@ -103,4 +113,26 @@ public class PayloadCarryingRpcControlle
   public void startCancel() {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * @param priority Priority for this request; should fall roughly in the range
+   * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
+   */
+  public void setPriority(int priority) {
+    this.priority = priority;
+  }
+
+  /**
+   * @param tn Set priority based off the table we are going against.
+   */
+  public void setPriority(final TableName tn) {
+    this.priority = tn != null && tn.isSystemTable()? HConstants.HIGH_QOS: HConstants.NORMAL_QOS;
+  }
+
+  /**
+   * @return The priority of this request
+   */
+  public int getPriority() {
+    return priority;
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Fri Oct  4 23:38:41 2013
@@ -55,6 +55,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.codec.Codec;
@@ -211,7 +212,8 @@ public class RpcClient {
   @SuppressWarnings("serial")
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class FailedServerException extends IOException {
+  // Shouldn't this be a DoNotRetryException? St.Ack 10/2/2013
+  public static class FailedServerException extends HBaseIOException {
     public FailedServerException(String s) {
       super(s);
     }
@@ -967,8 +969,12 @@ public class RpcClient {
       }
 
       // close the streams and therefore the socket
-      IOUtils.closeStream(out);
-      this.out = null;
+      if (this.out != null) {
+        synchronized(this.out) {
+          IOUtils.closeStream(out);
+          this.out = null;
+        }
+      }
       IOUtils.closeStream(in);
       this.in = null;
       disposeSasl();
@@ -1002,9 +1008,10 @@ public class RpcClient {
      * Note: this is not called from the Connection thread, but by other
      * threads.
      * @param call
+     * @param priority
      * @see #readResponse()
      */
-    protected void writeRequest(Call call) {
+    protected void writeRequest(Call call, final int priority) {
       if (shouldCloseConnection.get()) return;
       try {
         RequestHeader.Builder builder = RequestHeader.newBuilder();
@@ -1022,6 +1029,8 @@ public class RpcClient {
           cellBlockBuilder.setLength(cellBlock.limit());
           builder.setCellBlockMeta(cellBlockBuilder.build());
         }
+        // Only pass priority if there one.  Let zero be same as no priority.
+        if (priority != 0) builder.setPriority(priority);
         //noinspection SynchronizeOnNonFinalField
         RequestHeader header = builder.build();
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
@@ -1380,6 +1389,12 @@ public class RpcClient {
     }
   }
 
+  Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
+      Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout)
+  throws InterruptedException, IOException {
+    return call(md, param, cells, returnType, ticket, addr, rpcTimeout, HConstants.NORMAL_QOS);
+  }
+
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> protocol,
    * with the <code>ticket</code> credentials, returning the value.
@@ -1400,12 +1415,12 @@ public class RpcClient {
    */
   Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
       Message returnType, User ticket, InetSocketAddress addr,
-      int rpcTimeout)
+      int rpcTimeout, int priority)
   throws InterruptedException, IOException {
     Call call = new Call(md, param, cells, returnType);
     Connection connection =
       getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
-    connection.writeRequest(call);                 // send the parameter
+    connection.writeRequest(call, priority);                 // send the parameter
     boolean interrupted = false;
     //noinspection SynchronizationOnLocalVariableOrMethodParameter
     synchronized (call) {
@@ -1632,7 +1647,8 @@ public class RpcClient {
     }
     Pair<Message, CellScanner> val = null;
     try {
-      val = call(md, param, cells, returnType, ticket, isa, rpcTimeout);
+      val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
+        pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
       if (pcrc != null) {
         // Shove the results into controller so can be carried across the proxy/pb service void.
         if (val.getSecond() != null) pcrc.setCellScanner(val.getSecond());

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Fri Oct  4 23:38:41 2013
@@ -407,6 +407,12 @@ public final class ProtobufUtil {
         }
       }
     }
+    if (proto.hasExistenceOnly() && proto.getExistenceOnly()){
+      get.setCheckExistenceOnly(true);
+    }
+    if (proto.hasClosestRowBefore() && proto.getClosestRowBefore()){
+      get.setClosestRowBefore(true);
+    }
     return get;
   }
 
@@ -922,6 +928,12 @@ public final class ProtobufUtil {
     if (get.getRowOffsetPerColumnFamily() > 0) {
       builder.setStoreOffset(get.getRowOffsetPerColumnFamily());
     }
+    if (get.isCheckExistenceOnly()){
+      builder.setExistenceOnly(true);
+    }
+    if (get.isClosestRowBefore()){
+      builder.setClosestRowBefore(true);
+    }
     return builder.build();
   }
 
@@ -1053,6 +1065,21 @@ public final class ProtobufUtil {
         builder.addCell(toCell(c));
       }
     }
+    if (result.getExists() != null){
+      builder.setExists(result.getExists());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Convert a client Result to a protocol buffer Result
+   *
+   * @param existence the client existence to send
+   * @return the converted protocol buffer Result
+   */
+  public static ClientProtos.Result toResult(final boolean existence) {
+    ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
+    builder.setExists(existence);
     return builder.build();
   }
 
@@ -1066,6 +1093,9 @@ public final class ProtobufUtil {
   public static ClientProtos.Result toResultNoData(final Result result) {
     ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
     builder.setAssociatedCellCount(result.size());
+    if (result.getExists() != null){
+      builder.setExists(result.getExists());
+    }
     return builder.build();
   }
 
@@ -1076,12 +1106,16 @@ public final class ProtobufUtil {
    * @return the converted client Result
    */
   public static Result toResult(final ClientProtos.Result proto) {
+    if (proto.hasExists()) {
+      return Result.create(null, proto.getExists());
+    }
+
     List<CellProtos.Cell> values = proto.getCellList();
     List<Cell> cells = new ArrayList<Cell>(values.size());
-    for (CellProtos.Cell c: values) {
+    for (CellProtos.Cell c : values) {
       cells.add(toCell(c));
     }
-    return Result.create(cells);
+    return Result.create(cells, null);
   }
 
   /**
@@ -1094,6 +1128,10 @@ public final class ProtobufUtil {
    */
   public static Result toResult(final ClientProtos.Result proto, final CellScanner scanner)
   throws IOException {
+    if (proto.hasExists()){
+      return Result.create(null, proto.getExists());
+    }
+
     // TODO: Unit test that has some Cells in scanner and some in the proto.
     List<Cell> cells = null;
     if (proto.hasAssociatedCellCount()) {
@@ -1109,7 +1147,7 @@ public final class ProtobufUtil {
     for (CellProtos.Cell c: values) {
       cells.add(toCell(c));
     }
-    return Result.create(cells);
+    return Result.create(cells, null);
   }
 
   /**
@@ -2257,11 +2295,15 @@ public final class ProtobufUtil {
           ", row=" + getStringForByteString(r.getGet().getRow());
     } else if (m instanceof ClientProtos.MultiRequest) {
       ClientProtos.MultiRequest r = (ClientProtos.MultiRequest) m;
-      ClientProtos.MultiAction action = r.getActionList().get(0);
-      return "region= " + getStringForByteString(r.getRegion().getValue()) +
-          ", for " + r.getActionCount() +
-          " actions and 1st row key=" + getStringForByteString(action.hasMutation() ?
-          action.getMutation().getRow() : action.getGet().getRow());
+      // Get first set of Actions.
+      ClientProtos.RegionAction actions = r.getRegionActionList().get(0);
+      String row = actions.getActionCount() <= 0? "":
+        getStringForByteString(actions.getAction(0).hasGet()?
+          actions.getAction(0).getGet().getRow():
+          actions.getAction(0).getMutation().getRow());
+      return "region= " + getStringForByteString(actions.getRegion().getValue()) +
+          ", for " + r.getRegionActionCount() +
+          " actions and 1st row key=" + row;
     } else if (m instanceof ClientProtos.MutateRequest) {
       ClientProtos.MutateRequest r = (ClientProtos.MutateRequest) m;
       return "region= " + getStringForByteString(r.getRegion().getValue()) +

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Fri Oct  4 23:38:41 2013
@@ -63,14 +63,12 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -101,6 +99,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Triple;
+import org.mortbay.log.Log;
 
 import com.google.protobuf.ByteString;
 
@@ -131,7 +130,6 @@ public final class RequestConverter {
     GetRequest.Builder builder = GetRequest.newBuilder();
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
-    builder.setClosestRowBefore(true);
     builder.setRegion(region);
 
     Column.Builder columnBuilder = Column.newBuilder();
@@ -140,63 +138,30 @@ public final class RequestConverter {
       ClientProtos.Get.newBuilder();
     getBuilder.setRow(ByteString.copyFrom(row));
     getBuilder.addColumn(columnBuilder.build());
+    getBuilder.setClosestRowBefore(true);
     builder.setGet(getBuilder.build());
     return builder.build();
   }
 
-  /**
-   * Create a protocol buffer GetRequest for a client Get
-   *
-   * @param regionName the name of the region to get
-   * @param get the client Get
-   * @return a protocol buffer GetReuqest
-   */
-  public static GetRequest buildGetRequest(final byte[] regionName,
-      final Get get) throws IOException {
-    return buildGetRequest(regionName, get, false);
-  }
 
   /**
    * Create a protocol buffer GetRequest for a client Get
    *
    * @param regionName the name of the region to get
    * @param get the client Get
-   * @param existenceOnly indicate if check row existence only
    * @return a protocol buffer GetRequest
    */
   public static GetRequest buildGetRequest(final byte[] regionName,
-      final Get get, final boolean existenceOnly) throws IOException {
+      final Get get) throws IOException {
     GetRequest.Builder builder = GetRequest.newBuilder();
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
-    builder.setExistenceOnly(existenceOnly);
     builder.setRegion(region);
     builder.setGet(ProtobufUtil.toGet(get));
     return builder.build();
   }
 
   /**
-   * Create a protocol buffer MultiGetRequest for client Gets All gets are going to be run against
-   * the same region.
-   * @param regionName the name of the region to get from
-   * @param gets the client Gets
-   * @param existenceOnly indicate if check rows existence only
-   * @return a protocol buffer MultiGetRequest
-   */
-  public static MultiGetRequest buildMultiGetRequest(final byte[] regionName, final List<Get> gets,
-      final boolean existenceOnly, final boolean closestRowBefore) throws IOException {
-    MultiGetRequest.Builder builder = MultiGetRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
-    builder.setExistenceOnly(existenceOnly);
-    builder.setClosestRowBefore(closestRowBefore);
-    builder.setRegion(region);
-    for (Get get : gets) {
-      builder.addGet(ProtobufUtil.toGet(get));
-    }
-    return builder.build();
-  }
-
-  /**
    * Create a protocol buffer MutateRequest for a client increment
    *
    * @param regionName
@@ -358,17 +323,18 @@ public final class RequestConverter {
   }
 
   /**
-   * Create a protocol buffer MultiRequest for a row mutations
-   *
+   * Create a protocol buffer MultiRequest for row mutations.
+   * Does not propagate Action absolute position.  Does not set atomic action on the created
+   * RegionAtomic.  Caller should do that if wanted.
    * @param regionName
    * @param rowMutations
-   * @return a multi request
+   * @return a data-laden RegionMutation.Builder
    * @throws IOException
    */
-  public static MultiRequest buildMultiRequest(final byte[] regionName,
+  public static RegionAction.Builder buildRegionAction(final byte [] regionName,
       final RowMutations rowMutations)
   throws IOException {
-    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
+    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
     for (Mutation mutation: rowMutations.getMutations()) {
       MutationType mutateType = null;
       if (mutation instanceof Put) {
@@ -380,25 +346,26 @@ public final class RequestConverter {
           mutation.getClass().getName());
       }
       MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
-      builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
+      builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
     }
-    return builder.build();
+    return builder;
   }
 
   /**
    * Create a protocol buffer MultiRequest for row mutations that does not hold data.  Data/Cells
-   * are carried outside of protobuf.  Return references to the Cells in <code>cells</code> param
-   *
+   * are carried outside of protobuf.  Return references to the Cells in <code>cells</code> param.
+    * Does not propagate Action absolute position.  Does not set atomic action on the created
+   * RegionAtomic.  Caller should do that if wanted.
    * @param regionName
    * @param rowMutations
    * @param cells Return in here a list of Cells as CellIterable.
-   * @return a multi request minus data
+   * @return a region mutation minus data
    * @throws IOException
    */
-  public static MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+  public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
       final RowMutations rowMutations, final List<CellScannable> cells)
   throws IOException {
-    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
+    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
     for (Mutation mutation: rowMutations.getMutations()) {
       MutationType type = null;
       if (mutation instanceof Put) {
@@ -411,17 +378,16 @@ public final class RequestConverter {
       }
       MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
       cells.add(mutation);
-      builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
+      builder.addAction(ClientProtos.Action.newBuilder().setMutation(mp).build());
     }
-    return builder.build();
+    return builder;
   }
 
-  private static MultiRequest.Builder getMultiRequestBuilderWithRegionAndAtomicSet(final byte [] regionName,
-      final boolean atomic) {
-    MultiRequest.Builder builder = MultiRequest.newBuilder();
+  private static RegionAction.Builder getRegionActionBuilderWithRegion(final byte [] regionName) {
+    RegionAction.Builder builder = RegionAction.newBuilder();
     RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
     builder.setRegion(region);
-    return builder.setAtomic(atomic);
+    return builder;
   }
 
   /**
@@ -510,39 +476,43 @@ public final class RequestConverter {
 
   /**
    * Create a protocol buffer multi request for a list of actions.
-   * RowMutations in the list (if any) will be ignored.
+   * Propagates Actions original index.
    *
    * @param regionName
    * @param actions
    * @return a multi request
    * @throws IOException
    */
-  public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
+  public static <R> RegionAction.Builder buildRegionAction(final byte[] regionName,
       final List<Action<R>> actions)
   throws IOException {
-    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
+    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
     for (Action<R> action: actions) {
-      MultiAction.Builder protoAction = MultiAction.newBuilder();
       Row row = action.getAction();
+      ClientProtos.Action.Builder actionBuilder =
+          ClientProtos.Action.newBuilder().setIndex(action.getOriginalIndex());
       if (row instanceof Get) {
-        protoAction.setGet(ProtobufUtil.toGet((Get)row));
+        Get g = (Get)row;
+        builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
       } else if (row instanceof Put) {
-        protoAction.setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row));
+        builder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row)));
       } else if (row instanceof Delete) {
-        protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row));
+        builder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row)));
       } else if (row instanceof Append) {
-        protoAction.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row));
+        builder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row)));
       } else if (row instanceof Increment) {
-        protoAction.setMutation(ProtobufUtil.toMutation((Increment)row));
+        builder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutation((Increment)row)));
       } else if (row instanceof RowMutations) {
-        continue; // ignore RowMutations
+        throw new UnsupportedOperationException("No RowMutations in multi calls; use mutateRow");
       } else {
-        throw new DoNotRetryIOException(
-          "multi doesn't support " + row.getClass().getName());
+        throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }
-      builder.addAction(protoAction.build());
     }
-    return builder.build();
+    return builder;
   }
 
   /**
@@ -553,7 +523,7 @@ public final class RequestConverter {
    * carried by protobuf.  We return references to the data by adding them to the passed in
    * <code>data</code> param.
    *
-   * RowMutations in the list (if any) will be ignored.
+   * <p>Propagates Actions original index.
    *
    * @param regionName
    * @param actions
@@ -561,20 +531,22 @@ public final class RequestConverter {
    * @return a multi request that does not carry any data.
    * @throws IOException
    */
-  public static <R> MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+  public static <R> RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
       final List<Action<R>> actions, final List<CellScannable> cells)
   throws IOException {
-    MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
+    RegionAction.Builder builder = getRegionActionBuilderWithRegion(regionName);
     for (Action<R> action: actions) {
-      MultiAction.Builder protoAction = MultiAction.newBuilder();
       Row row = action.getAction();
+      ClientProtos.Action.Builder actionBuilder =
+        ClientProtos.Action.newBuilder().setIndex(action.getOriginalIndex());
       if (row instanceof Get) {
-        // Gets are carried by protobufs.
-        protoAction.setGet(ProtobufUtil.toGet((Get)row));
+        Get g = (Get)row;
+        builder.addAction(actionBuilder.setGet(ProtobufUtil.toGet(g)));
       } else if (row instanceof Put) {
         Put p = (Put)row;
         cells.add(p);
-        protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p));
+        builder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p)));
       } else if (row instanceof Delete) {
         Delete d = (Delete)row;
         int size = d.size();
@@ -585,26 +557,29 @@ public final class RequestConverter {
         // metadata only in the pb and then send the kv along the side in cells.
         if (size > 0) {
           cells.add(d);
-          protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d));
+          builder.addAction(actionBuilder.
+            setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d)));
         } else {
-          protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d));
+          builder.addAction(actionBuilder.
+            setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d)));
         }
       } else if (row instanceof Append) {
         Append a = (Append)row;
         cells.add(a);
-        protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a));
+        builder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a)));
       } else if (row instanceof Increment) {
         Increment i = (Increment)row;
         cells.add(i);
-        protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i));
+        builder.addAction(actionBuilder.
+          setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i)));
       } else if (row instanceof RowMutations) {
         continue; // ignore RowMutations
       } else {
         throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
       }
-      builder.addAction(protoAction.build());
     }
-    return builder.build();
+    return builder;
   }
 
 // End utilities for Client

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Fri Oct  4 23:38:41 2013
@@ -39,14 +39,19 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
 import org.apache.hadoop.hbase.security.access.UserPermission;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.protobuf.ByteString;
@@ -68,27 +73,58 @@ public final class ResponseConverter {
   /**
    * Get the results from a protocol buffer MultiResponse
    *
-   * @param proto the protocol buffer MultiResponse to convert
+   * @param request the protocol buffer MultiResponse to convert
    * @param cells Cells to go with the passed in <code>proto</code>.  Can be null.
    * @return the results that were in the MultiResponse (a Result or an Exception).
    * @throws IOException
    */
-  public static List<Object> getResults(final ClientProtos.MultiResponse proto,
-      final CellScanner cells)
+  public static org.apache.hadoop.hbase.client.MultiResponse getResults(final MultiRequest request,
+      final MultiResponse response, final CellScanner cells)
   throws IOException {
-    List<Object> results = new ArrayList<Object>();
-    List<ActionResult> resultList = proto.getResultList();
-    for (int i = 0, n = resultList.size(); i < n; i++) {
-      ActionResult result = resultList.get(i);
-      if (result.hasException()) {
-        results.add(ProtobufUtil.toException(result.getException()));
-      } else if (result.hasValue()) {
-        ClientProtos.Result value = result.getValue();
-        results.add(ProtobufUtil.toResult(value, cells));
-      } else {
-        results.add(new Result());
+    int requestRegionActionCount = request.getRegionActionCount();
+    int responseRegionActionResultCount = response.getRegionActionResultCount();
+    if (requestRegionActionCount != responseRegionActionResultCount) {
+      throw new IllegalStateException("Request mutation count=" + responseRegionActionResultCount +
+          " does not match response mutation result count=" + responseRegionActionResultCount);
+    }
+
+    org.apache.hadoop.hbase.client.MultiResponse results =
+      new org.apache.hadoop.hbase.client.MultiResponse();
+
+    for (int i = 0; i < responseRegionActionResultCount; i++) {
+      RegionAction actions = request.getRegionAction(i);
+      RegionActionResult actionResult = response.getRegionActionResult(i);
+      byte[] regionName = actions.getRegion().toByteArray();
+
+      if (actionResult.hasException()){
+        Throwable regionException =  ProtobufUtil.toException(actionResult.getException());
+        for (ClientProtos.Action a : actions.getActionList()){
+          results.add(regionName, new Pair<Integer, Object>(a.getIndex(), regionException));
+        }
+        continue;
+      }
+
+      if (actions.getActionCount() != actionResult.getResultOrExceptionCount()) {
+        throw new IllegalStateException("actions.getActionCount=" + actions.getActionCount() +
+            ", actionResult.getResultOrExceptionCount=" +
+            actionResult.getResultOrExceptionCount() + " for region " + actions.getRegion());
+      }
+
+      for (ResultOrException roe : actionResult.getResultOrExceptionList()) {
+        if (roe.hasException()) {
+          results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
+              ProtobufUtil.toException(roe.getException())));
+        } else if (roe.hasResult()) {
+          results.add(regionName, new Pair<Integer, Object>(roe.getIndex(),
+              ProtobufUtil.toResult(roe.getResult(), cells)));
+        } else {
+          // no result & no exception. Unexpected.
+          throw new IllegalStateException("No result & no exception roe=" + roe +
+              " for region " + actions.getRegion());
+        }
       }
     }
+
     return results;
   }
 
@@ -96,16 +132,36 @@ public final class ResponseConverter {
    * Wrap a throwable to an action result.
    *
    * @param t
-   * @return an action result
+   * @return an action result builder
    */
-  public static ActionResult buildActionResult(final Throwable t) {
-    ActionResult.Builder builder = ActionResult.newBuilder();
+  public static ResultOrException.Builder buildActionResult(final Throwable t) {
+    ResultOrException.Builder builder = ResultOrException.newBuilder();
+    if (t != null) builder.setException(buildException(t));
+    return builder;
+  }
+
+  /**
+   * Wrap a throwable to an action result.
+   *
+   * @param r
+   * @return an action result builder
+   */
+  public static ResultOrException.Builder buildActionResult(final ClientProtos.Result r) {
+    ResultOrException.Builder builder = ResultOrException.newBuilder();
+    if (r != null) builder.setResult(r);
+    return builder;
+  }
+
+  /**
+   * @param t
+   * @return NameValuePair of the exception name to stringified version os exception.
+   */
+  public static NameBytesPair buildException(final Throwable t) {
     NameBytesPair.Builder parameterBuilder = NameBytesPair.newBuilder();
     parameterBuilder.setName(t.getClass().getName());
     parameterBuilder.setValue(
       ByteString.copyFromUtf8(StringUtils.stringifyException(t)));
-    builder.setException(parameterBuilder.build());
-    return builder.build();
+    return parameterBuilder.build();
   }
 
   /**

Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java (original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java Fri Oct  4 23:38:41 2013
@@ -172,7 +172,7 @@ public class TestAsyncProcess {
    */
   static class MyConnectionImpl2 extends MyConnectionImpl {
     List<HRegionLocation> hrl;
-    boolean usedRegions[];
+    final boolean usedRegions[];
 
     protected MyConnectionImpl2(List<HRegionLocation> hrl) {
       super(c);
@@ -186,7 +186,7 @@ public class TestAsyncProcess {
       int i = 0;
       for (HRegionLocation hr:hrl){
         if (Arrays.equals(row, hr.getRegionInfo().getStartKey())){
-          usedRegions[i] = true;
+            usedRegions[i] = true;
           return hr;
         }
         i++;
@@ -475,9 +475,9 @@ public class TestAsyncProcess {
 
 
   private class MyCB implements AsyncProcess.AsyncProcessCallback<Object> {
-    private AtomicInteger successCalled = new AtomicInteger(0);
-    private AtomicInteger failureCalled = new AtomicInteger(0);
-    private AtomicInteger retriableFailure = new AtomicInteger(0);
+    private final AtomicInteger successCalled = new AtomicInteger(0);
+    private final AtomicInteger failureCalled = new AtomicInteger(0);
+    private final AtomicInteger retriableFailure = new AtomicInteger(0);
 
 
     @Override
@@ -705,7 +705,7 @@ public class TestAsyncProcess {
    */
   @Test
   public void testThreadCreation() throws Exception {
-    final int NB_REGS = 10000;
+    final int NB_REGS = 100;
     List<HRegionLocation> hrls = new ArrayList<HRegionLocation>(NB_REGS);
     List<Get> gets = new ArrayList<Get>(NB_REGS);
     for (int i = 0; i < NB_REGS; i++) {
@@ -721,11 +721,13 @@ public class TestAsyncProcess {
     HTable ht = new HTable();
     MyConnectionImpl2 con = new MyConnectionImpl2(hrls);
     ht.connection = con;
-    ht.batch(gets);
+
+      ht.batch(gets);
+
 
     Assert.assertEquals(con.ap.nbActions.get(), NB_REGS);
-    Assert.assertEquals(con.ap.nbMultiResponse.get(), 2); // 1 multi response per server
-    Assert.assertEquals(con.nbThreads.get(), 2);  // 1 thread per server
+    Assert.assertEquals("1 multi response per server", 2, con.ap.nbMultiResponse.get());
+    Assert.assertEquals("1 thread per server", 2, con.nbThreads.get());
 
     int nbReg = 0;
     for (int i =0; i<NB_REGS; i++){

Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java (original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java Fri Oct  4 23:38:41 2013
@@ -301,4 +301,4 @@ public class TestClientNoCluster {
       return this.stub;
     }
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java Fri Oct  4 23:38:41 2013
@@ -244,11 +244,13 @@ public final class CellUtil {
 
       @Override
       public Cell current() {
+        if (cells == null) return null;
         return (index < 0)? null: this.cells[index];
       }
 
       @Override
       public boolean advance() {
+        if (cells == null) return false;
         return ++index < this.cells.length;
       }
     };

Modified: hbase/trunk/hbase-protocol/README.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/README.txt?rev=1529348&r1=1529347&r2=1529348&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/README.txt (original)
+++ hbase/trunk/hbase-protocol/README.txt Fri Oct  4 23:38:41 2013
@@ -25,7 +25,6 @@ terminal and hit return -- the protoc co
   do
     protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE
   done
-  ll $JAVA_DIR/org/apache/hadoop/hbase/protobuf/generated
 
 After you've done the above, check it in and then check it in (or post a patch
 on a JIRA with your definition file changes and the generated files).



Mime
View raw message