tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [1/2] tajo git commit: TAJO-1563: Improve RPC error handling. (jinho)
Date Tue, 28 Apr 2015 09:28:32 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 1971d85fc -> 2e7d03dff


http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 57e436b..190beae 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,40 +18,49 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.ServiceException;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.*;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
+import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
 import java.io.Closeable;
 import java.lang.reflect.Method;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
-public abstract class NettyClientBase implements Closeable {
-  private static final Log LOG = LogFactory.getLog(NettyClientBase.class);
-  private static final int CONNECTION_TIMEOUT = 60000;  // 60 sec
-  private static final long PAUSE = 1000; // 1 sec
-
-  private final int numRetries;
+public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable {
+  public final static Log LOG = LogFactory.getLog(NettyClientBase.class);
 
   private Bootstrap bootstrap;
   private volatile ChannelFuture channelFuture;
-
-  protected final Class<?> protocol;
-  protected final AtomicInteger sequence = new AtomicInteger(0);
-
   private final RpcConnectionKey key;
+  private final int maxRetries;
+  private boolean enableMonitor;
+
+  private final ConcurrentMap<RpcConnectionKey, ChannelEventListener> channelEventListeners =
+      new ConcurrentHashMap<RpcConnectionKey, ChannelEventListener>();
+  private final ConcurrentMap<Integer, T> requests = new ConcurrentHashMap<Integer, T>();
 
   public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
       throws ClassNotFoundException, NoSuchMethodException {
     this.key = rpcConnectionKey;
-    this.protocol = rpcConnectionKey.protocolClass;
-    this.numRetries = numRetries;
+    this.maxRetries = numRetries;
   }
 
   // should be called from sub class
@@ -59,13 +68,13 @@ public abstract class NettyClientBase implements Closeable {
     this.bootstrap = new Bootstrap();
     this.bootstrap
         .group(RpcChannelFactory.getSharedClientEventloopGroup())
-      .channel(NioSocketChannel.class)
-      .handler(initializer)
-      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-      .option(ChannelOption.SO_REUSEADDR, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT)
-      .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
-      .option(ChannelOption.TCP_NODELAY, true);
+        .channel(NioSocketChannel.class)
+        .handler(initializer)
+        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, RpcConstants.DEFAULT_CONNECT_TIMEOUT)
+        .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
+        .option(ChannelOption.TCP_NODELAY, true);
   }
 
   public RpcClientManager.RpcConnectionKey getKey() {
@@ -73,23 +82,81 @@ public abstract class NettyClientBase implements Closeable {
   }
 
   protected final Class<?> getServiceClass() throws ClassNotFoundException {
-    String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
+    String serviceClassName = getKey().protocolClass.getName() + "$" +
+        getKey().protocolClass.getSimpleName() + "Service";
     return Class.forName(serviceClassName);
   }
 
   @SuppressWarnings("unchecked")
-  protected final <T> T getStub(Method stubMethod, Object rpcChannel) {
+  protected final <I> I getStub(Method stubMethod, Object rpcChannel) {
     try {
-      return (T) stubMethod.invoke(null, rpcChannel);
+      return (I) stubMethod.invoke(null, rpcChannel);
     } catch (Exception e) {
       throw new RemoteException(e.getMessage(), e);
     }
   }
 
-  public abstract <T> T getStub();
+  protected static RpcProtos.RpcRequest buildRequest(int seqId,
+                                        Descriptors.MethodDescriptor method,
+                                        Message param) {
+    RpcProtos.RpcRequest.Builder requestBuilder = RpcProtos.RpcRequest.newBuilder()
+        .setId(seqId)
+        .setMethodName(method.getName());
+
+    if (param != null) {
+      requestBuilder.setRequestMessage(param.toByteString());
+    }
+
+    return requestBuilder.build();
+  }
+
+  /**
+   * Repeat invoke rpc request until the connection attempt succeeds or exceeded retries
+   */
+  protected void invoke(final RpcProtos.RpcRequest rpcRequest, final T callback, final int retry) {
+
+    ChannelPromise promise = getChannel().newPromise();
+    promise.addListener(new GenericFutureListener<ChannelFuture>() {
 
+      @Override
+      public void operationComplete(final ChannelFuture future) throws Exception {
 
-  private InetSocketAddress resolveAddress(InetSocketAddress address) {
+        if (future.isSuccess()) {
+
+          getHandler().registerCallback(rpcRequest.getId(), callback);
+        } else {
+
+          if (!future.channel().isActive() && retry < maxRetries) {
+
+            /* schedule the current request for the retry */
+            LOG.warn(future.cause() + " Try to reconnect :" + getKey().addr);
+
+            final EventLoop loop = future.channel().eventLoop();
+            loop.schedule(new Runnable() {
+              @Override
+              public void run() {
+                doConnect(getKey().addr).addListener(new GenericFutureListener<ChannelFuture>() {
+                  @Override
+                  public void operationComplete(ChannelFuture future) throws Exception {
+                    invoke(rpcRequest, callback, retry + 1);
+                  }
+                });
+              }
+            }, RpcConstants.DEFAULT_PAUSE, TimeUnit.MILLISECONDS);
+          } else {
+
+            /* Max retry count has been exceeded or internal failure */
+            getHandler().registerCallback(rpcRequest.getId(), callback);
+            getHandler().exceptionCaught(getChannel().pipeline().lastContext(),
+                new RecoverableException(rpcRequest.getId(), future.cause()));
+          }
+        }
+      }
+    });
+    getChannel().writeAndFlush(rpcRequest, promise);
+  }
+
+  private static InetSocketAddress resolveAddress(InetSocketAddress address) {
     if (address.isUnresolved()) {
       return RpcUtils.createSocketAddr(address.getHostName(), address.getPort());
     }
@@ -100,11 +167,10 @@ public abstract class NettyClientBase implements Closeable {
     return this.channelFuture = bootstrap.clone().connect(address);
   }
 
-
-  public synchronized void connect() throws ConnectTimeoutException {
+  public synchronized void connect() throws ConnectException {
     if (isConnected()) return;
 
-    final AtomicInteger retries = new AtomicInteger();
+    int retries = 0;
     InetSocketAddress address = key.addr;
     if (address.isUnresolved()) {
       address = resolveAddress(address);
@@ -112,22 +178,26 @@ public abstract class NettyClientBase implements Closeable {
 
     /* do not call await() inside handler */
     ChannelFuture f = doConnect(address).awaitUninterruptibly();
-    retries.incrementAndGet();
 
-    if (!f.isSuccess() && numRetries > 0) {
-      doReconnect(address, f, retries);
+    if (!f.isSuccess()) {
+      if (maxRetries > 0) {
+        doReconnect(address, f, ++retries);
+      } else {
+        throw new ConnectException(ExceptionUtils.getMessage(f.cause()));
+      }
     }
   }
 
-  private void doReconnect(final InetSocketAddress address, ChannelFuture future, AtomicInteger retries)
-      throws ConnectTimeoutException {
+  private void doReconnect(final InetSocketAddress address, ChannelFuture future, int retries)
+      throws ConnectException {
 
     for (; ; ) {
-      if (numRetries >= retries.getAndIncrement()) {
+      if (maxRetries > retries) {
+        retries++;
 
-        LOG.warn(future.cause().getMessage() + " Try to reconnect");
+        LOG.warn(future.cause() + " Try to reconnect : " + getKey().addr);
         try {
-          Thread.sleep(PAUSE);
+          Thread.sleep(RpcConstants.DEFAULT_PAUSE);
         } catch (InterruptedException e) {
         }
 
@@ -136,19 +206,21 @@ public abstract class NettyClientBase implements Closeable {
           break;
         }
       } else {
-        throw new ConnectTimeoutException("Max retry count has been exceeded. attempts=" + numRetries
+        throw new ConnectTimeoutException("Max retry count has been exceeded. attempts=" + retries
             + " caused by: " + future.cause());
       }
     }
   }
 
+  protected abstract NettyChannelInboundHandler getHandler();
+
   public Channel getChannel() {
     return channelFuture == null ? null : channelFuture.channel();
   }
 
   public boolean isConnected() {
     Channel channel = getChannel();
-    return channel != null && channel.isOpen() && channel.isActive();
+    return channel != null && channel.isActive();
   }
 
   public SocketAddress getRemoteAddress() {
@@ -156,12 +228,165 @@ public abstract class NettyClientBase implements Closeable {
     return channel == null ? null : channel.remoteAddress();
   }
 
+  public int getActiveRequests() {
+    return requests.size();
+  }
+
+  public boolean subscribeEvent(RpcConnectionKey key, ChannelEventListener listener) {
+    return channelEventListeners.putIfAbsent(key, listener) == null;
+  }
+
+  public void removeSubscribers() {
+    channelEventListeners.clear();
+  }
+
+  public Collection<ChannelEventListener> getSubscribers() {
+    return channelEventListeners.values();
+  }
+
+  private String getErrorMessage(String message) {
+    return "Exception [" + getKey().protocolClass.getCanonicalName() +
+        "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
+        getChannel().remoteAddress()) + ")]: " + message;
+  }
+
   @Override
   public void close() {
+    getHandler().sendExceptions(getClass().getSimpleName() + "terminates all the connections");
+
     Channel channel = getChannel();
     if (channel != null && channel.isOpen()) {
       LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
-      channel.close().awaitUninterruptibly();
+      channel.close().syncUninterruptibly();
+    }
+  }
+
+  protected abstract class NettyChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> {
+
+    protected void registerCallback(int seqId, T callback) {
+      if (requests.putIfAbsent(seqId, callback) != null) {
+        throw new RemoteException(
+            getErrorMessage("Duplicate Sequence Id " + seqId));
+      }
+    }
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+      MonitorClientHandler handler = ctx.pipeline().get(MonitorClientHandler.class);
+      if (handler != null) {
+        enableMonitor = true;
+      }
+
+      for (ChannelEventListener listener : getSubscribers()) {
+        listener.channelRegistered(ctx);
+      }
+      super.channelRegistered(ctx);
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+      for (ChannelEventListener listener : getSubscribers()) {
+        listener.channelUnregistered(ctx);
+      }
+      super.channelUnregistered(ctx);
+
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+      super.channelActive(ctx);
+      LOG.debug("Connection established successfully : " + ctx.channel());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      super.channelInactive(ctx);
+      sendExceptions("Connection lost :" + getKey().addr);
+    }
+
+    @Override
+    protected final void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
+      T callback = requests.remove(response.getId());
+      if (callback == null)
+        LOG.warn("Dangling rpc call");
+      else run(response, callback);
+    }
+
+    /**
+     * A {@link #channelRead0} received a message.
+     * @param response response proto of type {@link RpcResponse}.
+     * @param callback callback of type {@link T}.
+     * @throws Exception
+     */
+    protected abstract void run(RpcResponse response, T callback) throws Exception;
+
+    /**
+     * Calls from exceptionCaught
+     * @param requestId sequence id of request.
+     * @param callback callback of type {@link T}.
+     * @param message the error message to handle
+     */
+    protected abstract void handleException(int requestId, T callback, String message);
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+
+      Throwable rootCause = ExceptionUtils.getRootCause(cause);
+      LOG.error(getKey().addr + "," + getKey().protocolClass + "," + ExceptionUtils.getMessage(rootCause), rootCause);
+
+      if (cause instanceof RecoverableException) {
+        sendException((RecoverableException) cause);
+      } else {
+        /* unrecoverable fatal error*/
+        sendExceptions(ExceptionUtils.getMessage(rootCause));
+        if (ctx.channel().isOpen()) {
+          ctx.close();
+        }
+      }
+    }
+
+    /**
+     * Send an error to all callback
+     */
+    private void sendExceptions(String message) {
+      for (int requestId : requests.keySet()) {
+        handleException(requestId, requests.remove(requestId), message);
+      }
+    }
+
+    /**
+     * Send an error to callback
+     */
+    private void sendException(RecoverableException e) {
+      T callback = requests.remove(e.getSeqId());
+
+      if (callback != null) {
+        handleException(e.getSeqId(), callback, ExceptionUtils.getRootCauseMessage(e));
+      }
+    }
+
+    /**
+     * Trigger timeout event
+     */
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+
+      if (!enableMonitor && evt instanceof IdleStateEvent) {
+        IdleStateEvent e = (IdleStateEvent) evt;
+        /* If all requests is done and event is triggered, idle channel close. */
+        if (e.state() == IdleState.READER_IDLE && requests.isEmpty()) {
+          ctx.close();
+          LOG.info("Idle connection closed successfully :" + ctx.channel());
+        }
+      } else if (evt instanceof MonitorStateEvent) {
+        MonitorStateEvent e = (MonitorStateEvent) evt;
+        if (e.state() == MonitorStateEvent.MonitorState.PING_EXPIRED) {
+          exceptionCaught(ctx, new ServiceException("Server has not respond: " + ctx.channel()));
+        }
+      }
+
+      super.userEventTriggered(ctx, evt);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
deleted file mode 100644
index 74eb650..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.MessageLite;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.protobuf.ProtobufDecoder;
-import io.netty.handler.codec.protobuf.ProtobufEncoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-import io.netty.handler.timeout.IdleStateHandler;
-
-class ProtoChannelInitializer extends ChannelInitializer<Channel> {
-  private final MessageLite defaultInstance;
-  private final ChannelHandler handler;
-  private final int idleTimeSeconds;
-
-  public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
-    this(handler, defaultInstance, 0);
-  }
-
-  public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance, int idleTimeSeconds) {
-    this.handler = handler;
-    this.defaultInstance = defaultInstance;
-    this.idleTimeSeconds = idleTimeSeconds;
-  }
-
-  @Override
-  protected void initChannel(Channel channel) throws Exception {
-    ChannelPipeline pipeline = channel.pipeline();
-    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
-    pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
-    pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
-    pipeline.addLast("protobufEncoder", new ProtobufEncoder());
-    pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeSeconds)); //zero is disabling
-    pipeline.addLast("handler", handler);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java
new file mode 100644
index 0000000..8787dee
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.MessageLite;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import io.netty.handler.timeout.IdleStateHandler;
+
+import java.util.concurrent.TimeUnit;
+
+class ProtoClientChannelInitializer extends ChannelInitializer<Channel> {
+  private final MessageLite defaultInstance;
+  private final ChannelHandler handler;
+  private final long timeoutTimeNanos;
+  private final boolean enablePing;
+
+  public ProtoClientChannelInitializer(ChannelHandler handler, MessageLite defaultInstance,
+                                       long timeoutTimeNanos,
+                                       boolean enablePing) {
+    this.handler = handler;
+    this.defaultInstance = defaultInstance;
+    this.timeoutTimeNanos = timeoutTimeNanos;
+    this.enablePing = enablePing;
+  }
+
+  @Override
+  protected void initChannel(Channel channel) throws Exception {
+    ChannelPipeline pipeline = channel.pipeline();
+    pipeline.addLast("idleStateHandler",
+        new IdleStateHandler(timeoutTimeNanos, timeoutTimeNanos / 2, 0, TimeUnit.NANOSECONDS));
+
+    if (enablePing) pipeline.addLast("MonitorClientHandler", new MonitorClientHandler());
+
+    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
+    pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
+    pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
+    pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+    pipeline.addLast("handler", handler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java
new file mode 100644
index 0000000..91d331e
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+interface ProtoDeclaration {
+
+  <I> I getStub();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java
new file mode 100644
index 0000000..3e2fe0d
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.protobuf.MessageLite;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+
+class ProtoServerChannelInitializer extends ChannelInitializer<Channel> {
+  private final MessageLite defaultInstance;
+  private final ChannelHandler handler;
+
+  public ProtoServerChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
+    this.handler = handler;
+    this.defaultInstance = defaultInstance;
+  }
+
+  @Override
+  protected void initChannel(Channel channel) throws Exception {
+    ChannelPipeline pipeline = channel.pipeline();
+    pipeline.addLast("MonitorServerHandler", new MonitorServerHandler());
+    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
+    pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
+    pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
+    pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+    pipeline.addLast("handler", handler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java
new file mode 100644
index 0000000..1d22663
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+/* RecoverableException can be handle a failure request */
+public class RecoverableException extends RemoteException {
+  private int seqId;
+
+  public RecoverableException(int seqId, String message) {
+    super(message);
+    this.seqId = seqId;
+  }
+
+  public RecoverableException(int seqId, Throwable t) {
+    this(seqId, t.getMessage(), t);
+  }
+
+  public RecoverableException(int seqId, String message, Throwable t) {
+    super(message, t);
+    this.seqId = seqId;
+  }
+
+  public int getSeqId() {
+    return seqId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index f05fb97..f8def7f 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -18,26 +18,28 @@
 
 package org.apache.tajo.rpc;
 
-import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.internal.logging.CommonsLoggerFactory;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import javax.annotation.concurrent.ThreadSafe;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 @ThreadSafe
 public class RpcClientManager {
   private static final Log LOG = LogFactory.getLog(RpcClientManager.class);
 
-  public static final int RPC_RETRIES = 3;
-
-  /* If all requests is done and client is idle state, client will be removed. */
-  public static final int RPC_IDLE_TIMEOUT = 43200; // 12 hour
+  private volatile int timeoutSeconds = RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS;
+  private volatile int retries = RpcConstants.DEFAULT_RPC_RETRIES;
 
   /* entries will be removed by ConnectionCloseFutureListener */
   private static final Map<RpcConnectionKey, NettyClientBase>
@@ -57,45 +59,86 @@ public class RpcClientManager {
     return instance;
   }
 
-  private NettyClientBase makeClient(RpcConnectionKey rpcConnectionKey)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+  private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey,
+                                                   int retries,
+                                                   long timeout,
+                                                   TimeUnit timeUnit,
+                                                   boolean enablePing)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
     NettyClientBase client;
     if (rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT);
+      client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing);
     } else {
-      client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT);
+      client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing);
     }
-    return client;
+    return (T) client;
   }
 
   /**
    * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol.
    * This client will be shared per protocol and address. Client is removed in shared map when a client is closed
-   * @param addr
-   * @param protocolClass
-   * @param asyncMode
-   * @return
-   * @throws NoSuchMethodException
-   * @throws ClassNotFoundException
-   * @throws ConnectTimeoutException
    */
-  public NettyClientBase getClient(InetSocketAddress addr,
-                                   Class<?> protocolClass, boolean asyncMode)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+  public <T extends NettyClientBase> T getClient(InetSocketAddress addr,
+                                                 Class<?> protocolClass, boolean asyncMode)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
     RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
 
     NettyClientBase client;
     synchronized (clients) {
       client = clients.get(key);
       if (client == null) {
-        clients.put(key, client = makeClient(key));
+        clients.put(key, client = makeClient(key, retries, getTimeoutSeconds(), TimeUnit.SECONDS, true));
       }
     }
 
     if (!client.isConnected()) {
-      client.connect();
-      client.getChannel().closeFuture().addListener(new ConnectionCloseFutureListener(key));
+
+      final NettyClientBase target = client;
+      client.subscribeEvent(target.getKey(), new ChannelEventListener() {
+        @Override
+        public void channelRegistered(ChannelHandlerContext ctx) {
+          /* Register client to managed map */
+          clients.put(target.getKey(), target);
+          target.getChannel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey()));
+        }
+
+        @Override
+        public void channelUnregistered(ChannelHandlerContext ctx) {
+          // nothing to do
+        }
+      });
     }
+
+    client.connect();
+    assert client.isConnected();
+    return (T) client;
+  }
+
+  /**
+   * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol.
+   * This client does not managed. It should close.
+   */
+  public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr,
+                                                              Class<?> protocolClass,
+                                                              boolean asyncMode,
+                                                              int retries,
+                                                              long timeout,
+                                                              TimeUnit timeUnit,
+                                                              boolean enablePing)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+    return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing);
+  }
+
+  public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key,
+                                                              int retries,
+                                                              long timeout,
+                                                              TimeUnit timeUnit,
+                                                              boolean enablePing)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+    T client = makeClient(key, retries, timeout, timeUnit, enablePing);
+    client.connect();
     assert client.isConnected();
     return client;
   }
@@ -125,11 +168,6 @@ public class RpcClientManager {
     RpcChannelFactory.shutdownGracefully();
   }
 
-  protected static NettyClientBase remove(RpcConnectionKey key) {
-    LOG.debug("Removing shared rpc client :" + key);
-    return clients.remove(key);
-  }
-
   protected static boolean contains(RpcConnectionKey key) {
     return clients.containsKey(key);
   }
@@ -148,6 +186,22 @@ public class RpcClientManager {
     }
   }
 
+  public int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
+  public void setTimeoutSeconds(int timeoutSeconds) {
+    this.timeoutSeconds = timeoutSeconds;
+  }
+
+  public int getRetries() {
+    return retries;
+  }
+
+  public void setRetries(int retries) {
+    this.retries = retries;
+  }
+
   static class RpcConnectionKey {
     final InetSocketAddress addr;
     final Class<?> protocolClass;
@@ -182,4 +236,17 @@ public class RpcClientManager {
       return description.hashCode();
     }
   }
+
+  static class ClientCloseFutureListener implements GenericFutureListener {
+    private RpcClientManager.RpcConnectionKey key;
+
+    public ClientCloseFutureListener(RpcClientManager.RpcConnectionKey key) {
+      this.key = key;
+    }
+
+    @Override
+    public void operationComplete(Future future) throws Exception {
+      clients.remove(key);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto
index 824ea38..35cc945 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto
@@ -28,6 +28,7 @@ service DummyProtocolService {
   rpc echo (EchoMessage) returns (EchoMessage);
   rpc getError (EchoMessage) returns (EchoMessage);
   rpc getNull (EchoMessage) returns (EchoMessage);
-  rpc deley (EchoMessage) returns (EchoMessage);
+  rpc delay (EchoMessage) returns (EchoMessage);
+  rpc busy (EchoMessage) returns (EchoMessage);
   rpc throwException (EchoMessage) returns (EchoMessage);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 68f170c..f8c6d32 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -39,9 +39,11 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -59,17 +61,20 @@ public class TestAsyncRpc {
   Interface stub;
   DummyProtocolAsyncImpl service;
   int retries;
-  
+  RpcClientManager.RpcConnectionKey rpcConnectionKey;
+  RpcClientManager manager = RpcClientManager.getInstance();
+
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
   @interface SetupRpcConnection {
     boolean setupRpcServer() default true;
+
     boolean setupRpcClient() default true;
   }
-  
+
   @Rule
   public ExternalResource resource = new ExternalResource() {
-    
+
     private Description description;
 
     @Override
@@ -85,7 +90,7 @@ public class TestAsyncRpc {
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         setUpRpcServer();
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
         setUpRpcClient();
       }
@@ -102,7 +107,7 @@ public class TestAsyncRpc {
           fail(e.getMessage());
         }
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         try {
           tearDownRpcServer();
@@ -111,25 +116,24 @@ public class TestAsyncRpc {
         }
       }
     }
-    
+
   };
-  
+
   public void setUpRpcServer() throws Exception {
     service = new DummyProtocolAsyncImpl();
     server = new AsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0), 2);
+        service, new InetSocketAddress("127.0.0.1", 0), 3);
     server.start();
   }
-  
+
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(
-              RpcUtils.getConnectAddress(server.getListenAddress()),
-              DummyProtocol.class, true);
-    client = new AsyncRpcClient(rpcConnectionKey, retries);
-    client.connect();
+    rpcConnectionKey = new RpcClientManager.RpcConnectionKey(
+        RpcUtils.getConnectAddress(server.getListenAddress()),
+        DummyProtocol.class, true);
+    client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true);
+    assertTrue(client.isConnected());
     stub = client.getStub();
   }
 
