hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jya...@apache.org
Subject git commit: HBASE-11048 Support setting custom priority per client RPC
Date Fri, 23 May 2014 06:33:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/0.98 494e0b50e -> 796134e9f


HBASE-11048 Support setting custom priority per client RPC


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/796134e9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/796134e9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/796134e9

Branch: refs/heads/0.98
Commit: 796134e9f5ebc97451d9e59bcc75fcb3d01b4a4d
Parents: 494e0b5
Author: Jesse Yates <jyates@apache.org>
Authored: Thu May 22 23:29:22 2014 -0700
Committer: Jesse Yates <jyates@apache.org>
Committed: Thu May 22 23:29:22 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  11 +-
 .../hadoop/hbase/client/ClientScanner.java      |  28 ++-
 .../client/ClientSmallReversedScanner.java      |   2 +-
 .../hadoop/hbase/client/ClientSmallScanner.java |  19 +-
 .../hadoop/hbase/client/HConnectionManager.java |   9 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |  74 ++++---
 .../hbase/client/MultiServerCallable.java       |  10 +-
 .../hbase/client/ReversedClientScanner.java     |   7 +-
 .../hbase/client/ReversedScannerCallable.java   |  17 +-
 .../hadoop/hbase/client/ScannerCallable.java    |  18 +-
 .../DelegatingPayloadCarryingRpcController.java |  58 ++++++
 .../hbase/ipc/RegionCoprocessorRpcChannel.java  |  12 +-
 .../hadoop/hbase/ipc/RpcControllerFactory.java  |  59 ++++++
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |  70 ++++++-
 .../hadoop/hbase/client/TestAsyncProcess.java   |   3 +-
 .../regionserver/wal/WALEditsReplaySink.java    |   3 +
 .../hbase/client/TestRpcControllerFactory.java  | 203 +++++++++++++++++++
 17 files changed, 525 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 52bedf3..bbb2fdf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -46,11 +46,14 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.cloudera.htrace.Trace;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class  allows a continuous flow of requests. It's written to be compatible with a
  * synchronous caller such as HTable.
@@ -127,6 +130,7 @@ class AsyncProcess<CResult> {
   protected int numTries;
   protected int serverTrackerTimeout;
   protected RpcRetryingCallerFactory rpcCallerFactory;
+  private RpcControllerFactory rpcFactory;
 
 
   /**
@@ -198,7 +202,7 @@ class AsyncProcess<CResult> {
 
   public AsyncProcess(HConnection hc, TableName tableName, ExecutorService pool,
       AsyncProcessCallback<CResult> callback, Configuration conf,
-      RpcRetryingCallerFactory rpcCaller) {
+      RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
     if (hc == null){
       throw new IllegalArgumentException("HConnection cannot be null.");
     }
@@ -251,8 +255,9 @@ class AsyncProcess<CResult> {
       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
     }
 
-
     this.rpcCallerFactory = rpcCaller;
+    Preconditions.checkNotNull(rpcFactory);
+    this.rpcFactory = rpcFactory;
   }
 
   /**
@@ -576,7 +581,7 @@ class AsyncProcess<CResult> {
    */
   protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
       final MultiAction<Row> multi) {
-    return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
+    return new MultiServerCallable<Row>(hConnection, tableName, location, this.rpcFactory, multi);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 82471c8..a5edd86 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
@@ -66,6 +67,7 @@ public class ClientScanner extends AbstractClientScanner {
     protected final int scannerTimeout;
     protected boolean scanMetricsPublished = false;
     protected RpcRetryingCaller<Result []> caller;
+    protected RpcControllerFactory rpcControllerFactory;
 
     /**
      * Create a new ClientScanner for the specified table. An HConnection will be
@@ -104,7 +106,8 @@ public class ClientScanner extends AbstractClientScanner {
      */
   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
       HConnection connection) throws IOException {
-    this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
+    this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
+        RpcControllerFactory.instantiate(conf));
   }
 
   /**
@@ -113,7 +116,20 @@ public class ClientScanner extends AbstractClientScanner {
   @Deprecated
   public ClientScanner(final Configuration conf, final Scan scan, final byte [] tableName,
       HConnection connection) throws IOException {
-    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf));
+    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
+        RpcControllerFactory.instantiate(conf));
+  }
+
+  /**
+   * @deprecated Use
+   *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
+   *             RpcRetryingCallerFactory, RpcControllerFactory)}
+   *             instead
+   */
+  @Deprecated
+  public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
+      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
+    this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
   }
 
   /**
@@ -126,7 +142,8 @@ public class ClientScanner extends AbstractClientScanner {
    * @throws IOException
    */
   public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
