hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-16433 Remove AsyncRpcChannel related stuffs
Date Thu, 25 Aug 2016 00:55:26 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 8a692ff18 -> a1f760ff7


HBASE-16433 Remove AsyncRpcChannel related stuffs


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1f760ff
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1f760ff
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1f760ff

Branch: refs/heads/master
Commit: a1f760ff763bacbfcfd6eb80d5076ec35e3b27e3
Parents: 8a692ff
Author: zhangduo <zhangduo@apache.org>
Authored: Wed Aug 17 17:03:53 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Thu Aug 25 08:15:46 2016 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Future.java  |  34 -
 .../hbase/client/ResponseFutureListener.java    |  30 -
 .../org/apache/hadoop/hbase/ipc/AsyncCall.java  |  10 +-
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       | 728 +++++++++++++++++-
 .../hadoop/hbase/ipc/AsyncRpcChannelImpl.java   | 770 -------------------
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java |  28 +-
 .../hbase/ipc/AsyncServerResponseHandler.java   |   4 +-
 .../org/apache/hadoop/hbase/ipc/Promise.java    |  38 -
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  |  20 +-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 156 +---
 .../hadoop/hbase/ipc/AbstractTestIPC.java       | 146 +---
 11 files changed, 730 insertions(+), 1234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
deleted file mode 100644
index 99a8baa..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Future.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * Promise for responses
- * @param <V> Value type
- */
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-
-@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_SAME_SIMPLE_NAME_AS_INTERFACE",
-  justification="Agree that this can be confusing but folks will pull in this and think twice "
-      + "about pulling in netty; incidence of confusion should be rare in this case.")
-public interface Future<V> extends io.netty.util.concurrent.Future<V> {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
deleted file mode 100644
index f23dc8f..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResponseFutureListener.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import io.netty.util.concurrent.GenericFutureListener;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Specific interface for the Response future listener
- * @param <V> Value type.
- */
-@InterfaceAudience.Private
-public interface ResponseFutureListener<V>
-    extends GenericFutureListener<Future<V>> {
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index 89e6ca4..33536df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -19,7 +19,11 @@ package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
+
+import io.netty.util.concurrent.DefaultPromise;
+
 import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CellScanner;
@@ -39,12 +43,12 @@ import org.apache.hadoop.ipc.RemoteException;
  * @param <M> Message returned in communication to be converted
  */
 @InterfaceAudience.Private
-public class AsyncCall<M extends Message, T> extends Promise<T> {
+public class AsyncCall<M extends Message, T> extends DefaultPromise<T> {
   private static final Log LOG = LogFactory.getLog(AsyncCall.class.getName());
 
   final int id;
 
-  private final AsyncRpcChannelImpl channel;
+  private final AsyncRpcChannel channel;
 
   final Descriptors.MethodDescriptor method;
   final Message param;
@@ -77,7 +81,7 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
    * @param priority            for this request
    * @param metrics             MetricsConnection to which the metrics are stored for this request
    */
-  public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor
+  public AsyncCall(AsyncRpcChannel channel, int connectId, Descriptors.MethodDescriptor
         md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
         messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
       MetricsConnection metrics) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 8cc730f..9550f2a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -20,19 +20,298 @@ package org.apache.hadoop.hbase.ipc;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 
-import io.netty.util.concurrent.EventExecutor;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.GenericFutureListener;
 
+import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.SaslClientHandler;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityInfo;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
 
 /**
- * Interface for Async Rpc Channels
+ * Netty RPC channel
  */
 @InterfaceAudience.Private
-public interface AsyncRpcChannel {
+public class AsyncRpcChannel {
+  private static final Log LOG = LogFactory.getLog(AsyncRpcChannel.class.getName());
+
+  private static final int MAX_SASL_RETRIES = 5;
+
+  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
+    = new HashMap<>();
+
+  static {
+    TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
+      new AuthenticationTokenSelector());
+  }
+
+  final AsyncRpcClient client;
+
+  // Contains the channel to work with.
+  // Only exists when connected
+  private Channel channel;
+
+  String name;
+  final User ticket;
+  final String serviceName;
+  final InetSocketAddress address;
+
+  private int failureCounter = 0;
+
+  boolean useSasl;
+  AuthMethod authMethod;
+  private int reloginMaxBackoff;
+  private Token<? extends TokenIdentifier> token;
+  private String serverPrincipal;
+
+  // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
+  private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
+  private boolean connected = false;
+  private boolean closed = false;
+
+  private Timeout cleanupTimer;
+
+  private final TimerTask timeoutTask = new TimerTask() {
+    @Override
+    public void run(Timeout timeout) throws Exception {
+      cleanupCalls();
+    }
+  };
+
+  /**
+   * Constructor for netty RPC channel
+   * @param bootstrap to construct channel on
+   * @param client to connect with
+   * @param ticket of user which uses connection
+   * @param serviceName name of service to connect to
+   * @param address to connect to
+   */
+  public AsyncRpcChannel(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
+      String serviceName, InetSocketAddress address) {
+    this.client = client;
+
+    this.ticket = ticket;
+    this.serviceName = serviceName;
+    this.address = address;
+
+    this.channel = connect(bootstrap).channel();
+
+    name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
+        + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
+  }
+
+  /**
+   * Connect to channel
+   * @param bootstrap to connect to
+   * @return future of connection
+   */
+  private ChannelFuture connect(final Bootstrap bootstrap) {
+    return bootstrap.remoteAddress(address).connect()
+        .addListener(new GenericFutureListener<ChannelFuture>() {
+          @Override
+          public void operationComplete(final ChannelFuture f) throws Exception {
+            if (!f.isSuccess()) {
+              retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
+              return;
+            }
+            channel = f.channel();
+
+            setupAuthorization();
+
+            ByteBuf b = channel.alloc().directBuffer(6);
+            createPreamble(b, authMethod);
+            channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
+            if (useSasl) {
+              UserGroupInformation ticket = AsyncRpcChannel.this.ticket.getUGI();
+              if (authMethod == AuthMethod.KERBEROS) {
+                if (ticket != null && ticket.getRealUser() != null) {
+                  ticket = ticket.getRealUser();
+                }
+              }
+              SaslClientHandler saslHandler;
+              if (ticket == null) {
+                throw new FatalConnectionException("ticket/user is null");
+              }
+              final UserGroupInformation realTicket = ticket;
+              saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
+                @Override
+                public SaslClientHandler run() throws IOException {
+                  return getSaslHandler(realTicket, bootstrap);
+                }
+              });
+              if (saslHandler != null) {
+                // Sasl connect is successful. Let's set up Sasl channel handler
+                channel.pipeline().addFirst(saslHandler);
+              } else {
+                // fall back to simple auth because server told us so.
+                authMethod = AuthMethod.SIMPLE;
+                useSasl = false;
+              }
+            } else {
+              startHBaseConnection(f.channel());
+            }
+          }
+        });
+  }
+
+  /**
+   * Start HBase connection
+   * @param ch channel to start connection on
+   */
+  private void startHBaseConnection(Channel ch) {
+    ch.pipeline().addLast("frameDecoder",
+      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+    try {
+      writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (!future.isSuccess()) {
+            close(future.cause());
+            return;
+          }
+          List<AsyncCall> callsToWrite;
+          synchronized (pendingCalls) {
+            connected = true;
+            callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+          }
+          for (AsyncCall call : callsToWrite) {
+            writeRequest(call);
+          }
+        }
+      });
+    } catch (IOException e) {
+      close(e);
+    }
+  }
+
+  private void startConnectionWithEncryption(Channel ch) {
+    // for rpc encryption, the order of ChannelInboundHandler should be:
+    // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
+    // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
+    // SaslClientHandler will handler this
+    ch.pipeline().addFirst("beforeUnwrapDecoder",
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
+    ch.pipeline().addLast("afterUnwrapDecoder",
+        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
+    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
+    List<AsyncCall> callsToWrite;
+    synchronized (pendingCalls) {
+      connected = true;
+      callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+    }
+    for (AsyncCall call : callsToWrite) {
+      writeRequest(call);
+    }
+  }
+
+  /**
+   * Get SASL handler
+   * @param bootstrap to reconnect to
+   * @return new SASL handler
+   * @throws java.io.IOException if handler failed to create
+   */
+  private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
+      final Bootstrap bootstrap) throws IOException {
+    return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
+        client.fallbackAllowed,
+        client.conf.get("hbase.rpc.protection",
+          SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
+        getChannelHeaderBytes(authMethod),
+        new SaslClientHandler.SaslExceptionHandler() {
+          @Override
+          public void handle(int retryCount, Random random, Throwable cause) {
+            try {
+              // Handle Sasl failure. Try to potentially get new credentials
+              handleSaslConnectionFailure(retryCount, cause, realTicket);
+
+              retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
+                cause);
+            } catch (IOException | InterruptedException e) {
+              close(e);
+            }
+          }
+        }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
+          @Override
+          public void onSuccess(Channel channel) {
+            startHBaseConnection(channel);
+          }
+
+          @Override
+          public void onSaslProtectionSucess(Channel channel) {
+            startConnectionWithEncryption(channel);
+          }
+        });
+  }
+
+  /**
+   * Retry to connect or close
+   * @param bootstrap to connect with
+   * @param failureCount failure count
+   * @param e exception of fail
+   */
+  private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
+      Throwable e) {
+    if (failureCount < client.maxRetries) {
+      client.newTimeout(new TimerTask() {
+        @Override
+        public void run(Timeout timeout) throws Exception {
+          connect(bootstrap);
+        }
+      }, timeout, TimeUnit.MILLISECONDS);
+    } else {
+      client.failedServers.addToFailedServers(address);
+      close(e);
+    }
+  }
 
   /**
    * Calls method on channel
@@ -40,41 +319,450 @@ public interface AsyncRpcChannel {
    * @param request to send
    * @param cellScanner with cells to send
    * @param responsePrototype to construct response with
-   * @param messageConverter for the messages to expected result
-   * @param exceptionConverter for converting exceptions
    * @param rpcTimeout timeout for request
    * @param priority for request
    * @return Promise for the response Message
    */
