tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [1/2] tajo git commit: TAJO-1571: Merge TAJO-1497 and TAJO-1569 to 0.10.1. (jinho)
Date Mon, 20 Apr 2015 05:13:03 GMT
Repository: tajo
Updated Branches:
  refs/heads/branch-0.10.1 5e1fa93b5 -> 47008c58e


http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 4ec5718..61e2f04 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,80 +18,57 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.protobuf.*;
+import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.Descriptors.MethodDescriptor;
-
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import io.netty.channel.*;
-import io.netty.util.concurrent.*;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
-import io.netty.util.ReferenceCountUtil;
-
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.*;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 
 public class BlockingRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(RpcProtos.class);
 
-  private final ChannelInitializer<Channel> initializer;
-  private final ProxyRpcChannel rpcChannel;
-
-  private final AtomicInteger sequence = new AtomicInteger(0);
   private final Map<Integer, ProtoCallFuture> requests =
       new ConcurrentHashMap<Integer, ProtoCallFuture>();
 
-  private final Class<?> protocol;
   private final Method stubMethod;
-
-  private RpcConnectionKey key;
+  private final ProxyRpcChannel rpcChannel;
+  private final ChannelInboundHandlerAdapter inboundHandler;
 
   /**
    * Intentionally make this method package-private, avoiding user directly
    * new an instance through this constructor.
    */
-  BlockingRpcClient(final Class<?> protocol,
-                           final InetSocketAddress addr, int retries)
-      throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
-
-    this.protocol = protocol;
-    String serviceClassName = protocol.getName() + "$"
-        + protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    stubMethod = serviceClass.getMethod("newBlockingStub",
-        BlockingRpcChannel.class);
-
-    initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance());
-    super.init(addr, initializer, retries);
-    rpcChannel = new ProxyRpcChannel();
-
-    this.key = new RpcConnectionKey(addr, protocol, false);
+  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+      throws NoSuchMethodException, ClassNotFoundException {
+    this(rpcConnectionKey, retries, 0);
   }
 
-  @Override
-  public RpcConnectionKey getKey() {
-    return key;
+  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds)
+      throws ClassNotFoundException, NoSuchMethodException {
+    super(rpcConnectionKey, retries);
+    stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
+    rpcChannel = new ProxyRpcChannel();
+    inboundHandler = new ClientChannelInboundHandler();
+    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds));
   }
 
   @Override
   public <T> T getStub() {
-    try {
-      return (T) stubMethod.invoke(null, rpcChannel);
-    } catch (Exception e) {
-      throw new RuntimeException(e.getMessage(), e);
-    }
-  }
-
-  public BlockingRpcChannel getBlockingRpcChannel() {
-    return this.rpcChannel;
+    return getStub(stubMethod, rpcChannel);
   }
 
   @Override
@@ -100,25 +77,12 @@ public class BlockingRpcClient extends NettyClientBase {
       callback.setFailed("BlockingRpcClient terminates all the connections",
           new ServiceException("BlockingRpcClient terminates all the connections"));
     }
-
+    requests.clear();
     super.close();
   }
 
   private class ProxyRpcChannel implements BlockingRpcChannel {
 
-    private final ClientChannelInboundHandler handler;
-
-    public ProxyRpcChannel() {
-
-      this.handler = getChannel().pipeline().
-          get(ClientChannelInboundHandler.class);
-
-      if (handler == null) {
-        throw new IllegalArgumentException("Channel does not have " +
-            "proper handler");
-      }
-    }
-
     @Override
     public Message callBlockingMethod(final MethodDescriptor method,
                                       final RpcController controller,
@@ -139,7 +103,7 @@ public class BlockingRpcClient extends NettyClientBase {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
           if (!future.isSuccess()) {
-            handler.exceptionCaught(null, new ServiceException(future.cause()));
+            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
           }
         }
       });