-      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
+      HConnection connection, RpcRetryingCallerFactory rpcFactory,
+      RpcControllerFactory controllerFactory) throws IOException {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Scan table=" + tableName
             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@@ -159,7 +176,8 @@ public class ClientScanner extends AbstractClientScanner {
             HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
       }
 
-    this.caller = rpcFactory.<Result[]> newCaller();
+      this.caller = rpcFactory.<Result[]> newCaller();
+      this.rpcControllerFactory = controllerFactory;
 
       initializeScannerInConstruction();
     }
@@ -278,7 +296,7 @@ public class ClientScanner extends AbstractClientScanner {
         int nbRows) {
       scan.setStartRow(localStartKey);
       ScannerCallable s = new ScannerCallable(getConnection(),
-        getTable(), scan, this.scanMetrics);
+        getTable(), scan, this.scanMetrics, rpcControllerFactory.newController());
       s.setCaching(nbRows);
       return s;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index 17b1110..c707e45 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -109,7 +109,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
     }
 
     smallScanCallable = ClientSmallScanner.getSmallScanCallable(
-        scan, getConnection(), getTable(), localStartKey, cacheNum);
+        scan, getConnection(), getTable(), localStartKey, cacheNum, this.rpcControllerFactory);
 
     if (this.scanMetrics != null && skipRowOfFirstResult == null) {
       this.scanMetrics.countOfRegions.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index a67b09f..24229b5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -82,7 +83,8 @@ public class ClientSmallScanner extends ClientScanner {
    */
   public ClientSmallScanner(final Configuration conf, final Scan scan,
       final TableName tableName, HConnection connection) throws IOException {
-    this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
+    this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
+        RpcControllerFactory.instantiate(conf));
   }
 
   /**
@@ -94,12 +96,13 @@ public class ClientSmallScanner extends ClientScanner {
    * @param tableName The table that we wish to rangeGet
    * @param connection Connection identifying the cluster
    * @param rpcFactory
+   * @param controllerFactory 
    * @throws IOException
    */
-  public ClientSmallScanner(final Configuration conf, final Scan scan,
-      final TableName tableName, HConnection connection,
-      RpcRetryingCallerFactory rpcFactory) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory);
+  public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName,
+      HConnection connection, RpcRetryingCallerFactory rpcFactory,
+      RpcControllerFactory controllerFactory) throws IOException {
+    super(conf, scan, tableName, connection, rpcFactory, controllerFactory);
   }
 
   @Override
@@ -151,7 +154,7 @@ public class ClientSmallScanner extends ClientScanner {
           + Bytes.toStringBinary(localStartKey) + "'");
     }
     smallScanCallable = getSmallScanCallable(
-        scan, getConnection(), getTable(), localStartKey, cacheNum);
+        scan, getConnection(), getTable(), localStartKey, cacheNum, rpcControllerFactory);
     if (this.scanMetrics != null && skipRowOfFirstResult == null) {
       this.scanMetrics.countOfRegions.incrementAndGet();
     }