@@ -137,16 +141,16 @@ public class TestAsyncRpc {
   public static void tearDownClass() throws Exception {
     RpcChannelFactory.shutdownGracefully();
   }
-  
+
   public void tearDownRpcServer() throws Exception {
-    if(server != null) {
+    if (server != null) {
       server.shutdown();
       server = null;
     }
   }
-  
+
   public void tearDownRpcClient() throws Exception {
-    if(client != null) {
+    if (client != null) {
       client.close();
       client = null;
     }
@@ -172,35 +176,37 @@ public class TestAsyncRpc {
     });
 
 
+    final CountDownLatch barrier = new CountDownLatch(1);
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
     RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() {
       @Override
       public void run(EchoMessage parameter) {
+        assertNotNull(parameter);
         echo = parameter.getMessage();
         assertEquals(MESSAGE, echo);
         calledMarker = true;
+        barrier.countDown();
       }
     };
     stub.echo(null, echoMessage, callback);
-    Thread.sleep(1000);
+
+    assertTrue(barrier.await(1000, TimeUnit.MILLISECONDS));
     assertTrue(calledMarker);
   }
 
-  private CountDownLatch testNullLatch;
-
   @Test
   public void testGetNull() throws Exception {
-    testNullLatch = new CountDownLatch(1);
+    final CountDownLatch barrier = new CountDownLatch(1);
     stub.getNull(null, null, new RpcCallback<EchoMessage>() {
       @Override
       public void run(EchoMessage parameter) {
         assertNull(parameter);
         LOG.info("testGetNull retrieved");
-        testNullLatch.countDown();
+        barrier.countDown();
       }
     });
-    assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS));
+    assertTrue(barrier.await(1000, TimeUnit.MILLISECONDS));
     assertTrue(service.getNullCalled);
   }
 