-  <R extends Message, O> Future<O> callMethod(
+  public <R extends Message, O> io.netty.util.concurrent.Promise<O> callMethod(
       final Descriptors.MethodDescriptor method,
-      final Message request, final CellScanner cellScanner,
-      R responsePrototype, MessageConverter<R, O> messageConverter,
-      IOExceptionConverter exceptionConverter, long rpcTimeout, int priority);
+      final Message request,final CellScanner cellScanner,
+      R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
+      exceptionConverter, long rpcTimeout, int priority) {
+    final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
+        method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
+        rpcTimeout, priority, client.metrics);
+
+    synchronized (pendingCalls) {
+      if (closed) {
+        call.setFailure(new ConnectException());
+        return call;
+      }
+      pendingCalls.put(call.id, call);
+      // Add timeout for cleanup if none is present
+      if (cleanupTimer == null && call.getRpcTimeout() > 0) {
+        cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
+      }
+      if (!connected) {
+        return call;
+      }
+    }
+    writeRequest(call);
+    return call;
+  }
+
+  public EventLoop getEventExecutor() {
+    return this.channel.eventLoop();
+  }
+
+  AsyncCall removePendingCall(int id) {
+    synchronized (pendingCalls) {
+      return pendingCalls.remove(id);
+    }
+  }
+
+  /**
+   * Write the channel header
+   * @param channel to write to
+   * @return future of write
+   * @throws java.io.IOException on failure to write
+   */
+  private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
+    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
+    ByteBuf b = channel.alloc().directBuffer(totalSize);
+
+    b.writeInt(header.getSerializedSize());
+    b.writeBytes(header.toByteArray());
 
+    return channel.writeAndFlush(b);
+  }
+
+  private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
+    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
+    ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
+    b.putInt(header.getSerializedSize());
+    b.put(header.toByteArray());
+    return b.array();
+  }
+
+  private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
+    RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
+        .setServiceName(serviceName);
+
+    RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
+    if (userInfoPB != null) {
+      headerBuilder.setUserInfo(userInfoPB);
+    }
+
+    if (client.codec != null) {
+      headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
+    }
+    if (client.compressor != null) {
+      headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
+    }
+
+    headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
+    return headerBuilder.build();
+  }
+
+  /**
+   * Write request to channel
+   * @param call to write
+   */
+  private void writeRequest(final AsyncCall call) {
+    try {
+      final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
+          .newBuilder();
+      requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
+          .setRequestParam(call.param != null);
+
+      if (Trace.isTracing()) {
+        Span s = Trace.currentSpan();
+        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
+            .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+      }
+
+      ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
+      if (cellBlock != null) {
+        final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
+            .newBuilder();
+        cellBlockBuilder.setLength(cellBlock.limit());
+        requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
+      }
+      // Only pass priority if there one. Let zero be same as no priority.
+      if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
+        requestHeaderBuilder.setPriority(call.getPriority());
+      }
+      requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
+          Integer.MAX_VALUE : (int)call.rpcTimeout);
+
+      RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
+
+      int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
+      if (cellBlock != null) {
+        totalSize += cellBlock.remaining();
+      }
+
+      ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
+      try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
+        call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
+      }
+
+      channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
+    } catch (IOException e) {
+      close(e);
+    }
+  }
+
+  /**
+   * Set up server authorization
+   * @throws java.io.IOException if auth setup failed
+   */
+  private void setupAuthorization() throws IOException {
+    SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
+    this.useSasl = client.userProvider.isHBaseSecurityEnabled();
+
+    this.token = null;
+    if (useSasl && securityInfo != null) {
+      AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
+      if (tokenKind != null) {
+        TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
+        if (tokenSelector != null) {
+          token = tokenSelector.selectToken(new Text(client.clusterId),
+            ticket.getUGI().getTokens());
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("No token selector found for type " + tokenKind);
+        }
+      }
+      String serverKey = securityInfo.getServerPrincipal();
+      if (serverKey == null) {
+        throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
+      }
+      this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
+        address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
+            + serverPrincipal);
+      }
+    }
+
+    if (!useSasl) {
+      authMethod = AuthMethod.SIMPLE;
+    } else if (token != null) {
+      authMethod = AuthMethod.DIGEST;
+    } else {
+      authMethod = AuthMethod.KERBEROS;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+        "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
+    }
+    reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
+  }
+
+  /**
+   * Build the user information
+   * @param ugi User Group Information
+   * @param authMethod Authorization method
+   * @return UserInformation protobuf
+   */
+  private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
+    if (ugi == null || authMethod == AuthMethod.DIGEST) {
+      // Don't send user for token auth
+      return null;
+    }
+    RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
+    if (authMethod == AuthMethod.KERBEROS) {
+      // Send effective user for Kerberos auth
+      userInfoPB.setEffectiveUser(ugi.getUserName());
+    } else if (authMethod == AuthMethod.SIMPLE) {
+      // Send both effective user and real user for simple auth
+      userInfoPB.setEffectiveUser(ugi.getUserName());
+      if (ugi.getRealUser() != null) {
+        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
+      }
+    }
+    return userInfoPB.build();
+  }
 
   /**
-   * Get the EventLoop on which this channel operated
-   * @return EventLoop
+   * Create connection preamble
+   * @param byteBuf to write to
+   * @param authMethod to write
    */
