hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [4/6] hbase git commit: HBASE-16445 Refactor and reimplement RpcClient
Date Thu, 08 Sep 2016 12:33:20 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index c18bd7e..f66350b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -30,15 +30,19 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.TracingProtos.RPCTInfo;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * Utility to help ipc'ing.
  */
 @InterfaceAudience.Private
-public class IPCUtil {
+class IPCUtil {
 
   /**
    * Write out header, param, and cell block if there is one.
@@ -93,18 +97,32 @@ public class IPCUtil {
     return totalSize;
   }
 
-  /**
-   * @return True if the exception is a fatal connection exception.
-   */
-  public static boolean isFatalConnectionException(final ExceptionResponse e) {
-    return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
+  static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
+    RequestHeader.Builder builder = RequestHeader.newBuilder();
+    builder.setCallId(call.id);
+    if (call.span != null) {
+      builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
+          .setTraceId(call.span.getTraceId()));
+    }
+    builder.setMethodName(call.md.getName());
+    builder.setRequestParam(call.param != null);
+    if (cellBlockMeta != null) {
+      builder.setCellBlockMeta(cellBlockMeta);
+    }
+    // Only pass priority if there is one set.
+    if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+      builder.setPriority(call.priority);
+    }
+    builder.setTimeout(call.timeout);
+
+    return builder.build();
   }
 
   /**
    * @param e exception to be wrapped
    * @return RemoteException made from passed <code>e</code>
    */