@@ -209,70 +215,112 @@ public class TestAsyncRpc {
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
     CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-    stub.deley(null, echoMessage, future);
+    stub.delay(future.getController(), echoMessage, future);
 
     assertFalse(future.isDone());
-    assertEquals(future.get(), echoMessage);
+    assertEquals(echoMessage, future.get());
     assertTrue(future.isDone());
   }
 
   @Test
   public void testCallFutureTimeout() throws Exception {
     boolean timeout = false;
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    EchoMessage echoMessage = EchoMessage.newBuilder().setMessage(MESSAGE).build();
     try {
-      EchoMessage echoMessage = EchoMessage.newBuilder()
-          .setMessage(MESSAGE).build();
-      CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-      stub.deley(null, echoMessage, future);
-
+      stub.delay(future.getController(), echoMessage, future);
       assertFalse(future.isDone());
-      future.get(1, TimeUnit.SECONDS);
+      future.get(100, TimeUnit.MILLISECONDS);
     } catch (TimeoutException te) {
       timeout = true;
     }
+    assertFalse(future.getController().failed());
     assertTrue(timeout);
   }
 
   @Test
-  public void testCallFutureDisconnected() throws Exception {
+  public void testThrowException() throws Exception {
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
     CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.throwException(future.getController(), echoMessage, future);
 
-    tearDownRpcServer();
-
-    stub.echo(future.getController(), echoMessage, future);
-    EchoMessage response = future.get();
+    assertFalse(future.isDone());
+    EchoMessage result = null;
+    try {
+      result = future.get();
+    } catch (ExecutionException e) {
+    }
 
-    assertNull(response);
+    assertEquals(null, result);
+    assertTrue(future.isDone());
     assertTrue(future.getController().failed());
-    assertTrue(future.getController().errorText() != null);
+    assertNotNull(future.getController().errorText());
   }
 
   @Test
-  public void testStubDisconnected() throws Exception {
+  public void testThrowException2() throws Exception {
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.throwException(null, echoMessage, future);
+
+    assertFalse(future.isDone());
+    assertNull(future.get());
+
+    assertTrue(future.isDone());
+    assertFalse(future.getController().failed());
+    assertNull(future.getController().errorText());
+  }
 
+  @Test(timeout = 60000)
+  public void testServerShutdown1() throws Exception {
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
     CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
 
-    if (server != null) {
-      server.shutdown(true);
-      server = null;
+    tearDownRpcServer();
+
+    stub.echo(future.getController(), echoMessage, future);
+
+    EchoMessage result = null;
+    try {
+      result = future.get();
+    } catch (ExecutionException e) {
     }
 
-    stub = client.getStub();
+    assertEquals(null, result);
+    assertTrue(future.isDone());
+    assertTrue(future.getController().failed());
+    assertNotNull(future.getController().errorText(), future.getController().errorText());
+  }
+
+  @Test(timeout = 60000)
+  public void testServerShutdown2() throws Exception {
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+    tearDownRpcServer();
+
+    Interface stub = client.getStub();
     stub.echo(future.getController(), echoMessage, future);
-    EchoMessage response = future.get();
 
-    assertNull(response);
+    EchoMessage result = null;
+    try {
+      result = future.get();
+    } catch (ExecutionException e) {
+    }
+
+    assertEquals(null, result);
+    assertTrue(future.isDone());
     assertTrue(future.getController().failed());
-    assertTrue(future.getController().errorText() != null);
+    assertNotNull(future.getController().errorText(), future.getController().errorText());
   }
 
   @Test
-  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
-  public void testConnectionRetry() throws Exception {
+  @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false)
+  public void testClientRetryOnStartup() throws Exception {
     retries = 10;
     ServerSocket serverSocket = new ServerSocket(0);
     final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
@@ -300,94 +348,228 @@ public class TestAsyncRpc {
     serverThread.start();
 
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
-    client = new AsyncRpcClient(rpcConnectionKey, retries);
-    client.connect();
+        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey,
+        retries, 0, TimeUnit.MILLISECONDS, false);
     assertTrue(client.isConnected());
-    stub = client.getStub();
+
+    Interface stub = client.getStub();
     stub.echo(future.getController(), echoMessage, future);
 
     assertFalse(future.isDone());
     assertEquals(echoMessage, future.get());
     assertTrue(future.isDone());
+    client.close();
+    server.shutdown();
   }
 
-  @Test
-  public void testConnectionFailure() throws Exception {
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false)
+  public void testClientRetryFailureOnStartup() throws Exception {
+    retries = 2;
+    ServerSocket serverSocket = new ServerSocket(0);
+    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+    serverSocket.close();
+    service = new DummyProtocolAsyncImpl();
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
+    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries);
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectTimeoutException e) {
+      assertFalse(e.getMessage(), client.isConnected());
+    }
+
+    stub = client.getStub();
+    stub.echo(future.getController(), echoMessage, future);
+
+    EchoMessage result = null;
+    try {
+      result = future.get();
+    } catch (ExecutionException e) {
+    }
+
+    assertEquals(null, result);
+    assertTrue(future.isDone());
+    assertTrue(future.getController().failed());
+    assertNotNull(future.getController().errorText(), future.getController().errorText());
+  }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false)
+  public void testUnresolvedAddress() throws Exception {
     InetSocketAddress address = new InetSocketAddress("test", 0);
     boolean expected = false;
+    AsyncRpcClient client = null;
     try {
       RpcClientManager.RpcConnectionKey rpcConnectionKey =
           new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
-      NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
+      client = new AsyncRpcClient(rpcConnectionKey, retries);
       client.connect();
       fail();
-    } catch (ConnectTimeoutException e) {
+    } catch (ConnectException e) {
       expected = true;
     } catch (Throwable throwable) {
-      fail();
+      fail(throwable.getMessage());
+    } finally {
+      client.close();
     }
     assertTrue(expected);
 
   }
 