-  EventExecutor getEventExecutor();
+  private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
+    byteBuf.writeBytes(HConstants.RPC_HEADER);
+    byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
+    byteBuf.writeByte(authMethod.code);
+  }
+
+  private void close0(Throwable e) {
+    List<AsyncCall> toCleanup;
+    synchronized (pendingCalls) {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
+      pendingCalls.clear();
+    }
+    IOException closeException = null;
+    if (e != null) {
+      if (e instanceof IOException) {
+        closeException = (IOException) e;
+      } else {
+        closeException = new IOException(e);
+      }
+    }
+    // log the info
+    if (LOG.isDebugEnabled() && closeException != null) {
+      LOG.debug(name + ": closing ipc connection to " + address, closeException);
+    }
+    if (cleanupTimer != null) {
+      cleanupTimer.cancel();
+      cleanupTimer = null;
+    }
+    for (AsyncCall call : toCleanup) {
+      call.setFailed(closeException != null ? closeException
+          : new ConnectionClosingException(
+              "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
+    }
+    channel.disconnect().addListener(ChannelFutureListener.CLOSE);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(name + ": closed");
+    }
+  }
 
   /**
    * Close connection
-   * @param cause of closure.
+   * @param e exception on close
    */
-  void close(Throwable cause);
+  public void close(final Throwable e) {
+    client.removeConnection(this);
+
+    // Move closing from the requesting thread to the channel thread
+    if (channel.eventLoop().inEventLoop()) {
+      close0(e);
+    } else {
+      channel.eventLoop().execute(new Runnable() {
+        @Override
+        public void run() {
+          close0(e);
+        }
+      });
+    }
+  }
+
+  /**
+   * Clean up calls.
+   */
+  private void cleanupCalls() {
+    List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    long nextCleanupTaskDelay = -1L;
+    synchronized (pendingCalls) {
+      for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
+        AsyncCall call = iter.next();
+        long timeout = call.getRpcTimeout();
+        if (timeout > 0) {
+          if (currentTime - call.getStartTime() >= timeout) {
+            iter.remove();
+            toCleanup.add(call);
+          } else {
+            if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
+              nextCleanupTaskDelay = timeout;
+            }
+          }
+        }
+      }
+      if (nextCleanupTaskDelay > 0) {
+        cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
+      } else {
+        cleanupTimer = null;
+      }
+    }
+    for (AsyncCall call : toCleanup) {
+      call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
+          + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
+    }
+  }
 
   /**
    * Check if the connection is alive
-   *
    * @return true if alive
    */
-  boolean isAlive();
+  public boolean isAlive() {
+    return channel.isOpen();
+  }
+
+  public InetSocketAddress getAddress() {
+    return this.address;
+  }
+
+  /**
+   * Check if user should authenticate over Kerberos
+   * @return true if should be authenticated over Kerberos
+   * @throws java.io.IOException on failure of check
+   */
+  private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
+    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    UserGroupInformation realUser = currentUser.getRealUser();
+    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
+      // Make sure user logged in using Kerberos either keytab or TGT
+      loginUser.hasKerberosCredentials() &&
+      // relogin only in case it is the login user (e.g. JT)
+      // or superuser (like oozie).
+      (loginUser.equals(currentUser) || loginUser.equals(realUser));
+  }
 
   /**
-   * Get the address on which this channel operates
-   * @return InetSocketAddress
+   * If multiple clients with the same principal try to connect to the same server at the same time,
+   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
+   * work around this, what is done is that the client backs off randomly and tries to initiate the
+   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
+   * attempted.
+   * <p>
+   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
+   * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
+   * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
+   * underlying authentication implementation, so there is no retry from other high level (for eg,
+   * HCM or HBaseAdmin).
+   * </p>
+   * @param currRetries retry count
+   * @param ex exception describing fail
+   * @param user which is trying to connect
+   * @throws java.io.IOException if IO fail
+   * @throws InterruptedException if thread is interrupted
    */
