hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [38/50] [abbrv] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. w
Date Wed, 17 Aug 2016 18:34:44 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 882e21b..d2423b3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -18,12 +18,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
@@ -43,7 +37,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -74,6 +67,15 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 
+import com.google.common.annotations.VisibleForTesting;
+// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
+// Internally, we use shaded protobuf. This below are part of our public API.
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+// SEE ABOVE NOTE!
+
 /**
  * An implementation of {@link Table}. Used to communicate with a single HBase table.
  * Lightweight. Get as needed and just close when done.
@@ -416,23 +418,16 @@ public class HTable implements Table {
 
     if (get.getConsistency() == Consistency.STRONG) {
       // Good old call.
-      final Get getReq = get;
+      final Get configuredGet = get;
       RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-          getName(), get.getRow()) {
+          this.rpcControllerFactory, getName(), get.getRow()) {
         @Override
-        public Result call(int callTimeout) throws IOException {
-          ClientProtos.GetRequest request =
-            RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            ClientProtos.GetResponse response = getStub().get(controller, request);
-            if (response == null) return null;
-            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
+        protected Result rpcCall() throws Exception {
+          ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
+              getLocation().getRegionInfo().getRegionName(), configuredGet);
+          ClientProtos.GetResponse response = getStub().get(getRpcController(), request);
+          if (response == null) return null;
+          return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner());
         }
       };
       return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
@@ -448,7 +443,6 @@ public class HTable implements Table {
     return callable.call(operationTimeout);
   }
 
-
   /**
    * {@inheritDoc}
    */
@@ -459,16 +453,14 @@ public class HTable implements Table {
     }
     try {
       Object[] r1 = new Object[gets.size()];
-      batch((List) gets, r1);
-
-      // translate.
+      batch((List<? extends Row>)gets, r1);
+      // Translate.
       Result [] results = new Result[r1.length];
-      int i=0;
-      for (Object o : r1) {
-        // batch ensures if there is a failure we get an exception instead
-        results[i++] = (Result) o;
+      int i = 0;
+      for (Object obj: r1) {
+        // Batch ensures if there is a failure we get an exception instead
+        results[i++] = (Result)obj;
       }
-
       return results;
     } catch (InterruptedException e) {
       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
@@ -516,21 +508,13 @@ public class HTable implements Table {
   public void delete(final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
-        tableName, delete.getRow()) {
+        this.rpcControllerFactory, getName(), delete.getRow()) {
       @Override
-      public Boolean call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setPriority(tableName);
-        controller.setCallTimeout(callTimeout);
-
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), delete);
-          MutateResponse response = getStub().mutate(controller, request);
-          return Boolean.valueOf(response.getProcessed());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Boolean rpcCall() throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), delete);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
       }
     };
     rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
@@ -586,41 +570,28 @@ public class HTable implements Table {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
-    PayloadCarryingServerCallable<MultiResponse> callable =
-      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+    CancellableRegionServerCallable<MultiResponse> callable =
+      new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
           rpcControllerFactory) {
-        @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
-                getLocation().getRegionInfo().getRegionName(), rm);
-            regionMutationBuilder.setAtomic(true);
-            MultiRequest request =
-                MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-            ClientProtos.MultiResponse response = getStub().multi(controller, request);
-            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if (ex instanceof IOException) {
-                throw (IOException) ex;
-              }
-              throw new IOException("Failed to mutate row: " +
-                  Bytes.toStringBinary(rm.getRow()), ex);
-            }
-            return ResponseConverter.getResults(request, response, controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+      @Override
+      protected MultiResponse rpcCall() throws Exception {
+        RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
+            getLocation().getRegionInfo().getRegionName(), rm);
+        regionMutationBuilder.setAtomic(true);
+        MultiRequest request =
+            MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+        ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
+        ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+        if (res.hasException()) {
+          Throwable ex = ProtobufUtil.toException(res.getException());
+          if (ex instanceof IOException) {
+            throw (IOException) ex;
           }
+          throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
         }
-      };
+        return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
+      }
+    };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
         null, null, callable, operationTimeout);
     ars.waitUntilDone();
@@ -629,38 +600,32 @@ public class HTable implements Table {
     }
   }
 