-  @Test
-  @SetupRpcConnection(setupRpcClient=false)
-  public void testUnresolvedAddress() throws Exception {
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testUnresolvedAddress2() throws Exception {
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(
-              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
-    client = new AsyncRpcClient(rpcConnectionKey, retries);
+        new RpcClientManager.RpcConnectionKey(
+            RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
+    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries);
     client.connect();
-    assertTrue(client.isConnected());
-    Interface stub = client.getStub();
+    try {
+      assertTrue(client.isConnected());
+      Interface stub = client.getStub();
+      EchoMessage echoMessage = EchoMessage.newBuilder()
+          .setMessage(MESSAGE).build();
+      CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+      stub.echo(future.getController(), echoMessage, future);
+
+      assertFalse(future.isDone());
+      assertEquals(echoMessage, future.get());
+      assertTrue(future.isDone());
+    } finally {
+      client.close();
+    }
+  }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testStubRecovery() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, 2, 0, TimeUnit.MILLISECONDS, false);
+
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
-    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-    stub.deley(null, echoMessage, future);
+    int repeat = 5;
 
-    assertFalse(future.isDone());
-    assertEquals(future.get(), echoMessage);
-    assertTrue(future.isDone());
+    assertTrue(client.isConnected());
+    Interface stub = client.getStub();
+
+    client.close(); // close connection
+    assertFalse(client.isConnected());
+
+    for (int i = 0; i < repeat; i++) {
+      try {
+        CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+        stub.echo(future.getController(), echoMessage, future);
+        assertEquals(echoMessage, future.get());
+        assertTrue(future.isDone());
+        assertTrue(client.isConnected());
+      } finally {
+        client.close(); // close connection
+        assertFalse(client.isConnected());
+      }
+    }
   }
 
