hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/4] hbase git commit: Revert "HBASE-16308 Contain protobuf references Gather up the pb references into a few locations only rather than have pb references distributed all about the code base."
Date Fri, 05 Aug 2016 22:18:59 GMT
Repository: hbase
Updated Branches:
  refs/heads/master ed87a81b4 -> 0206dc67d


http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
new file mode 100644
index 0000000..cf08ea9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/TimeLimitedRpcController.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class TimeLimitedRpcController implements RpcController {
+
+  /**
+   * The time, in ms before the call should expire.
+   */
+  protected volatile Integer callTimeout;
+  protected volatile boolean cancelled = false;
+  protected final AtomicReference<RpcCallback<Object>> cancellationCb =
+      new AtomicReference<>(null);
+
+  protected final AtomicReference<RpcCallback<IOException>> failureCb =
+      new AtomicReference<>(null);
+
+  private IOException exception;
+
+  public int getCallTimeout() {
+    if (callTimeout != null) {
+      return callTimeout;
+    } else {
+      return 0;
+    }
+  }
+
+  public void setCallTimeout(int callTimeout) {
+    this.callTimeout = callTimeout;
+  }
+
+  public boolean hasCallTimeout(){
+    return callTimeout != null;
+  }
+
+  @Override
+  public String errorText() {
+    if (exception != null) {
+      return exception.getMessage();
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * For use in async rpc clients
+   * @return true if failed
+   */
+  @Override
+  public boolean failed() {
+    return this.exception != null;
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return cancelled;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
+    this.cancellationCb.set(cancellationCb);
+    if (this.cancelled) {
+      cancellationCb.run(null);
+    }
+  }
+
+  /**
+   * Notify a callback on error.
+   * For use in async rpc clients
+   *
+   * @param failureCb the callback to call on error
+   */
+  public void notifyOnFail(RpcCallback<IOException> failureCb) {
+    this.failureCb.set(failureCb);
+    if (this.exception != null) {
+      failureCb.run(this.exception);
+    }
+  }
+
+  @Override
+  public void reset() {
+    exception = null;
+    cancelled = false;
+    failureCb.set(null);
+    cancellationCb.set(null);
+    callTimeout = null;
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    this.exception = new IOException(reason);
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  /**
+   * Set failed with an exception to pass on.
+   * For use in async rpc clients
+   *
+   * @param e exception to set with
+   */
+  public void setFailed(IOException e) {
+    this.exception = e;
+    if (this.failureCb.get() != null) {
+      this.failureCb.get().run(this.exception);
+    }
+  }
+
+  @Override
+  public void startCancel() {
+    cancelled = true;
+    if (cancellationCb.get() != null) {
+      cancellationCb.get().run(null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 623acd5..5ba0572 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hbase.protobuf;
 
-import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
-.RegionSpecifierType.REGION_NAME;
-
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -41,11 +38,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier
+.RegionSpecifierType.REGION_NAME;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -125,8 +124,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionInTransition;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
-import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
 import org.apache.hadoop.hbase.protobuf.generated.FSProtos.HBaseVersionFileContent;
+import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
 import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
@@ -172,9 +171,11 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Methods;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -333,32 +334,17 @@ public final class ProtobufUtil {
    *   a new IOException that wraps the unexpected ServiceException.
    */
   public static IOException getRemoteException(ServiceException se) {
-    return makeIOExceptionOfException(se);
-  }
-
-  /**
-   * Like {@link #getRemoteException(ServiceException)} but more generic, able to handle more than
-   * just {@link ServiceException}. Prefer this method to
-   * {@link #getRemoteException(ServiceException)} because trying to
-   * contain direct protobuf references.
-   * @param e
-   */
-  public static IOException handleRemoteException(Exception e) {
-    return makeIOExceptionOfException(e);
-  }
-
-  private static IOException makeIOExceptionOfException(Exception e) {
-    Throwable t = e;
-    if (e instanceof ServiceException) {
-      t = e.getCause();
+    Throwable e = se.getCause();
+    if (e == null) {
+      return new IOException(se);
     }
-    if (ExceptionUtil.isInterrupt(t)) {
-      return ExceptionUtil.asInterrupt(t);
+    if (ExceptionUtil.isInterrupt(e)) {
+      return ExceptionUtil.asInterrupt(e);
     }
-    if (t instanceof RemoteException) {
-      t = ((RemoteException)t).unwrapRemoteException();
+    if (e instanceof RemoteException) {
+      e = ((RemoteException) e).unwrapRemoteException();
     }
-    return t instanceof IOException? (IOException)t: new HBaseIOException(t);
+    return e instanceof IOException ? (IOException) e : new IOException(se);
   }
 
   /**
@@ -1266,6 +1252,7 @@ public final class ProtobufUtil {
     return toMutation(type, mutation, builder, HConstants.NO_NONCE);
   }
 
+  @SuppressWarnings("deprecation")
   public static MutationProto toMutation(final MutationType type, final Mutation mutation,
       MutationProto.Builder builder, long nonce)
   throws IOException {
@@ -2671,11 +2658,13 @@ public final class ProtobufUtil {
     }
   }
 
+  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] family,
       List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     return toCompactionDescriptor(info, null, family, inputPaths, outputPaths, storeDir);
   }
 
+  @SuppressWarnings("deprecation")
   public static CompactionDescriptor toCompactionDescriptor(HRegionInfo info, byte[] regionName,
       byte[] family, List<Path> inputPaths, List<Path> outputPaths, Path storeDir) {
     // compaction descriptor contains relative paths.
@@ -3674,28 +3663,4 @@ public final class ProtobufUtil {
     return new RegionLoadStats(stats.getMemstoreLoad(), stats.getHeapOccupancy(),
         stats.getCompactionPressure());
   }
-
-  /**
-   * @param msg
-   * @return A String version of the passed in <code>msg</code>
-   */
-  public static String toText(Message msg) {
-    return TextFormat.shortDebugString(msg);
-  }
-
-  public static byte [] toBytes(ByteString bs) {
-    return bs.toByteArray();
-  }
-
-  /**
-   * Contain ServiceException inside here. Take a callable that is doing our pb rpc and run it.
-   * @throws IOException
-   */
-  public static <T> T call(Callable<T> callable) throws IOException {
-    try {
-      return callable.call();
-    } catch (Exception e) {
-      throw ProtobufUtil.handleRemoteException(e);
-    }
-  }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index fd2a393..f083001 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -65,6 +65,7 @@ public class TestClientScanner {
   RpcControllerFactory controllerFactory;
 
   @Before
+  @SuppressWarnings("deprecation")
   public void setup() throws IOException {
     clusterConn = Mockito.mock(ClusterConnection.class);
     rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
index edcbdc5..9c3367e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseIOException.java
@@ -45,5 +45,4 @@ public class HBaseIOException extends IOException {
 
   public HBaseIOException(Throwable cause) {
       super(cause);
-  }
-}
\ No newline at end of file
+  }}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
index 7e6c5d6..688b51a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
@@ -73,4 +73,4 @@ public class ExceptionUtil {
 
   private ExceptionUtil() {
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ec28315..73226aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -75,17 +75,20 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -96,6 +99,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.AuthMethod;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 8ddbe18..09dedec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -87,8 +87,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@@ -104,6 +102,7 @@ import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
+ * @see #usage()
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -131,13 +130,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private String bulkToken;
   private UserProvider userProvider;
   private int nrThreads;
-  private RpcControllerFactory rpcControllerFactory;
 
   private LoadIncrementalHFiles() {}
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
     super(conf);
-    this.rpcControllerFactory = new RpcControllerFactory(conf);
     initialize();
   }
 
@@ -325,7 +322,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
     Deque<LoadQueueItem> queue = new LinkedList<>();
-    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table.getConfiguration(), table);
+    SecureBulkLoadClient secureClient =  new SecureBulkLoadClient(table);
 
     try {
       /*
@@ -476,11 +473,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Used by the replication sink to load the hfiles from the source cluster. It does the following,
-   * <ol>
-   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
-   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
-   * </li>
-   * </ol>
+   * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
+   * {@link
+   * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
    * @param table Table to which these hfiles should be loaded to
    * @param conn Connection to use
    * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
@@ -781,23 +776,27 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
       final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
   throws IOException {
-    final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
+    final List<Pair<byte[], String>> famPaths =
+      new ArrayList<>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
       famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
     }
-    final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
-        rpcControllerFactory, tableName, first) {
+
+    final RegionServerCallable<Boolean> svrCallable =
+        new RegionServerCallable<Boolean>(conn, tableName, first) {
       @Override
-      protected Boolean call(PayloadCarryingRpcController controller) throws Exception {
+      public Boolean call(int callTimeout) throws Exception {
         SecureBulkLoadClient secureClient = null;
         boolean success = false;
+
         try {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(getConf(), table);
-            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+            secureClient = new SecureBulkLoadClient(table);
+            success =
+                secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
                   assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
           }
           return success;
@@ -1079,7 +1078,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   /**
    * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
-   * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
+   * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
    * property. This directory is used as a temporary directory where all files are initially
    * copied/moved from user given directory, set all the required file permissions and then from
    * their it is finally loaded into a table. This should be set only when, one would like to manage
@@ -1089,4 +1088,5 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   public void setBulkToken(String stagingDir) {
     this.bulkToken = stagingDir;
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
index 3261bd6..a21edcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ExpiredMobFileCleanerChore.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.hbase.mob.ExpiredMobFileCleaner;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobUtils;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * The Class ExpiredMobFileCleanerChore for running cleaner regularly to remove the expired
  * mob files.
@@ -84,6 +86,10 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
             } catch (LockTimeoutException e) {
               LOG.info("Fail to acquire the lock because of timeout, maybe a"
                 + " MobCompactor is running", e);
+            } catch (ServiceException e) {
+              LOG.error(
+                "Fail to clean the expired mob files for the column " + hcd.getNameAsString()
+                  + " in the table " + htd.getNameAsString(), e);
             } catch (IOException e) {
               LOG.error(
                 "Fail to clean the expired mob files for the column " + hcd.getNameAsString()

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index d7ba4f3..531883a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@@ -454,7 +454,8 @@ public class ServerManager {
   /**
    * Adds the onlineServers list. onlineServers should be locked.
    * @param serverName The remote servers name.
-   * @param s
+   * @param sl
+   * @return Server load from the removed server, if any.
    */
   @VisibleForTesting
   void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
index d4a54bb..3c965cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/ExpiredMobFileCleaner.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * The cleaner to delete the expired MOB files.
  */
@@ -58,8 +60,11 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
    * directory.
    * @param tableName The current table name.
    * @param family The current family.
+   * @throws ServiceException
+   * @throws IOException
    */
-  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family) throws IOException {
+  public void cleanExpiredMobFiles(String tableName, HColumnDescriptor family)
+      throws ServiceException, IOException {
     Configuration conf = getConf();
     TableName tn = TableName.valueOf(tableName);
     FileSystem fs = FileSystem.get(conf);
@@ -94,7 +99,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
     String tableName = args[0];
     String familyName = args[1];
     TableName tn = TableName.valueOf(tableName);
-    HBaseAdmin.available(getConf());
+    HBaseAdmin.checkHBaseAvailable(getConf());
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {
@@ -122,4 +127,5 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
       }
     }
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
index c27e8ae..8547c8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/mapreduce/Sweeper.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.zookeeper.KeeperException;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * The sweep tool. It deletes the mob files that are not used and merges the small mob files to
  * bigger ones. Each run of this sweep tool only handles one column family. The runs on
@@ -62,10 +64,10 @@ public class Sweeper extends Configured implements Tool {
    * @throws ServiceException
    */
   int sweepFamily(String tableName, String familyName) throws IOException, InterruptedException,
-      ClassNotFoundException, KeeperException {
+      ClassNotFoundException, KeeperException, ServiceException {
     Configuration conf = getConf();
     // make sure the target HBase exists.
-    HBaseAdmin.available(conf);
+    HBaseAdmin.checkHBaseAvailable(conf);
     Connection connection = ConnectionFactory.createConnection(getConf());
     Admin admin = connection.getAdmin();
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fb9a605..d87ada4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.TimeLimitedRpcController;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -2764,15 +2765,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                     timeLimitDelta =
                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
                   }
-                  if (controller instanceof PayloadCarryingRpcController) {
-                    PayloadCarryingRpcController pRpcController =
-                        (PayloadCarryingRpcController)controller;
-                    if (pRpcController.getCallTimeout() > 0) {
-                      timeLimitDelta = Math.min(timeLimitDelta, pRpcController.getCallTimeout());
+                  if (controller instanceof TimeLimitedRpcController) {
+                    TimeLimitedRpcController timeLimitedRpcController =
+                        (TimeLimitedRpcController)controller;
+                    if (timeLimitedRpcController.getCallTimeout() > 0) {
+                      timeLimitDelta = Math.min(timeLimitDelta,
+                          timeLimitedRpcController.getCallTimeout());
                     }
-                  } else {
-                    throw new UnsupportedOperationException("We only do " +
-                      "PayloadCarryingRpcControllers! FIX IF A PROBLEM");
                   }
                   // Use half of whichever timeout value was more restrictive... But don't allow
                   // the time limit to be less than the allowable minimum (could cause an

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index c71153d..3eb85bd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -58,8 +61,10 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.Private
 public class WALEditsReplaySink {
+
   private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
   private static final int MAX_BATCH_SIZE = 1024;
+
   private final Configuration conf;
   private final ClusterConnection conn;
   private final TableName tableName;
@@ -161,8 +166,8 @@ public class WALEditsReplaySink {
     try {
       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
       ReplayServerCallable<ReplicateWALEntryResponse> callable =
-          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.rpcControllerFactory,
-              this.tableName, regionLoc, entries);
+          new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
+              regionInfo, entries);
       factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
     } catch (IOException ie) {
       if (skipErrors) {
@@ -179,19 +184,31 @@ public class WALEditsReplaySink {
    * @param <R>
    */
   class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
+    private HRegionInfo regionInfo;
     private List<Entry> entries;
 
-    ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
-        final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
-      super(connection, rpcControllerFactory, tableName, null);
+    ReplayServerCallable(final Connection connection, final TableName tableName,
+        final HRegionLocation regionLoc, final HRegionInfo regionInfo,
+        final List<Entry> entries) {
+      super(connection, tableName, null);
       this.entries = entries;
+      this.regionInfo = regionInfo;
       setLocation(regionLoc);
     }
 
     @Override
-    protected ReplicateWALEntryResponse call(PayloadCarryingRpcController controller)
-    throws Exception {
-      if (entries.isEmpty()) return null;
+    public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
+      try {
+        replayToServer(this.regionInfo, this.entries);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+      return null;
+    }
+
+    private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
+        throws IOException, ServiceException {
+      if (entries.isEmpty()) return;
 
       Entry[] entriesArray = new Entry[entries.size()];
       entriesArray = entries.toArray(entriesArray);
@@ -199,8 +216,12 @@ public class WALEditsReplaySink {
 
       Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
           ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
-      controller.setCellScanner(p.getSecond());
-      return remoteSvr.replay(controller, p.getFirst());
+      PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
+      try {
+        remoteSvr.replay(controller, p.getFirst());
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
     }
 
     @Override
@@ -224,4 +245,4 @@ public class WALEditsReplaySink {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index c756294..b0fd176 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellScanner;
@@ -45,21 +46,27 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RetryingCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.hbase.replication.BaseWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -67,17 +74,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
@@ -609,8 +611,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
    * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
    * the entry if the region boundaries have changed or the region is gone.
    */
-  static class RegionReplicaReplayCallable extends
-      RegionAdminServiceCallable<ReplicateWALEntryResponse> {
+  static class RegionReplicaReplayCallable
+    extends RegionAdminServiceCallable<ReplicateWALEntryResponse> {
+
     private final List<Entry> entries;
     private final byte[] initialEncodedRegionName;
     private final AtomicLong skippedEntries;
@@ -625,25 +628,38 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
     }
 
-    public ReplicateWALEntryResponse call(PayloadCarryingRpcController controller) throws Exception {
-      // Check whether we should still replay this entry. If the regions are changed, or the
+    @Override
+    public ReplicateWALEntryResponse call(int timeout) throws IOException {
+      return replayToServer(this.entries, timeout);
+    }
+
+    private ReplicateWALEntryResponse replayToServer(List<Entry> entries, int timeout)
+        throws IOException {
+      // check whether we should still replay this entry. If the regions are changed, or the
       // entry is not coming form the primary region, filter it out because we do not need it.
       // Regions can change because of (1) region split (2) region merge (3) table recreated
       boolean skip = false;
+
       if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
-          initialEncodedRegionName)) {
+        initialEncodedRegionName)) {
         skip = true;
       }
-      if (!this.entries.isEmpty() && !skip) {
-        Entry[] entriesArray = new Entry[this.entries.size()];
-        entriesArray = this.entries.toArray(entriesArray);
+      if (!entries.isEmpty() && !skip) {
+        Entry[] entriesArray = new Entry[entries.size()];
+        entriesArray = entries.toArray(entriesArray);
 
         // set the region name for the target region replica
         Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
             ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
                 .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
-        controller.setCellScanner(p.getSecond());
-        return stub.replay(controller, p.getFirst());
+        try {
+          PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
+          controller.setCallTimeout(timeout);
+          controller.setPriority(tableName);
+          return stub.replay(controller, p.getFirst());
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
       }
 
       if (skip) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
index 3c81cfe..d708edc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/Merge.java
@@ -23,18 +23,19 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -79,11 +80,13 @@ public class Merge extends Configured implements Tool {
     // Verify HBase is down
     LOG.info("Verifying that HBase is not running...");
     try {
-      HBaseAdmin.available(getConf());
+      HBaseAdmin.checkHBaseAvailable(getConf());
       LOG.fatal("HBase cluster must be off-line, and is not. Aborting.");
       return -1;
     } catch (ZooKeeperConnectionException zkce) {
       // If no zk, presume no master.
+    } catch (MasterNotRunningException e) {
+      // Expected. Ignore.
     }
 
     // Initialize MetaUtils and and get the root of the HBase installation

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
index 2dca6b1..d778fa9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
@@ -60,6 +60,7 @@ public class TestNamespace {
   private static ZKNamespaceManager zkNamespaceManager;
   private String prefix = "TestNamespace";
 
+
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
@@ -300,8 +301,7 @@ public class TestNamespace {
     runWithExpectedException(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
-        HTableDescriptor htd =
-            new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
+        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("non_existing_namespace", "table1"));
         htd.addFamily(new HColumnDescriptor("family1"));
         admin.createTable(htd);
         return null;
@@ -387,4 +387,5 @@ public class TestNamespace {
     }
     fail("Should have thrown exception " + exceptionClass);
   }
-}
\ No newline at end of file
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index 1716622..d088fc4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ProcedureInfo;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -65,6 +67,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.protobuf.ServiceException;
+
 
 /**
  * Class to test HBaseAdmin.
@@ -639,9 +643,11 @@ public class TestAdmin2 {
 
     long start = System.currentTimeMillis();
     try {
-      HBaseAdmin.available(conf);
+      HBaseAdmin.checkHBaseAvailable(conf);
       assertTrue(false);
+    } catch (MasterNotRunningException ignored) {
     } catch (ZooKeeperConnectionException ignored) {
+    } catch (ServiceException ignored) {
     } catch (IOException ignored) {
     }
     long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index f49c558..679d9c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -28,10 +28,13 @@ import java.net.UnknownHostException;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
@@ -53,6 +56,7 @@ import com.google.protobuf.ServiceException;
 
 @Category({MediumTests.class, ClientTests.class})
 public class TestClientTimeouts {
+  private static final Log LOG = LogFactory.getLog(TestClientTimeouts.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   protected static int SLAVES = 1;
 
@@ -83,6 +87,7 @@ public class TestClientTimeouts {
    */
   @Test
   public void testAdminTimeout() throws Exception {
+    Connection lastConnection = null;
     boolean lastFailed = false;
     int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
     RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
@@ -100,7 +105,7 @@ public class TestClientTimeouts {
           connection = ConnectionFactory.createConnection(conf);
           admin = connection.getAdmin();
           // run some admin commands
-          HBaseAdmin.available(conf);
+          HBaseAdmin.checkHBaseAvailable(conf);
           admin.setBalancerRunning(false, false);
         } catch (ZooKeeperConnectionException ex) {
           // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 33af5de..1b20b76 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.HMaster;
@@ -104,6 +103,8 @@ public class TestHCM {
       TableName.valueOf("test2");
   private static final TableName TABLE_NAME3 =
       TableName.valueOf("test3");
+  private static final TableName TABLE_NAME4 =
+      TableName.valueOf("test4");
   private static final byte[] FAM_NAM = Bytes.toBytes("f");
   private static final byte[] ROW = Bytes.toBytes("bbb");
   private static final byte[] ROW_X = Bytes.toBytes("xxx");
@@ -406,11 +407,10 @@ public class TestHCM {
     long pauseTime;
     long baseTime = 100;
     TableName tableName = TableName.valueOf("HCM-testCallableSleep");
+    Table table = TEST_UTIL.createTable(tableName, FAM_NAM);
     RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
-        TEST_UTIL.getConnection(), new RpcControllerFactory(TEST_UTIL.getConfiguration()),
-        tableName, ROW) {
-      @Override
-      protected Object call(PayloadCarryingRpcController controller) throws Exception {
+        TEST_UTIL.getConnection(), tableName, ROW) {
+      public Object call(int timeout) throws IOException {
         return null;
       }
     };
@@ -424,10 +424,9 @@ public class TestHCM {
 
     RegionAdminServiceCallable<Object> regionAdminServiceCallable =
         new RegionAdminServiceCallable<Object>(
-        (ClusterConnection) TEST_UTIL.getConnection(),
-          new RpcControllerFactory(TEST_UTIL.getConfiguration()), tableName, ROW) {
-      @Override
-      public Object call(PayloadCarryingRpcController controller) throws Exception {
+        (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
+            TEST_UTIL.getConfiguration()), tableName, ROW) {
+      public Object call(int timeout) throws IOException {
         return null;
       }
     };
@@ -439,21 +438,16 @@ public class TestHCM {
       assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
     }
 
-    MasterCallable<Object> masterCallable = new MasterCallable<Object>(TEST_UTIL.getConnection(),
-        new RpcControllerFactory(TEST_UTIL.getConfiguration())) {
-      @Override
-      protected Object call(PayloadCarryingRpcController rpcController) throws Exception {
+    MasterCallable masterCallable = new MasterCallable(TEST_UTIL.getConnection()) {
+      public Object call(int timeout) throws IOException {
         return null;
       }
     };
-    try {
-      for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
-        pauseTime = masterCallable.sleep(baseTime, i);
-        assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
-        assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
-      }
-    } finally {
-      masterCallable.close();
+
+    for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
+      pauseTime = masterCallable.sleep(baseTime, i);
+      assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
+      assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
     }
   }
 
@@ -1155,6 +1149,7 @@ public class TestHCM {
     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
     EnvironmentEdgeManager.injectEdge(timeMachine);
     try {
+      long timeBase = timeMachine.currentTime();
       long largeAmountOfTime = ANY_PAUSE * 1000;
       ConnectionImplementation.ServerErrorTracker tracker =
           new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index d99d2ee..354f0a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -334,27 +332,26 @@ public class TestReplicaWithCluster {
 
     // bulk load HFiles
     LOG.debug("Loading test data");
+    @SuppressWarnings("deprecation")
     final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
     table = conn.getTable(hdt.getTableName());
-    final String bulkToken =
-        new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
-    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
-        new RpcControllerFactory(HTU.getConfiguration()), hdt.getTableName(),
-        TestHRegionServerBulkLoad.rowkey(0)) {
-      @Override
-      protected Void call(PayloadCarryingRpcController controller) throws Exception {
-        LOG.debug("Going to connect to server " + getLocation() + " for row "
+    final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
+    RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
+      conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
+        @Override
+        public Void call(int timeout) throws Exception {
+          LOG.debug("Going to connect to server " + getLocation() + " for row "
             + Bytes.toStringBinary(getRow()));
-        SecureBulkLoadClient secureClient = null;
-        byte[] regionName = getLocation().getRegionInfo().getRegionName();
-        try (Table table = conn.getTable(getTableName())) {
-          secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
-          secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-              true, null, bulkToken);
+          SecureBulkLoadClient secureClient = null;
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          try (Table table = conn.getTable(getTableName())) {
+            secureClient = new SecureBulkLoadClient(table);
+            secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+                  true, null, bulkToken);
+          }
+          return null;
         }
-        return null;
-      }
-    };
+      };
     RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
     RpcRetryingCaller<Void> caller = factory.newCaller();
     caller.callWithRetries(callable, 10000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 30805c0..6e68201 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -62,8 +62,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -200,20 +198,19 @@ public class TestHRegionServerBulkLoad {
       }
 
       // bulk load HFiles
-      final ClusterConnection conn = (ClusterConnection)UTIL.getConnection();
+      final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
       Table table = conn.getTable(tableName);
-      final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table).
-          prepareBulkLoad(conn);
-      RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn,
-          new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
+      final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn);
+      RegionServerCallable<Void> callable =
+          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
         @Override
-        public Void call(PayloadCarryingRpcController controller) throws Exception {
+        public Void call(int callTimeout) throws Exception {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
               + Bytes.toStringBinary(getRow()));
           SecureBulkLoadClient secureClient = null;
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
+            secureClient = new SecureBulkLoadClient(table);
             secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
                   true, null, bulkToken);
           }
@@ -227,15 +224,15 @@ public class TestHRegionServerBulkLoad {
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn,
-            new RpcControllerFactory(UTIL.getConfiguration()), tableName, Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
           @Override
-          protected Void call(PayloadCarryingRpcController controller) throws Exception {
+          public Void call(int callTimeout) throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =
               conn.getAdmin(getLocation().getServerName());
-            CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
+            CompactRegionRequest request =
+              RequestConverter.buildCompactRegionRequest(
                 getLocation().getRegionInfo().getRegionName(), true, null);
             server.compactRegion(null, request);
             numCompactions.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
index 7560a41..d55adef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -33,8 +33,6 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -91,12 +89,10 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
 
       // bulk load HFiles
       final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
-      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
-              Bytes.toBytes("aaa")) {
+          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
         @Override
-        protected Void call(PayloadCarryingRpcController controller) throws Exception {
+        public Void call(int callTimeout) throws Exception {
           LOG.info("Non-secure old client");
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
               BulkLoadHFileRequest request =
@@ -113,10 +109,9 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
-            Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
           @Override
-          protected Void call(PayloadCarryingRpcController controller) throws Exception {
+          public Void call(int callTimeout) throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
index 0bc9498..6de6261 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -33,13 +33,13 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,8 +62,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
     super(duration);
   }
 
-  private static final Log LOG =
-      LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
+  private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
 
   @BeforeClass
   public static void setUpBeforeClass() throws IOException {
@@ -104,17 +103,16 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
       Table table = conn.getTable(tableName);
       final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
-      RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       RegionServerCallable<Void> callable =
-          new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
-              Bytes.toBytes("aaa")) {
+          new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
             @Override
-            protected Void call(PayloadCarryingRpcController controller) throws Exception {
-              LOG.debug("Going to connect to server " + getLocation() + " for row " +
-                  Bytes.toStringBinary(getRow()));
+            public Void call(int callTimeout) throws Exception {
+              LOG.debug("Going to connect to server " + getLocation() + " for row "
+                  + Bytes.toStringBinary(getRow()));
               try (Table table = conn.getTable(getTableName())) {
-                boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
-                    null, bulkToken, getLocation().getRegionInfo().getStartKey());
+                boolean loaded =
+                    new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null,
+                      bulkToken, getLocation().getRegionInfo().getStartKey());
               }
               return null;
             }
@@ -126,10 +124,9 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       // Periodically do compaction to reduce the number of open file handles.
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
-        callable = new RegionServerCallable<Void>(conn, rpcControllerFactory, tableName,
-            Bytes.toBytes("aaa")) {
+        callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
           @Override
-          protected Void call(PayloadCarryingRpcController controller) throws Exception {
+          public Void call(int callTimeout) throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "
                 + Bytes.toStringBinary(getRow()));
             AdminProtos.AdminService.BlockingInterface server =

http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
index 3e90fe1..fa66d69 100644
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/SparkSQLPushDownFilter.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.hbase.spark;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
@@ -35,8 +37,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ByteString;
 
 /**
  * This filter will push down all qualifier logic given to us


Mime
View raw message