-  InetSocketAddress getAddress();
+  private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
+      final UserGroupInformation user) throws IOException, InterruptedException {
+    user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws IOException, InterruptedException {
+        if (shouldAuthenticateOverKrb()) {
+          if (currRetries < MAX_SASL_RETRIES) {
+            LOG.debug("Exception encountered while connecting to the server : " + ex);
+            // try re-login
+            if (UserGroupInformation.isLoginKeytabBased()) {
+              UserGroupInformation.getLoginUser().reloginFromKeytab();
+            } else {
+              UserGroupInformation.getLoginUser().reloginFromTicketCache();
+            }
+
+            // Should reconnect
+            return null;
+          } else {
+            String msg = "Couldn't setup connection for "
+                + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
+            LOG.warn(msg, ex);
+            throw (IOException) new IOException(msg).initCause(ex);
+          }
+        } else {
+          LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
+        }
+        if (ex instanceof RemoteException) {
+          throw (RemoteException) ex;
+        }
+        if (ex instanceof SaslException) {
+          String msg = "SASL authentication failed."
+              + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
+          LOG.fatal(msg, ex);
+          throw new RuntimeException(msg, ex);
+        }
+        throw new IOException(ex);
+      }
+    });
+  }
+
+  public int getConnectionHashCode() {
+    return ConnectionId.hashCode(ticket, serviceName, address);
+  }
+
+  @Override
+  public int hashCode() {
+    return getConnectionHashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof AsyncRpcChannel) {
+      AsyncRpcChannel channel = (AsyncRpcChannel) obj;
+      return channel.hashCode() == obj.hashCode();
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
+  }
+
+  /**
+   * Listens to call writes and fails if write failed
+   */
+  private static final class CallWriteListener implements ChannelFutureListener {
+    private final AsyncRpcChannel rpcChannel;
+    private final int id;
+
+    public CallWriteListener(AsyncRpcChannel asyncRpcChannelImpl, int id) {
+      this.rpcChannel = asyncRpcChannelImpl;
+      this.id = id;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (!future.isSuccess()) {
+        AsyncCall call = rpcChannel.removePendingCall(id);
+        if (call != null) {
+          if (future.cause() instanceof IOException) {
+            call.setFailed((IOException) future.cause());
+          } else {
+            call.setFailed(new IOException(future.cause()));
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
deleted file mode 100644
index 6b7dc5b..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannelImpl.java
+++ /dev/null
@@ -1,770 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.EventLoop;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.GenericFutureListener;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Locale;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.sasl.SaslException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.protobuf.generated.TracingProtos;
-import org.apache.hadoop.hbase.security.AuthMethod;
-import org.apache.hadoop.hbase.security.SaslClientHandler;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SecurityInfo;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.htrace.Span;
-import org.apache.htrace.Trace;
-
-/**
- * Netty RPC channel
- */
-@InterfaceAudience.Private
-public class AsyncRpcChannelImpl implements AsyncRpcChannel {
-  private static final Log LOG = LogFactory.getLog(AsyncRpcChannelImpl.class.getName());
-
-  private static final int MAX_SASL_RETRIES = 5;
-
-  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDDLERS
-    = new HashMap<>();
-
-  static {
-    TOKEN_HANDDLERS.put(AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN,
-      new AuthenticationTokenSelector());
-  }
-
-  final AsyncRpcClient client;
-
-  // Contains the channel to work with.
-  // Only exists when connected
-  private Channel channel;
-
-  String name;
-  final User ticket;
-  final String serviceName;
-  final InetSocketAddress address;
-
-  private int failureCounter = 0;
-
-  boolean useSasl;
-  AuthMethod authMethod;
-  private int reloginMaxBackoff;
-  private Token<? extends TokenIdentifier> token;
-  private String serverPrincipal;
-
-  // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
-  private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
-  private boolean connected = false;
-  private boolean closed = false;
-
-  private Timeout cleanupTimer;
-
-  private final TimerTask timeoutTask = new TimerTask() {
-    @Override
-    public void run(Timeout timeout) throws Exception {
-      cleanupCalls();
-    }
-  };
-
-  /**
-   * Constructor for netty RPC channel
-   * @param bootstrap to construct channel on
-   * @param client to connect with
-   * @param ticket of user which uses connection
-   * @param serviceName name of service to connect to
-   * @param address to connect to
-   */
-  public AsyncRpcChannelImpl(Bootstrap bootstrap, final AsyncRpcClient client, User ticket,
-      String serviceName, InetSocketAddress address) {
-    this.client = client;
-
-    this.ticket = ticket;
-    this.serviceName = serviceName;
-    this.address = address;
-
-    this.channel = connect(bootstrap).channel();
-
-    name = ("IPC Client (" + channel.hashCode() + ") to " + address.toString()
-        + ((ticket == null) ? " from unknown user" : (" from " + ticket.getName())));
-  }
-
-  /**
-   * Connect to channel
-   * @param bootstrap to connect to
-   * @return future of connection
-   */
-  private ChannelFuture connect(final Bootstrap bootstrap) {
-    return bootstrap.remoteAddress(address).connect()
-        .addListener(new GenericFutureListener<ChannelFuture>() {
-          @Override
-          public void operationComplete(final ChannelFuture f) throws Exception {
-            if (!f.isSuccess()) {
-              retryOrClose(bootstrap, failureCounter++, client.failureSleep, f.cause());
-              return;
-            }
-            channel = f.channel();
-
-            setupAuthorization();
-
-            ByteBuf b = channel.alloc().directBuffer(6);
-            createPreamble(b, authMethod);
-            channel.writeAndFlush(b).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-            if (useSasl) {
-              UserGroupInformation ticket = AsyncRpcChannelImpl.this.ticket.getUGI();
-              if (authMethod == AuthMethod.KERBEROS) {
-                if (ticket != null && ticket.getRealUser() != null) {
-                  ticket = ticket.getRealUser();
-                }
-              }
-              SaslClientHandler saslHandler;
-              if (ticket == null) {
-                throw new FatalConnectionException("ticket/user is null");
-              }
-              final UserGroupInformation realTicket = ticket;
-              saslHandler = ticket.doAs(new PrivilegedExceptionAction<SaslClientHandler>() {
-                @Override
-                public SaslClientHandler run() throws IOException {
-                  return getSaslHandler(realTicket, bootstrap);
-                }
-              });
-              if (saslHandler != null) {
-                // Sasl connect is successful. Let's set up Sasl channel handler
-                channel.pipeline().addFirst(saslHandler);
-              } else {
-                // fall back to simple auth because server told us so.
-                authMethod = AuthMethod.SIMPLE;
-                useSasl = false;
-              }
-            } else {
-              startHBaseConnection(f.channel());
-            }
-          }
-        });
-  }
-
-  /**
-   * Start HBase connection
-   * @param ch channel to start connection on
-   */
-  private void startHBaseConnection(Channel ch) {
-    ch.pipeline().addLast("frameDecoder",
-      new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    try {
-      writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            close(future.cause());
-            return;
-          }
-          List<AsyncCall> callsToWrite;
-          synchronized (pendingCalls) {
-            connected = true;
-            callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
-          }
-          for (AsyncCall call : callsToWrite) {
-            writeRequest(call);
-          }
-        }
-      });
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  private void startConnectionWithEncryption(Channel ch) {
-    // for rpc encryption, the order of ChannelInboundHandler should be:
-    // LengthFieldBasedFrameDecoder->SaslClientHandler->LengthFieldBasedFrameDecoder
-    // Don't skip the first 4 bytes for length in beforeUnwrapDecoder,
-    // SaslClientHandler will handler this
-    ch.pipeline().addFirst("beforeUnwrapDecoder",
-        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 0));
-    ch.pipeline().addLast("afterUnwrapDecoder",
-        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
-    ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-    List<AsyncCall> callsToWrite;
-    synchronized (pendingCalls) {
-      connected = true;
-      callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
-    }
-    for (AsyncCall call : callsToWrite) {
-      writeRequest(call);
-    }
-  }
-
-  /**
-   * Get SASL handler
-   * @param bootstrap to reconnect to
-   * @return new SASL handler
-   * @throws java.io.IOException if handler failed to create
-   */
-  private SaslClientHandler getSaslHandler(final UserGroupInformation realTicket,
-      final Bootstrap bootstrap) throws IOException {
-    return new SaslClientHandler(realTicket, authMethod, token, serverPrincipal,
-        client.fallbackAllowed,
-        client.conf.get("hbase.rpc.protection",
-          SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
-        getChannelHeaderBytes(authMethod),
-        new SaslClientHandler.SaslExceptionHandler() {
-          @Override
-          public void handle(int retryCount, Random random, Throwable cause) {
-            try {
-              // Handle Sasl failure. Try to potentially get new credentials
-              handleSaslConnectionFailure(retryCount, cause, realTicket);
-
-              retryOrClose(bootstrap, failureCounter++, random.nextInt(reloginMaxBackoff) + 1,
-                cause);
-            } catch (IOException | InterruptedException e) {
-              close(e);
-            }
-          }
-        }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
-          @Override
-          public void onSuccess(Channel channel) {
-            startHBaseConnection(channel);
-          }
-
-          @Override
-          public void onSaslProtectionSucess(Channel channel) {
-            startConnectionWithEncryption(channel);
-          }
-        });
-  }
-
-  /**
-   * Retry to connect or close
-   * @param bootstrap to connect with
-   * @param failureCount failure count
-   * @param e exception of fail
-   */
-  private void retryOrClose(final Bootstrap bootstrap, int failureCount, long timeout,
-      Throwable e) {
-    if (failureCount < client.maxRetries) {
-      client.newTimeout(new TimerTask() {
-        @Override
-        public void run(Timeout timeout) throws Exception {
-          connect(bootstrap);
-        }
-      }, timeout, TimeUnit.MILLISECONDS);
-    } else {
-      client.failedServers.addToFailedServers(address);
-      close(e);
-    }
-  }
-
-  /**
-   * Calls method on channel
-   * @param method to call
-   * @param request to send
-   * @param cellScanner with cells to send
-   * @param responsePrototype to construct response with
-   * @param rpcTimeout timeout for request
-   * @param priority for request
-   * @return Promise for the response Message
-   */
-  @Override
-  public <R extends Message, O> Future<O> callMethod(
-      final Descriptors.MethodDescriptor method,
-      final Message request,final CellScanner cellScanner,
-      R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
-      exceptionConverter, long rpcTimeout, int priority) {
-    final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
-        method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
-        rpcTimeout, priority, client.metrics);
-
-    synchronized (pendingCalls) {
-      if (closed) {
-        call.setFailure(new ConnectException());
-        return call;
-      }
-      pendingCalls.put(call.id, call);
-      // Add timeout for cleanup if none is present
-      if (cleanupTimer == null && call.getRpcTimeout() > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS);
-      }
-      if (!connected) {
-        return call;
-      }
-    }
-    writeRequest(call);
-    return call;
-  }
-
-  @Override
-  public EventLoop getEventExecutor() {
-    return this.channel.eventLoop();
-  }
-
-  AsyncCall removePendingCall(int id) {
-    synchronized (pendingCalls) {
-      return pendingCalls.remove(id);
-    }
-  }
-
-  /**
-   * Write the channel header
-   * @param channel to write to
-   * @return future of write
-   * @throws java.io.IOException on failure to write
-   */
-  private ChannelFuture writeChannelHeader(Channel channel) throws IOException {
-    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
-    int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header);
-    ByteBuf b = channel.alloc().directBuffer(totalSize);
-
-    b.writeInt(header.getSerializedSize());
-    b.writeBytes(header.toByteArray());
-
-    return channel.writeAndFlush(b);
-  }
-
-  private byte[] getChannelHeaderBytes(AuthMethod authMethod) {
-    RPCProtos.ConnectionHeader header = getChannelHeader(authMethod);
-    ByteBuffer b = ByteBuffer.allocate(header.getSerializedSize() + 4);
-    b.putInt(header.getSerializedSize());
-    b.put(header.toByteArray());
-    return b.array();
-  }
-
-  private RPCProtos.ConnectionHeader getChannelHeader(AuthMethod authMethod) {
-    RPCProtos.ConnectionHeader.Builder headerBuilder = RPCProtos.ConnectionHeader.newBuilder()
-        .setServiceName(serviceName);
-
-    RPCProtos.UserInformation userInfoPB = buildUserInfo(ticket.getUGI(), authMethod);
-    if (userInfoPB != null) {
-      headerBuilder.setUserInfo(userInfoPB);
-    }
-
-    if (client.codec != null) {
-      headerBuilder.setCellBlockCodecClass(client.codec.getClass().getCanonicalName());
-    }
-    if (client.compressor != null) {
-      headerBuilder.setCellBlockCompressorClass(client.compressor.getClass().getCanonicalName());
-    }
-
-    headerBuilder.setVersionInfo(ProtobufUtil.getVersionInfo());
-    return headerBuilder.build();
-  }
-
-  /**
-   * Write request to channel
-   * @param call to write
-   */
-  private void writeRequest(final AsyncCall call) {
-    try {
-      final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
-          .newBuilder();
-      requestHeaderBuilder.setCallId(call.id).setMethodName(call.method.getName())
-          .setRequestParam(call.param != null);
-
-      if (Trace.isTracing()) {
-        Span s = Trace.currentSpan();
-        requestHeaderBuilder.setTraceInfo(TracingProtos.RPCTInfo.newBuilder()
-            .setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
-      }
-
-      ByteBuffer cellBlock = client.buildCellBlock(call.cellScanner());
-      if (cellBlock != null) {
-        final RPCProtos.CellBlockMeta.Builder cellBlockBuilder = RPCProtos.CellBlockMeta
-            .newBuilder();
-        cellBlockBuilder.setLength(cellBlock.limit());
-        requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
-      }
-      // Only pass priority if there one. Let zero be same as no priority.
-      if (call.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
-        requestHeaderBuilder.setPriority(call.getPriority());
-      }
-      requestHeaderBuilder.setTimeout(call.rpcTimeout > Integer.MAX_VALUE ?
-          Integer.MAX_VALUE : (int)call.rpcTimeout);
-
-      RPCProtos.RequestHeader rh = requestHeaderBuilder.build();
-
-      int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(rh, call.param);
-      if (cellBlock != null) {
-        totalSize += cellBlock.remaining();
-      }
-
-      ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
-      try (ByteBufOutputStream out = new ByteBufOutputStream(b)) {
-        call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
-      }
-
-      channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
-    } catch (IOException e) {
-      close(e);
-    }
-  }
-
-  /**
-   * Set up server authorization
-   * @throws java.io.IOException if auth setup failed
-   */
-  private void setupAuthorization() throws IOException {
-    SecurityInfo securityInfo = SecurityInfo.getInfo(serviceName);
-    this.useSasl = client.userProvider.isHBaseSecurityEnabled();
-
-    this.token = null;
-    if (useSasl && securityInfo != null) {
-      AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();
-      if (tokenKind != null) {
-        TokenSelector<? extends TokenIdentifier> tokenSelector = TOKEN_HANDDLERS.get(tokenKind);
-        if (tokenSelector != null) {
-          token = tokenSelector.selectToken(new Text(client.clusterId),
-            ticket.getUGI().getTokens());
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("No token selector found for type " + tokenKind);
-        }
-      }
-      String serverKey = securityInfo.getServerPrincipal();
-      if (serverKey == null) {
-        throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");
-      }
-      this.serverPrincipal = SecurityUtil.getServerPrincipal(client.conf.get(serverKey),
-        address.getAddress().getCanonicalHostName().toLowerCase(Locale.ROOT));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RPC Server Kerberos principal name for service=" + serviceName + " is "
-            + serverPrincipal);
-      }
-    }
-
-    if (!useSasl) {
-      authMethod = AuthMethod.SIMPLE;
-    } else if (token != null) {
-      authMethod = AuthMethod.DIGEST;
-    } else {
-      authMethod = AuthMethod.KERBEROS;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-        "Use " + authMethod + " authentication for service " + serviceName + ", sasl=" + useSasl);
-    }
-    reloginMaxBackoff = client.conf.getInt("hbase.security.relogin.maxbackoff", 5000);
-  }
-
-  /**
-   * Build the user information
-   * @param ugi User Group Information
-   * @param authMethod Authorization method
-   * @return UserInformation protobuf
-   */
-  private RPCProtos.UserInformation buildUserInfo(UserGroupInformation ugi, AuthMethod authMethod) {
-    if (ugi == null || authMethod == AuthMethod.DIGEST) {
-      // Don't send user for token auth
-      return null;
-    }
-    RPCProtos.UserInformation.Builder userInfoPB = RPCProtos.UserInformation.newBuilder();
-    if (authMethod == AuthMethod.KERBEROS) {
-      // Send effective user for Kerberos auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-    } else if (authMethod == AuthMethod.SIMPLE) {
-      // Send both effective user and real user for simple auth
-      userInfoPB.setEffectiveUser(ugi.getUserName());
-      if (ugi.getRealUser() != null) {
-        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
-      }
-    }
-    return userInfoPB.build();
-  }
-
-  /**
-   * Create connection preamble
-   * @param byteBuf to write to
-   * @param authMethod to write
-   */
-  private void createPreamble(ByteBuf byteBuf, AuthMethod authMethod) {
-    byteBuf.writeBytes(HConstants.RPC_HEADER);
-    byteBuf.writeByte(HConstants.RPC_CURRENT_VERSION);
-    byteBuf.writeByte(authMethod.code);
-  }
-
-  private void close0(Throwable e) {
-    List<AsyncCall> toCleanup;
-    synchronized (pendingCalls) {
-      if (closed) {
-        return;
-      }
-      closed = true;
-      toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
-      pendingCalls.clear();
-    }
-    IOException closeException = null;
-    if (e != null) {
-      if (e instanceof IOException) {
-        closeException = (IOException) e;
-      } else {
-        closeException = new IOException(e);
-      }
-    }
-    // log the info
-    if (LOG.isDebugEnabled() && closeException != null) {
-      LOG.debug(name + ": closing ipc connection to " + address, closeException);
-    }
-    if (cleanupTimer != null) {
-      cleanupTimer.cancel();
-      cleanupTimer = null;
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(closeException != null ? closeException
-          : new ConnectionClosingException(
-              "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
-    }
-    channel.disconnect().addListener(ChannelFutureListener.CLOSE);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(name + ": closed");
-    }
-  }
-
-  /**
-   * Close connection
-   * @param e exception on close
-   */
-  public void close(final Throwable e) {
-    client.removeConnection(this);
-
-    // Move closing from the requesting thread to the channel thread
-    if (channel.eventLoop().inEventLoop()) {
-      close0(e);
-    } else {
-      channel.eventLoop().execute(new Runnable() {
-        @Override
-        public void run() {
-          close0(e);
-        }
-      });
-    }
-  }
-
-  /**
-   * Clean up calls.
-   */
-  private void cleanupCalls() {
-    List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
-    long currentTime = EnvironmentEdgeManager.currentTime();
-    long nextCleanupTaskDelay = -1L;
-    synchronized (pendingCalls) {
-      for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
-        AsyncCall call = iter.next();
-        long timeout = call.getRpcTimeout();
-        if (timeout > 0) {
-          if (currentTime - call.getStartTime() >= timeout) {
-            iter.remove();
-            toCleanup.add(call);
-          } else {
-            if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
-              nextCleanupTaskDelay = timeout;
-            }
-          }
-        }
-      }
-      if (nextCleanupTaskDelay > 0) {
-        cleanupTimer = client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS);
-      } else {
-        cleanupTimer = null;
-      }
-    }
-    for (AsyncCall call : toCleanup) {
-      call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
-          + (currentTime - call.getStartTime()) + ", rpcTimeout=" + call.getRpcTimeout()));
-    }
-  }
-
-  /**
-   * Check if the connection is alive
-   * @return true if alive
-   */
-  public boolean isAlive() {
-    return channel.isOpen();
-  }
-
-  @Override
-  public InetSocketAddress getAddress() {
-    return this.address;
-  }
-
-  /**
-   * Check if user should authenticate over Kerberos
-   * @return true if should be authenticated over Kerberos
-   * @throws java.io.IOException on failure of check
-   */
-  private synchronized boolean shouldAuthenticateOverKrb() throws IOException {
-    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-    UserGroupInformation realUser = currentUser.getRealUser();
-    return authMethod == AuthMethod.KERBEROS && loginUser != null &&
-      // Make sure user logged in using Kerberos either keytab or TGT
-      loginUser.hasKerberosCredentials() &&
-      // relogin only in case it is the login user (e.g. JT)
-      // or superuser (like oozie).
-      (loginUser.equals(currentUser) || loginUser.equals(realUser));
-  }
-
-  /**
-   * If multiple clients with the same principal try to connect to the same server at the same time,
-   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
-   * work around this, what is done is that the client backs off randomly and tries to initiate the
-   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
-   * attempted.
-   * <p>
-   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
-   * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
-   * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
-   * underlying authentication implementation, so there is no retry from other high level (for eg,
-   * HCM or HBaseAdmin).
-   * </p>
-   * @param currRetries retry count
-   * @param ex exception describing fail
-   * @param user which is trying to connect
-   * @throws java.io.IOException if IO fail
-   * @throws InterruptedException if thread is interrupted
-   */
-  private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
-      final UserGroupInformation user) throws IOException, InterruptedException {
-    user.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws IOException, InterruptedException {
-        if (shouldAuthenticateOverKrb()) {
-          if (currRetries < MAX_SASL_RETRIES) {
-            LOG.debug("Exception encountered while connecting to the server : " + ex);
-            // try re-login
-            if (UserGroupInformation.isLoginKeytabBased()) {
-              UserGroupInformation.getLoginUser().reloginFromKeytab();
-            } else {
-              UserGroupInformation.getLoginUser().reloginFromTicketCache();
-            }
-
-            // Should reconnect
-            return null;
-          } else {
-            String msg = "Couldn't setup connection for "
-                + UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
-            LOG.warn(msg, ex);
-            throw (IOException) new IOException(msg).initCause(ex);
-          }
-        } else {
-          LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
-        }
-        if (ex instanceof RemoteException) {
-          throw (RemoteException) ex;
-        }
-        if (ex instanceof SaslException) {
-          String msg = "SASL authentication failed."
-              + " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
-          LOG.fatal(msg, ex);
-          throw new RuntimeException(msg, ex);
-        }
-        throw new IOException(ex);
-      }
-    });
-  }
-
-  public int getConnectionHashCode() {
-    return ConnectionId.hashCode(ticket, serviceName, address);
-  }
-
-  @Override
-  public int hashCode() {
-    return getConnectionHashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof AsyncRpcChannelImpl) {
-      AsyncRpcChannelImpl channel = (AsyncRpcChannelImpl) obj;
-      return channel.hashCode() == obj.hashCode();
-    }
-    return false;
-  }
-
-  @Override
-  public String toString() {
-    return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
-  }
-
-  /**
-   * Listens to call writes and fails if write failed
-   */
-  private static final class CallWriteListener implements ChannelFutureListener {
-    private final AsyncRpcChannelImpl rpcChannel;
-    private final int id;
-
-    public CallWriteListener(AsyncRpcChannelImpl asyncRpcChannelImpl, int id) {
-      this.rpcChannel = asyncRpcChannelImpl;
-      this.id = id;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (!future.isSuccess()) {
-        AsyncCall call = rpcChannel.removePendingCall(id);
-        if (call != null) {
-          if (future.cause() instanceof IOException) {
-            call.setFailed((IOException) future.cause());
-          } else {
-            call.setFailed(new IOException(future.cause()));
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 723a234..3d343b4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -23,11 +23,11 @@ import com.google.protobuf.Message;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcChannel;
 import com.google.protobuf.RpcController;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
@@ -37,6 +37,9 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -49,16 +52,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.client.ResponseFutureListener;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.JVM;
 import org.apache.hadoop.hbase.util.Pair;
@@ -240,7 +240,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
     }
     final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
 
-    final Future<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
+    final Promise<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
         getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(),
         pcrc.getPriority());
 
@@ -290,8 +290,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
     try {
       connection = createRpcChannel(md.getService().getName(), addr, ticket);
 
-      ResponseFutureListener<Message> listener =
-        new ResponseFutureListener<Message>() {
+      FutureListener<Message> listener =
+        new FutureListener<Message>() {
           @Override
           public void operationComplete(Future<Message> future) throws Exception {
             if (!future.isSuccess()) {
@@ -351,11 +351,6 @@ public class AsyncRpcClient extends AbstractRpcClient {
     }
   }
 
-  @Override
-  public EventLoop getEventExecutor() {
-    return this.bootstrap.config().group().next();
-  }
-
   /**
    * Create a cell scanner
    *
@@ -378,13 +373,6 @@ public class AsyncRpcClient extends AbstractRpcClient {
     return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
   }
 
-  @Override
-  public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
-      throws StoppedRpcClientException, FailedServerException {
-    return this.createRpcChannel(serviceName,
-        new InetSocketAddress(sn.getHostname(), sn.getPort()), user);
-  }
-
   /**
    * Creates an RPC client
    *
@@ -420,7 +408,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
         connections.remove(hashCode);
       }
       if (rpcChannel == null) {
-        rpcChannel = new AsyncRpcChannelImpl(this.bootstrap, this, ticket, serviceName, location);
+        rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
         connections.put(hashCode, rpcChannel);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index 6fcca34..7a2802f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -37,13 +37,13 @@ import org.apache.hadoop.ipc.RemoteException;
  */
 @InterfaceAudience.Private
 public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
-  private final AsyncRpcChannelImpl channel;
+  private final AsyncRpcChannel channel;
 
   /**
    * Constructor
    * @param channel on which this response handler operates
    */
-  public AsyncServerResponseHandler(AsyncRpcChannelImpl channel) {
+  public AsyncServerResponseHandler(AsyncRpcChannel channel) {
     this.channel = channel;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
deleted file mode 100644
index 0d05db8..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Promise.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import io.netty.util.concurrent.DefaultPromise;
-import io.netty.util.concurrent.EventExecutor;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
-
-/**
- * Abstract response promise
- * @param <T> Type of result contained in Promise
- */
-@InterfaceAudience.Private
-public class Promise<T> extends DefaultPromise<T> implements Future<T> {
-  /**
-   * Constructor
-   * @param eventLoop to handle events on
-   */
-  public Promise(EventExecutor eventLoop) {
-    super(eventLoop);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index 9d05c21..a8ec628 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.RpcChannel;
-import io.netty.util.concurrent.EventExecutor;
+
 import java.io.Closeable;
 import java.io.IOException;
 
@@ -70,18 +70,6 @@ import org.apache.hadoop.hbase.security.User;
       throws IOException;
 
   /**
-   * Create or fetch AsyncRpcChannel
-   * @param serviceName to connect to
-   * @param sn ServerName of the channel to create
-   * @param user for the service
-   * @return An async RPC channel fitting given parameters
-   * @throws FailedServerException if server failed
-   * @throws StoppedRpcClientException if the RPC client has stopped
-   */
-  AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
-      throws StoppedRpcClientException, FailedServerException;
-
-  /**
    * Creates a "channel" that can be used by a protobuf service.  Useful setting up
    * protobuf stubs.
    *
@@ -116,10 +104,4 @@ import org.apache.hadoop.hbase.security.User;
    *         supports cell blocks.
    */
   boolean hasCellBlockSupport();
-
-  /**
-   * Get an event loop to operate on
-   * @return EventLoop
-   */
-  EventExecutor getEventExecutor();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index dc05af1..37b9afd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -23,8 +23,6 @@ import com.google.protobuf.Message;
 import com.google.protobuf.Message.Builder;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
-import io.netty.util.concurrent.EventExecutor;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -55,6 +53,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.net.SocketFactory;
 import javax.security.sasl.SaslException;
 
@@ -66,7 +65,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
@@ -1219,11 +1217,6 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
   }
 
-  @Override
-  public EventExecutor getEventExecutor() {
-    return AsyncRpcClient.getGlobalEventLoopGroup(this.conf).getFirst().next();
-  }
-
   /**
    * Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code> which is servicing the <code>protocol</code> protocol,
@@ -1336,14 +1329,8 @@ public class RpcClientImpl extends AbstractRpcClient {
   }
 
   @Override
-  public org.apache.hadoop.hbase.ipc.AsyncRpcChannel createRpcChannel(String serviceName,
-      ServerName sn, User user) throws StoppedRpcClientException, FailedServerException {
-    return new AsyncRpcChannel(sn, user);
-  }
-
-  @Override
   public RpcChannel createProtobufRpcChannel(ServerName sn, User user, int rpcTimeout) {
-    return new RpcChannelImplementation(sn, user, rpcTimeout);
+    throw new UnsupportedOperationException();
   }
 
   /**
@@ -1392,143 +1379,4 @@ public class RpcClientImpl extends AbstractRpcClient {
 
     return connection;
   }
-
-  /**
-   * Simulated async call
-   */
-  private class RpcChannelImplementation implements RpcChannel {
-    private final InetSocketAddress isa;
-    private final User ticket;
-    private final int channelOperationTimeout;
-    private final EventExecutor executor;
-
-    /**
-     * @param channelOperationTimeout - the default timeout when no timeout is given
-     */
-    protected RpcChannelImplementation(
-        final ServerName sn, final User ticket, int channelOperationTimeout) {
-      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
-      this.ticket = ticket;
-      this.channelOperationTimeout = channelOperationTimeout;
-
-      this.executor = RpcClientImpl.this.getEventExecutor();
-    }
-
-    @Override
-    public void callMethod(final MethodDescriptor method, RpcController controller,
-        final Message request, final Message responsePrototype, final RpcCallback<Message> done) {
-      final PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController(
-          controller,
-          channelOperationTimeout);
-
-      executor.execute(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
-            cs.setStartTime(EnvironmentEdgeManager.currentTime());
-            Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
-            cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
-            if (metrics != null) {
-              metrics.updateRpc(method, request, cs);
-            }
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
-            }
-
-            done.run(call.response);
-          } catch (IOException e) {
-            pcrc.setFailed(e);
-          } catch (InterruptedException e) {
-            pcrc.startCancel();
-          }
-        }
-      });
-    }
-  }
-
-  /**
-   * Wraps the call in an async channel.
-   */
-  private class AsyncRpcChannel implements org.apache.hadoop.hbase.ipc.AsyncRpcChannel {
-    private final EventExecutor executor;
-    private final InetSocketAddress isa;
-
-    private final User ticket;
-
-    /**
-     * Constructor
-     * @param sn servername to connect to
-     * @param user to connect with
-     */
-    public AsyncRpcChannel(ServerName sn, User user) {
-      this.isa = new InetSocketAddress(sn.getHostname(), sn.getPort());
-      this.executor = RpcClientImpl.this.getEventExecutor();
-      this.ticket = user;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <R extends Message, O> Future<O> callMethod(final MethodDescriptor method,
-        final Message request, CellScanner cellScanner, final R responsePrototype,
-        final MessageConverter<R, O> messageConverter,
-        final IOExceptionConverter exceptionConverter, long rpcTimeout, int priority) {
-      final PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(cellScanner);
-      pcrc.setPriority(priority);
-      pcrc.setCallTimeout((int) rpcTimeout);
-
-      final Promise<O> promise = new Promise<>(executor);
-
-      executor.execute(new Runnable() {
-        @Override
-        public void run() {
-          try {
-            final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
-            cs.setStartTime(EnvironmentEdgeManager.currentTime());
-            Call call = call(method, request, responsePrototype, pcrc, ticket, isa, cs);
-            cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
-            if (metrics != null) {
-              metrics.updateRpc(method, request, cs);
-            }
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Call: " + method.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
-            }
-
-            promise.setSuccess(
-                messageConverter.convert((R) call.response, call.cells)
-            );
-          } catch (InterruptedException e) {
-            promise.cancel(true);
-          } catch (IOException e) {
-            if(exceptionConverter != null) {
-              e = exceptionConverter.convert(e);
-            }
-            promise.setFailure(e);
-          }
-        }
-      });
-
-      return promise;
-    }
-
-    @Override
-    public EventExecutor getEventExecutor() {
-      return this.executor;
-    }
-
-    @Override
-    public void close(Throwable cause) {
-      this.executor.shutdownGracefully();
-    }
-
-    @Override
-    public boolean isAlive() {
-      return !this.executor.isShuttingDown() && !this.executor.isShutdown();
-    }
-
-    @Override
-    public InetSocketAddress getAddress() {
-      return isa;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f760ff/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 45cec78..4cfa25c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.ipc;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyObject;
@@ -32,9 +31,9 @@ import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetAddress;
@@ -42,9 +41,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -56,8 +53,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.Future;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
@@ -414,141 +409,4 @@ public abstract class AbstractTestIPC {
         .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
         .getCause() instanceof CallTimeoutException);
   }
-
-  @Test
-  public void testAsyncProtobufConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    try (RpcClient client = createRpcClient(CONF)) {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
-      RpcChannel channel = client.createProtobufRpcChannel(
-          ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
-          User.getCurrent(), 0);
-
-      final AtomicBoolean done = new AtomicBoolean(false);
-
-      channel
-          .callMethod(md, new PayloadCarryingRpcController(), param, md.getOutputType().toProto(),
-              new com.google.protobuf.RpcCallback<Message>() {
-                @Override
-                public void run(Message parameter) {
-                  done.set(true);
-                }
-              });
-
-      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return done.get();
-        }
-      });
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
-  @Test
-  public void testRTEDuringAsyncProtobufConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
-      rpcServer.start();
-      InetSocketAddress address = rpcServer.getListenerAddress();
-      if (address == null) {
-        throw new IOException("Listener channel is closed");
-      }
-      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
-      RpcChannel channel = client.createProtobufRpcChannel(
-          ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
-          User.getCurrent(), 0);
-
-      final AtomicBoolean done = new AtomicBoolean(false);
-
-      PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
-      controller.notifyOnFail(new com.google.protobuf.RpcCallback<IOException>() {
-        @Override
-        public void run(IOException e) {
-          done.set(true);
-          LOG.info("Caught expected exception: " + e.toString());
-          assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
-        }
-      });
-
-      channel.callMethod(md, controller, param, md.getOutputType().toProto(),
-          new com.google.protobuf.RpcCallback<Message>() {
-            @Override
-            public void run(Message parameter) {
-              done.set(true);
-              fail("Expected an exception to have been thrown!");
-            }
-          });
-
-      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return done.get();
-        }
-      });
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
-  @Test
-  public void testAsyncConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    try (RpcClient client = createRpcClient(CONF)) {
-      rpcServer.start();
-      Message msg = setupAsyncConnection(rpcServer, client);
-
-      assertNotNull(msg);
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
-  @Test
-  public void testRTEDuringAsyncConnectionSetup() throws Exception {
-    TestRpcServer rpcServer = new TestRpcServer();
-    try (RpcClient client = createRpcClientRTEDuringConnectionSetup(CONF)) {
-      rpcServer.start();
-      setupAsyncConnection(rpcServer, client);
-
-      fail("Expected an exception to have been thrown!");
-    } catch (ExecutionException e) {
-      assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
-    } finally {
-      rpcServer.stop();
-    }
-  }
-
-  private Message setupAsyncConnection(TestRpcServer rpcServer, RpcClient client)
-      throws IOException, InterruptedException, ExecutionException,
-      java.util.concurrent.TimeoutException {
-    InetSocketAddress address = rpcServer.getListenerAddress();
-    if (address == null) {
-      throw new IOException("Listener channel is closed");
-    }
-    MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
-    EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-
-    ServerName serverName =
-        ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis());
-
-    AsyncRpcChannel channel =
-        client.createRpcChannel(md.getService().getName(), serverName, User.getCurrent());
-
-    final Future<Message> f = channel
-        .callMethod(md, param, null, md.getOutputType().toProto(), MessageConverter.NO_CONVERTER,
-            null, 1000, HConstants.NORMAL_QOS);
-
-    return f.get(1, TimeUnit.SECONDS);
-  }
 }


Mime
View raw message