-  @Test
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
   public void testIdleTimeout() throws Exception {
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
         new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
-    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
-    client.connect();
+    //500 millis idle timeout
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
     assertTrue(client.isConnected());
 
-    Thread.sleep(2000);
+    Thread.sleep(600);  //timeout
     assertFalse(client.isConnected());
 
     client.connect(); // try to reconnect
     assertTrue(client.isConnected());
-    client.close();
+
+    Thread.sleep(600);  //timeout
     assertFalse(client.isConnected());
+    client.close();
   }
 
-  @Test
-  public void testIdleTimeoutWithActiveRequest() throws Exception {
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testPingOnIdle() throws Exception {
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
         new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
-    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
-    client.connect();
 
+    //500 millis request timeout
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    assertTrue(client.isConnected());
+
+    Thread.sleep(600);
+    assertTrue(client.isConnected());
+
+    Thread.sleep(600);
+    assertTrue(client.isConnected());
+    client.close();
+  }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testIdleTimeoutWithActiveRequest() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    //500 millis idle timeout
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
     assertTrue(client.isConnected());
+
     Interface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
     CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
-    stub.deley(null, echoMessage, future); //3 sec delay
+    stub.delay(future.getController(), echoMessage, future); //3 sec delay
+    assertTrue(client.isConnected());
 
     assertFalse(future.isDone());
-    assertEquals(future.get(), echoMessage);
+    assertEquals(echoMessage, future.get());
     assertTrue(future.isDone());
 
-    Thread.sleep(2000);
+    assertTrue(client.getActiveRequests() == 0);
+    Thread.sleep(600);  //timeout
     assertFalse(client.isConnected());
   }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testRequestTimeoutOnBusy() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+
