hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [2/6] hbase git commit: HBASE-16445 Refactor and reimplement RpcClient
Date Thu, 08 Sep 2016 12:33:18 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
index ce32ed9..3f43f7f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
@@ -25,17 +25,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Map;
 
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
 import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
 
 import org.apache.commons.logging.Log;
@@ -48,105 +39,23 @@ import org.apache.hadoop.security.SaslOutputStream;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
- * A utility class that encapsulates SASL logic for RPC client.
- * Copied from <code>org.apache.hadoop.security</code>
+ * A utility class that encapsulates SASL logic for RPC client. Copied from
+ * <code>org.apache.hadoop.security</code>
  */
 @InterfaceAudience.Private
-public class HBaseSaslRpcClient {
-  private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
+public class HBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
 
-  private final SaslClient saslClient;
-  private final boolean fallbackAllowed;
-  protected final Map<String, String> saslProps;
-  /**
-   * Create a HBaseSaslRpcClient for an authentication method
-   *
-   * @param method
-   *          the requested authentication method
-   * @param token
-   *          token to use if needed by the authentication method
-   * @param serverPrincipal
-   *          the server principal that we are trying to set the connection up to
-   * @param fallbackAllowed
-   *          does the client allow fallback to simple authentication
-   * @throws IOException
-   */
-  public HBaseSaslRpcClient(AuthMethod method,
-      Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed)
-      throws IOException {
-    this(method, token, serverPrincipal, fallbackAllowed, "authentication");
-  }
-  /**
-   * Create a HBaseSaslRpcClient for an authentication method
-   *
-   * @param method
-   *          the requested authentication method
-   * @param token
-   *          token to use if needed by the authentication method
-   * @param serverPrincipal
-   *          the server principal that we are trying to set the connection up to
-   * @param fallbackAllowed
-   *          does the client allow fallback to simple authentication
-   * @param rpcProtection
-   *          the protection level ("authentication", "integrity" or "privacy")
-   * @throws IOException
-   */
-  public HBaseSaslRpcClient(AuthMethod method,
-      Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
-      String rpcProtection) throws IOException {
-    this.fallbackAllowed = fallbackAllowed;
-    saslProps = SaslUtil.initSaslProperties(rpcProtection);
-    switch (method) {
-    case DIGEST:
-      if (LOG.isDebugEnabled())
-        LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
-            + " client to authenticate to service at " + token.getService());
-      saslClient = createDigestSaslClient(
-          new String[] { AuthMethod.DIGEST.getMechanismName() },
-          SaslUtil.SASL_DEFAULT_REALM, new SaslClientCallbackHandler(token));
-      break;
-    case KERBEROS:
-      if (LOG.isDebugEnabled()) {
-        LOG
-            .debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
-                + " client. Server's Kerberos principal name is "
-                + serverPrincipal);
-      }
-      if (serverPrincipal == null || serverPrincipal.length() == 0) {
-        throw new IOException(
-            "Failed to specify server's Kerberos principal name");
-      }
-      String[] names = SaslUtil.splitKerberosName(serverPrincipal);
-      if (names.length != 3) {
-        throw new IOException(
-          "Kerberos principal does not have the expected format: "
-                + serverPrincipal);
-      }
-      saslClient = createKerberosSaslClient(
-          new String[] { AuthMethod.KERBEROS.getMechanismName() },
-          names[0], names[1]);
-      break;
-    default:
-      throw new IOException("Unknown authentication method " + method);
-    }
-    if (saslClient == null)
-      throw new IOException("Unable to find SASL client implementation");
-  }
+  private static final Log LOG = LogFactory.getLog(HBaseSaslRpcClient.class);
 
-  protected SaslClient createDigestSaslClient(String[] mechanismNames,
-      String saslDefaultRealm, CallbackHandler saslClientCallbackHandler)
-      throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm,
-        saslProps, saslClientCallbackHandler);
+  public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
+      String serverPrincipal, boolean fallbackAllowed) throws IOException {
+    super(method, token, serverPrincipal, fallbackAllowed);
   }
 
