hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From syuanji...@apache.org
Subject [37/50] [abbrv] hbase git commit: REVERT of revert of "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base." This is a revert of a revert; i.e. w
Date Wed, 17 Aug 2016 18:34:43 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7b1547d..f460bdb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -21,9 +21,11 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@@ -38,41 +40,35 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.token.Token;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * Client proxy for SecureBulkLoadProtocol
  */
 @InterfaceAudience.Private
 public class SecureBulkLoadClient {
   private Table table;
+  private final RpcControllerFactory rpcControllerFactory;
 
-  public SecureBulkLoadClient(Table table) {
+  public SecureBulkLoadClient(final Configuration conf, Table table) {
     this.table = table;
+    this.rpcControllerFactory = new RpcControllerFactory(conf);
   }
 
   public String prepareBulkLoad(final Connection conn) throws IOException {
     try {
-      RegionServerCallable<String> callable =
-          new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
-            @Override
-            public String call(int callTimeout) throws IOException {
-              byte[] regionName = getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region =
-                  RequestConverter
-                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
-              try {
-                PrepareBulkLoadRequest request =
-                    PrepareBulkLoadRequest.newBuilder()
-                        .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
-                        .setRegion(region).build();
-                PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
-                return response.getBulkToken();
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-            }
-          };
+      RegionServerCallable<String> callable = new RegionServerCallable<String>(conn,
+          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+        @Override
+        protected String rpcCall() throws Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region =
+              RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+          PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
+              .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
+              .setRegion(region).build();
+          PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
+          return response.getBulkToken();
+        }
+      };
       return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -82,24 +78,19 @@ public class SecureBulkLoadClient {
 
   public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
     try {
-      RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) {
-            @Override
-            public Void call(int callTimeout) throws IOException {
-              byte[] regionName = getLocation().getRegionInfo().getRegionName();
-              RegionSpecifier region = RequestConverter.buildRegionSpecifier(
-                RegionSpecifierType.REGION_NAME, regionName);
-              try {
-                CleanupBulkLoadRequest request =
-                    CleanupBulkLoadRequest.newBuilder().setRegion(region)
-                        .setBulkToken(bulkToken).build();
-                getStub().cleanupBulkLoad(null, request);
-              } catch (ServiceException se) {
-                throw ProtobufUtil.getRemoteException(se);
-              }
-              return null;
-            }
-          };
+      RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+          this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) {
+        @Override
+        protected Void rpcCall() throws Exception {
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          RegionSpecifier region = RequestConverter.buildRegionSpecifier(
+              RegionSpecifierType.REGION_NAME, regionName);
+          CleanupBulkLoadRequest request =
+              CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
+          getStub().cleanupBulkLoad(null, request);
+          return null;
+        }
+      };
       RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
           .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
     } catch (Throwable throwable) {
@@ -130,12 +121,12 @@ public class SecureBulkLoadClient {
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
       return response.getLoaded();
-    } catch (ServiceException se) {
-      throw ProtobufUtil.getRemoteException(se);
+    } catch (Exception se) {
+      throw ProtobufUtil.handleRemoteException(se);
     }
   }
 
   public Path getStagingPath(String bulkToken, byte[] family) throws IOException {
     return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
index 6fae5cb..a6384e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java
@@ -77,5 +77,4 @@ public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
     }
     return response;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index f4f18b3..d9877dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
@@ -26,15 +28,26 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
 /**
  * Optionally carries Cells across the proxy/service interface down into ipc. On its
- * way out it optionally carries a set of result Cell data.  We stick the Cells here when we want
- * to avoid having to protobuf them.  This class is used ferrying data across the proxy/protobuf
- * service chasm.  Used by client and server ipc'ing.
+ * way out it optionally carries a set of result Cell data. We stick the Cells here when we want
+ * to avoid having to protobuf them (for performance reasons). This class is used ferrying data
+ * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server
+ * ipc'ing.
  */
 @InterfaceAudience.Private
-public class PayloadCarryingRpcController
-    extends TimeLimitedRpcController implements CellScannable {
+public class PayloadCarryingRpcController implements RpcController, CellScannable {
+  /**
+   * The time, in ms before the call should expire.
+   */
+  protected volatile Integer callTimeout;
+  protected volatile boolean cancelled = false;
+  protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null);
+  protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null);
+  private IOException exception;
 
   public static final int PRIORITY_UNSET = -1;
   /**
@@ -88,8 +101,8 @@ public class PayloadCarryingRpcController
    * @param tn Set priority based off the table we are going against.
    */
   public void setPriority(final TableName tn) {
-    this.priority =
-        (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS;
+    setPriority(tn != null && tn.isSystemTable()? HConstants.SYSTEMTABLE_QOS:
+      HConstants.NORMAL_QOS);
   }
 
   /**
@@ -99,9 +112,103 @@ public class PayloadCarryingRpcController
     return priority;
   }
 
-  @Override public void reset() {
-    super.reset();
+  @Override
+  public void reset() {
     priority = 0;
     cellScanner = null;
+    exception = null;
+    cancelled = false;
+    failureCb.set(null);
+    cancellationCb.set(null);
+    callTimeout = null;
+  }
+
+  public int getCallTimeout() {
+    if (callTimeout != null) {
+      return callTimeout;
+    } else {
+      return 0;
+    }
+  }
+
+  public void setCallTimeout(int callTimeout) {
+    this.callTimeout = callTimeout;
+  }
+
+  public boolean hasCallTimeout(){
+    return callTimeout != null;
+  }
+
+  @Override
+  public String errorText() {
+    if (exception != null) {
+      return exception.getMessage();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * For use in async rpc clients
+   * @return true if failed
+   */
+  @Override
+  public boolean failed() {
+    return this.exception != null;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return cancelled;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
+    this.cancellationCb.set(cancellationCb);
+    if (this.cancelled) {
+      cancellationCb.run(null);
+    }
+  }
+
+  /**
+   * Notify a callback on error.
+   * For use in async rpc clients
+   *
+   * @param failureCb the callback to call on error
+   */
+  public void notifyOnFail(RpcCallback<IOException> failureCb) {
+    this.failureCb.set(failureCb);
+    if (this.exception != null) {
+      failureCb.run(this.exception);
+    }
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    this.exception = new IOException(reason);
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  /**
+   * Set failed with an exception to pass on.
+   * For use in async rpc clients
+   *
+   * @param e exception to set with
+   */
+  public void setFailed(IOException e) {
+    this.exception = e;
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  @Override
+  public void startCancel() {
+    cancelled = true;
+    if (cancellationCb.get() != null) {
+      cancellationCb.get().run(null);
+    }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
index 55d6375..209deed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
@@ -36,11 +36,12 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
- * Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
+ * Provides clients with an RPC connection to call Coprocessor Endpoint
+ * {@link com.google.protobuf.Service}s
  * against a given table region.  An instance of this class may be obtained
  * by calling {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])},
- * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to call the endpoint
- * methods.
+ * but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to
+ * call the endpoint methods.
  * @see org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])
  */
 @InterfaceAudience.Private
@@ -76,30 +77,21 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
       Descriptors.MethodDescriptor method, Message request, Message responsePrototype)
           throws IOException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Call: "+method.getName()+", "+request.toString());
+      LOG.trace("Call: " + method.getName() + ", " + request.toString());
     }