+    //500 millis request timeout
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    assertTrue(client.isConnected());
+
+    Interface stub = client.getStub();
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.busy(future.getController(), echoMessage, future); //30 sec delay
+    assertFalse(future.isDone());
+
+    EchoMessage result = null;
+    try {
+      result = future.get();
+    } catch (ExecutionException e) {
+    }
+
+    assertEquals(null, result);
+    assertTrue(future.getController().errorText(), future.getController().failed());
+    assertTrue(client.getActiveRequests() == 0);
+    client.close();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 568eb63..6f7fdd1 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -36,6 +36,7 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.concurrent.CountDownLatch;
@@ -50,18 +51,20 @@ public class TestBlockingRpc {
   private BlockingRpcClient client;
   private BlockingInterface stub;
   private DummyProtocolBlockingImpl service;
+  RpcClientManager manager = RpcClientManager.getInstance();
   private int retries;
-  
+
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
   @interface SetupRpcConnection {
     boolean setupRpcServer() default true;
+
     boolean setupRpcClient() default true;
   }
-  
+
   @Rule
   public ExternalResource resource = new ExternalResource() {
-    
+
     private Description description;
 
     @Override
@@ -73,11 +76,11 @@ public class TestBlockingRpc {
     @Override
     protected void before() throws Throwable {
       SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         setUpRpcServer();
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
         setUpRpcClient();
       }
@@ -86,7 +89,7 @@ public class TestBlockingRpc {
     @Override
     protected void after() {
       SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
         try {
           tearDownRpcClient();
@@ -94,7 +97,7 @@ public class TestBlockingRpc {
           fail(e.getMessage());
         }
       }
-      
+
       if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
         try {
           tearDownRpcServer();
@@ -103,25 +106,24 @@ public class TestBlockingRpc {
         }
       }
     }
-    
+
   };
-  
+
   public void setUpRpcServer() throws Exception {
     service = new DummyProtocolBlockingImpl();
     server = new BlockingRpcServer(DummyProtocol.class, service,
         new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
   }
-  
+
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(
-              RpcUtils.getConnectAddress(server.getListenAddress()),
-              DummyProtocol.class, false);
-    client = new BlockingRpcClient(rpcConnectionKey, retries);
-    client.connect();
+        new RpcClientManager.RpcConnectionKey(
+            RpcUtils.getConnectAddress(server.getListenAddress()),
+            DummyProtocol.class, false);
+    client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true);
     assertTrue(client.isConnected());
     stub = client.getStub();
   }
@@ -130,16 +132,16 @@ public class TestBlockingRpc {
   public static void tearDownClass() throws Exception {
     RpcChannelFactory.shutdownGracefully();
   }
-  
+
   public void tearDownRpcServer() throws Exception {
-    if(server != null) {
+    if (server != null) {
       server.shutdown();
       server = null;
     }
   }
-  
+
   public void tearDownRpcClient() throws Exception {
-    if(client != null) {
+    if (client != null) {
       client.close();
       client = null;
     }
@@ -162,7 +164,14 @@ public class TestBlockingRpc {
   }
 
   @Test
-  @SetupRpcConnection(setupRpcClient=false)
+  public void testGetNull() throws Exception {
+    assertNull(stub.getNull(null, null));
+    assertTrue(service.getNullCalled);
+  }
+
+  @Test
+  @SetupRpcConnection(setupRpcClient = false)
+  @Deprecated // serverCallable will be remove
   public void testRpcWithServiceCallable() throws Exception {
     RpcClientManager manager = RpcClientManager.getInstance();
     final SumRequest request = SumRequest.newBuilder()
@@ -172,15 +181,15 @@ public class TestBlockingRpc {
         .setX4(2.0f).build();
 
     SumResponse response =
-    new ServerCallable<SumResponse>(manager,
-        server.getListenAddress(), DummyProtocol.class, false) {
-      @Override
-      public SumResponse call(NettyClientBase client) throws Exception {
-        BlockingInterface stub2 = client.getStub();
-        SumResponse response1 = stub2.sum(null, request);
-        return response1;
-      }
-    }.withRetries();
+        new ServerCallable<SumResponse>(manager,
+            server.getListenAddress(), DummyProtocol.class, false) {
+          @Override
+          public SumResponse call(NettyClientBase client) throws Exception {
+            BlockingInterface stub2 = client.getStub();
+            SumResponse response1 = stub2.sum(null, request);
+            return response1;
+          }
+        }.withRetries();
 
     assertEquals(8.15d, response.getResult(), 1e-15);
 
@@ -207,10 +216,8 @@ public class TestBlockingRpc {
     try {
       stub.throwException(null, message);
       fail("RpcCall should throw exception");
-    } catch (Throwable t) {
-      assertTrue(t instanceof TajoServiceException);
-      assertEquals("Exception Test", t.getMessage());
-      TajoServiceException te = (TajoServiceException)t;
+    } catch (TajoServiceException te) {
+      assertEquals("Exception Test", te.getMessage());
       assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol());
       assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(),
           te.getRemoteAddress());
@@ -222,83 +229,57 @@ public class TestBlockingRpc {
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
 
+    DefaultRpcController controller = new DefaultRpcController();
     try {
-      stub.throwException(null, message);
+      stub.throwException(controller, message);
       fail("RpcCall should throw exception");
-    } catch (Throwable t) {
-      assertTrue(t instanceof TajoServiceException);
+    } catch (TajoServiceException t) {
+      assertTrue(controller.failed());
+      assertNotNull(controller.errorText());
     }
 
-    EchoMessage message1 = stub.deley(null, message);
+    controller.reset();
+    EchoMessage message1 = stub.delay(controller, message);
     assertEquals(message, message1);
+    assertFalse(controller.failed());
+    assertNull(controller.errorText());
   }
 
-  @Test
-  @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
-  public void testConnectionRetry() throws Exception {
-    retries = 10;
-    ServerSocket serverSocket = new ServerSocket(0);
-    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
-    serverSocket.close();
-
+  @Test(timeout = 60000)
+  public void testServerShutdown1() throws Exception {
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
 
-    //lazy startup
-    Thread serverThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(1000);
-          server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
-        } catch (Exception e) {
-          fail(e.getMessage());
-        }
-        server.start();
-      }
-    });
-    serverThread.start();
-
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false);
-    client = new BlockingRpcClient(rpcConnectionKey, retries);
-    client.connect();
-    assertTrue(client.isConnected());
-    stub = client.getStub();
-
-    EchoMessage response = stub.echo(null, message);
-    assertEquals(MESSAGE, response.getMessage());
-  }
-
-  @Test
-  public void testConnectionFailed() throws Exception {
-    NettyClientBase client = null;
-    boolean expected = false;
+    tearDownRpcServer();
+    boolean expect = false;
     try {
-      int port = server.getListenAddress().getPort() + 1;
-      RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(
-              RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)),
-              DummyProtocol.class, false);
-      client = new BlockingRpcClient(rpcConnectionKey, retries);
-      client.connect();
-      fail();
-    } catch (ConnectTimeoutException e) {
-      expected = true;
-    } catch (Throwable e) {
-      fail(e.getMessage());
+      EchoMessage response = stub.echo(null, message);
+      fail(response.getMessage());
+    } catch (TajoServiceException e) {
+      expect = true;
     }
-    assertTrue(expected);
+    assertTrue(expect);
   }
 
-  @Test
-  public void testGetNull() throws Exception {
-    assertNull(stub.getNull(null, null));
-    assertTrue(service.getNullCalled);
+  @Test(timeout = 60000)
+  public void testServerShutdown2() throws Exception {
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    tearDownRpcServer();
+    boolean expect = false;
+    try {
+      BlockingInterface stub = client.getStub();
+      EchoMessage response = stub.echo(null, message);
+      fail(response.getMessage());
+    } catch (TajoServiceException e) {
+      expect = true;
+    }
+    assertTrue(expect);
   }
 
   @Test
-  public void testShutdown() throws Exception {
+  public void testServerShutdown3() throws Exception {
     final StringBuilder error = new StringBuilder();
     Thread callThread = new Thread() {
       public void run() {
@@ -306,11 +287,11 @@ public class TestBlockingRpc {
           EchoMessage message = EchoMessage.newBuilder()
               .setMessage(MESSAGE)
               .build();
-          stub.deley(null, message);
+          stub.delay(null, message);
         } catch (Exception e) {
           error.append(e.getMessage());
         }
-        synchronized(error) {
+        synchronized (error) {
           error.notifyAll();
         }
       }
@@ -340,67 +321,244 @@ public class TestBlockingRpc {
 
     assertTrue(latch.getCount() == 0);
 
-    synchronized(error) {
+    synchronized (error) {
       error.wait(5 * 1000);
     }
 
-    if(!error.toString().isEmpty()) {
+    if (!error.toString().isEmpty()) {
       fail(error.toString());
     }
   }
 
-  @Test
-  @SetupRpcConnection(setupRpcClient=false)
-  public void testUnresolvedAddress() throws Exception {
-    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false)
+  public void testClientRetryOnStartup() throws Exception {
+    retries = 10;
+    ServerSocket serverSocket = new ServerSocket(0);
+    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+    serverSocket.close();
+
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    //lazy startup
+    Thread serverThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(1000);
+          server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+        server.start();
+      }
+    });
+    serverThread.start();
+
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(
-              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
-    client = new BlockingRpcClient(rpcConnectionKey, retries);
-    client.connect();
+        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false);
+
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey,
+        retries, 0, TimeUnit.MILLISECONDS, false);
     assertTrue(client.isConnected());
+
     BlockingInterface stub = client.getStub();
+    EchoMessage response = stub.echo(null, message);
+    assertEquals(MESSAGE, response.getMessage());
+    client.close();
+    server.shutdown();
+  }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false)
+  public void testClientRetryFailureOnStartup() throws Exception {
+    retries = 2;
+    ServerSocket serverSocket = new ServerSocket(0);
+    final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+    serverSocket.close();
 
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
-    EchoMessage response2 = stub.echo(null, message);
-    assertEquals(MESSAGE, response2.getMessage());
+
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false);
+    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries);
+
+    try {
+      client.connect();
+      fail();
+    } catch (ConnectTimeoutException e) {
+      assertFalse(e.getMessage(), client.isConnected());
+    }
+
+    BlockingInterface stub = client.getStub();
+    try {
+      EchoMessage response = stub.echo(null, message);
+      fail();
+    } catch (TajoServiceException e) {
+      assertFalse(e.getMessage(), client.isConnected());
+    }
+    RpcClientManager.cleanup(client);
+  }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false)
+  public void testUnresolvedAddress() throws Exception {
+    InetSocketAddress address = new InetSocketAddress("test", 0);
+    boolean expected = false;
+    BlockingRpcClient client = null;
+    try {
+      RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
+      client = new BlockingRpcClient(rpcConnectionKey, retries);
+      client.connect();
+      fail();
+    } catch (ConnectException e) {
+      expected = true;
+    } catch (Throwable throwable) {
+      fail(throwable.getMessage());
+    } finally {
+      client.close();
+    }
+    assertTrue(expected);
   }
 
   @Test
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testUnresolvedAddress2() throws Exception {
+    String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(
+            RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
+    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries);
+    client.connect();
+    assertTrue(client.isConnected());
+
+    try {
+      BlockingInterface stub = client.getStub();
+      EchoMessage message = EchoMessage.newBuilder()
+          .setMessage(MESSAGE).build();
+      EchoMessage response2 = stub.echo(null, message);
+      assertEquals(MESSAGE, response2.getMessage());
+    } finally {
+      client.close();
+    }
+  }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testStubRecovery() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, 1, 0, TimeUnit.MILLISECONDS, false);
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    int repeat = 5;
+
+    assertTrue(client.isConnected());
+    BlockingInterface stub = client.getStub();
+
+    client.close(); // close connection
+    assertFalse(client.isConnected());
+
+    for (int i = 0; i < repeat; i++) {
+      try {
+        EchoMessage response = stub.echo(null, echoMessage);
+        assertEquals(MESSAGE, response.getMessage());
+        assertTrue(client.isConnected());
+      } finally {
+        client.close(); // close connection
+        assertFalse(client.isConnected());
+      }
+    }
+  }
+
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
   public void testIdleTimeout() throws Exception {
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
         new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
-    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
-    client.connect();
+    //500 millis idle timeout
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
     assertTrue(client.isConnected());
 
-    Thread.sleep(2000);
+    Thread.sleep(600);   //timeout
     assertFalse(client.isConnected());
 
     client.connect(); // try to reconnect
     assertTrue(client.isConnected());
-    client.close();
+
+    Thread.sleep(600);  //timeout
     assertFalse(client.isConnected());
+    client.close();
   }
 