+  private static void checkHasFamilies(final Mutation mutation) throws IOException {
+    if (mutation.numFamilies() == 0) {
+      throw new IOException("Invalid arguments to " + mutation + ", zero columns specified");
+    }
+  }
+
   /**
    * {@inheritDoc}
    */
   @Override
   public Result append(final Append append) throws IOException {
-    if (append.numFamilies() == 0) {
-      throw new IOException(
-          "Invalid arguments to append, no columns specified");
-    }
-
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Result> callable =
-      new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) {
-        @Override
-        public Result call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            if (!response.hasResult()) return null;
-            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    checkHasFamilies(append);
+    NoncedRegionServerCallable<Result> callable =
+        new NoncedRegionServerCallable<Result>(this.connection,
+        this.rpcControllerFactory, getName(), append.getRow()) {
+      @Override
+      protected Result call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        if (!response.hasResult()) return null;
+        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+      }
+    };
+    return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -668,27 +633,17 @@ public class HTable implements Table {
    */
   @Override
   public Result increment(final Increment increment) throws IOException {
-    if (!increment.hasFamilies()) {
-      throw new IOException(
-          "Invalid arguments to increment, no columns specified");
-    }
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-        getName(), increment.getRow()) {
+    checkHasFamilies(increment);
+    NoncedRegionServerCallable<Result> callable =
+        new NoncedRegionServerCallable<Result>(this.connection,
+        this.rpcControllerFactory, getName(), increment.getRow()) {
       @Override
-      public Result call(int callTimeout) throws IOException {
-        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-        controller.setPriority(getTableName());
-        controller.setCallTimeout(callTimeout);
-        try {
-          MutateRequest request = RequestConverter.buildMutateRequest(
-            getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
-          MutateResponse response = getStub().mutate(controller, request);
-          return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+      protected Result call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        // Should this check for null like append does?
+        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
       }
     };
     return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
@@ -725,30 +680,21 @@ public class HTable implements Table {
           "Invalid arguments to incrementColumnValue", npe);
     }
 
-    NonceGenerator ng = this.connection.getNonceGenerator();
-    final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
-    RegionServerCallable<Long> callable =
-      new RegionServerCallable<Long>(connection, getName(), row) {
-        @Override
-        public Long call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildIncrementRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family,
-              qualifier, amount, durability, nonceGroup, nonce);
-            MutateResponse response = getStub().mutate(controller, request);
-            Result result =
-              ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-            return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+    NoncedRegionServerCallable<Long> callable =
+        new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(),
+            row) {
+      @Override
+      protected Long call(PayloadCarryingRpcController controller) throws Exception {
+        MutateRequest request = RequestConverter.buildIncrementRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family,
+          qualifier, amount, durability, getNonceGroup(), getNonce());
+        MutateResponse response = getStub().mutate(controller, request);
+        Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+        return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+      }
+    };
+    return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -760,25 +706,19 @@ public class HTable implements Table {
       final Put put)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-                getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, put);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -790,56 +730,43 @@ public class HTable implements Table {
       final Put put)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), compareType, put);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), compareType, put);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean checkAndDelete(final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value,
-      final Delete delete)
+  public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
+      final byte [] value, final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), CompareType.EQUAL, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
-        this.operationTimeout);
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), CompareType.EQUAL, delete);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).
+        callWithRetries(callable, this.operationTimeout);
   }
 
   /**
@@ -851,25 +778,19 @@ public class HTable implements Table {
       final Delete delete)
   throws IOException {
     RegionServerCallable<Boolean> callable =
-      new RegionServerCallable<Boolean>(connection, getName(), row) {
-        @Override
-        public Boolean call(int callTimeout) throws IOException {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
-          controller.setPriority(tableName);
-          controller.setCallTimeout(callTimeout);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MutateRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-                new BinaryComparator(value), compareType, delete);
-            MutateResponse response = getStub().mutate(controller, request);
-            return Boolean.valueOf(response.getProcessed());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
-          }
-        }
-      };
-    return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
+        new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory,
+            getName(), row) {
+      @Override
+      protected Boolean rpcCall() throws Exception {
+        CompareType compareType = CompareType.valueOf(compareOp.name());
+        MutateRequest request = RequestConverter.buildMutateRequest(
+          getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+          new BinaryComparator(value), compareType, delete);
+        MutateResponse response = getStub().mutate(getRpcController(), request);
+        return Boolean.valueOf(response.getProcessed());
+      }
+    };
+    return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable,
         this.operationTimeout);
   }
 
@@ -880,40 +801,29 @@ public class HTable implements Table {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
     throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
-    PayloadCarryingServerCallable<MultiResponse> callable =
-      new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
+    CancellableRegionServerCallable<MultiResponse> callable =
+      new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
         rpcControllerFactory) {
         @Override
-        public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
-          controller.setPriority(tableName);
-          int remainingTime = tracker.getRemainingTime(callTimeout);
-          if (remainingTime == 0) {
-            throw new DoNotRetryIOException("Timeout for mutate row");
-          }
-          controller.setCallTimeout(remainingTime);
-          try {
-            CompareType compareType = CompareType.valueOf(compareOp.name());
-            MultiRequest request = RequestConverter.buildMutateRequest(
-              getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
-              new BinaryComparator(value), compareType, rm);
-            ClientProtos.MultiResponse response = getStub().multi(controller, request);
-            ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
-            if (res.hasException()) {
-              Throwable ex = ProtobufUtil.toException(res.getException());
-              if(ex instanceof IOException) {
-                throw (IOException)ex;
-              }
-              throw new IOException("Failed to checkAndMutate row: "+
-                                    Bytes.toStringBinary(rm.getRow()), ex);
+        protected MultiResponse rpcCall() throws Exception {
+          CompareType compareType = CompareType.valueOf(compareOp.name());
+          MultiRequest request = RequestConverter.buildMutateRequest(
+            getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+            new BinaryComparator(value), compareType, rm);
+          ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request);
+          ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
+          if (res.hasException()) {
+            Throwable ex = ProtobufUtil.toException(res.getException());
+            if (ex instanceof IOException) {
+              throw (IOException)ex;
             }
-            return ResponseConverter.getResults(request, response, controller.cellScanner());
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+            throw new IOException("Failed to checkAndMutate row: "+
+              Bytes.toStringBinary(rm.getRow()), ex);
           }
+          return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
         }
       };
+
     /**
      *  Currently, we use one array to store 'processed' flag which is returned by server.
      *  It is excessive to send such a large array, but that is required by the framework right now
@@ -973,7 +883,6 @@ public class HTable implements Table {
   }
 
   /**
-   * {@inheritDoc}
    * @throws IOException
    */
   void flushCommits() throws IOException {
@@ -1150,19 +1059,18 @@ public class HTable implements Table {
     for (final byte[] r : keys) {
       final RegionCoprocessorRpcChannel channel =
           new RegionCoprocessorRpcChannel(connection, tableName, r);
-      Future<R> future = pool.submit(
-          new Callable<R>() {
-            @Override
-            public R call() throws Exception {
-              T instance = ProtobufUtil.newServiceStub(service, channel);
-              R result = callable.call(instance);
-              byte[] region = channel.getLastRegion();
-              if (callback != null) {
-                callback.update(region, r, result);
-              }
-              return result;
-            }
-          });
+      Future<R> future = pool.submit(new Callable<R>() {
+        @Override
+        public R call() throws Exception {
+          T instance = ProtobufUtil.newServiceStub(service, channel);
+          R result = callable.call(instance);
+          byte[] region = channel.getLastRegion();
+          if (callback != null) {
+            callback.update(region, r, result);
+          }
+          return result;
+        }
+      });
       futures.put(r, future);
     }
     for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
@@ -1236,9 +1144,6 @@ public class HTable implements Table {
     return tableName + ";" + connection;
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <R extends Message> Map<byte[], R> batchCoprocessorService(
       Descriptors.MethodDescriptor methodDescriptor, Message request,
@@ -1247,14 +1152,13 @@ public class HTable implements Table {
         Bytes.BYTES_COMPARATOR));
     batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
         new Callback<R>() {
-
-          @Override
-          public void update(byte[] region, byte[] row, R result) {
-            if (region != null) {
-              results.put(region, result);
-            }
-          }
-        });
+      @Override
+      public void update(byte[] region, byte[] row, R result) {
+        if (region != null) {
+          results.put(region, result);
+        }
+      }
+    });
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
index 66d3c21..8c4da68 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java
@@ -21,16 +21,34 @@ package org.apache.hadoop.hbase.client;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+
 /**
- * A RetryingCallable for master operations.
+ * A RetryingCallable for Master RPC operations.
+ * Implement the #rpcCall method. It will be retried on error. See its javadoc and the javadoc of
+ * #call(int). See {@link HBaseAdmin} for examples of how this is used. To get at the
+ * rpcController that has been created and configured to make this rpc call, use getRpcController().
+ * We are trying to contain all protobuf references including references to rpcController so we
+ * don't pollute codebase with protobuf references; keep the protobuf references contained and only
+ * present in a few classes rather than all about the code base.
+ * <p>Like {@link RegionServerCallable} only in here, we can safely be PayloadCarryingRpcController
+ * all the time. This is not possible in the similar {@link RegionServerCallable} Callable because
+ * it has to deal with Coprocessor Endpoints.
  * @param <V> return type
  */
 abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