-  protected SaslClient createKerberosSaslClient(String[] mechanismNames,
-      String userFirstPart, String userSecondPart) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, userFirstPart,
-        userSecondPart, saslProps, null);
+  public HBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
+      String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException {
+    super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
   }
 
   private static void readStatus(DataInputStream inStream) throws IOException {
@@ -158,72 +67,65 @@ public class HBaseSaslRpcClient {
   }
 
   /**
-   * Do client side SASL authentication with server via the given InputStream
-   * and OutputStream
-   *
-   * @param inS
-   *          InputStream to use
-   * @param outS
-   *          OutputStream to use
-   * @return true if connection is set up, or false if needs to switch
-   *             to simple Auth.
+   * Do client side SASL authentication with server via the given InputStream and OutputStream
+   * @param inS InputStream to use
+   * @param outS OutputStream to use
+   * @return true if connection is set up, or false if needs to switch to simple Auth.
    * @throws IOException
    */
-  public boolean saslConnect(InputStream inS, OutputStream outS)
-      throws IOException {
+  public boolean saslConnect(InputStream inS, OutputStream outS) throws IOException {
     DataInputStream inStream = new DataInputStream(new BufferedInputStream(inS));
-    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(
-        outS));
+    DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(outS));
 
     try {
-      byte[] saslToken = new byte[0];
-      if (saslClient.hasInitialResponse())
-        saslToken = saslClient.evaluateChallenge(saslToken);
+      byte[] saslToken = getInitialResponse();
       if (saslToken != null) {
         outStream.writeInt(saslToken.length);
         outStream.write(saslToken, 0, saslToken.length);
         outStream.flush();
-        if (LOG.isDebugEnabled())
-          LOG.debug("Have sent token of size " + saslToken.length
-              + " from initSASLContext.");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext.");
+        }
       }
-      if (!saslClient.isComplete()) {
+      if (!isComplete()) {
         readStatus(inStream);
         int len = inStream.readInt();
         if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
           if (!fallbackAllowed) {
-            throw new IOException("Server asks us to fall back to SIMPLE auth, " +
-                "but this client is configured to only allow secure connections.");
+            throw new IOException("Server asks us to fall back to SIMPLE auth, "
+                + "but this client is configured to only allow secure connections.");
           }
           if (LOG.isDebugEnabled()) {
             LOG.debug("Server asks us to fall back to simple auth.");
           }
-          saslClient.dispose();
+          dispose();
           return false;
         }
         saslToken = new byte[len];
-        if (LOG.isDebugEnabled())
+        if (LOG.isDebugEnabled()) {
           LOG.debug("Will read input token of size " + saslToken.length
               + " for processing by initSASLContext");
+        }
         inStream.readFully(saslToken);
       }
 
-      while (!saslClient.isComplete()) {
-        saslToken = saslClient.evaluateChallenge(saslToken);
+      while (!isComplete()) {
+        saslToken = evaluateChallenge(saslToken);
         if (saslToken != null) {
-          if (LOG.isDebugEnabled())
-            LOG.debug("Will send token of size " + saslToken.length
-                + " from initSASLContext.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext.");
+          }
           outStream.writeInt(saslToken.length);
           outStream.write(saslToken, 0, saslToken.length);
           outStream.flush();
         }
-        if (!saslClient.isComplete()) {
+        if (!isComplete()) {
           readStatus(inStream);
           saslToken = new byte[inStream.readInt()];
-          if (LOG.isDebugEnabled())
+          if (LOG.isDebugEnabled()) {
             LOG.debug("Will read input token of size " + saslToken.length
                 + " for processing by initSASLContext");
+          }
           inStream.readFully(saslToken);
         }
       }
@@ -243,11 +145,8 @@ public class HBaseSaslRpcClient {
   }
 
   /**
-   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has
-   * been called.
-   *
-   * @param in
-   *          the InputStream to wrap
+   * Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called.
+   * @param in the InputStream to wrap
    * @return a SASL wrapped InputStream
    * @throws IOException
    */
@@ -259,11 +158,8 @@ public class HBaseSaslRpcClient {
   }
 
   /**
-   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has
-   * been called.
-   *
-   * @param out
-   *          the OutputStream to wrap
+   * Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called.
+   * @param out the OutputStream to wrap
    * @return a SASL wrapped OutputStream
    * @throws IOException
    */
@@ -273,58 +169,4 @@ public class HBaseSaslRpcClient {
     }
     return new SaslOutputStream(out, saslClient);
   }
-
-  /** Release resources used by wrapped saslClient */
-  public void dispose() throws SaslException {
-    saslClient.dispose();
-  }
-
-  @VisibleForTesting
-  static class SaslClientCallbackHandler implements CallbackHandler {
-    private final String userName;
-    private final char[] userPassword;
-
-    public SaslClientCallbackHandler(Token<? extends TokenIdentifier> token) {
-      this.userName = SaslUtil.encodeIdentifier(token.getIdentifier());
-      this.userPassword = SaslUtil.encodePassword(token.getPassword());
-    }
-
-    @Override
-    public void handle(Callback[] callbacks)
-        throws UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      RealmCallback rc = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof RealmChoiceCallback) {
-          continue;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          rc = (RealmCallback) callback;
-        } else {
-          throw new UnsupportedCallbackException(callback,
-              "Unrecognized SASL client callback");
-        }
-      }
-      if (nc != null) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("SASL client callback: setting username: " + userName);
-        nc.setName(userName);
-      }
-      if (pc != null) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("SASL client callback: setting userPassword");
-        pc.setPassword(userPassword);
-      }
-      if (rc != null) {
-        if (LOG.isDebugEnabled())
-          LOG.debug("SASL client callback: setting realm: "
-              + rc.getDefaultText());
-        rc.setText(rc.getDefaultText());
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java
new file mode 100644
index 0000000..d818097
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslChallengeDecoder.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * Decode the sasl challenge sent by RpcServer.
+ */
+@InterfaceAudience.Private
+public class SaslChallengeDecoder extends ByteToMessageDecoder {
+
+  private static final int MAX_CHALLENGE_SIZE = 1024 * 1024; // 1M
+
+  private ByteBuf tryDecodeChallenge(ByteBuf in, int offset, int readableBytes) throws IOException {
+    if (readableBytes < 4) {
+      return null;
+    }
+    int len = in.getInt(offset);
+    if (len <= 0) {
+      // fall back to simple
+      in.readerIndex(offset + 4);
+      return in.retainedSlice(offset, 4);
+    }
+    if (len > MAX_CHALLENGE_SIZE) {
+      throw new IOException(
+          "Sasl challenge too large(" + len + "), max allowed is " + MAX_CHALLENGE_SIZE);
+    }
+    int totalLen = 4 + len;
+    if (readableBytes < totalLen) {
+      return null;
+    }
+    in.readerIndex(offset + totalLen);
+    return in.retainedSlice(offset, totalLen);
+  }
+
+  // will throw a RemoteException out if data is enough, so do not need to return anything.
+  private void tryDecodeError(ByteBuf in, int offset, int readableBytes) throws IOException {
+    if (readableBytes < 4) {
+      return;
+    }
+    int classLen = in.getInt(offset);
+    if (classLen <= 0) {
+      throw new IOException("Invalid exception class name length " + classLen);
+    }
+    if (classLen > MAX_CHALLENGE_SIZE) {
+      throw new IOException("Exception class name length too large(" + classLen
+          + "), max allowed is " + MAX_CHALLENGE_SIZE);
+    }
+    if (readableBytes < 4 + classLen + 4) {
+      return;
+    }
+    int msgLen = in.getInt(offset + 4 + classLen);
+    if (msgLen <= 0) {
+      throw new IOException("Invalid exception message length " + msgLen);
+    }
+    if (msgLen > MAX_CHALLENGE_SIZE) {
+      throw new IOException("Exception message length too large(" + msgLen + "), max allowed is "
+          + MAX_CHALLENGE_SIZE);
+    }
+    int totalLen = classLen + msgLen + 8;
+    if (readableBytes < totalLen) {
+      return;
+    }
+    String className = in.toString(offset + 4, classLen, HConstants.UTF8_CHARSET);
+    String msg = in.toString(offset + classLen + 8, msgLen, HConstants.UTF8_CHARSET);
+    in.readerIndex(offset + totalLen);
+    throw new RemoteException(className, msg);
+  }
+
+  @Override
+  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+    int readableBytes = in.readableBytes();
+    if (readableBytes < 4) {
+      return;
+    }
+    int offset = in.readerIndex();
+    int status = in.getInt(offset);
+    if (status == SaslStatus.SUCCESS.state) {
+      ByteBuf challenge = tryDecodeChallenge(in, offset + 4, readableBytes - 4);
+      if (challenge != null) {
+        out.add(challenge);
+      }
+    } else {
+      tryDecodeError(in, offset + 4, readableBytes - 4);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
deleted file mode 100644
index d583e20..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslClientHandler.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.security.PrivilegedExceptionAction;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Handles Sasl connections
- */
-@InterfaceAudience.Private
-public class SaslClientHandler extends ChannelDuplexHandler {
-  private static final Log LOG = LogFactory.getLog(SaslClientHandler.class);
-
-  private final boolean fallbackAllowed;
-
-  private final UserGroupInformation ticket;
-
-  /**
-   * Used for client or server's token to send or receive from each other.
-   */
-  private final SaslClient saslClient;
-  private final Map<String, String> saslProps;
-  private final SaslExceptionHandler exceptionHandler;
-  private final SaslSuccessfulConnectHandler successfulConnectHandler;
-  private byte[] saslToken;
-  private byte[] connectionHeader;
-  private boolean firstRead = true;
-
-  private int retryCount = 0;
-  private Random random;
-
-  /**
-   * @param ticket                   the ugi
-   * @param method                   auth method
-   * @param token                    for Sasl
-   * @param serverPrincipal          Server's Kerberos principal name
-   * @param fallbackAllowed          True if server may also fall back to less secure connection
-   * @param rpcProtection            Quality of protection. Can be 'authentication', 'integrity' or
-   *                                 'privacy'.
-   * @throws java.io.IOException if handler could not be created
-   */
-  public SaslClientHandler(UserGroupInformation ticket, AuthMethod method,
-      Token<? extends TokenIdentifier> token, String serverPrincipal, boolean fallbackAllowed,
-      String rpcProtection, byte[] connectionHeader, SaslExceptionHandler exceptionHandler,
-      SaslSuccessfulConnectHandler successfulConnectHandler) throws IOException {
-    this.ticket = ticket;
-    this.fallbackAllowed = fallbackAllowed;
-    this.connectionHeader = connectionHeader;
-
-    this.exceptionHandler = exceptionHandler;
-    this.successfulConnectHandler = successfulConnectHandler;
-
-    saslProps = SaslUtil.initSaslProperties(rpcProtection);
-    switch (method) {
-    case DIGEST:
-      if (LOG.isDebugEnabled())
-        LOG.debug("Creating SASL " + AuthMethod.DIGEST.getMechanismName()
-            + " client to authenticate to service at " + token.getService());
-      saslClient = createDigestSaslClient(new String[] { AuthMethod.DIGEST.getMechanismName() },
-          SaslUtil.SASL_DEFAULT_REALM, new HBaseSaslRpcClient.SaslClientCallbackHandler(token));
-      break;
-    case KERBEROS:
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Creating SASL " + AuthMethod.KERBEROS.getMechanismName()
-            + " client. Server's Kerberos principal name is " + serverPrincipal);
-      }
-      if (serverPrincipal == null || serverPrincipal.isEmpty()) {
-        throw new IOException("Failed to specify server's Kerberos principal name");
-      }
-      String[] names = SaslUtil.splitKerberosName(serverPrincipal);
-      if (names.length != 3) {
-        throw new IOException(
-            "Kerberos principal does not have the expected format: " + serverPrincipal);
-      }
-      saslClient = createKerberosSaslClient(new String[] { AuthMethod.KERBEROS.getMechanismName() },
-          names[0], names[1]);
-      break;
-    default:
-      throw new IOException("Unknown authentication method " + method);
-    }
-    if (saslClient == null) {
-      throw new IOException("Unable to find SASL client implementation");
-    }
-  }
-
-  /**
-   * Create a Digest Sasl client
-   */
-  protected SaslClient createDigestSaslClient(String[] mechanismNames, String saslDefaultRealm,
-      CallbackHandler saslClientCallbackHandler) throws IOException {
-    return Sasl.createSaslClient(mechanismNames, null, null, saslDefaultRealm, saslProps,
-        saslClientCallbackHandler);
-  }
-
-  /**
-   * Create Kerberos client
-   *
-   * @param userFirstPart  first part of username
-   * @param userSecondPart second part of username
-   */
-  protected SaslClient createKerberosSaslClient(String[] mechanismNames, String userFirstPart,
-      String userSecondPart) throws IOException {
-    return Sasl
-        .createSaslClient(mechanismNames, null, userFirstPart, userSecondPart, saslProps,
-            null);
-  }
-
-  @Override
-  public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
-    saslClient.dispose();
-  }
-
-  private byte[] evaluateChallenge(final byte[] challenge) throws Exception {
-    return ticket.doAs(new PrivilegedExceptionAction<byte[]>() {
-
-      @Override
-      public byte[] run() throws Exception {
-        return saslClient.evaluateChallenge(challenge);
-      }
-    });
-  }
-
-  @Override
-  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
-    saslToken = new byte[0];
-    if (saslClient.hasInitialResponse()) {
-      saslToken = evaluateChallenge(saslToken);
-    }
-    if (saslToken != null) {
-      writeSaslToken(ctx, saslToken);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Have sent token of size " + saslToken.length + " from initSASLContext.");
-      }
-    }
-  }
-
-  @Override
-  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-    ByteBuf in = (ByteBuf) msg;
-
-    // If not complete, try to negotiate
-    if (!saslClient.isComplete()) {
-      while (!saslClient.isComplete() && in.isReadable()) {
-        readStatus(in);
-        int len = in.readInt();
-        if (firstRead) {
-          firstRead = false;
-          if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
-            if (!fallbackAllowed) {
-              throw new IOException("Server asks us to fall back to SIMPLE auth, " + "but this "
-                  + "client is configured to only allow secure connections.");
-            }
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Server asks us to fall back to simple auth.");
-            }
-            saslClient.dispose();
-
-            ctx.pipeline().remove(this);
-            successfulConnectHandler.onSuccess(ctx.channel());
-            return;
-          }
-        }
-        saslToken = new byte[len];
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Will read input token of size " + saslToken.length
-              + " for processing by initSASLContext");
-        }
-        in.readBytes(saslToken);
-
-        saslToken = evaluateChallenge(saslToken);
-        if (saslToken != null) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Will send token of size " + saslToken.length + " from initSASLContext.");
-          }
-          writeSaslToken(ctx, saslToken);
-        }
-      }
-      // release the memory
-      in.release();
-
-      if (saslClient.isComplete()) {
-        String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("SASL client context established. Negotiated QoP: " + qop);
-        }
-
-        boolean useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
-
-        if (!useWrap) {
-          ctx.pipeline().remove(this);
-          successfulConnectHandler.onSuccess(ctx.channel());
-        } else {
-          byte[] wrappedCH = saslClient.wrap(connectionHeader, 0, connectionHeader.length);
-          // write connection header
-          writeSaslToken(ctx, wrappedCH);
-          successfulConnectHandler.onSaslProtectionSucess(ctx.channel());
-        }
-      }
-    }
-    // Normal wrapped reading
-    else {
-      try {
-        int length = in.readInt();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Actual length is " + length);
-        }
-        saslToken = new byte[length];
-        in.readBytes(saslToken);
-        // release the memory
-        in.release();
-      } catch (IndexOutOfBoundsException e) {
-        return;
-      }
-      try {
-        ByteBuf b = ctx.channel().alloc().buffer(saslToken.length);
-
-        b.writeBytes(saslClient.unwrap(saslToken, 0, saslToken.length));
-        ctx.fireChannelRead(b);
-
-      } catch (SaslException se) {
-        try {
-          saslClient.dispose();
-        } catch (SaslException ignored) {
-          LOG.debug("Ignoring SASL exception", ignored);
-        }
-        throw se;
-      }
-    }
-  }
-
-  private void writeSaslToken(final ChannelHandlerContext ctx, byte[] saslToken) {
-    ByteBuf b = ctx.alloc().buffer(4 + saslToken.length);
-    b.writeInt(saslToken.length);
-    b.writeBytes(saslToken, 0, saslToken.length);
-    ctx.writeAndFlush(b).addListener(new ChannelFutureListener() {
-      @Override
-      public void operationComplete(ChannelFuture future) throws Exception {
-        if (!future.isSuccess()) {
-          exceptionCaught(ctx, future.cause());
-        }
-      }
-    });
-  }
-
-  /**
-   * Get the read status
-   */
-  private static void readStatus(ByteBuf inStream) throws RemoteException {
-    int status = inStream.readInt(); // read status
-    if (status != SaslStatus.SUCCESS.state) {
-      throw new RemoteException(inStream.toString(Charset.forName("UTF-8")),
-          inStream.toString(Charset.forName("UTF-8")));
-    }
-  }
-
-  @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-      throws Exception {
-    saslClient.dispose();
-
-    ctx.close();
-
-    if (this.random == null) {
-      this.random = new Random();
-    }
-    exceptionHandler.handle(this.retryCount++, this.random, cause);
-  }
-
-  @Override
-  public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
-      throws Exception {
-    // If not complete, try to negotiate
-    if (!saslClient.isComplete()) {
-      super.write(ctx, msg, promise);
-    } else {
-      ByteBuf in = (ByteBuf) msg;
-      byte[] unwrapped = new byte[in.readableBytes()];
-      in.readBytes(unwrapped);
-      // release the memory
-      in.release();
-
-      try {
-        saslToken = saslClient.wrap(unwrapped, 0, unwrapped.length);
-      } catch (SaslException se) {
-        try {
-          saslClient.dispose();
-        } catch (SaslException ignored) {
-          LOG.debug("Ignoring SASL exception", ignored);
-        }
-        promise.setFailure(se);
-      }
-      if (saslToken != null) {
-        ByteBuf out = ctx.channel().alloc().buffer(4 + saslToken.length);
-        out.writeInt(saslToken.length);
-        out.writeBytes(saslToken, 0, saslToken.length);
-
-        ctx.write(out).addListener(new ChannelFutureListener() {
-          @Override public void operationComplete(ChannelFuture future) throws Exception {
-            if (!future.isSuccess()) {
-              exceptionCaught(ctx, future.cause());
-            }
-          }
-        });
-
-        saslToken = null;
-      }
-    }
-  }
-
-  /**
-   * Handler for exceptions during Sasl connection
-   */
-  public interface SaslExceptionHandler {
-    /**
-     * Handle the exception
-     *
-     * @param retryCount current retry count
-     * @param random     to create new backoff with
-     */
-    public void handle(int retryCount, Random random, Throwable cause);
-  }
-
-  /**
-   * Handler for successful connects
-   */
-  public interface SaslSuccessfulConnectHandler {
-    /**
-     * Runs on success
-     *
-     * @param channel which is successfully authenticated
-     */
-    public void onSuccess(Channel channel);
-
-    /**
-     * Runs on success if data protection used in Sasl
-     *
-     * @param channel which is successfully authenticated
-     */
-    public void onSaslProtectionSucess(Channel channel);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
new file mode 100644
index 0000000..c2faf91
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Unwrap sasl messages. Should be placed after a
+ * {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder}
+ */
+@InterfaceAudience.Private
+public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+  private final SaslClient saslClient;
+
+  public SaslUnwrapHandler(SaslClient saslClient) {
+    this.saslClient = saslClient;
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    SaslUtil.safeDispose(saslClient);
+  }
+
+  @Override
+  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+    byte[] bytes = new byte[msg.readableBytes()];
+    msg.readBytes(bytes);
+    ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(bytes, 0, bytes.length)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
index b505fc0..aaa9d7a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUtil.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
@@ -30,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 @InterfaceAudience.Private
 public class SaslUtil {
-  private static final Log log = LogFactory.getLog(SaslUtil.class);
+  private static final Log LOG = LogFactory.getLog(SaslUtil.class);
   public static final String SASL_DEFAULT_REALM = "default";
   public static final int SWITCH_TO_SIMPLE_AUTH = -88;
 
@@ -51,7 +53,7 @@ public class SaslUtil {
 
     public boolean matches(String stringQop) {
       if (saslQop.equals(stringQop)) {
-        log.warn("Use authentication/integrity/privacy as value for rpc protection "
+        LOG.warn("Use authentication/integrity/privacy as value for rpc protection "
             + "configurations instead of auth/auth-int/auth-conf.");
         return true;
       }
@@ -113,4 +115,12 @@ public class SaslUtil {
     saslProps.put(Sasl.SERVER_AUTH, "true");
     return saslProps;
   }
+
+  static void safeDispose(SaslClient saslClient) {
+    try {
+      saslClient.dispose();
+    } catch (SaslException e) {
+      LOG.error("Error disposing of SASL client", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
new file mode 100644
index 0000000..ddb4ae9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.CoalescingBufferQueue;
+import io.netty.util.concurrent.PromiseCombiner;
+
+import javax.security.sasl.SaslClient;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * wrap sasl messages.
+ */
+@InterfaceAudience.Private
+public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+  private final SaslClient saslClient;
+
+  private CoalescingBufferQueue queue;
+
+  @Override
+  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    queue = new CoalescingBufferQueue(ctx.channel());
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+      throws Exception {
+    if (msg instanceof ByteBuf) {
+      queue.add((ByteBuf) msg, promise);
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  public SaslWrapHandler(SaslClient saslClient) {
+    this.saslClient = saslClient;
+  }
+
+  @Override
+  public void flush(ChannelHandlerContext ctx) throws Exception {
+    if (!queue.isEmpty()) {
+      ChannelPromise promise = ctx.newPromise();
+      int readableBytes = queue.readableBytes();
+      ByteBuf buf = queue.remove(readableBytes, promise);
+      byte[] bytes = new byte[readableBytes];
+      buf.readBytes(bytes);
+      byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
+      ChannelPromise lenPromise = ctx.newPromise();
+      ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
+      ChannelPromise contentPromise = ctx.newPromise();
+      ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
+      PromiseCombiner combiner = new PromiseCombiner();
+      combiner.addAll(lenPromise, contentPromise);
+      combiner.finish(promise);
+    }
+    ctx.flush();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
index 60ef357..ccabe66 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
@@ -27,7 +27,6 @@ import org.apache.commons.lang.time.StopWatch;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
@@ -53,11 +52,11 @@ public class TestCellBlockBuilder {
 
   private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class);
 
-  CellBlockBuilder builder;
+  private CellBlockBuilder builder;
 
   @Before
   public void before() {
-    this.builder = new CellBlockBuilder(new Configuration());
+    this.builder = new CellBlockBuilder(HBaseConfiguration.create());
   }
 
   @Test
@@ -164,9 +163,9 @@ public class TestCellBlockBuilder {
         + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
   }
 
-  private static void timerTest(final CellBlockBuilder builder, final StopWatch timer, final int count,
-      final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
-      throws IOException {
+  private static void timerTest(final CellBlockBuilder builder, final StopWatch timer,
+      final int count, final int size, final Codec codec, final CompressionCodec compressor,
+      final boolean sized) throws IOException {
     doBuildCellBlockUndoCellBlock(builder, codec, compressor, count, size, sized);
   }
 
@@ -187,10 +186,10 @@ public class TestCellBlockBuilder {
         usage(1);
       }
     }
-    CellBlockBuilder buildr = new CellBlockBuilder(HBaseConfiguration.create());
+    CellBlockBuilder builder = new CellBlockBuilder(HBaseConfiguration.create());
     ((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL);
-    timerTests(buildr, count, size, new KeyValueCodec(), null);
-    timerTests(buildr, count, size, new KeyValueCodec(), new DefaultCodec());
-    timerTests(buildr, count, size, new KeyValueCodec(), new GzipCodec());
+    timerTests(builder, count, size, new KeyValueCodec(), null);
+    timerTests(builder, count, size, new KeyValueCodec(), new DefaultCodec());
+    timerTests(builder, count, size, new KeyValueCodec(), new GzipCodec());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java
new file mode 100644
index 0000000..7ac5c2e
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientDeprecatedNameMapping.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestRpcClientDeprecatedNameMapping {
+
+  @Test
+  public void test() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, BlockingRpcClient.class.getName());
+    try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) {
+      assertThat(client, instanceOf(BlockingRpcClient.class));
+    }
+    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
+      "org.apache.hadoop.hbase.ipc.RpcClientImpl");
+    try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) {
+      assertThat(client, instanceOf(BlockingRpcClient.class));
+    }
+    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, NettyRpcClient.class.getName());
+    try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) {
+      assertThat(client, instanceOf(NettyRpcClient.class));
+    }
+    conf.set(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
+      "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
+    try (RpcClient client = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT)) {
+      assertThat(client, instanceOf(NettyRpcClient.class));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
index 0e3aeab..12b3661 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/security/TestHBaseSaslRpcClient.java
@@ -27,6 +27,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Strings;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -37,14 +39,14 @@ import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.TextOutputCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.Sasl;
 import javax.security.sasl.RealmCallback;
 import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 
+import org.apache.hadoop.hbase.security.AbstractHBaseSaslRpcClient.SaslClientCallbackHandler;
 import org.apache.hadoop.hbase.testclassification.SecurityTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcClient.SaslClientCallbackHandler;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.token.Token;
@@ -58,8 +60,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.mockito.Mockito;
 
-import com.google.common.base.Strings;
-
 @Category({SecurityTests.class, SmallTests.class})
 public class TestHBaseSaslRpcClient {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index 28c19ad..f41efc7 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -47,7 +47,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -93,15 +92,13 @@ public class IntegrationTestRpcClient {
     }
   }
 
-  protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) {
-    return isSyncClient ?
-        new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) :
-          new AsyncRpcClient(conf) {
-          @Override
-          Codec getCodec() {
-            return null;
-          }
-        };
+  protected AbstractRpcClient<?> createRpcClient(Configuration conf, boolean isSyncClient) {
+    return isSyncClient ? new BlockingRpcClient(conf) : new NettyRpcClient(conf) {
+      @Override
+      Codec getCodec() {
+        return null;
+      }
+    };
   }
 
   static String BIG_PAYLOAD;
@@ -258,7 +255,7 @@ public class IntegrationTestRpcClient {
   }
 
   static class SimpleClient extends Thread {
-    AbstractRpcClient rpcClient;
+    AbstractRpcClient<?> rpcClient;
     AtomicBoolean running = new  AtomicBoolean(true);
     AtomicBoolean sending = new AtomicBoolean(false);
     AtomicReference<Throwable> exception = new AtomicReference<>(null);
@@ -267,7 +264,7 @@ public class IntegrationTestRpcClient {
     long numCalls = 0;
     Random random = new Random();
 
-    public SimpleClient(Cluster cluster, AbstractRpcClient rpcClient, String id) {
+    public SimpleClient(Cluster cluster, AbstractRpcClient<?> rpcClient, String id) {
       this.cluster = cluster;
       this.rpcClient = rpcClient;
       this.id = id;
@@ -327,7 +324,7 @@ public class IntegrationTestRpcClient {
     cluster.startServer();
     conf.setBoolean(SPECIFIC_WRITE_THREAD, true);
     for(int i = 0; i <1000; i++) {
-      AbstractRpcClient rpcClient = createRpcClient(conf, true);
+      AbstractRpcClient<?> rpcClient = createRpcClient(conf, true);
       SimpleClient client = new SimpleClient(cluster, rpcClient, "Client1");
       client.start();
       while(!client.isSending()) {
@@ -419,7 +416,7 @@ public class IntegrationTestRpcClient {
     ArrayList<SimpleClient> clients = new ArrayList<>();
 
     // all threads should share the same rpc client
-    AbstractRpcClient rpcClient = createRpcClient(conf, isSyncClient);
+    AbstractRpcClient<?> rpcClient = createRpcClient(conf, isSyncClient);
 
     for (int i = 0; i < 30; i++) {
       String clientId = "client_" + i + "_";

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f611796..0df5097 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -157,7 +157,7 @@ import com.google.protobuf.TextFormat;
  * CallRunner#run executes the call.  When done, asks the included Call to put itself on new
  * queue for Responder to pull from and return result to client.
  *
- * @see RpcClientImpl
+ * @see BlockingRpcClient
  */
 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 663535b..f97dfb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -863,6 +863,7 @@ public class ServerManager {
     }
     long expiration = timeout + System.currentTimeMillis();
     while (System.currentTimeMillis() < expiration) {
+      controller.reset();
       try {
         HRegionInfo rsRegion =
           ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
index 1688874..5934e07 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
 
 /**
  * Provides ability to create multiple Connection instances and allows to process a batch of

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index f49c558..0349ca5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
@@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
-import org.apache.hadoop.hbase.ipc.RpcClientImpl;
+import org.apache.hadoop.hbase.ipc.BlockingRpcClient;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -130,7 +131,7 @@ public class TestClientTimeouts {
   /**
    * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
    */
-  public static class RandomTimeoutRpcClient extends RpcClientImpl {
+  public static class RandomTimeoutRpcClient extends BlockingRpcClient {
     public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
         MetricsConnection metrics) {
       super(conf, clusterId, localAddr, metrics);
@@ -153,9 +154,9 @@ public class TestClientTimeouts {
     public static final double CHANCE_OF_TIMEOUT = 0.3;
     private static AtomicInteger invokations = new AtomicInteger();
 
-    RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn,
-        final User ticket, final int rpcTimeout) throws UnknownHostException {
-      super(rpcClient, sn, ticket, rpcTimeout);
+    RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient, final ServerName sn,
+        final User ticket, final int rpcTimeout) {
+      super(rpcClient, new InetSocketAddress(sn.getHostname(), sn.getPort()), ticket, rpcTimeout);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index c1c9b1e..aac020d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -170,7 +170,7 @@ public class TestRpcControllerFactory {
     ResultScanner scan = table.getScanner(fam1);
     scan.next();
     scan.close();
-    counter = verifyCount(counter);
+    counter = verifyCount(counter + 2);
 
     Get g2 = new Get(row);
     table.get(Lists.newArrayList(g, g2));
@@ -189,7 +189,7 @@ public class TestRpcControllerFactory {
 
     // reversed, regular
     scanInfo.setSmall(false);
-    counter = doScan(table, scanInfo, counter);
+    counter = doScan(table, scanInfo, counter + 2);
 
     table.close();
     connection.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 2dbe6b0..a8ea4ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.ipc;
 
 import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
 import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -50,9 +52,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.compress.GzipCodec;
@@ -93,7 +98,7 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf);
+  protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
 
   /**
    * Ensure we do not HAVE TO HAVE a codec.
@@ -102,7 +107,7 @@ public abstract class AbstractTestIPC {
   public void testNoCodec() throws IOException, ServiceException {
     Configuration conf = HBaseConfiguration.create();
     TestRpcServer rpcServer = new TestRpcServer();
-    try (AbstractRpcClient client = createRpcClientNoCodec(conf)) {
+    try (AbstractRpcClient<?> client = createRpcClientNoCodec(conf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       HBaseRpcController pcrc = new HBaseRpcControllerImpl();
@@ -115,7 +120,7 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  protected abstract AbstractRpcClient createRpcClient(Configuration conf);
+  protected abstract AbstractRpcClient<?> createRpcClient(Configuration conf);
 
   /**
    * It is hard to verify the compression is actually happening under the wraps. Hope that if
@@ -132,7 +137,7 @@ public abstract class AbstractTestIPC {
       cells.add(CELL);
     }
     TestRpcServer rpcServer = new TestRpcServer();
-    try (AbstractRpcClient client = createRpcClient(conf)) {
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
@@ -152,14 +157,14 @@ public abstract class AbstractTestIPC {
     }
   }
 
-  protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
-      throws IOException;
+  protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(
+      Configuration conf) throws IOException;
 
   @Test
   public void testRTEDuringConnectionSetup() throws Exception {
     Configuration conf = HBaseConfiguration.create();
     TestRpcServer rpcServer = new TestRpcServer();
-    try (AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf)) {
+    try (AbstractRpcClient<?> client = createRpcClientRTEDuringConnectionSetup(conf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       stub.ping(null, EmptyRequestProto.getDefaultInstance());
@@ -180,7 +185,7 @@ public abstract class AbstractTestIPC {
     RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
     RpcServer rpcServer = new TestRpcServer(scheduler, CONF);
     verify(scheduler).init((RpcScheduler.Context) anyObject());
-    try (AbstractRpcClient client = createRpcClient(CONF)) {
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
       rpcServer.start();
       verify(scheduler).start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
@@ -201,7 +206,7 @@ public abstract class AbstractTestIPC {
     Configuration conf = new Configuration(CONF);
     conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100);
     RpcServer rpcServer = new TestRpcServer(conf);
-    try (AbstractRpcClient client = createRpcClient(conf)) {
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       StringBuilder message = new StringBuilder(120);
@@ -232,7 +237,7 @@ public abstract class AbstractTestIPC {
       throws IOException, ServiceException {
     TestRpcServer rpcServer = new TestRpcServer();
     InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
-    try (AbstractRpcClient client = createRpcClient(CONF)) {
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       assertEquals(localAddr.getAddress().getHostAddress(),
@@ -245,12 +250,12 @@ public abstract class AbstractTestIPC {
   @Test
   public void testRemoteError() throws IOException, ServiceException {
     TestRpcServer rpcServer = new TestRpcServer();
-    try (AbstractRpcClient client = createRpcClient(CONF)) {
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       stub.error(null, EmptyRequestProto.getDefaultInstance());
     } catch (ServiceException e) {
-      LOG.info("Caught expected exception: " + e.getMessage());
+      LOG.info("Caught expected exception: " + e);
       IOException ioe = ProtobufUtil.handleRemoteException(e);
       assertTrue(ioe instanceof DoNotRetryIOException);
       assertTrue(ioe.getMessage().contains("server error!"));
@@ -262,7 +267,7 @@ public abstract class AbstractTestIPC {
   @Test
   public void testTimeout() throws IOException {
     TestRpcServer rpcServer = new TestRpcServer();
-    try (AbstractRpcClient client = createRpcClient(CONF)) {
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       HBaseRpcController pcrc = new HBaseRpcControllerImpl();
@@ -277,7 +282,7 @@ public abstract class AbstractTestIPC {
         } catch (ServiceException e) {
           long waitTime = (System.nanoTime() - startTime) / 1000000;
           // expected
-          LOG.info("Caught expected exception: " + e.getMessage());
+          LOG.info("Caught expected exception: " + e);
           IOException ioe = ProtobufUtil.handleRemoteException(e);
           assertTrue(ioe.getCause() instanceof CallTimeoutException);
           // confirm that we got exception before the actual pause.
@@ -327,7 +332,7 @@ public abstract class AbstractTestIPC {
   public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
     Configuration conf = new Configuration(CONF);
     RpcServer rpcServer = new TestFailingRpcServer(conf);
-    try (AbstractRpcClient client = createRpcClient(conf)) {
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
       rpcServer.start();
       BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
@@ -339,4 +344,90 @@ public abstract class AbstractTestIPC {
       rpcServer.stop();
     }
   }
+
+  @Test
+  public void testAsyncEcho() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (AbstractRpcClient<?> client = createRpcClient(conf)) {
+      rpcServer.start();
+      Interface stub = newStub(client, rpcServer.getListenerAddress());
+      int num = 10;
+      List<HBaseRpcController> pcrcList = new ArrayList<>();
+      List<BlockingRpcCallback<EchoResponseProto>> callbackList = new ArrayList<>();
+      for (int i = 0; i < num; i++) {
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        BlockingRpcCallback<EchoResponseProto> done = new BlockingRpcCallback<>();
+        stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
+        pcrcList.add(pcrc);
+        callbackList.add(done);
+      }
+      for (int i = 0; i < num; i++) {
+        HBaseRpcController pcrc = pcrcList.get(i);
+        assertFalse(pcrc.failed());
+        assertNull(pcrc.cellScanner());
+        assertEquals("hello-" + i, callbackList.get(i).get().getMessage());
+      }
+    } finally {
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testAsyncRemoteError() throws IOException {
+    AbstractRpcClient<?> client = createRpcClient(CONF);
+    TestRpcServer rpcServer = new TestRpcServer();
+    try {
+      rpcServer.start();
+      Interface stub = newStub(client, rpcServer.getListenerAddress());
+      BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
+      HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+      stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);
+      assertNull(callback.get());
+      assertTrue(pcrc.failed());
+      LOG.info("Caught expected exception: " + pcrc.getFailed());
+      IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
+      assertTrue(ioe instanceof DoNotRetryIOException);
+      assertTrue(ioe.getMessage().contains("server error!"));
+    } finally {
+      client.close();
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testAsyncTimeout() throws IOException {
+    TestRpcServer rpcServer = new TestRpcServer();
+    try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
+      rpcServer.start();
+      Interface stub = newStub(client, rpcServer.getListenerAddress());
+      List<HBaseRpcController> pcrcList = new ArrayList<>();
+      List<BlockingRpcCallback<EmptyResponseProto>> callbackList = new ArrayList<>();
+      int ms = 1000;
+      int timeout = 100;
+      long startTime = System.nanoTime();
+      for (int i = 0; i < 10; i++) {
+        HBaseRpcController pcrc = new HBaseRpcControllerImpl();
+        pcrc.setCallTimeout(timeout);
+        BlockingRpcCallback<EmptyResponseProto> callback = new BlockingRpcCallback<>();
+        stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);
+        pcrcList.add(pcrc);
+        callbackList.add(callback);
+      }
+      for (BlockingRpcCallback<?> callback : callbackList) {
+        assertNull(callback.get());
+      }
+      long waitTime = (System.nanoTime() - startTime) / 1000000;
+      for (HBaseRpcController pcrc : pcrcList) {
+        assertTrue(pcrc.failed());
+        LOG.info("Caught expected exception: " + pcrc.getFailed());
+        IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
+        assertTrue(ioe.getCause() instanceof CallTimeoutException);
+      }
+      // confirm that we got exception before the actual pause.
+      assertTrue(waitTime < ms);
+    } finally {
+      rpcServer.stop();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
deleted file mode 100644
index 565f5bf..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-@Category({ RPCTests.class, SmallTests.class })
-public class TestAsyncIPC extends AbstractTestIPC {
-
-  @Parameters
-  public static Collection<Object[]> parameters() {
-    List<Object[]> paramList = new ArrayList<>();
-    paramList.add(new Object[] { false, false });
-    paramList.add(new Object[] { false, true });
-    paramList.add(new Object[] { true, false });
-    paramList.add(new Object[] { true, true });
-    return paramList;
-  }
-
-  private final boolean useNativeTransport;
-
-  private final boolean useGlobalEventLoopGroup;
-
-  public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
-    this.useNativeTransport = useNativeTransport;
-    this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
-  }
-
-  private void setConf(Configuration conf) {
-    conf.setBoolean(AsyncRpcClient.USE_NATIVE_TRANSPORT, useNativeTransport);
-    conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, useGlobalEventLoopGroup);
-    if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) {
-      if (useNativeTransport
-          && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup)
-          || (!useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP
-              .getFirst() instanceof NioEventLoopGroup))) {
-        AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
-        AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
-      }
-    }
-  }
-
-  @Override
-  protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
-    setConf(conf);
-    return new AsyncRpcClient(conf) {
-
-      @Override
-      Codec getCodec() {
-        return null;
-      }
-
-    };
-  }
-
-  @Override
-  protected AsyncRpcClient createRpcClient(Configuration conf) {
-    setConf(conf);
-    return new AsyncRpcClient(conf);
-  }
-
-  @Override
-  protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
-    setConf(conf);
-    return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
-      @Override
-      protected void initChannel(SocketChannel ch) throws Exception {
-        ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
-          @Override
-          public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
-              throws Exception {
-            promise.setFailure(new RuntimeException("Injected fault"));
-          }
-        });
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
new file mode 100644
index 0000000..98efcfb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java
@@ -0,0 +1,58 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.experimental.categories.Category;
+
+@Category({ RPCTests.class, SmallTests.class })
+public class TestBlockingIPC extends AbstractTestIPC {
+
+  @Override
+  protected BlockingRpcClient createRpcClientNoCodec(Configuration conf) {
+    return new BlockingRpcClient(conf) {
+      @Override
+      Codec getCodec() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  protected BlockingRpcClient createRpcClient(Configuration conf) {
+    return new BlockingRpcClient(conf);
+  }
+
+  @Override
+  protected BlockingRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf)
+      throws IOException {
+    return new BlockingRpcClient(conf) {
+
+      @Override
+      boolean isTcpNoDelay() {
+        throw new RuntimeException("Injected fault");
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
deleted file mode 100644
index 12bc35c..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ RPCTests.class, SmallTests.class })
-public class TestGlobalEventLoopGroup {
-
-  @Test
-  public void test() {
-    Configuration conf = HBaseConfiguration.create();
-    conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true);
-    AsyncRpcClient client = new AsyncRpcClient(conf);
-    assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP);
-    AsyncRpcClient client1 = new AsyncRpcClient(conf);
-    assertSame(client.bootstrap.group(), client1.bootstrap.group());
-    client1.close();
-    assertFalse(client.bootstrap.group().isShuttingDown());
-
-    conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false);
-    AsyncRpcClient client2 = new AsyncRpcClient(conf);
-    assertNotSame(client.bootstrap.group(), client2.bootstrap.group());
-    client2.close();
-
-    client.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
deleted file mode 100644
index b88cb7a..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import javax.net.SocketFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.net.NetUtils;
-import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-@Category({ RPCTests.class, SmallTests.class })
-public class TestIPC extends AbstractTestIPC {
-
-  @Override
-  protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
-    return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
-      @Override
-      Codec getCodec() {
-        return null;
-      }
-    };
-  }
-
-  @Override
-  protected RpcClientImpl createRpcClient(Configuration conf) {
-    return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
-  }
-
-  @Override
-  protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf)
-      throws IOException {
-    SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
-    Mockito.doAnswer(new Answer<Socket>() {
-      @Override
-      public Socket answer(InvocationOnMock invocation) throws Throwable {
-        Socket s = spy((Socket) invocation.callRealMethod());
-        doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
-        return s;
-      }
-    }).when(spyFactory).createSocket();
-
-    return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
new file mode 100644
index 0000000..3b32383
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.JVM;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ RPCTests.class, SmallTests.class })
+public class TestNettyIPC extends AbstractTestIPC {
+
+  @Parameters(name = "{index}: EventLoop={0}")
+  public static Collection<Object[]> parameters() {
+    List<Object[]> params = new ArrayList<>();
+    params.add(new Object[] { "nio" });
+    params.add(new Object[] { "perClientNio" });
+    if (JVM.isLinux() && JVM.isAmd64()) {
+      params.add(new Object[] { "epoll" });
+    }
+    return params;
+  }
+
+  @Parameter
+  public String eventLoopType;
+
+  private static NioEventLoopGroup NIO;
+
+  private static EpollEventLoopGroup EPOLL;
+
+  @BeforeClass
+  public static void setUpBeforeClass() {
+    NIO = new NioEventLoopGroup();
+    if (JVM.isLinux() && JVM.isAmd64()) {
+      EPOLL = new EpollEventLoopGroup();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() {
+    if (NIO != null) {
+      NIO.shutdownGracefully();
+    }
+    if (EPOLL != null) {
+      EPOLL.shutdownGracefully();
+    }
+  }
+
+  private void setConf(Configuration conf) {
+    switch (eventLoopType) {
+      case "nio":
+        NettyRpcClientConfigHelper.setEventLoopConfig(conf, NIO, NioSocketChannel.class);
+        break;
+      case "epoll":
+        NettyRpcClientConfigHelper.setEventLoopConfig(conf, EPOLL, EpollSocketChannel.class);
+        break;
+      case "perClientNio":
+        NettyRpcClientConfigHelper.createEventLoopPerClient(conf);
+        break;
+      default:
+        break;
+    }
+  }
+
+  @Override
+  protected NettyRpcClient createRpcClientNoCodec(Configuration conf) {
+    setConf(conf);
+    return new NettyRpcClient(conf) {
+
+      @Override
+      Codec getCodec() {
+        return null;
+      }
+
+    };
+  }
+
+  @Override
+  protected NettyRpcClient createRpcClient(Configuration conf) {
+    setConf(conf);
+    return new NettyRpcClient(conf);
+  }
+
+  @Override
+  protected NettyRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
+    setConf(conf);
+    return new NettyRpcClient(conf) {
+
+      @Override
+      boolean isTcpNoDelay() {
+        throw new RuntimeException("Injected fault");
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
index ae658a3..6354123 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java
@@ -62,7 +62,7 @@ public class TestProtobufRpcServiceImpl implements BlockingInterface {
   }
 
   public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException {
-    return TestProtobufRpcProto.newStub(client.createProtobufRpcChannel(
+    return TestProtobufRpcProto.newStub(client.createRpcChannel(
       ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()),
       User.getCurrent(), 0));
   }


Mime
View raw message