-
     if (row == null) {
       throw new IllegalArgumentException("Missing row property for remote region location");
     }
-
-    final RpcController rpcController = controller == null
-        ? rpcControllerFactory.newController() : controller;
-
     final ClientProtos.CoprocessorServiceCall call =
         CoprocessorRpcUtils.buildServiceCall(row, method, request);
     RegionServerCallable<CoprocessorServiceResponse> callable =
-        new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) {
+        new RegionServerCallable<CoprocessorServiceResponse>(connection,
+          controller == null? this.rpcControllerFactory.newController(): controller,
+          table, row) {
       @Override
-      public CoprocessorServiceResponse call(int callTimeout) throws Exception {
-        if (rpcController instanceof PayloadCarryingRpcController) {
-          ((PayloadCarryingRpcController) rpcController).setPriority(tableName);
-        }
-        if (rpcController instanceof TimeLimitedRpcController) {
-          ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout);
-        }
+      protected CoprocessorServiceResponse rpcCall() throws Exception {
         byte[] regionName = getLocation().getRegionInfo().getRegionName();
-        return ProtobufUtil.execService(rpcController, getStub(), call, regionName);
+        return ProtobufUtil.execService(getRpcController(), getStub(), call, regionName);
       }
     };
     CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()
@@ -126,4 +118,4 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
   public byte[] getLastRegion() {
     return lastRegion;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
index faeca8d..4b84df1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcControllerFactory.java
@@ -47,6 +47,7 @@ public class RpcControllerFactory {
   }
 
   public PayloadCarryingRpcController newController() {
+    // TODO: Set HConstants default rpc timeout here rather than nothing?
     return new PayloadCarryingRpcController();
   }
 
@@ -80,4 +81,4 @@ public class RpcControllerFactory {
       return new RpcControllerFactory(configuration);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
deleted file mode 100644
index cf08ea9..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.Private
-public class TimeLimitedRpcController implements RpcController {
-
-  /**
-   * The time, in ms before the call should expire.
-   */
-  protected volatile Integer callTimeout;
-  protected volatile boolean cancelled = false;
-  protected final AtomicReference<RpcCallback<Object>> cancellationCb =
-      new AtomicReference<>(null);
-
-  protected final AtomicReference<RpcCallback<IOException>> failureCb =
-      new AtomicReference<>(null);
-
-  private IOException exception;
-
-  public int getCallTimeout() {
-    if (callTimeout != null) {
-      return callTimeout;
-    } else {
-      return 0;
-    }
-  }
-
-  public void setCallTimeout(int callTimeout) {
-    this.callTimeout = callTimeout;
-  }
-
-  public boolean hasCallTimeout(){
-    return callTimeout != null;
-  }
-
-  @Override
-  public String errorText() {
-    if (exception != null) {
-      return exception.getMessage();
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * For use in async rpc clients
-   * @return true if failed
-   */
-  @Override
-  public boolean failed() {
-    return this.exception != null;
-  }
-
-  @Override
-  public boolean isCanceled() {
-    return cancelled;
-  }
-
-  @Override
-  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
-    this.cancellationCb.set(cancellationCb);
-    if (this.cancelled) {
-      cancellationCb.run(null);
-    }
-  }
-
-  /**
-   * Notify a callback on error.
-   * For use in async rpc clients
-   *
-   * @param failureCb the callback to call on error
-   */
-  public void notifyOnFail(RpcCallback<IOException> failureCb) {
-    this.failureCb.set(failureCb);
-    if (this.exception != null) {
-      failureCb.run(this.exception);
-    }
-  }
-
-  @Override
-  public void reset() {
-    exception = null;
-    cancelled = false;
-    failureCb.set(null);
-    cancellationCb.set(null);
-    callTimeout = null;
-  }
-
-  @Override
-  public void setFailed(String reason) {
-    this.exception = new IOException(reason);
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  /**
-   * Set failed with an exception to pass on.
-   * For use in async rpc clients
-   *
-   * @param e exception to set with
-   */
-  public void setFailed(IOException e) {
-    this.exception = e;
-    if (this.failureCb.get() != null) {
-      this.failureCb.get().run(this.exception);
-    }
-  }
-
-  @Override
-  public void startCancel() {
-    cancelled = true;
-    if (cancellationCb.get() != null) {
-      cancellationCb.get().run(null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 5ba0572..623acd5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
+import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
+.RegionSpecifierType.REGION_NAME;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -38,14 +41,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
-.RegionSpecifierType.REGION_NAME;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -124,8 +125,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
-import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
 import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
+import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
 import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@@ -171,11 +172,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Methods;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -334,17 +333,32 @@ public final class ProtobufUtil {
    *   a new IOException that wraps the unexpected ServiceException.
    */
   public static IOException getRemoteException(ServiceException se) {
-    Throwable e = se.getCause();
-    if (e == null) {
-      return new IOException(se);
+    return makeIOExceptionOfException(se);
+  }
+
+  /**
+   * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than
+   * just {@link ServiceException}. Prefer this method to
+   * {@link #getRemoteException(ServiceException)} because trying to
+   * contain direct protobuf references.
+   * @param e
+   */
+  public static IOException handleRemoteException(Exception e) {
+    return makeIOExceptionOfException(e);
+  }
+
+  private static IOException makeIOExceptionOfException(Exception e) {
+    Throwable t = e;
+    if (e instanceof ServiceException) {
+      t = e.getCause();
     }
-    if (ExceptionUtil.isInterrupt(e)) {
-      return ExceptionUtil.asInterrupt(e);
+    if (ExceptionUtil.isInterrupt(t)) {
+      return ExceptionUtil.asInterrupt(t);
     }
-    if (e instanceof RemoteException) {
-      e = ((RemoteException) e).unwrapRemoteException();
+    if (t instanceof RemoteException) {
+      t = ((RemoteException)t).unwrapRemoteException();
     }
-    return e instanceof IOException ? (IOException) e : new IOException(se);
+    return t instanceof IOException? (IOException)t: new HBaseIOException(t);
   }
 
   /**
@@ -1252,7 +1266,6 @@ public final class ProtobufUtil {
     return toMutation(type, mutation, builder, HConstants.NO_NONCE);
   }
 
-  @SuppressWarnings("deprecation")
   public static MutationProto toMutation(final MutationType type, final Mutation mutation,
       MutationProto.Builder builder, long nonce)
   throws IOException {
@@ -2658,13 +2671,11 @@ public final class ProtobufUtil {
     }
   }
 
-  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
       List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir);
   }
 
-  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName,
       byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     // compaction descriptor contains relative paths.
@@ -3663,4 +3674,28 @@ public final class ProtobufUtil {
     return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(),
         stats.getCompactionPressure());
   }
-}
+
+  /**
+   * @param msg
+   * @return A String version of the passed in <code>msg</code>
+   */
+  public static String toText(Message msg) {
+    return TextFormat.shortDebugString(msg);
+  }
+
+  public static byte [] toBytes(ByteString bs) {
+    return bs.toByteArray();
+  }
+
+  /**
+   * Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it.
+   * @throws IOException
+   */
+  public static <T> T call(Callable<T> callable) throws IOException {
+    try {
+      return callable.call();
+    } catch (Exception e) {
+      throw ProtobufUtil.handleRemoteException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 0aa9704..5959078 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -190,7 +190,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable callable) {
+        CancellableRegionServerCallable callable) {
       callsCt.incrementAndGet();
       MultiServerCallable callable1 = (MultiServerCallable) callable;
       final MultiResponse mr = createMultiResponse(
@@ -253,7 +253,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-      PayloadCarryingServerCallable callable) {
+      CancellableRegionServerCallable callable) {
       callsCt.incrementAndGet();
       return new CallerWithFailure(ioe);
     }
@@ -290,7 +290,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable payloadCallable) {
+        CancellableRegionServerCallable payloadCallable) {
       MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
       final MultiResponse mr = createMultiResponse(
           callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index f083001..fd2a393 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -65,7 +65,6 @@ public class TestClientScanner {
   RpcControllerFactory controllerFactory;
 
   @Before
-  @SuppressWarnings("deprecation")
   public void setup() throws IOException {
     clusterConn = Mockito.mock(ClusterConnection.class);
     rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
index 9c3367e..edcbdc5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
@@ -45,4 +45,5 @@ public class HBaseIOException extends IOException {
 
   public HBaseIOException(Throwable cause) {
       super(cause);
-  }}
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
index 688b51a..7e6c5d6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
@@ -73,4 +73,4 @@ public class ExceptionUtil {
 
   private ExceptionUtil() {
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 5b2aab1..4b27924 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -83,9 +83,9 @@ import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 09dedec..a34dc0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -20,11 +20,6 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static java.lang.String.format;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -87,12 +82,12 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSHDFSUtils;
@@ -100,9 +95,13 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
- * @see #usage()
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -130,11 +129,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private String bulkToken;
   private UserProvider userProvider;
   private int nrThreads;
+  private RpcControllerFactory rpcControllerFactory;
 
   private LoadIncrementalHFiles() {}
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
     super(conf);
+    this.rpcControllerFactory = new RpcControllerFactory(conf);
     initialize();
   }
 
@@ -322,7 +323,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
-    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table);
+    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table.getConfiguration(), table);
 
     try {
       /*
@@ -473,9 +474,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Used by the replication sink to load the hfiles from the source cluster. It does the following,
-   * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
-   * {@link
-   * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+   * <ol>
+   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+   * </li>
+   * </ol>
    * @param table Table to which these hfiles should be loaded to
    * @param conn Connection to use
    * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
@@ -776,27 +779,23 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
       final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
   throws IOException {
-    final List<Pair<byte[], String>> famPaths =
-      new ArrayList<>(lqis.size());
+    final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
       famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
     }
-
-    final RegionServerCallable<Boolean> svrCallable =
-        new RegionServerCallable<Boolean>(conn, tableName, first) {
+    final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
+        rpcControllerFactory, tableName, first) {
       @Override
-      public Boolean call(int callTimeout) throws Exception {
+      protected Boolean rpcCall() throws Exception {
         SecureBulkLoadClient secureClient = null;
         boolean success = false;
-
         try {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
-            success =
-                secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+            secureClient = new SecureBulkLoadClient(getConf(), table);
+            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
                   assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
           }
           return success;
@@ -1078,7 +1077,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
-   * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+   * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
    * property. This directory is used as a temporary directory where all files are initially
    * copied/moved from user given directory, set all the required file permissions and then from
    * their it is finally loaded into a table. This should be set only when, one would like to manage
@@ -1088,5 +1087,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   public void setBulkToken(String stagingDir) {
     this.bulkToken = stagingDir;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index a21edcc..3261bd6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
  * mob files.
@@ -86,10 +84,6 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
             } catch (LockTimeoutException e) {
               LOG.info("Fail to acquire the lock because of timeout, maybe a"
                 + " MobCompactor is running", e);
-            } catch (ServiceException e) {
-              LOG.error(
-                "Fail to clean the expired mob files for the column " + hcd.getNameAsString()
-                  + " in the table " + htd.getNameAsString(), e);
             } catch (IOException e) {
               LOG.error(
                 "Fail to clean the expired mob files for the column " + hcd.getNameAsString()

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index ad1a3ca..326aa00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -18,14 +18,6 @@
  */
 package org.apache.hadoop.hbase.master;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -92,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessController;
 import org.apache.hadoop.hbase.security.visibility.VisibilityController;
@@ -103,6 +94,14 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
 /**
  * Implements the master RPC services.
  */

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 531883a..d7ba4f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@@ -454,8 +454,7 @@ public class ServerManager {
   /**
    * Adds the onlineServers list. onlineServers should be locked.
    * @param serverName The remote servers name.
-   * @param sl
-   * @return Server load from the removed server, if any.
+   * @param s
    */
   @VisibleForTesting
   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
index 1499788..96ea036 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/TableStateManager.java
@@ -188,7 +188,6 @@ public class TableStateManager {
     return MetaTableAccessor.getTableState(master.getConnection(), tableName);
   }
 
-  @SuppressWarnings("deprecation")
   public void start() throws IOException {
     TableDescriptors tableDescriptors = master.getTableDescriptors();
     Connection connection = master.getConnection();
@@ -220,4 +219,4 @@ public class TableStateManager {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
index 3c965cb..d4a54bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -41,8 +41,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The cleaner to delete the expired MOB files.
  */
@@ -60,11 +58,8 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
    * directory.
    * @param tableName The current table name.
    * @param family The current family.
-   * @throws ServiceException
-   * @throws IOException
    */
-  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
-      throws ServiceException, IOException {
+  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException {
     Configuration conf = getConf();
     TableName tn = TableName.valueOf(tableName);
     FileSystem fs = FileSystem.get(conf);
@@ -99,7 +94,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
     String tableName = args[0];
     String familyName = args[1];
     TableName tn = TableName.valueOf(tableName);
-    HBaseAdmin.checkHBaseAvailable(getConf());
+    HBaseAdmin.available(getConf());
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {
@@ -127,5 +122,4 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
       }
     }
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
index 8547c8c..c27e8ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -38,8 +38,6 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
 
-import com.google.protobuf.ServiceException;
-
 /**
  * The sweep tool. It deletes the mob files that are not used and merges the small mob files to
  * bigger ones. Each run of this sweep tool only handles one column family. The runs on
@@ -64,10 +62,10 @@ public class Sweeper extends Configured implements Tool {
    * @throws ServiceException
    */
   int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
-      ClassNotFoundException, KeeperException, ServiceException {
+      ClassNotFoundException, KeeperException {
     Configuration conf = getConf();
     // make sure the target HBase exists.
-    HBaseAdmin.checkHBaseAvailable(conf);
+    HBaseAdmin.available(conf);
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 0b4ae75..89bfbf3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -18,17 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -106,7 +95,6 @@ import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.http.HttpServer;
 import org.apache.hadoop.hbase.http.InfoServer;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -198,6 +186,17 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
 import sun.misc.Signal;
 import sun.misc.SignalHandler;
 
@@ -206,7 +205,7 @@ import sun.misc.SignalHandler;
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-@SuppressWarnings("deprecation")
+@SuppressWarnings({ "deprecation", "restriction" })
 public class HRegionServer extends HasThread implements
     RegionServerServices, LastSequenceId, ConfigurationObserver {
 
@@ -818,9 +817,8 @@ public class HRegionServer extends HasThread implements
     // when ready.
     blockAndCheckIfStopped(this.clusterStatusTracker);
 
-    if (this.initLatch != null) {
-      this.initLatch.await(20, TimeUnit.SECONDS);
-    }
+    doLatch(this.initLatch);
+
     // Retrieve clusterId
     // Since cluster status is now up
     // ID should have already been set by HMaster
@@ -855,6 +853,16 @@ public class HRegionServer extends HasThread implements
     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
   }
 
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RV_RETURN_VALUE_IGNORED",
+      justification="We don't care about the return")
+  private void doLatch(final CountDownLatch latch) throws InterruptedException {
+    if (latch != null) {
+      // Result is ignored intentionally but if I remove the below, findbugs complains (the
+      // above justification on this method doesn't seem to suppress it).
+      boolean result = latch.await(20, TimeUnit.SECONDS);
+    }
+  }
+
   /**
    * Utilty method to wait indefinitely on a znode availability while checking
    * if the region server is shut down

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 681b1dc..3859d18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -1381,8 +1380,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         family = request.getFamily().toByteArray();
         store = region.getStore(family);
         if (store == null) {
-          throw new ServiceException(new IOException("column family " + Bytes.toString(family)
-            + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
+          throw new ServiceException(new DoNotRetryIOException("column family " +
+              Bytes.toString(family) + " does not exist in region " +
+              region.getRegionInfo().getRegionNameAsString()));
         }
       }
       if (request.hasMajor()) {
@@ -2767,12 +2767,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                     timeLimitDelta =
                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
                   }
-                  if (controller instanceof TimeLimitedRpcController) {
-                    TimeLimitedRpcController timeLimitedRpcController =
-                        (TimeLimitedRpcController)controller;
-                    if (timeLimitedRpcController.getCallTimeout() > 0) {
-                      timeLimitDelta = Math.min(timeLimitDelta,
-                          timeLimitedRpcController.getCallTimeout());
+                  if (controller != null) {
+                    if (controller instanceof PayloadCarryingRpcController) {
+                      PayloadCarryingRpcController pRpcController =
+                          (PayloadCarryingRpcController)controller;
+                      if (pRpcController.getCallTimeout() > 0) {
+                        timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
+                      }
+                    } else {
+                      throw new UnsupportedOperationException("We only do " +
+                        "PayloadCarryingRpcControllers! FIX IF A PROBLEM: " + controller);
                     }
                   }
                   // Use half of whichever timeout value was more restrictive... But don't allow

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index 3eb85bd..004581d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import com.google.protobuf.ServiceException;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -42,9 +40,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -61,10 +57,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.Private
 public class WALEditsReplaySink {
-
   private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
   private static final int MAX_BATCH_SIZE = 1024;
-
   private final Configuration conf;
   private final ClusterConnection conn;
   private final TableName tableName;
@@ -166,8 +160,8 @@ public class WALEditsReplaySink {
     try {
       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
       ReplayServerCallable<ReplicateWALEntryResponse> callable =
-          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
-              regionInfo, entries);
+          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory,
+              this.tableName, regionLoc, entries);
       factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
     } catch (IOException ie) {
       if (skipErrors) {
@@ -184,31 +178,18 @@ public class WALEditsReplaySink {
    * @param <R>
    */
   class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
-    private HRegionInfo regionInfo;
     private List<Entry> entries;
 
-    ReplayServerCallable(final Connection connection, final TableName tableName,
-        final HRegionLocation regionLoc, final HRegionInfo regionInfo,
-        final List<Entry> entries) {
-      super(connection, tableName, null);
+    ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
+        final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
+      super(connection, rpcControllerFactory, tableName, null);
       this.entries = entries;
-      this.regionInfo = regionInfo;
       setLocation(regionLoc);
     }
 
     @Override
-    public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
-      try {
-        replayToServer(this.regionInfo, this.entries);
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
-      return null;
-    }
-
-    private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
-        throws IOException, ServiceException {
-      if (entries.isEmpty()) return;
+    protected ReplicateWALEntryResponse rpcCall() throws Exception {
+      if (entries.isEmpty()) return null;
 
       Entry[] entriesArray = new Entry[entries.size()];
       entriesArray = entries.toArray(entriesArray);
@@ -216,12 +197,8 @@ public class WALEditsReplaySink {
 
       Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
           ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
-      PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
-      try {
-        remoteSvr.replay(controller, p.getFirst());
-      } catch (ServiceException se) {
-        throw ProtobufUtil.getRemoteException(se);
-      }
+      setRpcControllerCellScanner(p.getSecond());
+      return remoteSvr.replay(getRpcController(), p.getFirst());
     }
 
     @Override
@@ -245,4 +222,4 @@ public class WALEditsReplaySink {
       }
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index b0fd176..c756294 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
@@ -46,27 +45,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RetryingCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -74,12 +67,17 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
@@ -611,9 +609,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
    * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
    * the entry if the region boundaries have changed or the region is gone.
    */
-  static class RegionReplicaReplayCallable
-    extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
-
+  static class RegionReplicaReplayCallable extends
+      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
     private final List<Entry> entries;
     private final byte[] initialEncodedRegionName;
     private final AtomicLong skippedEntries;
@@ -628,38 +625,25 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
     }
 
-    @Override
-    public ReplicateWALEntryResponse call(int timeout) throws IOException {
-      return replayToServer(this.entries, timeout);
-    }
-
-    private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
-        throws IOException {
-      // check whether we should still replay this entry. If the regions are changed, or the
+    public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception {
+      // Check whether we should still replay this entry. If the regions are changed, or the
       // entry is not coming form the primary region, filter it out because we do not need it.
       // Regions can change because of (1) region split (2) region merge (3) table recreated
       boolean skip = false;
-
       if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
-        initialEncodedRegionName)) {
+          initialEncodedRegionName)) {
         skip = true;
       }
-      if (!entries.isEmpty() && !skip) {
-        Entry[] entriesArray = new Entry[entries.size()];
-        entriesArray = entries.toArray(entriesArray);
+      if (!this.entries.isEmpty() && !skip) {
+        Entry[] entriesArray = new Entry[this.entries.size()];
+        entriesArray = this.entries.toArray(entriesArray);
 
         // set the region name for the target region replica
         Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
             ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
                 .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
-        try {
-          PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
-          controller.setCallTimeout(timeout);
-          controller.setPriority(tableName);
-          return stub.replay(controller, p.getFirst());
-        } catch (ServiceException se) {
-          throw ProtobufUtil.getRemoteException(se);
-        }
+        controller.setCellScanner(p.getSecond());
+        return stub.replay(controller, p.getFirst());
       }
 
       if (skip) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
index 2e7cf7f..bbf858d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java
@@ -1145,8 +1145,11 @@ public final class Canary implements Tool {
     }
     List<RegionTask> tasks = new ArrayList<RegionTask>();
     try {
-      for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {        
-        tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
+      List<HRegionInfo> hris = admin.getTableRegions(tableDesc.getTableName());
+      if (hris != null) {
+        for (HRegionInfo region : hris) {
+          tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
+        }
       }
     } finally {
       table.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
index d708edc..3c81cfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
@@ -23,19 +23,18 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -80,13 +79,11 @@ public class Merge extends Configured implements Tool {
     // Verify HBase is down
     LOG.info("Verifying that HBase is not running...");
     try {
-      HBaseAdmin.checkHBaseAvailable(getConf());
+      HBaseAdmin.available(getConf());
       LOG.fatal("HBase cluster must be off-line, and is not. Aborting.");
       return -1;
     } catch (ZooKeeperConnectionException zkce) {
       // If no zk, presume no master.
-    } catch (MasterNotRunningException e) {
-      // Expected. Ignore.
     }
 
     // Initialize MetaUtils and and get the root of the HBase installation

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
index d778fa9..2dca6b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
@@ -60,7 +60,6 @@ public class TestNamespace {
   private static ZKNamespaceManager zkNamespaceManager;
   private String prefix = "TestNamespace";
 
-
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
@@ -301,7 +300,8 @@ public class TestNamespace {
     runWithExpectedException(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
-        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
+        HTableDescriptor htd =
+            new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
         htd.addFamily(new HColumnDescriptor("family1"));
         admin.createTable(htd);
         return null;
@@ -387,5 +387,4 @@ public class TestNamespace {
     }
     fail("Should have thrown exception " + exceptionClass);
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index d088fc4..3203636 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ProcedureInfo;
@@ -59,7 +58,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -67,8 +65,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import com.google.protobuf.ServiceException;
-
 
 /**
  * Class to test HBaseAdmin.
@@ -335,7 +331,8 @@ public class TestAdmin2 {
 
   @Test (timeout=300000)
   public void testCloseRegionIfInvalidRegionNameIsPassed() throws Exception {
-    byte[] TABLENAME = Bytes.toBytes("TestHBACloseRegion1");
+    final String name = "TestHBACloseRegion1";
+    byte[] TABLENAME = Bytes.toBytes(name);
     createTableWithDefaultConf(TABLENAME);
 
     HRegionInfo info = null;
@@ -343,7 +340,7 @@ public class TestAdmin2 {
     List<HRegionInfo> onlineRegions = ProtobufUtil.getOnlineRegions(rs.getRSRpcServices());
     for (HRegionInfo regionInfo : onlineRegions) {
       if (!regionInfo.isMetaTable()) {
-        if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion1")) {
+        if (regionInfo.getRegionNameAsString().contains(name)) {
           info = regionInfo;
           try {
             admin.closeRegionWithEncodedRegionName("sample", rs.getServerName()
@@ -643,11 +640,9 @@ public class TestAdmin2 {
 
     long start = System.currentTimeMillis();
     try {
-      HBaseAdmin.checkHBaseAvailable(conf);
+      HBaseAdmin.available(conf);
       assertTrue(false);
-    } catch (MasterNotRunningException ignored) {
     } catch (ZooKeeperConnectionException ignored) {
-    } catch (ServiceException ignored) {
     } catch (IOException ignored) {
     }
     long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 679d9c9..f49c558 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -28,13 +28,10 @@ import java.net.UnknownHostException;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
@@ -56,7 +53,6 @@ import com.google.protobuf.ServiceException;
 
 @Category({MediumTests.class, ClientTests.class})
 public class TestClientTimeouts {
-  private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   protected static int SLAVES = 1;
 
@@ -87,7 +83,6 @@ public class TestClientTimeouts {
    */
   @Test
   public void testAdminTimeout() throws Exception {
-    Connection lastConnection = null;
     boolean lastFailed = false;
     int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
     RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
@@ -105,7 +100,7 @@ public class TestClientTimeouts {
           connection = ConnectionFactory.createConnection(conf);
           admin = connection.getAdmin();
           // run some admin commands
-          HBaseAdmin.checkHBaseAvailable(conf);
+          HBaseAdmin.available(conf);
           admin.setBalancerRunning(false, false);
         } catch (ZooKeeperConnectionException ex) {
           // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index bfd16a7..bda80de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -84,6 +85,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Lists;
+import com.google.protobuf.RpcController;
 
 /**
  * This class is for testing HBaseConnectionManager features
@@ -104,8 +107,6 @@ public class TestHCM {
       TableName.valueOf("test2");
   private static final TableName TABLE_NAME3 =
       TableName.valueOf("test3");
-  private static final TableName TABLE_NAME4 =
-      TableName.valueOf("test4");
   private static final byte[] FAM_NAM = Bytes.toBytes("f");
   private static final byte[] ROW = Bytes.toBytes("bbb");
   private static final byte[] ROW_X = Bytes.toBytes("xxx");
@@ -525,10 +526,12 @@ public class TestHCM {
     long pauseTime;
     long baseTime = 100;
     TableName tableName = TableName.valueOf("HCM-testCallableSleep");
-    Table table = TEST_UTIL.createTable(tableName, FAM_NAM);
+    TEST_UTIL.createTable(tableName, FAM_NAM);
     RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
-        TEST_UTIL.getConnection(), tableName, ROW) {
-      public Object call(int timeout) throws IOException {
+        TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()),
+        tableName, ROW) {
+      @Override
+      protected Object rpcCall() throws Exception {
         return null;
       }
     };
@@ -542,9 +545,10 @@ public class TestHCM {
 
     RegionAdminServiceCallable<Object> regionAdminServiceCallable =
         new RegionAdminServiceCallable<Object>(
-        (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
-            TEST_UTIL.getConfiguration()), tableName, ROW) {
-      public Object call(int timeout) throws IOException {
+        (ClusterConnection) TEST_UTIL.getConnection(),
+          new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) {
+      @Override
+      public Object call(PayloadCarryingRpcController controller) throws Exception {
         return null;
       }
     };
@@ -556,16 +560,21 @@ public class TestHCM {
       assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
     }
 
-    MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) {
-      public Object call(int timeout) throws IOException {
+    MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(),
+        new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
+      @Override
+      protected Object rpcCall() throws Exception {
         return null;
       }
     };
-
-    for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
-      pauseTime = masterCallable.sleep(baseTime, i);
-      assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
-      assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+    try {
+      for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
+        pauseTime = masterCallable.sleep(baseTime, i);
+        assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
+        assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
+      }
+    } finally {
+      masterCallable.close();
     }
   }
 
@@ -1267,7 +1276,6 @@ public class TestHCM {
     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
     EnvironmentEdgeManager.injectEdge(timeMachine);
     try {
-      long timeBase = timeMachine.currentTime();
       long largeAmountOfTime = ANY_PAUSE * 1000;
       ConnectionImplementation.ServerErrorTracker tracker =
           new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 354f0a8..9b4e9f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -20,6 +20,15 @@
 
 package org.apache.hadoop.hbase.client;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +44,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -49,15 +59,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
 @Category({MediumTests.class, ClientTests.class})
 public class TestReplicaWithCluster {
   private static final Log LOG = LogFactory.getLog(TestReplicaWithCluster.class);
@@ -332,26 +333,27 @@ public class TestReplicaWithCluster {
 
     // bulk load HFiles
     LOG.debug("Loading test data");
-    @SuppressWarnings("deprecation")
     final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
     table = conn.getTable(hdt.getTableName());
-    final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
-    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
-      conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
-        @Override
-        public Void call(int timeout) throws Exception {
-          LOG.debug("Going to connect to server " + getLocation() + " for row "
+    final String bulkToken =
+        new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
+    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
+        new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(),
+        TestHRegionServerBulkLoad.rowkey(0)) {
+      @Override
+      protected Void rpcCall() throws Exception {
+        LOG.debug("Going to connect to server " + getLocation() + " for row "
             + Bytes.toStringBinary(getRow()));
-          SecureBulkLoadClient secureClient = null;
-          byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(table);
-            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-                  true, null, bulkToken);
-          }
-          return null;
+        SecureBulkLoadClient secureClient = null;
+        byte[] regionName = getLocation().getRegionInfo().getRegionName();
+        try (Table table = conn.getTable(getTableName())) {
+          secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
+          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+              true, null, bulkToken);
         }
-      };
+        return null;
+      }
+    };
     RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
     RpcRetryingCaller<Void> caller = factory.newCaller();
     caller.callWithRetries(callable, 10000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index b3cbd33..ffe3e82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -573,11 +573,11 @@ public class TestReplicasClient {
         Assert.assertTrue(((Result)r).isStale());
         Assert.assertTrue(((Result)r).getExists());
       }
-      Set<PayloadCarryingServerCallable> set =
+      Set<CancellableRegionServerCallable> set =
           ((AsyncRequestFutureImpl<?>)reqs).getCallsInProgress();
       // verify we did cancel unneeded calls
       Assert.assertTrue(!set.isEmpty());
-      for (PayloadCarryingServerCallable m : set) {
+      for (CancellableRegionServerCallable m : set) {
         Assert.assertTrue(m.isCancelled());
       }
     } finally {


Mime
View raw message