-  public static RemoteException createRemoteException(final ExceptionResponse e) {
+  static RemoteException createRemoteException(final ExceptionResponse e) {
     String innerExceptionClassName = e.getExceptionClassName();
     boolean doNotRetry = e.getDoNotRetry();
     return e.hasHostname() ?
@@ -115,6 +133,21 @@ public class IPCUtil {
   }
 
   /**
+   * @return True if the exception is a fatal connection exception.
+   */
+  static boolean isFatalConnectionException(final ExceptionResponse e) {
+    return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
+  }
+
+  static IOException toIOE(Throwable t) {
+    if (t instanceof IOException) {
+      return (IOException) t;
+    } else {
+      return new IOException(t);
+    }
+  }
+
+  /**
    * Takes an Exception and the address we were trying to connect to and return an IOException
with
    * the input exception as the cause. The new exception provides the stack trace of the
place where
    * the exception is thrown and some extra diagnostics information. If the exception is
@@ -124,7 +157,7 @@ public class IPCUtil {
    * @param exception the relevant exception
    * @return an exception to throw
    */
-  public static IOException wrapException(InetSocketAddress addr, Exception exception) {
+  static IOException wrapException(InetSocketAddress addr, Exception exception) {
     if (exception instanceof ConnectException) {
       // connection refused; include the host:port in the error
       return (ConnectException) new ConnectException(
@@ -140,4 +173,10 @@ public class IPCUtil {
           "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
     }
   }
+
+  static void setCancelled(Call call) {
+    call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime="
+        + (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimetout="
+        + call.timeout));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
deleted file mode 100644
index a85225a..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MessageConverter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import com.google.protobuf.Message;
-import java.io.IOException;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Interface to convert Messages to specific types
- * @param <M> Message Type to convert
- * @param <O> Output Type
- */
-@InterfaceAudience.Private
-public interface MessageConverter<M,O> {
-  /**
-   * Converts Message to Output
-   * @param msg to convert
-   * @param cellScanner to use for conversion
-   * @return Output
-   * @throws IOException if message could not be converted to response
-   */
-  O convert(M msg, CellScanner cellScanner) throws IOException;
-
-  MessageConverter<Message,Message> NO_CONVERTER = new MessageConverter<Message,
Message>() {
-    @Override
-    public Message convert(Message msg, CellScanner cellScanner) throws IOException {
-      return msg;
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
new file mode 100644
index 0000000..8c568af
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Netty client for the requests and responses.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
+
+  final EventLoopGroup group;
+
+  final Class<? extends Channel> channelClass;
+
+  private final boolean shutdownGroupWhenClose;
+
+  public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
+      MetricsConnection metrics) {
+    super(configuration, clusterId, localAddress, metrics);
+    Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass = NettyRpcClientConfigHelper
+        .getEventLoopConfig(conf);
+    if (groupAndChannelClass == null) {
+      // Use our own EventLoopGroup.
+      this.group = new NioEventLoopGroup();
+      this.channelClass = NioSocketChannel.class;
+      this.shutdownGroupWhenClose = true;
+    } else {
+      this.group = groupAndChannelClass.getFirst();
+      this.channelClass = groupAndChannelClass.getSecond();
+      this.shutdownGroupWhenClose = false;
+    }
+  }
+
+  /** Used in test only. */
+  NettyRpcClient(Configuration configuration) {
+    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
+  }
+
+  @Override
+  protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException
{
+    return new NettyRpcConnection(this, remoteId);
+  }
+
+  @Override
+  protected void closeInternal() {
+    if (shutdownGroupWhenClose) {
+      group.shutdownGracefully();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
new file mode 100644
index 0000000..a8af69c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClientConfigHelper.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * Helper class for passing config to {@link NettyRpcClient}.
+ * <p>
+ * As hadoop Configuration can not pass an Object directly, we need to find a way to pass
the
+ * EventLoopGroup to {@code AsyncRpcClient} if we want to use a single {@code EventLoopGroup}
for
+ * the whole process.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class NettyRpcClientConfigHelper {
+
+  public static final String EVENT_LOOP_CONFIG = "hbase.rpc.client.event-loop.config";
+
+  private static final String CONFIG_NAME = "global-event-loop";
+
+  private static final Map<String, Pair<EventLoopGroup, Class<? extends Channel>>>
+    EVENT_LOOP_CONFIG_MAP = new HashMap<>();
+
+  /**
+   * Set the EventLoopGroup and channel class for {@code AsyncRpcClient}.
+   */
+  public static void setEventLoopConfig(Configuration conf, EventLoopGroup group,
+      Class<? extends Channel> channelClass) {
+    Preconditions.checkNotNull(group, "group is null");
+    Preconditions.checkNotNull(channelClass, "channel class is null");
+    conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME);
+    EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME,
+      Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, channelClass));
+  }
+
+  /**
+   * The {@code AsyncRpcClient} will create its own {@code NioEventLoopGroup}.
+   */
+  public static void createEventLoopPerClient(Configuration conf) {
+    conf.set(EVENT_LOOP_CONFIG, "");
+    EVENT_LOOP_CONFIG_MAP.clear();
+  }
+
+  static Pair<EventLoopGroup, Class<? extends Channel>> getEventLoopConfig(Configuration
conf) {
+    String name = conf.get(EVENT_LOOP_CONFIG);
+    if (name == null) {
+      return DefaultNettyEventLoopConfig.GROUP_AND_CHANNEL_CLASS;
+    }
+    if (StringUtils.isBlank(name)) {
+      return null;
+    }
+    return EVENT_LOOP_CONFIG_MAP.get(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
new file mode 100644
index 0000000..5f22dfd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.CallEvent.Type.CANCELLED;
+import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
+
+import com.google.protobuf.RpcCallback;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.security.AsyncHBaseSaslRpcClientHandler;
+import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
+import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * RPC connection implementation based on netty.
+ * <p>
+ * Most operations are executed in handlers. Netty handler is always executed in the same
+ * thread(EventLoop) so no lock is needed.
+ */
+@InterfaceAudience.Private
+class NettyRpcConnection extends RpcConnection {
+
+  private static final Log LOG = LogFactory.getLog(NettyRpcConnection.class);
+
+  private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
+      .newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin"));
+
+  private final NettyRpcClient rpcClient;
+
+  private ByteBuf connectionHeaderPreamble;
+
+  private ByteBuf connectionHeaderWithLength;
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+      justification = "connect is also under lock as notifyOnCancel will call our action
directly")
+  private Channel channel;
+
+  NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException
{
+    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
+        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
+    this.rpcClient = rpcClient;
+    byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
+    this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length)
+        .writeBytes(connectionHeaderPreamble);
+    ConnectionHeader header = getConnectionHeader();
+    this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
+    this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
+    header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
+  }
+
+  @Override
+  protected synchronized void callTimeout(Call call) {
+    if (channel != null) {
+      channel.pipeline().fireUserEventTriggered(new CallEvent(TIMEOUT, call));
+    }
+  }
+
+  @Override
+  public synchronized boolean isActive() {
+    return channel != null;
+  }
+
+  private void shutdown0() {
+    if (channel != null) {
+      channel.close();
+      channel = null;
+    }
+  }
+
+  @Override
+  public synchronized void shutdown() {
+    shutdown0();
+  }
+
+  private void established(Channel ch) {
+    ch.write(connectionHeaderWithLength.retainedDuplicate());
+    ChannelPipeline p = ch.pipeline();
+    String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
+    p.addBefore(addBeforeHandler, null,
+      new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
+    p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
0, 4));
+    p.addBefore(addBeforeHandler, null,
+      new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
+    p.fireUserEventTriggered(BufferCallEvent.success());
+  }
+
+  private boolean reloginInProgress;
+
+  private void scheduleRelogin(Throwable error) {
+    if (error instanceof FallbackDisallowedException) {
+      return;
+    }
+    synchronized (this) {
+      if (reloginInProgress) {
+        return;
+      }
+      reloginInProgress = true;
+      RELOGIN_EXECUTOR.schedule(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            if (shouldAuthenticateOverKrb()) {
+              relogin();
+            }
+          } catch (IOException e) {
+            LOG.warn("relogin failed", e);
+          }
+          synchronized (this) {
+            reloginInProgress = false;
+          }
+        }
+      }, ThreadLocalRandom.current().nextInt(reloginMaxBackoff), TimeUnit.MILLISECONDS);
+    }
+  }
+
+  private void failInit(Channel ch, IOException e) {
+    synchronized (this) {
+      // fail all pending calls
+      ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
+      shutdown0();
+      return;
+    }
+  }
+
+  private void saslNegotiate(final Channel ch) {
+    UserGroupInformation ticket = getUGI();
+    if (ticket == null) {
+      failInit(ch, new FatalConnectionException("ticket/user is null"));
+      return;
+    }
+    Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
+    ChannelHandler saslHandler;
+    try {
+      saslHandler = new AsyncHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
+          serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get(
+            "hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
+    } catch (IOException e) {
+      failInit(ch, e);
+      return;
+    }
+    ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
+    saslPromise.addListener(new FutureListener<Boolean>() {
+
+      @Override
+      public void operationComplete(Future<Boolean> future) throws Exception {
+        if (future.isSuccess()) {
+          ChannelPipeline p = ch.pipeline();
+          p.remove(SaslChallengeDecoder.class);
+          p.remove(AsyncHBaseSaslRpcClientHandler.class);
+          established(ch);
+        } else {
+          final Throwable error = future.cause();
+          scheduleRelogin(error);
+          failInit(ch, toIOE(error));
+        }
+      }
+    });
+  }
+
+  private void connect() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to " + remoteId.address);
+    }
+
+    this.channel = new Bootstrap().group(rpcClient.group).channel(rpcClient.channelClass)
+        .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
+        .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
+        .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
+        .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener()
{
+
+          @Override
+          public void operationComplete(ChannelFuture future) throws Exception {
+            Channel ch = future.channel();
+            if (!future.isSuccess()) {
+              failInit(ch, toIOE(future.cause()));
+              rpcClient.failedServers.addToFailedServers(remoteId.address);
+              return;
+            }
+            ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
+            if (useSasl) {
+              saslNegotiate(ch);
+            } else {
+              established(ch);
+            }
+          }
+        }).channel();
+  }
+
+  @Override
+  public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
+      throws IOException {
+    if (reloginInProgress) {
+      throw new IOException("Can not send request because relogin is in progress.");
+    }
+    pcrc.notifyOnCancel(new RpcCallback<Object>() {
+
+      @Override
+      public void run(Object parameter) {
+        setCancelled(call);
+        synchronized (this) {
+          if (channel != null) {
+            channel.pipeline().fireUserEventTriggered(new CallEvent(CANCELLED, call));
+          }
+        }
+      }
+    }, new CancellationCallback() {
+
+      @Override
+      public void run(boolean cancelled) throws IOException {
+        if (cancelled) {
+          setCancelled(call);
+        } else {
+          if (channel == null) {
+            connect();
+          }
+          scheduleTimeoutTask(call);
+          channel.writeAndFlush(call).addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+              // Fail the call if we failed to write it out. This usually because the channel
is
+              // closed. This is needed because we may shutdown the channel inside event
loop and
+              // there may still be some pending calls in the event loop queue after us.
+              if (!future.isSuccess()) {
+                call.setException(toIOE(future.cause()));
+              }
+            }
+          });
+        }
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
new file mode 100644
index 0000000..1cd89d8
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.TextFormat;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.concurrent.PromiseCombiner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * The netty rpc handler.
+ */
+@InterfaceAudience.Private
+class NettyRpcDuplexHandler extends ChannelDuplexHandler {
+
+  private static final Log LOG = LogFactory.getLog(NettyRpcDuplexHandler.class);
+
+  private final NettyRpcConnection conn;
+
+  private final CellBlockBuilder cellBlockBuilder;
+
+  private final Codec codec;
+
+  private final CompressionCodec compressor;
+
+  private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();
+
+  public NettyRpcDuplexHandler(NettyRpcConnection conn, CellBlockBuilder cellBlockBuilder,
+      Codec codec, CompressionCodec compressor) {
+    this.conn = conn;
+    this.cellBlockBuilder = cellBlockBuilder;
+    this.codec = codec;
+    this.compressor = compressor;
+
+  }
+
+  private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise promise)
+      throws IOException {
+    id2Call.put(call.id, call);
+    ByteBuf cellBlock = cellBlockBuilder.buildCellBlock(codec, compressor, call.cells, ctx.alloc());
+    CellBlockMeta cellBlockMeta;
+    if (cellBlock != null) {
+      CellBlockMeta.Builder cellBlockMetaBuilder = CellBlockMeta.newBuilder();
+      cellBlockMetaBuilder.setLength(cellBlock.writerIndex());
+      cellBlockMeta = cellBlockMetaBuilder.build();
+    } else {
+      cellBlockMeta = null;
+    }
+    RequestHeader requestHeader = IPCUtil.buildRequestHeader(call, cellBlockMeta);
+    int sizeWithoutCellBlock = IPCUtil.getTotalSizeWhenWrittenDelimited(requestHeader, call.param);
+    int totalSize = cellBlock != null ? sizeWithoutCellBlock + cellBlock.writerIndex()
+        : sizeWithoutCellBlock;
+    ByteBuf buf = ctx.alloc().buffer(sizeWithoutCellBlock + 4);
+    buf.writeInt(totalSize);
+    ByteBufOutputStream bbos = new ByteBufOutputStream(buf);
+    requestHeader.writeDelimitedTo(bbos);
+    if (call.param != null) {
+      call.param.writeDelimitedTo(bbos);
+    }
+    if (cellBlock != null) {
+      ChannelPromise withoutCellBlockPromise = ctx.newPromise();
+      ctx.write(buf, withoutCellBlockPromise);
+      ChannelPromise cellBlockPromise = ctx.newPromise();
+      ctx.write(cellBlock, cellBlockPromise);
+      PromiseCombiner combiner = new PromiseCombiner();
+      combiner.addAll(withoutCellBlockPromise, cellBlockPromise);
+      combiner.finish(promise);
+    } else {
+      ctx.write(buf, promise);
+    }
+  }
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+      throws Exception {
+    if (msg instanceof Call) {
+      writeRequest(ctx, (Call) msg, promise);
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException {
+    int totalSize = buf.readInt();
+    ByteBufInputStream in = new ByteBufInputStream(buf);
+    ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+    int id = responseHeader.getCallId();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader)
+          + ", totalSize: " + totalSize + " bytes");
+    }
+    RemoteException remoteExc;
+    if (responseHeader.hasException()) {
+      ExceptionResponse exceptionResponse = responseHeader.getException();
+      remoteExc = IPCUtil.createRemoteException(exceptionResponse);
+      if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
+        // Here we will cleanup all calls so do not need to fall back, just return.
+        exceptionCaught(ctx, remoteExc);
+        return;
+      }
+    } else {
+      remoteExc = null;
+    }
+    Call call = id2Call.remove(id);
+    if (call == null) {
+      // So we got a response for which we have no corresponding 'call' here on the client-side.
+      // We probably timed out waiting, cleaned up all references, and now the server decides
+      // to return a response. There is nothing we can do w/ the response at this stage.
Clean
+      // out the wire of the response so its out of the way and we can get other responses
on
+      // this connection.
+      int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
+      int whatIsLeftToRead = totalSize - readSoFar;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead
+            + " bytes");
+      }
+      return;
+    }
+    if (remoteExc != null) {
+      call.setException(remoteExc);
+      return;
+    }
+    Message value;
+    if (call.responseDefaultType != null) {
+      Builder builder = call.responseDefaultType.newBuilderForType();
+      builder.mergeDelimitedFrom(in);
+      value = builder.build();
+    } else {
+      value = null;
+    }
+    CellScanner cellBlockScanner;
+    if (responseHeader.hasCellBlockMeta()) {
+      int size = responseHeader.getCellBlockMeta().getLength();
+      // Maybe we could read directly from the ByteBuf.
+      // The problem here is that we do not know when to release it.
+      byte[] cellBlock = new byte[size];
+      buf.readBytes(cellBlock);
+      cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor,
cellBlock);
+    } else {
+      cellBlockScanner = null;
+    }
+    call.setResponse(value, cellBlockScanner);
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    if (msg instanceof ByteBuf) {
+      ByteBuf buf = (ByteBuf) msg;
+      try {
+        readResponse(ctx, buf);
+      } finally {
+        buf.release();
+      }
+    } else {
+      super.channelRead(ctx, msg);
+    }
+  }
+
+  private void cleanupCalls(ChannelHandlerContext ctx, IOException error) {
+    for (Call call : id2Call.values()) {
+      call.setException(error);
+    }
+    id2Call.clear();
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    cleanupCalls(ctx, new IOException("Connection closed"));
+    conn.shutdown();
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    cleanupCalls(ctx, IPCUtil.toIOE(cause));
+    conn.shutdown();
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
{
+    if (evt instanceof IdleStateEvent) {
+      IdleStateEvent idleEvt = (IdleStateEvent) evt;
+      switch (idleEvt.state()) {
+        case WRITER_IDLE:
+          if (id2Call.isEmpty()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("shutdown connection to " + conn.remoteId().address
+                  + " because idle for a long time");
+            }
+            // It may happen that there are still some pending calls in the event loop queue
and
+            // they will get a closed channel exception. But this is not a big deal as it
rarely
+            // rarely happens and the upper layer could retry immediately.
+            conn.shutdown();
+          }
+          break;
+        default:
+          LOG.warn("Unrecognized idle state " + idleEvt.state());
+          break;
+      }
+    } else if (evt instanceof CallEvent) {
+      // just remove the call for now until we add other call event other than timeout and
cancel.
+      id2Call.remove(((CallEvent) evt).call.id);
+    } else {
+      ctx.fireUserEventTriggered(evt);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index a8ec628..26a5739 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -30,7 +30,8 @@ import org.apache.hadoop.hbase.security.User;
 /**
  * Interface for RpcClient implementations so ConnectionManager can handle it.
  */
-@InterfaceAudience.Private public interface RpcClient extends Closeable {
+@InterfaceAudience.Private
+public interface RpcClient extends Closeable {
   String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
   int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
   String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose";
@@ -79,7 +80,8 @@ import org.apache.hadoop.hbase.security.User;
    *
    * @return A rpc channel that goes via this rpc client instance.
    */
-  RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout);
+  RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout)
+      throws IOException;
 
   /**
    * Interrupt the connections to the given server. This should be called if the server

http://git-wip-us.apache.org/repos/asf/hbase/blob/c04b3891/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
index 07254e9..8cdfb03 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 
 import java.net.SocketAddress;
 
@@ -34,6 +35,10 @@ public final class RpcClientFactory {
 
   public static final String CUSTOM_RPC_CLIENT_IMPL_CONF_KEY = "hbase.rpc.client.impl";
 
+  private static final ImmutableMap<String, String> DEPRECATED_NAME_MAPPING = ImmutableMap.of(
+    "org.apache.hadoop.hbase.ipc.RpcClientImpl", BlockingRpcClient.class.getName(),
+    "org.apache.hadoop.hbase.ipc.AsyncRpcClient", NettyRpcClient.class.getName());
+
   /**
    * Private Constructor
    */
@@ -59,6 +64,15 @@ public final class RpcClientFactory {
     return createClient(conf, clusterId, null, metrics);
   }
 
+  private static String getRpcClientClass(Configuration conf) {
+    String rpcClientClass = conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY);
+    if (rpcClientClass == null) {
+      return NettyRpcClient.class.getName();
+    }
+    String mappedName = DEPRECATED_NAME_MAPPING.get(rpcClientClass);
+    return mappedName == null ? rpcClientClass : mappedName;
+  }
+
   /**
    * Creates a new RpcClient by the class defined in the configuration or falls back to
    * RpcClientImpl
@@ -70,13 +84,9 @@ public final class RpcClientFactory {
    */
   public static RpcClient createClient(Configuration conf, String clusterId,
       SocketAddress localAddr, MetricsConnection metrics) {
-    String rpcClientClass =
-        conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, AsyncRpcClient.class.getName());
-    return ReflectionUtils.instantiateWithCustomCtor(
-        rpcClientClass,
-        new Class[] { Configuration.class, String.class, SocketAddress.class,
-          MetricsConnection.class },
-        new Object[] { conf, clusterId, localAddr, metrics }
-    );
+    String rpcClientClass = getRpcClientClass(conf);
+    return ReflectionUtils.instantiateWithCustomCtor(rpcClientClass, new Class[] {
+        Configuration.class, String.class, SocketAddress.class, MetricsConnection.class },
+      new Object[] { conf, clusterId, localAddr, metrics });
   }
 }
\ No newline at end of file


Mime
View raw message