-  protected ClusterConnection connection;
+  protected final ClusterConnection connection;
   protected MasterKeepAliveConnection master;
+  private final PayloadCarryingRpcController rpcController;
 
-  public MasterCallable(final Connection connection) {
+  MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) {
     this.connection = (ClusterConnection) connection;
+    this.rpcController = rpcConnectionFactory.newController();
   }
 
   @Override
@@ -43,6 +61,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
     // The above prepare could fail but this would still be called though masterAdmin is null
     if (this.master != null) {
       this.master.close();
+      this.master = null;
     }
   }
 
@@ -59,4 +78,65 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable {
   public long sleep(long pause, int tries) {
     return ConnectionUtils.getPauseTime(pause, tries);
   }
+
+  /**
+   * Override that changes the {@link Callable#call()} Exception from {@link Exception} to
+   * {@link IOException}. It also does setup of an rpcController and calls through to the rpcCall()
+   * method which callers are expected to implement. If rpcController is an instance of
+   * PayloadCarryingRpcController, we will set a timeout on it.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when we call.
+  public V call(int callTimeout) throws IOException {
+    try {
+      if (this.rpcController != null) {
+        this.rpcController.setCallTimeout(callTimeout);
+      }
+      return rpcCall();
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run the RPC call. Implement this method. To get at the rpcController that has been created
+   * and configured to make this rpc call, use getRpcController(). We are trying to contain
+   * rpcController references so we don't pollute codebase with protobuf references; keep the
+   * protobuf references contained and only present in a few classes rather than all about the
+   * code base.
+   * @throws Exception
+   */
+  protected abstract V rpcCall() throws Exception;
+
+  PayloadCarryingRpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  void setPriority(final int priority) {
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(priority);
+    }
+  }
+
+  void setPriority(final TableName tableName) {
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(tableName);
+    }
+  }
+
+  /**
+   * @param regionName RegionName. If hbase:meta, we'll set high priority.
+   */
+  void setPriority(final byte [] regionName) {
+    if (isMetaRegion(regionName)) {
+      setPriority(TableName.META_TABLE_NAME);
+    }
+  }
+
+  private static boolean isMetaRegion(final byte[] regionName) {
+    return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName())
+        || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
index e445b78..47693f4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
  * against the master on the MasterProtos.MasterService.BlockingInterface; but not by
  * final user code. Hence it's package protected.
  */