-  @Test
-  public void testIdleTimeoutWithActiveRequest() throws Exception {
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testPingOnIdle() throws Exception {
     RpcClientManager.RpcConnectionKey rpcConnectionKey =
         new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
-    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout
 
-    client.connect();
+    //500 millis request timeout
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    assertTrue(client.isConnected());
+
+    Thread.sleep(600);
+    assertTrue(client.isConnected());
 
+    Thread.sleep(600);
     assertTrue(client.isConnected());
+    client.close();
+  }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testIdleTimeoutWithActiveRequest() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+    //500 millis idle timeout
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
+    assertTrue(client.isConnected());
+
     BlockingInterface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
 
-    EchoMessage message = stub.deley(null, echoMessage); //3 sec delay
+    EchoMessage message = stub.delay(null, echoMessage); //3 sec delay
     assertEquals(message, echoMessage);
+    assertTrue(client.isConnected());
 
-    Thread.sleep(2000);
+    assertTrue(client.getActiveRequests() == 0);
+    Thread.sleep(600);  //timeout
     assertFalse(client.isConnected());
   }
+
+  @Test(timeout = 60000)
+  @SetupRpcConnection(setupRpcClient = false)
+  public void testRequestTimeoutOnBusy() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+
+    //500 millis request timeout
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    assertTrue(client.isConnected());
+
+    BlockingInterface stub = client.getStub();
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    boolean expected = false;
+    try {
+      EchoMessage message = stub.busy(null, echoMessage); //30 sec delay
+      fail();
+    } catch (TajoServiceException e) {
+      expected = true;
+    } finally {
+      client.close();
+    }
+    assertTrue(expected);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
index 5f86518..71a2b6f 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
@@ -28,10 +28,9 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class TestRpcClientManager {
 
@@ -39,59 +38,141 @@ public class TestRpcClientManager {
   public void testRaceCondition() throws Exception {
     final int parallelCount = 50;
     final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+
     NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
         service, new InetSocketAddress("127.0.0.1", 0), parallelCount);
     server.start();
+    try {
+      final InetSocketAddress address = server.getListenAddress();
+      final RpcClientManager manager = RpcClientManager.getInstance();
 
-    final InetSocketAddress address = server.getListenAddress();
-    final RpcClientManager manager = RpcClientManager.getInstance();
-
-    ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
-    List<Future> tasks = new ArrayList<Future>();
-    for (int i = 0; i < parallelCount; i++) {
-      tasks.add(executor.submit(new Runnable() {
-            @Override
-            public void run() {
-              NettyClientBase client = null;
-              try {
-                client = manager.getClient(address, DummyProtocol.class, false);
-              } catch (Throwable e) {
-                fail(e.getMessage());
+      List<Future> tasks = new ArrayList<Future>();
+      for (int i = 0; i < parallelCount; i++) {
+        tasks.add(executor.submit(new Runnable() {
+              @Override
+              public void run() {
+                NettyClientBase client = null;
+                try {
+                  client = manager.getClient(address, DummyProtocol.class, false);
+                } catch (Throwable e) {
+                  fail(e.getMessage());
+                }
+                assertTrue(client.isConnected());
               }
-              assertTrue(client.isConnected());
-            }
-          })
-      );
+            })
+        );
+      }
+
+      for (Future future : tasks) {
+        future.get();
+      }
+
+      NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false);
+      RpcClientManager.cleanup(clientBase);
+    } finally {
+      server.shutdown();
+      executor.shutdown();
+      RpcClientManager.close();
     }
+  }
+
+  @Test
+  public void testClientCloseEvent() throws Exception {
+    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), 3);
+    server.start();
+    RpcClientManager manager = RpcClientManager.getInstance();
+
+    try {
+
+      NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true);
+      assertTrue(client.isConnected());
+      assertTrue(client.getChannel().isWritable());
+
+      RpcClientManager.RpcConnectionKey key = client.getKey();
+      assertTrue(RpcClientManager.contains(key));
 
-    for (Future future : tasks) {
-      future.get();
+      client.close();
+      assertFalse(RpcClientManager.contains(key));
+    } finally {
+      server.shutdown();
+      RpcClientManager.close();
     }
+  }
+
+  @Test
+  public void testClientCloseEventWithReconnect() throws Exception {
+    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), 3);
+    server.start();
+    int repeat = 10;
+    RpcClientManager manager = RpcClientManager.getInstance();
+
+    try {
 
-    NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false);
-    RpcClientManager.cleanup(clientBase);
-    server.shutdown();
-    executor.shutdown();
+      NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true);
+      assertTrue(client.isConnected());
+
+      RpcClientManager.RpcConnectionKey key = client.getKey();
+      assertTrue(RpcClientManager.contains(key));
+
+      client.close();
+      assertFalse(client.isConnected());
+      assertFalse(RpcClientManager.contains(key));
+
+      for (int i = 0; i < repeat; i++) {
+        client.connect();
+        assertTrue(client.isConnected());
+        assertTrue(RpcClientManager.contains(key));
+
+        client.close();
+        assertFalse(client.isConnected());
+        assertFalse(RpcClientManager.contains(key));
+      }
+    } finally {
+      server.shutdown();
+      RpcClientManager.close();
+    }
   }
 
   @Test