@@ -160,7 +163,7 @@ public class ClientSmallScanner extends ClientScanner {
 
   static RegionServerCallable<Result[]> getSmallScanCallable(
       final Scan sc, HConnection connection, TableName table, byte[] localStartKey,
-      final int cacheNum) throws IOException { 
+      final int cacheNum, final RpcControllerFactory rpcControllerFactory) throws IOException { 
     sc.setStartRow(localStartKey);
     RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
         connection, table, sc.getStartRow()) {
@@ -168,7 +171,7 @@ public class ClientSmallScanner extends ClientScanner {
         ScanRequest request = RequestConverter.buildScanRequest(getLocation()
           .getRegionInfo().getRegionName(), sc, cacheNum, true);
         ScanResponse response = null;
-        PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
         try {
           controller.setPriority(getTableName());
           response = getStub().scan(controller, request);

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
index e4adf8c..003bc8e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -1233,9 +1234,9 @@ public class HConnectionManager {
           }
 
           // Query the meta region for the location of the meta region
-          regionInfoRow = ProtobufUtil.getRowOrBefore(service,
-              metaLocation.getRegionInfo().getRegionName(), metaKey,
-              HConstants.CATALOG_FAMILY);
+          regionInfoRow =
+              ProtobufUtil.getRowOrBefore(service, metaLocation.getRegionInfo().getRegionName(),
+                metaKey, HConstants.CATALOG_FAMILY);
 
           if (regionInfoRow == null) {
             throw new TableNotFoundException(tableName);
@@ -2376,7 +2377,7 @@ public class HConnectionManager {
     protected <R> AsyncProcess createAsyncProcess(TableName tableName, ExecutorService pool,
            AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
       return new AsyncProcess<R>(this, tableName, pool, callback, conf,
-          RpcRetryingCallerFactory.instantiate(conf));
+          RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf));
     }
 
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 04ee336..3a4a715 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 
 import com.google.protobuf.Descriptors;
+import com.google.protobuf.GeneratedMessage;
 import com.google.protobuf.Message;
 import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
@@ -140,6 +142,7 @@ public class HTable implements HTableInterface {
   /** The Async process for puts with autoflush set to false or multiputs */
   protected AsyncProcess<Object> ap;
   private RpcRetryingCallerFactory rpcCallerFactory;
+  private RpcControllerFactory rpcControllerFactory;
 
   /**
    * Creates an object to access a HBase table.
@@ -344,8 +347,9 @@ public class HTable implements HTableInterface {
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
 
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
+    this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
     ap = new AsyncProcess<Object>(connection, tableName, pool, null,
-        configuration, rpcCallerFactory);
+        configuration, rpcCallerFactory, rpcControllerFactory);
 
     this.maxKeyValueSize = this.configuration.getInt(
         "hbase.client.keyvalue.maxsize", -1);
@@ -703,9 +707,9 @@ public class HTable implements HTableInterface {
      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
          tableName, row) {
        public Result call() throws IOException {
-         return ProtobufUtil.getRowOrBefore(getStub(),
-           getLocation().getRegionInfo().getRegionName(), row, family);
-       }
+            return ProtobufUtil.getRowOrBefore(getStub(), getLocation().getRegionInfo()
+                .getRegionName(), row, family, rpcControllerFactory.newController());
+          }
      };
     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
   }
@@ -724,17 +728,14 @@ public class HTable implements HTableInterface {
         return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
             this.connection);
       } else {
-        return new ReversedClientScanner(getConfiguration(), scan, getName(),
-            this.connection);
+        return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection);
       }
     }
 
     if (scan.isSmall()) {
-      return new ClientSmallScanner(getConfiguration(), scan, getName(),
-          this.connection, this.rpcCallerFactory);
+      return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
     } else {
-      return new ClientScanner(getConfiguration(), scan,
-          getName(), this.connection);
+      return new ClientScanner(getConfiguration(), scan, getName(), this.connection);
     }
   }
 
@@ -764,12 +765,18 @@ public class HTable implements HTableInterface {
    */
   @Override
   public Result get(final Get get) throws IOException {
-    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-        getName(), get.getRow()) {
-      public Result call() throws IOException {
-        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
-      }
-    };
+    // have to instanatiate this and set the priority here since in protobuf util we don't pass in
+    // the tablename... an unfortunate side-effect of public interfaces :-/ In 0.99+ we put all the
+    // logic back into HTable
+    final PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+    controller.setPriority(tableName);
+    RegionServerCallable<Result> callable =
+        new RegionServerCallable<Result>(this.connection, getName(), get.getRow()) {
+          public Result call() throws IOException {
+            return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get,
+              controller);
+          }
+        };
     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
@@ -856,7 +863,9 @@ public class HTable implements HTableInterface {
         try {
           MutateRequest request = RequestConverter.buildMutateRequest(
             getLocation().getRegionInfo().getRegionName(), delete);
-          MutateResponse response = getStub().mutate(null, request);
+              PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+              controller.setPriority(tableName);
+              MutateResponse response = getStub().mutate(controller, request);
           return Boolean.valueOf(response.getProcessed());
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
@@ -1000,9 +1009,9 @@ public class HTable implements HTableInterface {
           regionMutationBuilder.setAtomic(true);
           MultiRequest request =
             MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
-          PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController();
+          PayloadCarryingRpcController pcrc = rpcControllerFactory.newController();
           pcrc.setPriority(tableName);
-          getStub().multi(null, request);
+          getStub().multi(pcrc, request);
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
         }
@@ -1030,7 +1039,7 @@ public class HTable implements HTableInterface {
           try {
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
-            PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+            PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
             rpcController.setPriority(getTableName());
             MutateResponse response = getStub().mutate(rpcController, request);
             if (!response.hasResult()) return null;
@@ -1060,15 +1069,15 @@ public class HTable implements HTableInterface {
         try {
           MutateRequest request = RequestConverter.buildMutateRequest(
             getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
-          PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+          PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
           rpcController.setPriority(getTableName());
           MutateResponse response = getStub().mutate(rpcController, request);
           return ProtobufUtil.toResult(response.getResult(), rpcController.cellScanner());
         } catch (ServiceException se) {
           throw ProtobufUtil.getRemoteException(se);
         }
-        }
-      };
+      }
+    };
     return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
   }
 
@@ -1123,7 +1132,7 @@ public class HTable implements HTableInterface {
             MutateRequest request = RequestConverter.buildIncrementRequest(
               getLocation().getRegionInfo().getRegionName(), row, family,
               qualifier, amount, durability, nonceGroup, nonce);
-            PayloadCarryingRpcController rpcController = new PayloadCarryingRpcController();
+            PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
             rpcController.setPriority(getTableName());
             MutateResponse response = getStub().mutate(rpcController, request);
             Result result =
@@ -1152,7 +1161,9 @@ public class HTable implements HTableInterface {
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
                 new BinaryComparator(value), CompareType.EQUAL, put);
-            MutateResponse response = getStub().mutate(null, request);
+            PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
+            rpcController.setPriority(getTableName());
+            MutateResponse response = getStub().mutate(rpcController, request);
             return Boolean.valueOf(response.getProcessed());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
@@ -1178,7 +1189,9 @@ public class HTable implements HTableInterface {
             MutateRequest request = RequestConverter.buildMutateRequest(
               getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
                 new BinaryComparator(value), CompareType.EQUAL, delete);
-            MutateResponse response = getStub().mutate(null, request);
+            PayloadCarryingRpcController rpcController = rpcControllerFactory.newController();
+            rpcController.setPriority(getTableName());
+            MutateResponse response = getStub().mutate(rpcController, request);
             return Boolean.valueOf(response.getProcessed());
           } catch (ServiceException se) {
             throw ProtobufUtil.getRemoteException(se);
@@ -1482,7 +1495,8 @@ public class HTable implements HTableInterface {
    * {@inheritDoc}
    */
   public CoprocessorRpcChannel coprocessorService(byte[] row) {
-    return new RegionCoprocessorRpcChannel(connection, tableName, row);
+    return new RegionCoprocessorRpcChannel(connection, tableName, row, rpcCallerFactory,
+        rpcControllerFactory);
   }
 
   /**
@@ -1519,7 +1533,8 @@ public class HTable implements HTableInterface {
         new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
     for (final byte[] r : keys) {
       final RegionCoprocessorRpcChannel channel =
-          new RegionCoprocessorRpcChannel(connection, tableName, r);
+          new RegionCoprocessorRpcChannel(connection, tableName, r, rpcCallerFactory,
+              rpcControllerFactory);
       Future<R> future = pool.submit(
           new Callable<R>() {
             public R call() throws Exception {
@@ -1692,8 +1707,7 @@ public class HTable implements HTableInterface {
             return !(exception instanceof DoNotRetryIOException);
           }
         },
-        configuration,
-        RpcRetryingCallerFactory.instantiate(configuration));
+        configuration, rpcCallerFactory, rpcControllerFactory);
 
     asyncProcess.submitAll(execs);
     asyncProcess.waitUntilDone();

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index bb9d406..57ea476 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -30,14 +30,15 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 
 import com.google.protobuf.ServiceException;
 
@@ -50,11 +51,14 @@ import com.google.protobuf.ServiceException;
 class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
+  private RpcControllerFactory rpcFactory;
 
   MultiServerCallable(final HConnection connection, final TableName tableName,
-      final HRegionLocation location, final MultiAction<R> multi) {
+      final HRegionLocation location, final RpcControllerFactory rpcFactory,
+      final MultiAction<R> multi) {
     super(connection, tableName, null);
     this.multiAction = multi;
+    this.rpcFactory = rpcFactory;
     setLocation(location);
     this.cellBlock = isCellBlock();
   }
@@ -101,7 +105,7 @@ class MultiServerCallable<R> extends RegionServerCallable<MultiResponse> {
 
     // Controller optionally carries cell data over the proxy/service boundary and also
     // optionally ferries cell response data back out again.
-    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+    PayloadCarryingRpcController controller = rpcFactory.newController(cells);
     controller.setPriority(getTableName());
     ClientProtos.MultiResponse responseProto;
     ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index 639fe5a..a0299df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 
@@ -41,6 +42,7 @@ public class ReversedClientScanner extends ClientScanner {
   // A byte array in which all elements are the max byte, and it is used to
   // construct closest front row
   static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
+
   /**
    * Create a new ReversibleClientScanner for the specified table Note that the
    * passed {@link Scan}'s start row maybe changed.
@@ -125,8 +127,9 @@ public class ReversedClientScanner extends ClientScanner {
   protected ScannerCallable getScannerCallable(byte[] localStartKey,
       int nbRows, byte[] locateStartRow) {
     scan.setStartRow(localStartKey);
-    ScannerCallable s = new ReversedScannerCallable(getConnection(),
-        getTable(), scan, this.scanMetrics, locateStartRow);
+    ScannerCallable s =
+        new ReversedScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
+            locateStartRow, rpcControllerFactory.newController());
     s.setCaching(nbRows);
     return s;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
index a974b01..d641f30 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -44,17 +46,24 @@ public class ReversedScannerCallable extends ScannerCallable {
    */
   protected final byte[] locateStartRow;
 
+  @Deprecated
+  public ReversedScannerCallable(HConnection connection, TableName tableName, Scan scan,
+      ScanMetrics scanMetrics, byte[] locateStartRow) {
+    this(connection, tableName, scan, scanMetrics, locateStartRow, RpcControllerFactory
+        .instantiate(connection.getConfiguration()).newController());
+  }
+
   /**
-   * 
    * @param connection
    * @param tableName
    * @param scan
    * @param scanMetrics
    * @param locateStartRow The start row for locating regions
+   * @param rpcFactory
    */
-  public ReversedScannerCallable(HConnection connection, TableName tableName,
-      Scan scan, ScanMetrics scanMetrics, byte[] locateStartRow) {
-    super(connection, tableName, scan, scanMetrics);
+  public ReversedScannerCallable(HConnection connection, TableName tableName, Scan scan,
+      ScanMetrics scanMetrics, byte[] locateStartRow, PayloadCarryingRpcController rpcFactory) {
+    super(connection, tableName, scan, scanMetrics, rpcFactory);
     this.locateStartRow = locateStartRow;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 1a3d7a7..e306c5f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
+import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import com.google.protobuf.TextFormat;
 
@@ -81,31 +83,36 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
   // indicate if it is a remote server call
   protected boolean isRegionServerRemote = true;
   private long nextCallSeq = 0;
+  protected final PayloadCarryingRpcController controller;
   
   /**
    * @param connection which connection
    * @param tableName table callable is on
    * @param scan the scan to execute
-   * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable
-   * won't collect metrics
+   * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
+   *          metrics
+   * @param controller to use when writing the rpc
    */
   public ScannerCallable (HConnection connection, TableName tableName, Scan scan,
-    ScanMetrics scanMetrics) {
+      ScanMetrics scanMetrics, PayloadCarryingRpcController controller) {
     super(connection, tableName, scan.getStartRow());
     this.scan = scan;
     this.scanMetrics = scanMetrics;
     Configuration conf = connection.getConfiguration();
     logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
     logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
+    this.controller = controller;
   }
 
   /**
-   * @deprecated Use {@link #ScannerCallable(HConnection, TableName, Scan, ScanMetrics)}
+   * @deprecated Use {@link #ScannerCallable(HConnection, TableName, Scan, 
+   *  ScanMetrics, PayloadCarryingRpcController)}
    */
   @Deprecated
   public ScannerCallable (HConnection connection, final byte [] tableName, Scan scan,
       ScanMetrics scanMetrics) {
-    this(connection, TableName.valueOf(tableName), scan, scanMetrics);
+    this(connection, TableName.valueOf(tableName), scan, scanMetrics, RpcControllerFactory
+        .instantiate(connection.getConfiguration()).newController());
   }
 
   /**
@@ -162,7 +169,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
           incRPCcallsMetrics();
           request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
           ScanResponse response = null;
-          PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
           try {
             controller.setPriority(getTableName());
             response = getStub().scan(controller, request);

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
new file mode 100644
index 0000000..9f23770
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Simple delegating controller for use with the {@link RpcControllerFactory} to help override
+ * standard behavior of a {@link PayloadCarryingRpcController}.
+ */
+public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController {
+  private PayloadCarryingRpcController delegate;
+
+  public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public CellScanner cellScanner() {
+    return delegate.cellScanner();
+  }
+
+  @Override
+  public void setCellScanner(final CellScanner cellScanner) {
+    delegate.setCellScanner(cellScanner);
+  }
+
+  @Override
+  public void setPriority(int priority) {
+    delegate.setPriority(priority);
+  }
+
+  @Override
+  public void setPriority(final TableName tn) {
+    delegate.setPriority(tn);
+  }
+
+  @Override
+  public int getPriority() {
+    return delegate.getPriority();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 3238348..de1259d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -55,11 +55,15 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
 
   private RpcRetryingCallerFactory rpcFactory;
 
-  public RegionCoprocessorRpcChannel(HConnection conn, TableName table, byte[] row) {
+  private RpcControllerFactory rpcController;
+
+  public RegionCoprocessorRpcChannel(HConnection conn, TableName table, byte[] row,
+      RpcRetryingCallerFactory rpcFactory, RpcControllerFactory rpcControllerFactory) {
     this.connection = conn;
     this.table = table;
     this.row = row;
-    this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
+    this.rpcFactory = rpcFactory;
+    this.rpcController = rpcControllerFactory;// RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
   }
 
   @Override
@@ -80,11 +84,13 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
             .setServiceName(method.getService().getFullName())
             .setMethodName(method.getName())
             .setRequest(request.toByteString()).build();
+    final PayloadCarryingRpcController controller = rpcController.newController();
+    controller.setPriority(table);
     RegionServerCallable<CoprocessorServiceResponse> callable =
         new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
           public CoprocessorServiceResponse call() throws Exception {
             byte[] regionName = getLocation().getRegionInfo().getRegionName();
-            return ProtobufUtil.execService(getStub(), call, regionName);
+            return ProtobufUtil.execService(getStub(), call, regionName, controller);
           }
         };
     CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
new file mode 100644
index 0000000..2ffab8d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * Factory to create a {@link PayloadCarryingRpcController}
+ */
+public class RpcControllerFactory {
+
+  public static final String CUSTOM_CONTROLLER_CONF_KEY = "hbase.rpc.controllerfactory.class";
+  protected final Configuration conf;
+
+  public RpcControllerFactory(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public PayloadCarryingRpcController newController() {
+    return new PayloadCarryingRpcController();
+  }
+  
+  public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
+    return new PayloadCarryingRpcController(cellScanner);
+  }
+
+  public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
+    return new PayloadCarryingRpcController(cellIterables);
+  }
+  
+
+  public static RpcControllerFactory instantiate(Configuration configuration) {
+    String rpcControllerFactoryClazz =
+        configuration.get(CUSTOM_CONTROLLER_CONF_KEY,
+          RpcControllerFactory.class.getName());
+    return ReflectionUtils.instantiateWithCustomCtor(rpcControllerFactoryClazz,
+      new Class[] { Configuration.class }, new Object[] { configuration });
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 3e00637..0f393d9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -1429,7 +1430,7 @@ public final class ProtobufUtil {
 // Start helpers for Client
 
   /**
-   * A helper to invoke a Get using client protocol.
+   * A helper to invoke a Get using client protocol. Uses the standard (i.e. not configured) {@link PayloadCarryingRpcController} semantics
    *
    * @param client
    * @param regionName
@@ -1439,10 +1440,25 @@ public final class ProtobufUtil {
    */
   public static Result get(final ClientService.BlockingInterface client,
       final byte[] regionName, final Get get) throws IOException {
+    return get(client, regionName, get, null);
+  }
+  
+  /**
+   * A helper to invoke a Get using client protocol.
+   *
+   * @param client
+   * @param regionName
+   * @param get
+   * @param controller to use when writing the rpc
+   * @return the result of the Get
+   * @throws IOException
+   */
+  public static Result get(final ClientService.BlockingInterface client, final byte[] regionName,
+      final Get get, PayloadCarryingRpcController controller) throws IOException {
     GetRequest request =
       RequestConverter.buildGetRequest(regionName, get);
     try {
-      GetResponse response = client.get(null, request);
+      GetResponse response = client.get(controller, request);
       if (response == null) return null;
       return toResult(response.getResult());
     } catch (ServiceException se) {
@@ -1451,23 +1467,38 @@ public final class ProtobufUtil {
   }
 
   /**
+   * A helper to get a row of the closet one before using client protocol without setting any
+   * special (i.e. configured) {@link PayloadCarryingRpcController}
+   * @param client
+   * @param regionName
+   * @param row
+   * @param family
+   * @return the row or the closestRowBefore if it doesn't exist
+   * @throws IOException
+   */
+  public static Result getRowOrBefore(final ClientService.BlockingInterface client,
+      final byte[] regionName, final byte[] row, final byte[] family) throws IOException {
+    return getRowOrBefore(client, regionName, row, family, null);
+  }
+  
+  /**
    * A helper to get a row of the closet one before using client protocol.
-   *
    * @param client
    * @param regionName
    * @param row
    * @param family
+   * @param payloadCarryingRpcController
    * @return the row or the closestRowBefore if it doesn't exist
    * @throws IOException
    */
   public static Result getRowOrBefore(final ClientService.BlockingInterface client,
-      final byte[] regionName, final byte[] row,
-      final byte[] family) throws IOException {
+      final byte[] regionName, final byte[] row, final byte[] family,
+      PayloadCarryingRpcController payloadCarryingRpcController) throws IOException {
     GetRequest request =
       RequestConverter.buildGetRowOrBeforeRequest(
         regionName, row, family);
     try {
-      GetResponse response = client.get(null, request);
+      GetResponse response = client.get(payloadCarryingRpcController, request);
       if (!response.hasResult()) return null;
       return toResult(response.getResult());
     } catch (ServiceException se) {
@@ -1488,11 +1519,28 @@ public final class ProtobufUtil {
   public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum) throws IOException {
+    return bulkLoadHFile(client, familyPaths, regionName, assignSeqNum, null);
+  }
+    
+  /**
+   * A helper to bulk load a list of HFiles using client protocol.
+   *
+   * @param client
+   * @param familyPaths
+   * @param regionName
+   * @param assignSeqNum
+   * @param controller
+   * @return true if all are loaded
+   * @throws IOException
+   */
+  public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
+      final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
+      PayloadCarryingRpcController controller) throws IOException {
     BulkLoadHFileRequest request =
       RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum);
     try {
       BulkLoadHFileResponse response =
-        client.bulkLoadHFile(null, request);
+        client.bulkLoadHFile(controller, request);
       return response.getLoaded();
     } catch (ServiceException se) {
       throw getRemoteException(se);
@@ -1501,12 +1549,18 @@ public final class ProtobufUtil {
 
   public static CoprocessorServiceResponse execService(final ClientService.BlockingInterface client,
       final CoprocessorServiceCall call, final byte[] regionName) throws IOException {
+    return execService(client, call, regionName, null);
+  }
+
+  public static CoprocessorServiceResponse execService(
+      final ClientService.BlockingInterface client, final CoprocessorServiceCall call,
+      final byte[] regionName, PayloadCarryingRpcController controller) throws IOException {
     CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
         .setCall(call).setRegion(
             RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build();
     try {
       CoprocessorServiceResponse response =
-          client.execService(null, request);
+          client.execService(controller, request);
       return response;
     } catch (ServiceException se) {
       throw getRemoteException(se);

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 362b183..4c93f44 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Assert;
@@ -99,7 +100,7 @@ public class TestAsyncProcess {
                           AtomicInteger nbThreads) {
       super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
-          callback, conf, new RpcRetryingCallerFactory(conf));
+          callback, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 917a7c9..3845d6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
@@ -68,6 +69,7 @@ public class WALEditsReplaySink {
   private final AtomicLong totalReplayedEdits = new AtomicLong();
   private final boolean skipErrors;
   private final int replayTimeout;
+  private RpcControllerFactory rpcControllerFactory;
 
   /**
    * Create a sink for WAL log entries replay
@@ -86,6 +88,7 @@ public class WALEditsReplaySink {
       HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
     // a single replay operation time out and default is 60 seconds
     this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
+    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/796134e9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
new file mode 100644
index 0000000..02c2ef8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
+import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(MediumTests.class)
+public class TestRpcControllerFactory {
+
+  public static class StaticRpcControllerFactory extends RpcControllerFactory {
+
+    public StaticRpcControllerFactory(Configuration conf) {
+      super(conf);
+    }
+
+    public PayloadCarryingRpcController newController() {
+      return new CountingRpcController(super.newController());
+    }
+
+    public PayloadCarryingRpcController newController(final CellScanner cellScanner) {
+      return new CountingRpcController(super.newController(cellScanner));
+    }
+
+    public PayloadCarryingRpcController newController(final List<CellScannable> cellIterables) {
+      return new CountingRpcController(super.newController(cellIterables));
+    }
+  }
+
+  public static class CountingRpcController extends DelegatingPayloadCarryingRpcController {
+
+    private static AtomicInteger INT_PRIORITY = new AtomicInteger();
+    private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
+
+    public CountingRpcController(PayloadCarryingRpcController delegate) {
+      super(delegate);
+    }
+
+    @Override
+    public void setPriority(int priority) {
+      super.setPriority(priority);
+      INT_PRIORITY.incrementAndGet();
+    }
+
+    @Override
+    public void setPriority(TableName tn) {
+      super.setPriority(tn);
+      // ignore counts for system tables - it could change and we really only want to check on what
+      // the client should change
+      if (!tn.isSystemTable()) {
+        TABLE_PRIORITY.incrementAndGet();
+      }
+
+    }
+  }
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    // load an endpoint so we have an endpoint to test - it doesn't matter which one, but
+    // this is already in tests, so we can just use it.
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      ProtobufCoprocessorService.class.getName());
+    
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * check some of the methods and make sure we are incrementing each time. Its a bit tediuous to
+   * cover all methods here and really is a bit brittle since we can always add new methods but
+   * won't be sure to add them here. So we just can cover the major ones.
+   * @throws Exception on failure
+   */
+  @Test
+  public void testCountController() throws Exception {
+    Configuration conf = new Configuration(UTIL.getConfiguration());
+    // setup our custom controller
+    conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+      StaticRpcControllerFactory.class.getName());
+
+    TableName name = TableName.valueOf("testcustomcontroller");
+    UTIL.createTable(name, fam1).close();
+
+    // change one of the connection properties so we get a new HConnection with our configuration
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
+
+    HTable table = new HTable(conf, name);
+    table.setAutoFlushTo(false);
+    byte[] row = Bytes.toBytes("row");
+    Put p = new Put(row);
+    p.add(fam1, fam1, Bytes.toBytes("val0"));
+    table.put(p);
+    table.flushCommits();
+    Integer counter = 1;
+    counter = verifyCount(counter);
+
+    Delete d = new Delete(row);
+    d.deleteColumn(fam1, fam1);
+    table.delete(d);
+    counter = verifyCount(counter);
+
+    Put p2 = new Put(row);
+    p2.add(fam1, Bytes.toBytes("qual"), Bytes.toBytes("val1"));
+    table.batch(Lists.newArrayList(p, p2), new Object[2]);
+    // this only goes to a single server, so we don't need to change the count here
+    counter = verifyCount(counter);
+
+    Append append = new Append(row);
+    append.add(fam1, fam1, Bytes.toBytes("val2"));
+    table.append(append);
+    counter = verifyCount(counter);
+
+    // and check the major lookup calls as well
+    Get g = new Get(row);
+    table.get(g);
+    counter = verifyCount(counter);
+
+    ResultScanner scan = table.getScanner(fam1);
+    scan.next();
+    scan.close();
+    counter = verifyCount(counter);
+
+    Get g2 = new Get(row);
+    table.get(Lists.newArrayList(g, g2));
+    // same server, so same as above for not changing count
+    counter = verifyCount(counter);
+
+    // make sure all the scanner types are covered
+    Scan scanInfo = new Scan(row);
+    // regular small
+    scanInfo.setSmall(true);
+    counter = doScan(table, scanInfo, counter);
+
+    // reversed, small
+    scanInfo.setReversed(true);
+    counter = doScan(table, scanInfo, counter);
+
+    // reversed, regular
+    scanInfo.setSmall(false);
+    counter = doScan(table, scanInfo, counter);
+
+    table.close();
+  }
+
+  int doScan(HTable table, Scan scan, int expectedCount) throws IOException {
+    ResultScanner results = table.getScanner(scan);
+    results.next();
+    results.close();
+    return verifyCount(expectedCount);
+  }
+
+  int verifyCount(Integer counter) {
+    assertEquals(counter.intValue(), CountingRpcController.TABLE_PRIORITY.get());
+    assertEquals(0, CountingRpcController.INT_PRIORITY.get());
+    return counter + 1;
+  }
+}
\ No newline at end of file


Mime
View raw message