-interface MasterKeepAliveConnection
-extends MasterProtos.MasterService.BlockingInterface {
+interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface {
   // Do this instead of implement Closeable because closeable returning IOE is PITA.
   void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index e764ceb..1ce4aab 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,8 +30,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -41,15 +42,15 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
 
 /**
  * Callable that handles the <code>multi</code> method call going against a single
- * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a
- * {@link RegionServerCallable} that goes against multiple regions.
+ * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a
+ * RegionServerCallable that goes against multiple regions).
  * @param <R>
  */
-class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> {
+@InterfaceAudience.Private
+class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
 
@@ -79,7 +80,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
   }
 
   @Override
-  public MultiResponse call(int callTimeout) throws IOException {
+  protected MultiResponse rpcCall() throws Exception {
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
@@ -98,10 +99,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
       regionActionBuilder.clear();
       regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
           HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));
-
-
       if (this.cellBlock) {
-        // Presize.  Presume at least a KV per Action.  There are likely more.
+        // Pre-size. Presume at least a KV per Action.  There are likely more.
         if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
         // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
         // They have already been handled above. Guess at count of cells
@@ -114,20 +113,13 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
       multiRequestBuilder.addRegionAction(regionActionBuilder.build());
     }
 
-    // Controller optionally carries cell data over the proxy/service boundary and also
-    // optionally ferries cell response data back out again.
-    if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells));
-    controller.setPriority(getTableName());
-    controller.setCallTimeout(callTimeout);
-    ClientProtos.MultiResponse responseProto;
-    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
-    try {
-      responseProto = getStub().multi(controller, requestProto);
-    } catch (ServiceException e) {
-      throw ProtobufUtil.getRemoteException(e);
+    if (cells != null) {
+      setRpcControllerCellScanner(CellUtil.createCellScanner(cells));
     }
+    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
+    ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto);
     if (responseProto == null) return null; // Occurs on cancel