-  public void testCloseFuture() throws Exception {
+  public void testUnManagedClient() throws Exception {
     final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
     NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
         service, new InetSocketAddress("127.0.0.1", 0), 3);
     server.start();
+    RpcClientManager.RpcConnectionKey key =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    RpcClientManager.close();
+    RpcClientManager manager = RpcClientManager.getInstance();
+
+    try {
+      NettyClientBase client1 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false);
+      assertTrue(client1.isConnected());
+      assertFalse(RpcClientManager.contains(key));
 
-    final RpcClientManager manager = RpcClientManager.getInstance();
+      NettyClientBase client2 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false);
+      assertTrue(client2.isConnected());
+      assertFalse(RpcClientManager.contains(key));
 
-    NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true);
-    assertTrue(client.isConnected());
-    assertTrue(client.getChannel().isWritable());
+      assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress());
+      assertNotEquals(client1.getChannel(), client2.getChannel());
 
-    RpcClientManager.RpcConnectionKey key = client.getKey();
-    assertTrue(RpcClientManager.contains(key));
+      client1.close();
+      assertFalse(client1.isConnected());
+      assertTrue(client2.isConnected());
 
-    client.close();
-    assertFalse(RpcClientManager.contains(key));
-    server.shutdown();
+      client1.connect();
+      client2.close();
+      assertFalse(client2.isConnected());
+      assertTrue(client1.isConnected());
+
+      RpcClientManager.cleanup(client1, client2);
+    } finally {
+      server.shutdown();
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
index 0ca7563..abcc057 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
@@ -68,7 +68,7 @@ public class DummyProtocolAsyncImpl implements Interface {
   }
 
   @Override
-  public void deley(RpcController controller, EchoMessage request,
+  public void delay(RpcController controller, EchoMessage request,
                     RpcCallback<EchoMessage> done) {
     try {
       Thread.sleep(3000);
@@ -79,8 +79,25 @@ public class DummyProtocolAsyncImpl implements Interface {
     done.run(request);
   }
 
+  @Override
+  public void busy(RpcController controller, EchoMessage request,
+                   RpcCallback<EchoMessage> done) {
+    try {
+      Thread.sleep(30000);
+    } catch (InterruptedException e) {
+      LOG.error(e.getMessage());
+    }
+    done.run(request);
+  }
+
+  @Override
   public void throwException(RpcController controller, EchoMessage request,
                              RpcCallback<EchoMessage> done) {
-    done.run(request);
+    if(controller != null) {
+      controller.setFailed("throwException");
+      done.run(request);
+    } else {
+      throw new RuntimeException("throwException");
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
index 8d4b597..40eb18f 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java
@@ -65,7 +65,7 @@ public class DummyProtocolBlockingImpl implements BlockingInterface {
   }
 
   @Override
-  public EchoMessage deley(RpcController controller, EchoMessage request)
+  public EchoMessage delay(RpcController controller, EchoMessage request)
       throws ServiceException {
     try {
       Thread.sleep(3000);
@@ -76,6 +76,17 @@ public class DummyProtocolBlockingImpl implements BlockingInterface {
     return request;
   }
 
+  @Override
+  public EchoMessage busy(RpcController controller, EchoMessage request) {
+    try {
+      Thread.sleep(30000);
+    } catch (InterruptedException e) {
+      LOG.error(e.getMessage());
+    }
+    return request;
+  }
+
+  @Override
   public EchoMessage throwException(RpcController controller, EchoMessage request)
       throws ServiceException {
     throw new ServiceException("Exception Test");


Mime
View raw message