@@ -174,7 +138,7 @@ public class BlockingRpcClient extends NettyClientBase {
   }
 
   private String getErrorMessage(String message) {
-    if(protocol != null && getChannel() != null) {
+    if(getChannel() != null) {
       return protocol.getName() +
           "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
           getChannel().remoteAddress()) + "): " + message;
@@ -184,7 +148,7 @@ public class BlockingRpcClient extends NettyClientBase {
   }
 
   private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
-    if(protocol != null && getChannel() != null) {
+    if(getChannel() != null) {
       return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
           RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
     } else {
@@ -193,39 +157,29 @@ public class BlockingRpcClient extends NettyClientBase {
   }
 
   @ChannelHandler.Sharable
-  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
+  private class ClientChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> {
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-
-      if (msg instanceof RpcResponse) {
-        try {
-          RpcResponse rpcResponse = (RpcResponse) msg;
-          ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {
+      ProtoCallFuture callback = requests.remove(rpcResponse.getId());
 
-          if (callback == null) {
-            LOG.warn("Dangling rpc call");
+      if (callback == null) {
+        LOG.warn("Dangling rpc call");
+      } else {
+        if (rpcResponse.hasErrorMessage()) {
+          callback.setFailed(rpcResponse.getErrorMessage(),
+              makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
+        } else {
+          Message responseMessage;
+
+          if (!rpcResponse.hasResponseMessage()) {
+            responseMessage = null;
           } else {
-            if (rpcResponse.hasErrorMessage()) {
-              callback.setFailed(rpcResponse.getErrorMessage(),
-                  makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
-              throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
-            } else {
-              Message responseMessage;
-
-              if (!rpcResponse.hasResponseMessage()) {
-                responseMessage = null;
-              } else {
-                responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
-                    .build();
-              }
-
-              callback.setResponse(responseMessage);
-            }
+            responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
+                .build();
           }
-        } finally {
-          ReferenceCountUtil.release(msg);
+
+          callback.setResponse(responseMessage);
         }
       }
     }
@@ -233,22 +187,39 @@ public class BlockingRpcClient extends NettyClientBase {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
+      /* Current requests will be failed */
       for(ProtoCallFuture callback: requests.values()) {
         callback.setFailed(cause.getMessage(), cause);
       }
-      
+      requests.clear();
+
       if(LOG.isDebugEnabled()) {
         LOG.error("" + cause.getMessage(), cause);
       } else {
         LOG.error("RPC Exception:" + cause.getMessage());
       }
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+      super.channelActive(ctx);
+      LOG.info("Connection established successfully : " + ctx.channel().remoteAddress());
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent) {
+        IdleStateEvent e = (IdleStateEvent) evt;
+        /* If all requests is done and event is triggered, channel will be closed. */
+        if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) {
+          ctx.close();
+          LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress());
+        }
       }
     }
   }
 
- static class ProtoCallFuture implements Future<Message> {
+  static class ProtoCallFuture implements Future<Message> {
     private Semaphore sem = new Semaphore(0);
     private Message response = null;
     private Message returnType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 0ce359f..bb31367 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -22,15 +22,12 @@ import com.google.protobuf.BlockingService;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
-
 import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
-import io.netty.util.ReferenceCountUtil;
-
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 
@@ -62,7 +59,7 @@ public class BlockingRpcServer extends NettyServerBase {
   }
 
   @ChannelHandler.Sharable
-  private class ServerHandler extends ChannelInboundHandlerAdapter {
+  private class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
 
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@@ -83,52 +80,43 @@ public class BlockingRpcServer extends NettyServerBase {
     }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
+    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
 
-      if (msg instanceof RpcRequest) {
+      String methodName = request.getMethodName();
+      MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+      if (methodDescriptor == null) {
+        throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+      }
+      Message paramProto = null;
+      if (request.hasRequestMessage()) {
         try {
-          final RpcRequest request = (RpcRequest) msg;
-
-          String methodName = request.getMethodName();
-          MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
-
-          if (methodDescriptor == null) {
-            throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
-          }
-          Message paramProto = null;
-          if (request.hasRequestMessage()) {
-            try {
-              paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
-                  .mergeFrom(request.getRequestMessage()).build();
-
-            } catch (Throwable t) {
-              throw new RemoteCallException(request.getId(), methodDescriptor, t);
-            }
-          }
-          Message returnValue;
-          RpcController controller = new NettyRpcController();
-
-          try {
-            returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
-          } catch (Throwable t) {
-            throw new RemoteCallException(request.getId(), methodDescriptor, t);
-          }
-
-          RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
-
-          if (returnValue != null) {
-            builder.setResponseMessage(returnValue.toByteString());
-          }
-
-          if (controller.failed()) {
-            builder.setErrorMessage(controller.errorText());
-          }
-          ctx.writeAndFlush(builder.build());
-        } finally {
-          ReferenceCountUtil.release(msg);
+          paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+              .mergeFrom(request.getRequestMessage()).build();
+
+        } catch (Throwable t) {
+          throw new RemoteCallException(request.getId(), methodDescriptor, t);
         }
       }
+      Message returnValue;
+      RpcController controller = new NettyRpcController();
+
+      try {
+        returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
+      } catch (Throwable t) {
+        throw new RemoteCallException(request.getId(), methodDescriptor, t);
+      }
+
+      RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+
+      if (returnValue != null) {
+        builder.setResponseMessage(returnValue.toByteString());
+      }
+
+      if (controller.failed()) {
+        builder.setErrorMessage(controller.errorText());
+      }
+      ctx.writeAndFlush(builder.build());
     }
 
     @Override
@@ -137,11 +125,6 @@ public class BlockingRpcServer extends NettyServerBase {
         RemoteCallException callException = (RemoteCallException) cause;
         ctx.writeAndFlush(callException.getResponse());
       }
-      
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
-      }
     }
-    
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
new file mode 100644
index 0000000..29c9772
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+public class ConnectionCloseFutureListener implements GenericFutureListener {
+  private RpcClientManager.RpcConnectionKey key;
+
+  public ConnectionCloseFutureListener(RpcClientManager.RpcConnectionKey key) {
+    this.key = key;
+  }
+
+  @Override
+  public void operationComplete(Future future) throws Exception {
+    RpcClientManager.remove(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 7b52178..a75148b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,156 +18,150 @@
 
 package org.apache.tajo.rpc;
 
-import io.netty.channel.*;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.*;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 
 import java.io.Closeable;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.net.SocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class NettyClientBase implements Closeable {
-  private static Log LOG = LogFactory.getLog(NettyClientBase.class);
-  private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60;
+  private static final Log LOG = LogFactory.getLog(NettyClientBase.class);
+  private static final int CONNECTION_TIMEOUT = 60000;  // 60 sec
   private static final long PAUSE = 1000; // 1 sec
-  private int numRetries;
 
-  protected Bootstrap bootstrap;
-  private ChannelFuture channelFuture;
+  private final int numRetries;
 
-  public NettyClientBase() {
-  }
+  private Bootstrap bootstrap;
+  private volatile ChannelFuture channelFuture;
 
-  public abstract <T> T getStub();
-  public abstract RpcConnectionPool.RpcConnectionKey getKey();
-  
-  public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer, 
-      int numRetries) throws ConnectTimeoutException {
+  protected final Class<?> protocol;
+  protected final AtomicInteger sequence = new AtomicInteger(0);
+
+  private final RpcConnectionKey key;
+
+  public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
+      throws ClassNotFoundException, NoSuchMethodException {
+    this.key = rpcConnectionKey;
+    this.protocol = rpcConnectionKey.protocolClass;
     this.numRetries = numRetries;
-    
-    init(addr, initializer);
   }
 
-  public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer)
-      throws ConnectTimeoutException {
+  // should be called from sub class
+  protected void init(ChannelInitializer<Channel> initializer) {
     this.bootstrap = new Bootstrap();
     this.bootstrap
-      .channel(NioSocketChannel.class)
-      .handler(initializer)
-      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-      .option(ChannelOption.SO_REUSEADDR, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
-      .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
-      .option(ChannelOption.TCP_NODELAY, true);
-
-    connect(addr);
+        .group(RpcChannelFactory.getSharedClientEventloopGroup())
+        .channel(NioSocketChannel.class)
+        .handler(initializer)
+        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT)
+        .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
+        .option(ChannelOption.TCP_NODELAY, true);
   }
 
-  private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+  public RpcClientManager.RpcConnectionKey getKey() {
+    return key;
+  }
 
-    this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
-            .connect(address)
-            .addListener(listener);
+  protected final Class<?> getServiceClass() throws ClassNotFoundException {
+    String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
+    return Class.forName(serviceClassName);
   }
-  
-  private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
-    final CountDownLatch latch = new CountDownLatch(1);
-    GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch);
-    connectUsingNetty(addr, listener);
 
+  @SuppressWarnings("unchecked")
+  protected final <T> T getStub(Method stubMethod, Object rpcChannel) {
     try {
-      latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-    }
-
-    if (!channelFuture.isSuccess()) {
-      throw new ConnectTimeoutException("Connect error to " + addr +
-          " caused by " + ExceptionUtils.getMessage(channelFuture.cause()));
+      return (T) stubMethod.invoke(null, rpcChannel);
+    } catch (Exception e) {
+      throw new RemoteException(e.getMessage(), e);
     }
   }
 
-  public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
-    if(addr.isUnresolved()){
-       addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+  public abstract <T> T getStub();
+
+
+  private InetSocketAddress resolveAddress(InetSocketAddress address) {
+    if (address.isUnresolved()) {
+      return RpcUtils.createSocketAddr(address.getHostName(), address.getPort());
     }
+    return address;
+  }
 
-    handleConnectionInternally(addr);
+  private ChannelFuture doConnect(SocketAddress address) {
+    return this.channelFuture = bootstrap.clone().connect(address);
   }
 
-  class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
-    private final AtomicInteger retryCount = new AtomicInteger();
-    private final InetSocketAddress address;
-    private final CountDownLatch latch;
 
-    RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) {
-      this.address = address;
-      this.latch = latch;
+  public synchronized void connect() throws ConnectTimeoutException {
+    if (isConnected()) return;
+
+    final AtomicInteger retries = new AtomicInteger();
+    InetSocketAddress address = key.addr;
+    if (address.isUnresolved()) {
+      address = resolveAddress(address);
     }
 
-    @Override
-    public void operationComplete(ChannelFuture channelFuture) throws Exception {
-      if (!channelFuture.isSuccess()) {
-        channelFuture.channel().close();
+    /* do not call await() inside handler */
+    ChannelFuture f = doConnect(address).awaitUninterruptibly();
+    retries.incrementAndGet();
+
+    if (!f.isSuccess() && numRetries > 0) {
+      doReconnect(address, f, retries);
+    }
+  }
 
-        if (numRetries > retryCount.getAndIncrement()) {
-          final GenericFutureListener<ChannelFuture> currentListener = this;
+  private void doReconnect(final InetSocketAddress address, ChannelFuture future, AtomicInteger retries)
+      throws ConnectTimeoutException {
 
-          RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() {
-            @Override
-            public void run() {
-              connectUsingNetty(address, currentListener);
-            }
-          }, PAUSE, TimeUnit.MILLISECONDS);
+    for (; ; ) {
+      if (numRetries >= retries.getAndIncrement()) {
 
-          LOG.debug("Connecting to " + address + " has been failed. Retrying to connect.");
+        LOG.warn(future.cause().getMessage() + " Try to reconnect");
+        try {
+          Thread.sleep(PAUSE);
+        } catch (InterruptedException e) {
         }
-        else {
-          latch.countDown();
 
-          LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
+        this.channelFuture = doConnect(address).awaitUninterruptibly();
+        if (this.channelFuture.isDone() && this.channelFuture.isSuccess()) {
+          break;
         }
-      }
-      else {
-        latch.countDown();
+      } else {
+        throw new ConnectTimeoutException("Max retry count has been exceeded. attempts=" + numRetries
+            + " caused by: " + future.cause());
       }
     }
   }
 
-  public boolean isActive() {
-    return getChannel().isActive();
+  public Channel getChannel() {
+    return channelFuture == null ? null : channelFuture.channel();
   }
 
-  public InetSocketAddress getRemoteAddress() {
-    if (channelFuture == null || channelFuture.channel() == null) {
-      return null;
-    }
-    return (InetSocketAddress) channelFuture.channel().remoteAddress();
+  public boolean isConnected() {
+    Channel channel = getChannel();
+    return channel != null && channel.isOpen() && channel.isActive();
   }
 
-  public Channel getChannel() {
-    return channelFuture.channel();
+  public SocketAddress getRemoteAddress() {
+    Channel channel = getChannel();
+    return channel == null ? null : channel.remoteAddress();
   }
 
   @Override
   public void close() {
-    if (channelFuture != null && getChannel().isActive()) {
-      getChannel().close();
-    }
-
-    if (this.bootstrap != null) {
-      InetSocketAddress address = getRemoteAddress();
-      if (address != null) {
-        LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort());
-      }
+    Channel channel = getChannel();
+    if (channel != null && channel.isOpen()) {
+      LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+      channel.close().awaitUninterruptibly();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
index 6a340dc..74eb650 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.protobuf.MessageLite;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
@@ -26,16 +27,21 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufEncoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-
-import com.google.protobuf.MessageLite;
+import io.netty.handler.timeout.IdleStateHandler;
 
 class ProtoChannelInitializer extends ChannelInitializer<Channel> {
   private final MessageLite defaultInstance;
   private final ChannelHandler handler;
+  private final int idleTimeSeconds;
 
   public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
+    this(handler, defaultInstance, 0);
+  }
+
+  public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance, int idleTimeSeconds) {
     this.handler = handler;
     this.defaultInstance = defaultInstance;
+    this.idleTimeSeconds = idleTimeSeconds;
   }
 
   @Override
@@ -45,6 +51,7 @@ class ProtoChannelInitializer extends ChannelInitializer<Channel> {
     pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
     pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
     pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+    pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeSeconds)); //zero is disabling
     pipeline.addLast("handler", handler);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
new file mode 100644
index 0000000..f05fb97
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.util.internal.logging.CommonsLoggerFactory;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@ThreadSafe
+public class RpcClientManager {
+  private static final Log LOG = LogFactory.getLog(RpcClientManager.class);
+
+  public static final int RPC_RETRIES = 3;
+
+  /* If all requests is done and client is idle state, client will be removed. */
+  public static final int RPC_IDLE_TIMEOUT = 43200; // 12 hour
+
+  /* entries will be removed by ConnectionCloseFutureListener */
+  private static final Map<RpcConnectionKey, NettyClientBase>
+      clients = Collections.synchronizedMap(new HashMap<RpcConnectionKey, NettyClientBase>());
+
+  private static RpcClientManager instance;
+
+  static {
+    InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
+    instance = new RpcClientManager();
+  }
+
+  private RpcClientManager() {
+  }
+
+  public static RpcClientManager getInstance() {
+    return instance;
+  }
+
+  private NettyClientBase makeClient(RpcConnectionKey rpcConnectionKey)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+    NettyClientBase client;
+    if (rpcConnectionKey.asyncMode) {
+      client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT);
+    } else {
+      client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT);
+    }
+    return client;
+  }
+
+  /**
+   * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol.
+   * This client will be shared per protocol and address. Client is removed in shared map when a client is closed
+   * @param addr
+   * @param protocolClass
+   * @param asyncMode
+   * @return
+   * @throws NoSuchMethodException
+   * @throws ClassNotFoundException
+   * @throws ConnectTimeoutException
+   */
+  public NettyClientBase getClient(InetSocketAddress addr,
+                                   Class<?> protocolClass, boolean asyncMode)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+    RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
+
+    NettyClientBase client;
+    synchronized (clients) {
+      client = clients.get(key);
+      if (client == null) {
+        clients.put(key, client = makeClient(key));
+      }
+    }
+
+    if (!client.isConnected()) {
+      client.connect();
+      client.getChannel().closeFuture().addListener(new ConnectionCloseFutureListener(key));
+    }
+    assert client.isConnected();
+    return client;
+  }
+
+  /**
+   * Request to close this clients
+   * After it is closed, it is removed from clients map.
+   */
+  public static void close() {
+    LOG.info("Closing RPC client manager");
+
+    for (NettyClientBase eachClient : clients.values()) {
+      try {
+        eachClient.close();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * Close client manager and shutdown Netty RPC worker pool
+   * After it is shutdown it is not possible to reuse it again.
+   */
+  public static void shutdown() {
+    close();
+    RpcChannelFactory.shutdownGracefully();
+  }
+
+  protected static NettyClientBase remove(RpcConnectionKey key) {
+    LOG.debug("Removing shared rpc client :" + key);
+    return clients.remove(key);
+  }
+
+  protected static boolean contains(RpcConnectionKey key) {
+    return clients.containsKey(key);
+  }
+
+  public static void cleanup(NettyClientBase... clients) {
+    for (NettyClientBase client : clients) {
+      if (client != null) {
+        try {
+          client.close();
+        } catch (Exception e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Exception in closing " + client.getKey(), e);
+          }
+        }
+      }
+    }
+  }
+
+  static class RpcConnectionKey {
+    final InetSocketAddress addr;
+    final Class<?> protocolClass;
+    final boolean asyncMode;
+
+    final String description;
+
+    public RpcConnectionKey(InetSocketAddress addr,
+                            Class<?> protocolClass, boolean asyncMode) {
+      this.addr = addr;
+      this.protocolClass = protocolClass;
+      this.asyncMode = asyncMode;
+      this.description = "[" + protocolClass + "] " + addr + "," + asyncMode;
+    }
+
+    @Override
+    public String toString() {
+      return description;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof RpcConnectionKey)) {
+        return false;
+      }
+
+      return toString().equals(obj.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      return description.hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
deleted file mode 100644
index 43feeb1..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.common.base.Objects;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import io.netty.channel.ConnectTimeoutException;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.util.concurrent.GlobalEventExecutor;
-import io.netty.util.internal.logging.CommonsLoggerFactory;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-public class RpcConnectionPool {
-  private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
-
-  private Map<RpcConnectionKey, NettyClientBase> connections =
-      new HashMap<RpcConnectionKey, NettyClientBase>();
-  private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-
-  private static RpcConnectionPool instance;
-  private final Object lockObject = new Object();
-
-  public final static int RPC_RETRIES = 3;
-
-  private RpcConnectionPool() {
-  }
-
-  public synchronized static RpcConnectionPool getPool() {
-    if(instance == null) {
-      InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
-      instance = new RpcConnectionPool();
-    }
-    return instance;
-  }
-
-  private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
-    NettyClientBase client;
-    if(rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, 
-          RPC_RETRIES);
-    } else {
-      client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, 
-          RPC_RETRIES);
-    }
-    accepted.add(client.getChannel());
-    return client;
-  }
-
-  public NettyClientBase getConnection(InetSocketAddress addr,
-                                       Class<?> protocolClass, boolean asyncMode)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
-    RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
-    NettyClientBase client = connections.get(key);
-
-    if (client == null) {
-      synchronized (lockObject){
-        client = connections.get(key);
-        if (client == null) {
-          client = makeConnection(key);
-          connections.put(key, client);
-        }
-      }
-    }
-
-    if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) {
-      LOG.warn("Try to reconnect : " + addr);
-      client.connect(addr);
-    }
-    return client;
-  }
-
-  public void releaseConnection(NettyClientBase client) {
-    if (client == null) return;
-
-    try {
-      synchronized (lockObject) {
-        if (!client.getChannel().isOpen()) {
-          connections.remove(client.getKey());
-          client.close();
-        }
-      }
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Current Connections [" + connections.size() + "] Accepted: " + accepted.size());
-
-      }
-    } catch (Exception e) {
-      LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
-    }
-  }
-
-  public void closeConnection(NettyClientBase client) {
-    if (client == null) {
-      return;
-    }
-
-    try {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Close connection [" + client.getKey() + "]");
-      }
-
-      synchronized (lockObject) {
-        connections.remove(client.getKey());
-        client.close();
-      }
-
-    } catch (Exception e) {
-      LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
-    }
-  }
-
-  public synchronized void close() {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Pool Closed");
-    }
-    synchronized(lockObject) {
-      for(NettyClientBase eachClient: connections.values()) {
-        try {
-          eachClient.close();
-        } catch (Exception e) {
-          LOG.error("close client pool error", e);
-        }
-      }
-
-      connections.clear();
-    }
-
-    try {
-      accepted.close();
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    }
-  }
-
-  public synchronized void shutdown(){
-    close();
-    RpcChannelFactory.shutdownGracefully();
-  }
-
-  static class RpcConnectionKey {
-    final InetSocketAddress addr;
-    final Class<?> protocolClass;
-    final boolean asyncMode;
-
-    public RpcConnectionKey(InetSocketAddress addr,
-                            Class<?> protocolClass, boolean asyncMode) {
-      this.addr = addr;
-      this.protocolClass = protocolClass;
-      this.asyncMode = asyncMode;
-    }
-
-    @Override
-    public String toString() {
-      return "["+ protocolClass + "] " + addr + "," + asyncMode;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if(!(obj instanceof RpcConnectionKey)) {
-        return false;
-      }
-
-      return toString().equals(obj.toString());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(addr, asyncMode);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index fb1cec2..2804a03 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -18,13 +18,11 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.protobuf.ServiceException;
 
 public abstract class ServerCallable<T> {
   protected InetSocketAddress addr;
@@ -33,21 +31,16 @@ public abstract class ServerCallable<T> {
   protected Class<?> protocol;
   protected boolean asyncMode;
   protected boolean closeConn;
-  protected RpcConnectionPool connPool;
+  protected RpcClientManager manager;
 
   public abstract T call(NettyClientBase client) throws Exception;
 
-  public ServerCallable(RpcConnectionPool connPool,  InetSocketAddress addr, Class<?> protocol, boolean asyncMode) {
-    this(connPool, addr, protocol, asyncMode, false);
-  }
-
-  public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol,
-                        boolean asyncMode, boolean closeConn) {
-    this.connPool = connPool;
+  public ServerCallable(RpcClientManager manager, InetSocketAddress addr, Class<?> protocol,
+                        boolean asyncMode) {
+    this.manager = manager;
     this.addr = addr;
     this.protocol = protocol;
     this.asyncMode = asyncMode;
-    this.closeConn = closeConn;
   }
 
   public void beforeCall() {
@@ -74,26 +67,24 @@ public abstract class ServerCallable<T> {
    * Run this instance with retries, timed waits,
    * and refinds of missing regions.
    *
-   * @param <T> the type of the return value
    * @return an object of type T
    * @throws com.google.protobuf.ServiceException if a remote or network exception occurs
    */
+
   public T withRetries() throws ServiceException {
     //TODO configurable
     final long pause = 500; //ms
     final int numRetries = 3;
-    List<Throwable> exceptions = new ArrayList<Throwable>();
 
     for (int tries = 0; tries < numRetries; tries++) {
       NettyClientBase client = null;
       try {
         beforeCall();
         if(addr != null) {
-          client = connPool.getConnection(addr, protocol, asyncMode);
+          client = manager.getClient(addr, protocol, asyncMode);
         }
         return call(client);
       } catch (IOException ioe) {
-        exceptions.add(ioe);
         if(abort) {
           throw new ServiceException(ioe.getMessage(), ioe);
         }
@@ -105,9 +96,7 @@ public abstract class ServerCallable<T> {
       } finally {
         afterCall();
         if(closeConn) {
-          connPool.closeConnection(client);
-        } else {
-          connPool.releaseConnection(client);
+          RpcClientManager.cleanup(client);
         }
       }
       try {
@@ -122,7 +111,6 @@ public abstract class ServerCallable<T> {
 
   /**
    * Run this instance against the server once.
-   * @param <T> the type of the return value
    * @return an object of type T
    * @throws java.io.IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
@@ -131,7 +119,7 @@ public abstract class ServerCallable<T> {
     NettyClientBase client = null;
     try {
       beforeCall();
-      client = connPool.getConnection(addr, protocol, asyncMode);
+      client = manager.getClient(addr, protocol, asyncMode);
       return call(client);
     } catch (Throwable t) {
       Throwable t2 = translateException(t);
@@ -143,9 +131,7 @@ public abstract class ServerCallable<T> {
     } finally {
       afterCall();
       if(closeConn) {
-        connPool.closeConnection(client);
-      } else {
-        connPool.releaseConnection(client);
+        RpcClientManager.cleanup(client);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 31d5265..1e4959b 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.rpc;
 
 import com.google.protobuf.RpcCallback;
+import io.netty.channel.ConnectTimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.test.DummyProtocol;
@@ -34,8 +35,6 @@ import org.junit.rules.ExternalResource;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
 
-import io.netty.channel.ConnectTimeoutException;
-
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -60,17 +59,17 @@ public class TestAsyncRpc {
   Interface stub;
   DummyProtocolAsyncImpl service;
   int retries;
-  
+
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
   @interface SetupRpcConnection {
     boolean setupRpcServer() default true;
     boolean setupRpcClient() default true;
   }
-  
+
   @Rule
   public ExternalResource resource = new ExternalResource() {
-    
+
     private Description description;
 
     @Override
@@ -86,7 +85,7 @@ public class TestAsyncRpc {
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         setUpRpcServer();
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
         setUpRpcClient();
       }
@@ -103,7 +102,7 @@ public class TestAsyncRpc {
           fail(e.getMessage());
         }
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         try {
           tearDownRpcServer();
@@ -112,21 +111,25 @@ public class TestAsyncRpc {
         }
       }
     }
-    
+
   };
-  
+
   public void setUpRpcServer() throws Exception {
     service = new DummyProtocolAsyncImpl();
     server = new AsyncRpcServer(DummyProtocol.class,
         service, new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
   }
-  
+
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    client = new AsyncRpcClient(DummyProtocol.class,
-        RpcUtils.getConnectAddress(server.getListenAddress()), retries);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(
+            RpcUtils.getConnectAddress(server.getListenAddress()),
+            DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    client.connect();
     stub = client.getStub();
   }
 
@@ -134,14 +137,14 @@ public class TestAsyncRpc {
   public static void tearDownClass() throws Exception {
     RpcChannelFactory.shutdownGracefully();
   }
-  
+
   public void tearDownRpcServer() throws Exception {
     if(server != null) {
       server.shutdown();
       server = null;
     }
   }
-  
+
   public void tearDownRpcClient() throws Exception {
     if(client != null) {
       client.close();
@@ -296,7 +299,11 @@ public class TestAsyncRpc {
     });
     serverThread.start();
 
-    client = new AsyncRpcClient(DummyProtocol.class, address, retries);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    client.connect();
+    assertTrue(client.isConnected());
     stub = client.getStub();
     stub.echo(future.getController(), echoMessage, future);
 
@@ -310,7 +317,10 @@ public class TestAsyncRpc {
     InetSocketAddress address = new InetSocketAddress("test", 0);
     boolean expected = false;
     try {
-      new AsyncRpcClient(DummyProtocol.class, address, retries);
+      RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
+      NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
+      client.connect();
       fail();
     } catch (ConnectTimeoutException e) {
       expected = true;
@@ -318,14 +328,19 @@ public class TestAsyncRpc {
       fail();
     }
     assertTrue(expected);
+
   }
 
   @Test
   @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    client = new AsyncRpcClient(DummyProtocol.class,
-        RpcUtils.createUnresolved(hostAndPort), retries);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(
+            RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    client.connect();
+    assertTrue(client.isConnected());
     Interface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -336,4 +351,43 @@ public class TestAsyncRpc {
     assertEquals(future.get(), echoMessage);
     assertTrue(future.isDone());
   }
+
+  @Test
+  public void testIdleTimeout() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
+    client.connect();
+    assertTrue(client.isConnected());
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+
+    client.connect(); // try to reconnect
+    assertTrue(client.isConnected());
+    client.close();
+    assertFalse(client.isConnected());
+  }
+
+  @Test
+  public void testIdleTimeoutWithActiveRequest() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
+    client.connect();
+
+    assertTrue(client.isConnected());
+    Interface stub = client.getStub();
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.deley(null, echoMessage, future); //3 sec delay
+
+    assertFalse(future.isDone());
+    assertEquals(future.get(), echoMessage);
+    assertTrue(future.isDone());
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 07e2dca..8c0b475 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.rpc;
 
+import io.netty.channel.ConnectTimeoutException;
 import org.apache.tajo.rpc.test.DummyProtocol;
 import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
 import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
@@ -35,7 +36,6 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.concurrent.CountDownLatch;
@@ -51,17 +51,17 @@ public class TestBlockingRpc {
   private BlockingInterface stub;
   private DummyProtocolBlockingImpl service;
   private int retries;
-  
+
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
   @interface SetupRpcConnection {
     boolean setupRpcServer() default true;
     boolean setupRpcClient() default true;
   }
-  
+
   @Rule
   public ExternalResource resource = new ExternalResource() {
-    
+
     private Description description;
 
     @Override
@@ -73,11 +73,11 @@ public class TestBlockingRpc {
     @Override
     protected void before() throws Throwable {
       SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         setUpRpcServer();
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
         setUpRpcClient();
       }
@@ -86,7 +86,7 @@ public class TestBlockingRpc {
     @Override
     protected void after() {
       SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
         try {
           tearDownRpcClient();
@@ -94,7 +94,7 @@ public class TestBlockingRpc {
           fail(e.getMessage());
         }
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         try {
           tearDownRpcServer();
@@ -103,21 +103,26 @@ public class TestBlockingRpc {
         }
       }
     }
-    
+
   };
-  
+
   public void setUpRpcServer() throws Exception {
     service = new DummyProtocolBlockingImpl();
     server = new BlockingRpcServer(DummyProtocol.class, service,
         new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
   }
-  
+
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    client = new BlockingRpcClient(DummyProtocol.class,
-        RpcUtils.getConnectAddress(server.getListenAddress()), retries);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(
+            RpcUtils.getConnectAddress(server.getListenAddress()),
+            DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    client.connect();
+    assertTrue(client.isConnected());
     stub = client.getStub();
   }
 
@@ -125,14 +130,14 @@ public class TestBlockingRpc {
   public static void tearDownClass() throws Exception {
     RpcChannelFactory.shutdownGracefully();
   }
-  
+
   public void tearDownRpcServer() throws Exception {
     if(server != null) {
       server.shutdown();
       server = null;
     }
   }
-  
+
   public void tearDownRpcClient() throws Exception {
     if(client != null) {
       client.close();
@@ -159,7 +164,7 @@ public class TestBlockingRpc {
   @Test
   @SetupRpcConnection(setupRpcClient=false)
   public void testRpcWithServiceCallable() throws Exception {
-    RpcConnectionPool pool = RpcConnectionPool.getPool();
+    RpcClientManager manager = RpcClientManager.getInstance();
     final SumRequest request = SumRequest.newBuilder()
         .setX1(1)
         .setX2(2)
@@ -167,20 +172,20 @@ public class TestBlockingRpc {
         .setX4(2.0f).build();
 
     SumResponse response =
-    new ServerCallable<SumResponse>(pool,
-        server.getListenAddress(), DummyProtocol.class, false) {
-      @Override
-      public SumResponse call(NettyClientBase client) throws Exception {
-        BlockingInterface stub2 = client.getStub();
-        SumResponse response1 = stub2.sum(null, request);
-        return response1;
-      }
-    }.withRetries();
+        new ServerCallable<SumResponse>(manager,
+            server.getListenAddress(), DummyProtocol.class, false) {
+          @Override
+          public SumResponse call(NettyClientBase client) throws Exception {
+            BlockingInterface stub2 = client.getStub();
+            SumResponse response1 = stub2.sum(null, request);
+            return response1;
+          }
+        }.withRetries();
 
     assertEquals(8.15d, response.getResult(), 1e-15);
 
     response =
-        new ServerCallable<SumResponse>(pool,
+        new ServerCallable<SumResponse>(manager,
             server.getListenAddress(), DummyProtocol.class, false) {
           @Override
           public SumResponse call(NettyClientBase client) throws Exception {
@@ -191,7 +196,7 @@ public class TestBlockingRpc {
         }.withoutRetries();
 
     assertTrue(8.15d == response.getResult());
-    pool.close();
+    RpcClientManager.close();
   }
 
   @Test
@@ -213,6 +218,22 @@ public class TestBlockingRpc {
   }
 
   @Test
+  public void testThrowException2() throws Exception {
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    try {
+      stub.throwException(null, message);
+      fail("RpcCall should throw exception");
+    } catch (Throwable t) {
+      assertTrue(t instanceof TajoServiceException);
+    }
+
+    EchoMessage message1 = stub.deley(null, message);
+    assertEquals(message, message1);
+  }
+
+  @Test
   @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
   public void testConnectionRetry() throws Exception {
     retries = 10;
@@ -238,7 +259,11 @@ public class TestBlockingRpc {
     });
     serverThread.start();
 
-    client = new BlockingRpcClient(DummyProtocol.class, address, retries);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    client.connect();
+    assertTrue(client.isConnected());
     stub = client.getStub();
 
     EchoMessage response = stub.echo(null, message);
@@ -247,22 +272,21 @@ public class TestBlockingRpc {
 
   @Test
   public void testConnectionFailed() throws Exception {
-    boolean expected = false;
     NettyClientBase client = null;
-    
+    boolean expected = false;
     try {
       int port = server.getListenAddress().getPort() + 1;
-      client = new BlockingRpcClient(DummyProtocol.class,
-          RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries);
-      client.close();
-      fail("Connection should be failed.");
-    } catch (ConnectException ce) {
-      expected = true;
-    } catch (Throwable ce){
-      if (client != null) {
-        client.close();
-      }
+      RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(
+              RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)),
+              DummyProtocol.class, false);
+      client = new BlockingRpcClient(rpcConnectionKey, retries);
+      client.connect();
       fail();
+    } catch (ConnectTimeoutException e) {
+      expected = true;
+    } catch (Throwable e) {
+      fail(e.getMessage());
     }
     assertTrue(expected);
   }
@@ -329,8 +353,12 @@ public class TestBlockingRpc {
   @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    client = new BlockingRpcClient(DummyProtocol.class,
-        RpcUtils.createUnresolved(hostAndPort), retries);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(
+            RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    client.connect();
+    assertTrue(client.isConnected());
     BlockingInterface stub = client.getStub();
 
     EchoMessage message = EchoMessage.newBuilder()
@@ -338,4 +366,41 @@ public class TestBlockingRpc {
     EchoMessage response2 = stub.echo(null, message);
     assertEquals(MESSAGE, response2.getMessage());
   }
+
+  @Test
+  public void testIdleTimeout() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
+    client.connect();
+    assertTrue(client.isConnected());
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+
+    client.connect(); // try to reconnect
+    assertTrue(client.isConnected());
+    client.close();
+    assertFalse(client.isConnected());
+  }
+
+  @Test
+  public void testIdleTimeoutWithActiveRequest() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
+
+    client.connect();
+
+    assertTrue(client.isConnected());
+    BlockingInterface stub = client.getStub();
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    EchoMessage message = stub.deley(null, echoMessage); //3 sec delay
+    assertEquals(message, echoMessage);
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
new file mode 100644
index 0000000..5f86518
--- /dev/null
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestRpcClientManager {
+
+  @Test
+  public void testRaceCondition() throws Exception {
+    final int parallelCount = 50;
+    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), parallelCount);
+    server.start();
+
+    final InetSocketAddress address = server.getListenAddress();
+    final RpcClientManager manager = RpcClientManager.getInstance();
+
+    ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+    List<Future> tasks = new ArrayList<Future>();
+    for (int i = 0; i < parallelCount; i++) {
+      tasks.add(executor.submit(new Runnable() {
+            @Override
+            public void run() {
+              NettyClientBase client = null;
+              try {
+                client = manager.getClient(address, DummyProtocol.class, false);
+              } catch (Throwable e) {
+                fail(e.getMessage());
+              }
+              assertTrue(client.isConnected());
+            }
+          })
+      );
+    }
+
+    for (Future future : tasks) {
+      future.get();
+    }
+
+    NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false);
+    RpcClientManager.cleanup(clientBase);
+    server.shutdown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testCloseFuture() throws Exception {
+    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), 3);
+    server.start();
+
+    final RpcClientManager manager = RpcClientManager.getInstance();
+
+    NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true);
+    assertTrue(client.isConnected());
+    assertTrue(client.getChannel().isWritable());
+
+    RpcClientManager.RpcConnectionKey key = client.getKey();
+    assertTrue(RpcClientManager.contains(key));
+
+    client.close();
+    assertFalse(RpcClientManager.contains(key));
+    server.shutdown();
+  }
+}
\ No newline at end of file


Mime
View raw message