-    return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
+    return ResponseConverter.getResults(requestProto, responseProto, getRpcControllerCellScanner());
   }
 
   /**
@@ -151,4 +143,4 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse
   ServerName getServerName() {
     return location.getServerName();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
new file mode 100644
index 0000000..21e77bd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+
+/**
+ * Implementations make an rpc call against a RegionService via a protobuf Service.
+ * Implement #rpcCall(RpcController) and then call {@link #call(int)} to
+ * trigger the rpc. The {@link #call(int)} eventually invokes your
+ * #rpcCall(RpcController) meanwhile saving you having to write a bunch of
+ * boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are
+ * retried on fail.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the paths make use of
+ *       the regioninfo part of location when building requests. The only reason it works for
+ *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
+ *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
+ *       RegionCallable and actual RegionServerCallable with ServerName.
+ * @param <T> the class that the ServerCallable handles
+ */
+@InterfaceAudience.Private
+public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> {
+  private ClientService.BlockingInterface stub;
+  private final PayloadCarryingRpcController rpcController;
+  private final long nonce;
+
+  /**
+   * @param connection Connection to use.
+   * @param tableName Table name to which <code>row</code> belongs.
+   * @param row The row we want in <code>tableName</code>.
+   */
+  public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
+      TableName tableName, byte [] row) {
+    this(connection, rpcControllerFactory.newController(), tableName, row);
+  }
+
+  public NoncedRegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController,
+      TableName tableName, byte [] row) {
+    super(connection, tableName, row);
+    this.rpcController = rpcController;
+    if (this.rpcController != null) {
+      this.rpcController.setPriority(tableName);
+    }
+    this.nonce = getConnection().getNonceGenerator().newNonce();
+  }
+
+  void setClientByServiceName(ServerName service) throws IOException {
+    this.setStub(getConnection().getClient(service));
+  }
+
+  /**
+   * @return Client Rpc protobuf communication stub
+   */
+  protected ClientService.BlockingInterface getStub() {
+    return this.stub;
+  }
+
+  /**
+   * Set the client protobuf communication stub
+   * @param stub to set
+   */
+  void setStub(final ClientService.BlockingInterface stub) {
+    this.stub = stub;
+  }
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    if (this.rpcController != null) {
+      this.rpcController.setCallTimeout(callTimeout);
+    }
+    try {
+      return call(this.rpcController);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+
+  public PayloadCarryingRpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  long getNonceGroup() {
+    return getConnection().getNonceGenerator().getNonceGroup();
+  }
+
+  long getNonce() {
+    return this.nonce;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
deleted file mode 100644
index d94f069..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-
-/**
- * This class is used to unify HTable calls with AsyncProcess Framework.
- * HTable can use AsyncProcess directly though this class.
- */
-@InterfaceAudience.Private
-public abstract class PayloadCarryingServerCallable<T>
-    extends RegionServerCallable<T> implements Cancellable {
-  protected PayloadCarryingRpcController controller;
-
-  public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row,
-    RpcControllerFactory rpcControllerFactory) {
-    super(connection, tableName, row);
-    this.controller = rpcControllerFactory.newController();
-  }
-
-  @Override
-  public void cancel() {
-    controller.startCancel();
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return controller.isCanceled();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
index 54c93a0..4e347dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java
@@ -27,31 +27,30 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable
+ * Similar to RegionServerCallable but for the AdminService interface. This service callable
  * assumes a Table and row and thus does region locating similar to RegionServerCallable.
+ * Works against Admin stub rather than Client stub.
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD",
   justification="stub used by ipc")
 @InterfaceAudience.Private
 public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> {
-
-  protected final ClusterConnection connection;
-
-  protected final RpcControllerFactory rpcControllerFactory;
-
   protected AdminService.BlockingInterface stub;
+  protected final RpcControllerFactory rpcControllerFactory;
+  private PayloadCarryingRpcController controller = null;
 
+  protected final ClusterConnection connection;
   protected HRegionLocation location;
-
   protected final TableName tableName;
   protected final byte[] row;
   protected final int replicaId;
-
   protected final static int MIN_WAIT_DEAD_SERVER = 10000;
 
   public RegionAdminServiceCallable(ClusterConnection connection,
@@ -82,16 +81,13 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
     if (reload || location == null) {
       location = getLocation(!reload);
     }
-
     if (location == null) {
       // With this exception, there will be a retry.
       throw new HBaseIOException(getExceptionMessage());
     }
-
     this.setStub(connection.getAdmin(location.getServerName()));
   }
 
@@ -167,7 +163,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<
     if (rl == null) {
       throw new RetriesExhaustedException("Can't get the locations");
     }
-
     return rl;
   }
-}
+
+  /**
+   * Override that changes Exception from {@link Exception} to {@link IOException}. It also does
+   * setup of an rpcController and calls through to the unimplemented
+   * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation.
+   */
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf. We can't set priority on the rpcController as
+  // we do in RegionServerCallable because we don't always have a Table when we call.
+  public T call(int callTimeout) throws IOException {
+    this.controller = rpcControllerFactory.newController();
+    this.controller.setPriority(this.tableName);
+    this.controller.setCallTimeout(callTimeout);
+    try {
+      return call(this.controller);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() {
+    return this.controller;
+  }
+
+  /**
+   * Run RPC call.
+   * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a
+   * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this
+   * class.
+   * @throws Exception
+   */
+  protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index d878bae..3771c50 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,34 +20,62 @@ package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 
+import com.google.protobuf.RpcController;
+
 /**
- * Implementations call a RegionServer and implement {@link #call(int)}.
- * Passed to a {@link RpcRetryingCaller} so we retry on fail.
- * TODO: this class is actually tied to one region, because most of the paths make use of
+ * Implementations make an rpc call against a RegionService via a protobuf Service.
+ * Implement rpcCall(). Be sure to make use of the RpcController that this instance is carrying
+ * via {@link #getRpcController()}.
+ *
+ * <p>TODO: this class is actually tied to one region, because most of the paths make use of
  *       the regioninfo part of location when building requests. The only reason it works for
  *       multi-region requests (e.g. batch) is that they happen to not use the region parts.
  *       This could be done cleaner (e.g. having a generic parameter and 2 derived classes,
  *       RegionCallable and actual RegionServerCallable with ServerName.
+ *
  * @param <T> the class that the ServerCallable handles
  */
 @InterfaceAudience.Private
-public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements
-    RetryingCallable<T> {
-
+public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> {
   private ClientService.BlockingInterface stub;
 
+  /* This is 99% of the time a PayloadCarryingRpcController but this RegionServerCallable is
+   * also used doing Coprocessor Endpoints and in this case, it is a ServerRpcControllable which is
+   * not a PayloadCarryingRpcController. Too hard to untangle it all at this stage since
+   * downstreamers are using RegionServerCallable invoking CPEPs so just do ugly instanceof
+   * checks in the below.
+   */
+  private final RpcController rpcController;
+
   /**
    * @param connection Connection to use.
    * @param tableName Table name to which <code>row</code> belongs.
    * @param row The row we want in <code>tableName</code>.
    */
-  public RegionServerCallable(Connection connection, TableName tableName, byte [] row) {
+  public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory,
+      TableName tableName, byte [] row) {
+    this(connection, rpcControllerFactory.newController(), tableName, row);
+  }
+
+  public RegionServerCallable(Connection connection, RpcController rpcController,
+      TableName tableName, byte [] row) {
     super(connection, tableName, row);
+    this.rpcController = rpcController;
+    // If it is an instance of PayloadCarryingRpcController, we can set priority on the
+    // controller based off the tableName. RpcController may be null in tests when mocking so allow
+    // for null controller.
+    if (this.rpcController != null && this.rpcController instanceof PayloadCarryingRpcController) {
+      ((PayloadCarryingRpcController)this.rpcController).setPriority(tableName);
+    }
   }
 
   void setClientByServiceName(ServerName service) throws IOException {
@@ -69,4 +96,55 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab
   void setStub(final ClientService.BlockingInterface stub) {
     this.stub = stub;
   }
-}
+
+  /**
+   * Override that changes call Exception from {@link Exception} to {@link IOException}. It also
+   * does setup of an rpcController and calls through to the unimplemented
+   * rpcCall() method. If rpcController is an instance of PayloadCarryingRpcController,
+   * we will set a timeout on it.
+   */
+  @Override
+  public T call(int callTimeout) throws IOException {
+    try {
+      if (this.rpcController != null &&
+          this.rpcController instanceof PayloadCarryingRpcController) {
+        ((PayloadCarryingRpcController)this.rpcController).setCallTimeout(callTimeout);
+        // Do a reset of the CellScanner in case we are carrying any Cells since last time through.
+        setRpcControllerCellScanner(null);
+      }
+      return rpcCall();
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  /**
+   * Run the RPC call. Implement this method. To get at the rpcController that has been created
+   * and configured to make this rpc call, use getRpcController(). We are trying to contain
+   * rpcController references so we don't pollute codebase with protobuf references; keep the
+   * protobuf references contained and only present in a few classes rather than all about the
+   * code base.
+   * @throws Exception
+   */
+  protected abstract T rpcCall() throws Exception;
+
+  protected RpcController getRpcController() {
+    return this.rpcController;
+  }
+
+  /**
+   * Get the RpcController CellScanner.
+   * If the RpcController is a PayloadCarryingRpcController, which it is in all cases except
+   * when we are processing Coprocessor Endpoint, then this method returns a reference to the
+   * CellScanner that the PayloadCarryingRpcController is carrying. Do it up here in this Callable
+   * so we don't have to scatter ugly instanceof tests around the codebase. Will fail if called in
+   * a Coproccessor Endpoint context. Should never happen.
+   */
+  protected CellScanner getRpcControllerCellScanner() {
+    return ((PayloadCarryingRpcController)this.rpcController).cellScanner();
+  }
+
+  protected void setRpcControllerCellScanner(CellScanner cellScanner) {
+    ((PayloadCarryingRpcController)this.rpcController).setCellScanner(cellScanner);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
index 2377a0d..afbcc9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java
@@ -36,4 +36,4 @@ public interface RetryingCallable<T> extends RetryingCallableBase {
    * @throws Exception if unable to compute a result
    */
   T call(int callTimeout) throws Exception;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index 24288e6..b9438e6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
  * Tracks the amount of time remaining for an operation.
  */
 class RetryingTimeTracker {
-
   private long globalStartTime = -1;
 
   public void start() {
@@ -38,16 +37,19 @@ class RetryingTimeTracker {
       if (callTimeout == Integer.MAX_VALUE) {
         return Integer.MAX_VALUE;
       }
-      int remainingTime = (int) (
-        callTimeout -
-        (EnvironmentEdgeManager.currentTime() - this.globalStartTime));
+      long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime;
+      long remainingTime = callTimeout - remaining;
       if (remainingTime < 1) {
         // If there is no time left, we're trying anyway. It's too late.
         // 0 means no timeout, and it's not the intent here. So we secure both cases by
         // resetting to the minimum.
         remainingTime = 1;
       }
-      return remainingTime;
+      if (remainingTime > Integer.MAX_VALUE) {
+        throw new RuntimeException("remainingTime=" + remainingTime +
+            " which is > Integer.MAX_VALUE");
+      }
+      return (int)remainingTime;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index 0c2d345..a5bebd0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -176,9 +176,9 @@ public class ReversedScannerCallable extends ScannerCallable {
 
   @Override
   public ScannerCallable getScannerCallableForReplica(int id) {
-    ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName,
-        this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id);
+    ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName,
+        this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id);
     r.setCaching(this.getCaching());
     return r;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
new file mode 100644
index 0000000..68a4aa2
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+
+/**
+ * A RetryingCallable for RPC connection operations.
+ * @param <V> return type
+ */
+abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable {
+  @Override
+  public void prepare(boolean reload) throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public void throwable(Throwable t, boolean retrying) {
+  }
+
+  @Override
+  public String getExceptionMessageAdditionalDetail() {
+    return "";
+  }
+
+  @Override
+  public long sleep(long pause, int tries) {
+    return ConnectionUtils.getPauseTime(pause, tries);
+  }
+
+  @Override
+  // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate
+  // and so we contain references to protobuf.
+  public V call(int callTimeout) throws IOException {
+    try {
+      return rpcCall(callTimeout);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+
+  protected abstract V rpcCall(int callTimeout) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index b4cd2ef..2b2e4c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 import java.io.IOException;
 
-/**
- *
- */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RpcRetryingCaller<T> {
@@ -52,4 +49,4 @@ public interface RpcRetryingCaller<T> {
    */
   T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
   throws IOException, RuntimeException;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 1c723c5..f92aeae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -36,6 +36,7 @@ public class RpcRetryingCallerFactory {
   private final int rpcTimeout;
   private final RetryingCallerInterceptor interceptor;
   private final int startLogErrorsCnt;
+  /* These below data members are UNUSED!!!*/
   private final boolean enableBackPressure;
   private ServerStatisticTracker stats;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 65dbb10..8d63295 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -29,8 +29,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
-import com.google.protobuf.ServiceException;
-
 
 /**
  * Caller that goes to replica if the primary region does no answer within a configurable
@@ -57,8 +53,6 @@ import com.google.protobuf.ServiceException;
  */
 @InterfaceAudience.Private
 public class RpcRetryingCallerWithReadReplicas {
-  private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
-
   protected final ExecutorService pool;
   protected final ClusterConnection cConnection;
   protected final Configuration conf;
@@ -98,7 +92,7 @@ public class RpcRetryingCallerWithReadReplicas {
     private final PayloadCarryingRpcController controller;
 
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
-      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+      super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
       this.id = id;
       this.location = location;
@@ -141,28 +135,22 @@ public class RpcRetryingCallerWithReadReplicas {
     }
 
     @Override
-    public Result call(int callTimeout) throws Exception {
+    protected Result rpcCall() throws Exception {
       if (controller.isCanceled()) return null;
-
       if (Thread.interrupted()) {
         throw new InterruptedIOException();
       }
-
       byte[] reg = location.getRegionInfo().getRegionName();
-
       ClientProtos.GetRequest request =
           RequestConverter.buildGetRequest(reg, get);
-      controller.setCallTimeout(callTimeout);
-
-      try {
-        ClientProtos.GetResponse response = getStub().get(controller, request);
-        if (response == null) {
-          return null;
-        }
-        return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+      // Presumption that we are passed a PayloadCarryingRpcController here!
+      PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
+      pcrc.setCallTimeout(callTimeout);
+      ClientProtos.GetResponse response = getStub().get(controller, request);
+      if (response == null) {
+        return null;
       }
+      return ProtobufUtil.toResult(response.getResult(), pcrc.cellScanner());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 72d69ec..0ee54d0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -52,9 +51,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
-
 /**
  * Scanner operations such as create, next, etc.
  * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
@@ -74,7 +70,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected boolean renew = false;
   private Scan scan;
   private int caching = 1;
-  protected final ClusterConnection cConnection;
   protected ScanMetrics scanMetrics;
   private boolean logScannerActivity = false;
   private int logCutOffLatency = 1000;
@@ -99,8 +94,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   // indicate if it is a remote server call
   protected boolean isRegionServerRemote = true;
   private long nextCallSeq = 0;
-  protected RpcControllerFactory controllerFactory;
-  protected PayloadCarryingRpcController controller;
+  protected final RpcControllerFactory rpcControllerFactory;
 
   /**
    * @param connection which connection
@@ -125,19 +119,14 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
    */
   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
-    super(connection, tableName, scan.getStartRow());
+    super(connection, rpcControllerFactory, tableName, scan.getStartRow());
     this.id = id;
-    this.cConnection = connection;
     this.scan = scan;
     this.scanMetrics = scanMetrics;
     Configuration conf = connection.getConfiguration();
     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
-    this.controllerFactory = rpcControllerFactory;
-  }
-
-  PayloadCarryingRpcController getController() {
-    return controller;
+    this.rpcControllerFactory = rpcControllerFactory;
   }
 
   /**
@@ -185,25 +174,16 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     }
   }
 
-
-  @Override
-  public Result [] call(int callTimeout) throws IOException {
+  protected Result [] rpcCall() throws Exception {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
-
-    if (controller == null) {
-      controller = controllerFactory.newController();
-      controller.setPriority(getTableName());
-      controller.setCallTimeout(callTimeout);
-    }
-
-    if (closed) {
-      if (scannerId != -1) {
+    if (this.closed) {
+      if (this.scannerId != -1) {
         close();
       }
     } else {
-      if (scannerId == -1L) {
+      if (this.scannerId == -1L) {
         this.scannerId = openScanner();
       } else {
         Result [] rrs = null;
@@ -212,61 +192,54 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
         setHeartbeatMessage(false);
         try {
           incRPCcallsMetrics();
-          request =
-              RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
+          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
                 this.scanMetrics != null, renew);
           ScanResponse response = null;
-          try {
-            response = getStub().scan(controller, request);
-            // Client and RS maintain a nextCallSeq number during the scan. Every next() call
-            // from client to server will increment this number in both sides. Client passes this
-            // number along with the request and at RS side both the incoming nextCallSeq and its
-            // nextCallSeq will be matched. In case of a timeout this increment at the client side
-            // should not happen. If at the server side fetching of next batch of data was over,
-            // there will be mismatch in the nextCallSeq number. Server will throw
-            // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
-            // as the last successfully retrieved row.
-            // See HBASE-5974
-            nextCallSeq++;
-            long timestamp = System.currentTimeMillis();
-            setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
-            // Results are returned via controller
-            CellScanner cellScanner = controller.cellScanner();
-            rrs = ResponseConverter.getResults(cellScanner, response);
-            if (logScannerActivity) {
-              long now = System.currentTimeMillis();
-              if (now - timestamp > logCutOffLatency) {
-                int rows = rrs == null ? 0 : rrs.length;
-                LOG.info("Took " + (now-timestamp) + "ms to fetch "
+          response = getStub().scan(getRpcController(), request);
+          // Client and RS maintain a nextCallSeq number during the scan. Every next() call
+          // from client to server will increment this number in both sides. Client passes this
+          // number along with the request and at RS side both the incoming nextCallSeq and its
+          // nextCallSeq will be matched. In case of a timeout this increment at the client side
+          // should not happen. If at the server side fetching of next batch of data was over,
+          // there will be mismatch in the nextCallSeq number. Server will throw
+          // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
+          // as the last successfully retrieved row.
+          // See HBASE-5974
+          nextCallSeq++;
+          long timestamp = System.currentTimeMillis();
+          setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
+          rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
+          if (logScannerActivity) {
+            long now = System.currentTimeMillis();
+            if (now - timestamp > logCutOffLatency) {
+              int rows = rrs == null ? 0 : rrs.length;
+              LOG.info("Took " + (now-timestamp) + "ms to fetch "
                   + rows + " rows from scanner=" + scannerId);
-              }
             }
-            updateServerSideMetrics(response);
-            // moreResults is only used for the case where a filter exhausts all elements
-            if (response.hasMoreResults() && !response.getMoreResults()) {
-              scannerId = -1L;
-              closed = true;
-              // Implied that no results were returned back, either.
-              return null;
-            }
-            // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
-            // to size or quantity of results in the response.
-            if (response.hasMoreResultsInRegion()) {
-              // Set what the RS said
-              setHasMoreResultsContext(true);
-              setServerHasMoreResults(response.getMoreResultsInRegion());
-            } else {
-              // Server didn't respond whether it has more results or not.
-              setHasMoreResultsContext(false);
-            }
-          } catch (ServiceException se) {
-            throw ProtobufUtil.getRemoteException(se);
+          }
+          updateServerSideMetrics(response);
+          // moreResults is only used for the case where a filter exhausts all elements
+          if (response.hasMoreResults() && !response.getMoreResults()) {
+            this.scannerId = -1L;
+            this.closed = true;
+            // Implied that no results were returned back, either.
+            return null;
+          }
+          // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
+          // to size or quantity of results in the response.
+          if (response.hasMoreResultsInRegion()) {
+            // Set what the RS said
+            setHasMoreResultsContext(true);
+            setServerHasMoreResults(response.getMoreResultsInRegion());
+          } else {
+            // Server didn't respond whether it has more results or not.
+            setHasMoreResultsContext(false);
           }
           updateResultsMetrics(rrs);
         } catch (IOException e) {
           if (logScannerActivity) {
-            LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
-              + " to " + getLocation(), e);
+            LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " +
+                getLocation(), e);
           }
           IOException ioe = e;
           if (e instanceof RemoteException) {
@@ -275,9 +248,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
           if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
             try {
               HRegionLocation location =
-                getConnection().relocateRegion(getTableName(), scan.getStartRow());
-              LOG.info("Scanner=" + scannerId
-                + " expired, current region location is " + location.toString());
+                  getConnection().relocateRegion(getTableName(), scan.getStartRow());
+              LOG.info("Scanner=" + scannerId + " expired, current region location is " +
+                  location.toString());
             } catch (Throwable t) {
               LOG.info("Failed to relocate region", t);
             }
@@ -375,9 +348,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
       ScanRequest request =
           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
       try {
-        getStub().scan(controller, request);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
+        getStub().scan(getRpcController(), request);
+      } catch (Exception e) {
+        throw ProtobufUtil.handleRemoteException(e);
       }
     } catch (IOException e) {
       LOG.warn("Ignore, probably already closed", e);
@@ -387,20 +360,18 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
 
   protected long openScanner() throws IOException {
     incRPCcallsMetrics();
-    ScanRequest request =
-      RequestConverter.buildScanRequest(
-        getLocation().getRegionInfo().getRegionName(),
-        this.scan, 0, false);
+    ScanRequest request = RequestConverter.buildScanRequest(
+        getLocation().getRegionInfo().getRegionName(), this.scan, 0, false);
     try {
-      ScanResponse response = getStub().scan(controller, request);
+      ScanResponse response = getStub().scan(getRpcController(), request);
       long id = response.getScannerId();
       if (logScannerActivity) {
         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
           + " on region " + getLocation().toString());
       }
       return id;
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
     }
   }
 
@@ -443,11 +414,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     return caching;
   }
 
-  @Override
-  public ClusterConnection getConnection() {
-    return cConnection;
-  }
-
   /**
    * Set the number of rows that will be fetched on next
    * @param caching the number of rows for caching
@@ -458,7 +424,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
 
   public ScannerCallable getScannerCallableForReplica(int id) {
     ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
-        this.getScan(), this.scanMetrics, controllerFactory, id);
+        this.getScan(), this.scanMetrics, this.rpcControllerFactory, id);
     s.setCaching(this.caching);
     return s;
   }
@@ -488,4 +454,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
     this.serverHasMoreResultsContext = serverHasMoreResultsContext;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index c3a3834..096841b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -267,7 +267,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
   /**
    * When a scanner switches in the middle of scanning (the 'next' call fails
    * for example), the upper layer {@link ClientScanner} needs to know
-   * @return
    */
   public boolean switchedToADifferentReplica() {
     return replicaSwitched.get();
@@ -398,8 +397,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
     public void cancel() {
       cancelled = true;
       caller.cancel();
-      if (callable.getController() != null) {
-        callable.getController().startCancel();
+      if (callable.getRpcController() != null) {
+        callable.getRpcController().startCancel();
       }
       someRPCcancelled = true;
     }


Mime
View raw message