tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/2] tajo git commit: TAJO-1860: Refactor Rpc clients to take Connection Parameters.
Date Wed, 23 Sep 2015 02:38:55 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 5a1558613 -> 1eb100459


http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index a860d51..f8d5c45 100644
--- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -19,21 +19,32 @@
 package org.apache.tajo.jdbc;
 
 import com.google.common.collect.Maps;
+import io.netty.channel.ConnectTimeoutException;
 import org.apache.tajo.*;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.exception.SQLExceptionUtil;
+import org.apache.tajo.util.UriUtil;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.sql.*;
 import java.util.*;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.error.Errors.ResultCode.CLIENT_CONNECTION_EXCEPTION;
+import static org.apache.tajo.error.Errors.ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION;
 import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
@@ -197,7 +208,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
   public void testResultSetCompression() throws Exception {
     String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(),
         TajoConstants.DEFAULT_DATABASE_NAME);
-    connUri = connUri + "?" + SessionVars.COMPRESSED_RESULT_TRANSFER.keyname() + "=true";
+    connUri = connUri + "?useCompression=true";
     Connection conn = DriverManager.getConnection(connUri);
     assertTrue(conn.isValid(100));
 
@@ -593,7 +604,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
     try {
       if (!testingCluster.isHiveCatalogStoreRunning()) {
         String connUri = buildConnectionUri(tajoMasterAddress.getHostName(),
-          tajoMasterAddress.getPort(), "TestTajoJdbc");
+            tajoMasterAddress.getPort(), "TestTajoJdbc");
 
         conn = DriverManager.getConnection(connUri);
         assertTrue(conn.isValid(100));

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
index 8f84226..1050fc0 100644
--- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java
@@ -21,7 +21,10 @@ package org.apache.tajo.jdbc;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.exception.SQLExceptionUtil;
+import org.apache.tajo.util.UriUtil;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -29,9 +32,13 @@ import org.junit.experimental.categories.Category;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.sql.*;
+import java.util.Properties;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.error.Errors.ResultCode.CLIENT_CONNECTION_EXCEPTION;
 import static org.apache.tajo.exception.SQLExceptionUtil.toSQLState;
 import static org.apache.tajo.jdbc.TestTajoJdbc.buildConnectionUri;
 import static org.junit.Assert.*;
@@ -53,7 +60,7 @@ public class TestTajoJdbcNegative extends QueryTestCaseBase {
   @Test(expected = SQLException.class)
   public void testGetConnection() throws SQLException {
     DriverManager.getConnection("jdbc:taju://" + tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort()
-      + "/default");
+        + "/default");
   }
 
   @Test
@@ -191,4 +198,49 @@ public class TestTajoJdbcNegative extends QueryTestCaseBase {
       }
     }
   }
+
+  private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException {
+    try (Socket socket = new Socket())  {
+      // Try to connect to a private address in the 10.x.y.z range.
+      // These addresses are usually not routed, so an attempt to
+      // connect to them will hang the connection attempt, which is
+      // what we want to simulate in this test.
+      socket.connect(new InetSocketAddress(host, port), connectTimeout);
+      // Abort the test if we can connect.
+      Assume.assumeTrue(false);
+    } catch (SocketTimeoutException x) {
+      // Expected timeout during connect, continue the test.
+      Assume.assumeTrue(true);
+    } catch (Throwable x) {
+      // Abort if any other exception happens.
+      Assume.assumeTrue(false);
+    }
+  }
+
+  @Test(timeout = 5000)
+  public final void testConnectTimeout() throws Exception {
+    final String host = "10.255.255.1";
+    final int port = 80;
+    int connectTimeout = 1000;
+    assumeConnectTimeout(host, port, connectTimeout);
+
+    long startTime = Long.MIN_VALUE;
+    long endTime;
+    try {
+      // artificially cause connection timeout
+      String connUri = buildConnectionUri(host, port, DEFAULT_DATABASE_NAME);
+      connUri = UriUtil.addParam(connUri, "connectTimeout", "1"); // 1 seconds
+      connUri = UriUtil.addParam(connUri, "retry", "0"); // 1 seconds
+      startTime = System.currentTimeMillis();
+      new JdbcConnection(connUri, new Properties());
+      fail("Must be failed");
+    } catch (SQLException t) {
+      endTime = System.currentTimeMillis();
+      assertEquals(t.getSQLState(), SQLExceptionUtil.toSQLState(CLIENT_CONNECTION_EXCEPTION));
+      assertEquals("connection timed out: /10.255.255.1:80", t.getMessage());
+      // default is 15 seconds. So, if timeout is shorter than 1~2 seconds.
+      // We can ensure the parameter was effective.
+      assertTrue(((endTime - startTime) / 1000) < 2);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
index ab0826f..601f3d2 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
@@ -18,15 +18,33 @@
 
 package org.apache.tajo.rpc;
 
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Constants for RPC
+ */
 public class RpcConstants {
 
   public static final String PING_PACKET = "TAJO";
-  public static final String RPC_CLIENT_RETRY_MAX = "tajo.rpc.client.retry.max";
-  public static final String RPC_CLIENT_TIMEOUT_SECS = "tajo.rpc.client.timeout-secs";
-
-  public static final int DEFAULT_RPC_RETRIES = 3;
-  public static final int DEFAULT_RPC_TIMEOUT_SECONDS = 180;
-  public static final int DEFAULT_CONNECT_TIMEOUT = 20000;  // 20 sec
   public static final int DEFAULT_PAUSE = 1000; // 1 sec
-  public static final int DEFAULT_FUTURE_TIMEOUT_SECONDS = 10;
+  public static final int FUTURE_TIMEOUT_SECONDS_DEFAULT = 10;
+
+  /** How many times the connect will retry */
+  public static final String CLIENT_RETRY_NUM = "tajo.rpc.client.retry-num";
+  public static final int CLIENT_RETRY_NUM_DEFAULT = 0;
+
+  /** Client connection timeout (milliseconds) */
+  public static final String CLIENT_CONNECTION_TIMEOUT = "tajo.rpc.client.connection-timeout-ms";
+  /** Default client connection timeout 15 seconds */
+  public final static long CLIENT_CONNECTION_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(15);
+
+  /**
+   * Socket timeout (milliseconds).
+   */
+  public static final String CLIENT_SOCKET_TIMEOUT = "tajo.rpc.client.socket-timeout-ms";
+  /** Default socket timeout - 60 seconds */
+  public final static long CLIENT_SOCKET_TIMEOUT_DEFAULT =  TimeUnit.SECONDS.toMillis(180);
+
+  public static final String CLIENT_HANG_DETECTION = "tajo.rpc.client.hang-detection";
+  public final static boolean CLIENT_HANG_DETECTION_DEFAULT =  false;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 6fb62d4..c613bac 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -18,56 +18,55 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.*;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.EventLoopGroup;
-import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
 import java.lang.reflect.Method;
-import java.util.concurrent.TimeUnit;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.tajo.rpc.RpcConstants.*;
+
 public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallback> {
 
   private final Method stubMethod;
   private final ProxyRpcChannel rpcChannel;
   private final NettyChannelInboundHandler handler;
 
-  @VisibleForTesting
-  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
-      throws ClassNotFoundException, NoSuchMethodException {
-    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup());
-  }
 
   /**
    * Intentionally make this method package-private, avoiding user directly
    * new an instance through this constructor.
    *
-   * @param rpcConnectionKey
-   * @param retries          retry operation number of times
-   * @param timeout          disable ping, it trigger timeout event on idle-state.
-   *                         otherwise it is request timeout on active-state
-   * @param timeUnit         TimeUnit
-   * @param enablePing       enable to detect remote peer hangs
-   * @param eventLoopGroup   thread pool of netty's
+   * @param rpcConnectionKey  RpcConnectionKey
+   * @param eventLoopGroup    Thread pool of netty's
+   * @param rpcParams         Rpc connection parameters (see RpcConstants)
+   *
    * @throws ClassNotFoundException
    * @throws NoSuchMethodException
    */
-  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing,
-                 EventLoopGroup eventLoopGroup)
+  AsyncRpcClient(EventLoopGroup eventLoopGroup,
+                 RpcConnectionKey rpcConnectionKey,
+                 Properties rpcParams)
       throws ClassNotFoundException, NoSuchMethodException {
-    super(rpcConnectionKey, retries);
+    super(rpcConnectionKey, rpcParams);
 
     this.stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
     this.rpcChannel = new ProxyRpcChannel();
     this.handler = new ClientChannelInboundHandler();
-    init(new ProtoClientChannelInitializer(handler,
-        RpcResponse.getDefaultInstance(),
-        timeUnit.toNanos(timeout),
-        enablePing), eventLoopGroup);
+
+    final long socketTimeoutMills = Long.parseLong(
+        rpcParams.getProperty(CLIENT_SOCKET_TIMEOUT, String.valueOf(CLIENT_SOCKET_TIMEOUT_DEFAULT)));
+
+    // Enable proactive hang detection
+    final boolean hangDetectionEnabled = Boolean.parseBoolean(
+        rpcParams.getProperty(CLIENT_HANG_DETECTION, String.valueOf(CLIENT_HANG_DETECTION_DEFAULT)));
+
+    init(new ProtoClientChannelInitializer(handler, RpcResponse.getDefaultInstance(), socketTimeoutMills,
+            hangDetectionEnabled), eventLoopGroup);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 4327003..35675b4 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,56 +18,57 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.*;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.EventLoopGroup;
-import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.util.Properties;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.tajo.rpc.RpcConstants.*;
+
 public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCallFuture> {
 
   private final Method stubMethod;
   private final ProxyRpcChannel rpcChannel;
   private final NettyChannelInboundHandler handler;
 
-  @VisibleForTesting
-  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
-      throws NoSuchMethodException, ClassNotFoundException {
-    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup());
-  }
-
   /**
    * Intentionally make this method package-private, avoiding user directly
    * new an instance through this constructor.
    *
-   * @param rpcConnectionKey
-   * @param retries          retry operation number of times
-   * @param timeout          disable ping, it trigger timeout event on idle-state.
-   *                         otherwise it is request timeout on active-state
-   * @param timeUnit         TimeUnit
-   * @param enablePing       enable to detect remote peer hangs
-   * @param eventLoopGroup   thread pool of netty's
+   * @param rpcConnectionKey     RpcConnectionKey
+   * @param eventLoopGroup       Thread pool of netty's
+   * @param rpcParams            Rpc connection parameters (see RpcConstants)
+   *
    * @throws ClassNotFoundException
    * @throws NoSuchMethodException
+   * @see RpcConstants
    */
-  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing,
-                    EventLoopGroup eventLoopGroup) throws ClassNotFoundException, NoSuchMethodException {
-    super(rpcConnectionKey, retries);
+  public BlockingRpcClient(EventLoopGroup eventLoopGroup,
+                           RpcConnectionKey rpcConnectionKey,
+                           Properties rpcParams)
+      throws ClassNotFoundException, NoSuchMethodException {
+    super(rpcConnectionKey, rpcParams);
 
     this.stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
     this.rpcChannel = new ProxyRpcChannel();
     this.handler = new ClientChannelInboundHandler();
-    init(new ProtoClientChannelInitializer(handler,
-        RpcResponse.getDefaultInstance(),
-        timeUnit.toNanos(timeout),
-        enablePing), eventLoopGroup);
+
+    long socketTimeoutMills = Long.parseLong(
+        rpcParams.getProperty(CLIENT_SOCKET_TIMEOUT, String.valueOf(CLIENT_SOCKET_TIMEOUT_DEFAULT)));
+
+    // Enable proactive hang detection
+    final boolean hangDetectionEnabled = Boolean.parseBoolean(
+        rpcParams.getProperty(CLIENT_HANG_DETECTION, String.valueOf(CLIENT_HANG_DETECTION_DEFAULT)));
+
+    init(new ProtoClientChannelInitializer(handler, RpcResponse.getDefaultInstance(), socketTimeoutMills,
+            hangDetectionEnabled), eventLoopGroup);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index e5485da..6008c4c 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
@@ -31,7 +32,6 @@ import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
 import java.io.Closeable;
@@ -41,27 +41,50 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.Collection;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.tajo.rpc.RpcConstants.*;
+
 public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable {
   public final static Log LOG = LogFactory.getLog(NettyClientBase.class);
 
-  private Bootstrap bootstrap;
-  private volatile ChannelFuture channelFuture;
   private final RpcConnectionKey key;
-  private final int maxRetries;
+  /** Number to retry for connection and RPC invocation */
+  private final int maxRetryNum;
+  /** Connection Timeout */
+  private final long connTimeoutMillis;
   private boolean enableMonitor;
-
-  private final ConcurrentMap<RpcConnectionKey, ChannelEventListener> channelEventListeners =
-      new ConcurrentHashMap<>();
+  private final ConcurrentMap<RpcConnectionKey, ChannelEventListener> channelEventListeners = new ConcurrentHashMap<>();
   private final ConcurrentMap<Integer, T> requests = new ConcurrentHashMap<>();
 
-  public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
+  private Bootstrap bootstrap;
+  private volatile ChannelFuture channelFuture;
+
+  /**
+   * Constructor of NettyClientBase
+   *
+   * @param rpcConnectionKey RpcConnectionKey
+   * @param rpcParams        Rpc connection parameters (see RpcConstants)
+   *
+   * @throws ClassNotFoundException
+   * @throws NoSuchMethodException
+   * @see RpcConstants
+   */
+  public NettyClientBase(RpcConnectionKey rpcConnectionKey, Properties rpcParams)
       throws ClassNotFoundException, NoSuchMethodException {
     this.key = rpcConnectionKey;
-    this.maxRetries = numRetries;
+
+    this.maxRetryNum = Integer.parseInt(
+        rpcParams.getProperty(CLIENT_RETRY_NUM, String.valueOf(CLIENT_RETRY_NUM_DEFAULT)));
+
+    this.connTimeoutMillis = Integer.parseInt(
+        rpcParams.getProperty(CLIENT_CONNECTION_TIMEOUT, String.valueOf(CLIENT_CONNECTION_TIMEOUT_DEFAULT)));
+
+    // Netty only takes integer value range and this is to avoid integer overflow.
+    Preconditions.checkArgument(this.connTimeoutMillis <= Integer.MAX_VALUE, "Too long connection timeout");
   }
 
   // should be called from sub class
@@ -73,12 +96,12 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
         .handler(initializer)
         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
         .option(ChannelOption.SO_REUSEADDR, true)
-        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, RpcConstants.DEFAULT_CONNECT_TIMEOUT)
+        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connTimeoutMillis)
         .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
         .option(ChannelOption.TCP_NODELAY, true);
   }
 
-  public RpcClientManager.RpcConnectionKey getKey() {
+  public RpcConnectionKey getKey() {
     return key;
   }
 
@@ -132,7 +155,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
           getHandler().registerCallback(rpcRequest.getId(), callback);
         } else {
 
-          if (!future.channel().isActive() && retry < maxRetries) {
+          if (!future.channel().isActive() && retry < maxRetryNum) {
 
             /* schedule the current request for the retry */
             LOG.warn(future.cause() + " Try to reconnect :" + getKey().addr);
@@ -173,6 +196,14 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
     return this.channelFuture = bootstrap.clone().connect(address);
   }
 
+  private ConnectException makeConnectException(InetSocketAddress address, ChannelFuture future) {
+    if (future.cause() instanceof UnresolvedAddressException) {
+      return new ConnectException("Can't resolve host name: " + address.toString());
+    } else {
+      return new ConnectTimeoutException(future.cause().getMessage());
+    }
+  }
+
   public synchronized void connect() throws ConnectException {
     if (isConnected()) return;
 
@@ -186,10 +217,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
     ChannelFuture f = doConnect(address).awaitUninterruptibly();
 
     if (!f.isSuccess()) {
-      if (maxRetries > 0) {
+      if (maxRetryNum > 0) {
         doReconnect(address, f, ++retries);
       } else {
-        throw new ConnectException(ExceptionUtils.getMessage(f.cause()));
+        throw makeConnectException(address, f);
       }
     }
   }
@@ -198,7 +229,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
       throws ConnectException {
 
     for (; ; ) {
-      if (maxRetries > retries) {
+      if (maxRetryNum > retries) {
         retries++;
 
         if(getChannel().eventLoop().isShuttingDown()) {
@@ -218,12 +249,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
         }
       } else {
         LOG.error("Max retry count has been exceeded. attempts=" + retries + " caused by: " + future.cause());
-
-        if (future.cause() instanceof UnresolvedAddressException) {
-          throw new ConnectException("Can't resolve host name: " + address.toString());
-        } else {
-          throw new ConnectTimeoutException(future.cause().getMessage());
-        }
+        throw makeConnectException(address, future);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java
index 8787dee..5d544cb 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java
@@ -34,26 +34,33 @@ import java.util.concurrent.TimeUnit;
 class ProtoClientChannelInitializer extends ChannelInitializer<Channel> {
   private final MessageLite defaultInstance;
   private final ChannelHandler handler;
-  private final long timeoutTimeNanos;
-  private final boolean enablePing;
+  private final long idleTimeout;
+  private final boolean hangDetection;
 
+  /**
+   * Channel Pipe Initializer
+   *
+   * @param handler          Channel Handler
+   * @param defaultInstance  Default Rpc Proto instance
+   * @param idleTimeout      Idle timeout (milliseconds)
+   */
   public ProtoClientChannelInitializer(ChannelHandler handler, MessageLite defaultInstance,
-                                       long timeoutTimeNanos,
-                                       boolean enablePing) {
+                                       long idleTimeout, boolean hangDetection) {
     this.handler = handler;
     this.defaultInstance = defaultInstance;
-    this.timeoutTimeNanos = timeoutTimeNanos;
-    this.enablePing = enablePing;
+    this.idleTimeout = idleTimeout;
+    this.hangDetection = hangDetection;
   }
 
   @Override
   protected void initChannel(Channel channel) throws Exception {
     ChannelPipeline pipeline = channel.pipeline();
     pipeline.addLast("idleStateHandler",
-        new IdleStateHandler(timeoutTimeNanos, timeoutTimeNanos / 2, 0, TimeUnit.NANOSECONDS));
-
-    if (enablePing) pipeline.addLast("MonitorClientHandler", new MonitorClientHandler());
+        new IdleStateHandler(idleTimeout, idleTimeout / 2, 0, TimeUnit.MILLISECONDS));
 
+    if (hangDetection) {
+      pipeline.addLast("MonitorClientHandler", new MonitorClientHandler());
+    }
     pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
     pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
     pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index c801b8a..032cf35 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -33,15 +33,12 @@ import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.Properties;
 
 @ThreadSafe
 public class RpcClientManager {
   private static final Log LOG = LogFactory.getLog(RpcClientManager.class);
 
-  private volatile int timeoutSeconds = RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS;
-  private volatile int retries = RpcConstants.DEFAULT_RPC_RETRIES;
-
   /* entries will be removed by ConnectionCloseFutureListener */
   private static final Map<RpcConnectionKey, NettyClientBase>
       clients = Collections.synchronizedMap(new HashMap<RpcConnectionKey, NettyClientBase>());
@@ -61,26 +58,23 @@ public class RpcClientManager {
   }
 
   private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey,
-                                                   int retries,
-                                                   long timeout,
-                                                   TimeUnit timeUnit,
-                                                   boolean enablePing)
+                                                   Properties rpcParams)
       throws NoSuchMethodException, ConnectException, ClassNotFoundException {
-    return makeClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, NettyUtils.getDefaultEventLoopGroup());
+
+
+    return makeClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, rpcParams);
   }
 
-  private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey,
-                                                   int retries,
-                                                   long timeout,
-                                                   TimeUnit timeUnit,
-                                                   boolean enablePing,
-                                                   EventLoopGroup eventLoopGroup)
+  private <T extends NettyClientBase> T makeClient(EventLoopGroup eventLoopGroup,
+                                                   RpcConnectionKey rpcConnectionKey,
+                                                   Properties rpcParams)
       throws NoSuchMethodException, ClassNotFoundException, ConnectException {
     NettyClientBase client;
     if (rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup);
+      client = new AsyncRpcClient(eventLoopGroup, rpcConnectionKey, rpcParams);
+
     } else {
-      client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup);
+      client = new BlockingRpcClient(eventLoopGroup, rpcConnectionKey, rpcParams);
     }
     return (T) client;
   }
@@ -90,7 +84,9 @@ public class RpcClientManager {
    * This client will be shared per protocol and address. Client is removed in shared map when a client is closed
    */
   public <T extends NettyClientBase> T getClient(InetSocketAddress addr,
-                                                 Class<?> protocolClass, boolean asyncMode)
+                                                 Class<?> protocolClass,
+                                                 boolean asyncMode,
+                                                 Properties rpcParams)
       throws NoSuchMethodException, ClassNotFoundException, ConnectException {
     RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
 
@@ -98,7 +94,7 @@ public class RpcClientManager {
     synchronized (clients) {
       client = clients.get(key);
       if (client == null) {
-        clients.put(key, client = makeClient(key, retries, getTimeoutSeconds(), TimeUnit.SECONDS, true));
+        clients.put(key, client = makeClient(key, rpcParams));
       }
     }
 
@@ -129,35 +125,30 @@ public class RpcClientManager {
    * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol.
    * This client does not managed. It should close.
    */
-  public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr,
+  public <T extends NettyClientBase> T newClient(InetSocketAddress addr,
                                                               Class<?> protocolClass,
                                                               boolean asyncMode,
-                                                              int retries,
-                                                              long timeout,
-                                                              TimeUnit timeUnit,
-                                                              boolean enablePing)
-      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
-
-    return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing);
-  }
-
-  public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr,
-                                                              Class<?> protocolClass,
-                                                              boolean asyncMode)
+                                                              Properties rpcParams)
       throws NoSuchMethodException, ClassNotFoundException, ConnectException {
 
-    return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode),
-        retries, getTimeoutSeconds(), TimeUnit.SECONDS, true);
+    return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), rpcParams);
   }
 
+  /**
+   *
+   * @param key                 RpcConnectionKey
+   * @param <T>                 Rpc Protocol Class
+   * @return                    Rpc Client Class
+   * @throws NoSuchMethodException
+   * @throws ClassNotFoundException
+   * @throws ConnectException
+   */
   public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key,
-                                                              int retries,
-                                                              long timeout,
-                                                              TimeUnit timeUnit,
-                                                              boolean enablePing)
+                                                              Properties connectionParameters)
+
       throws NoSuchMethodException, ClassNotFoundException, ConnectException {
 
-    T client = makeClient(key, retries, timeout, timeUnit, enablePing);
+    T client = makeClient(key, connectionParameters);
     client.connect();
     assert client.isConnected();
     return client;
@@ -165,12 +156,11 @@ public class RpcClientManager {
 
   public synchronized <T extends NettyClientBase> T newBlockingClient(InetSocketAddress addr,
                                                                       Class<?> protocolClass,
-                                                                      int retries,
-                                                                      EventLoopGroup eventLoopGroup)
+                                                                      EventLoopGroup eventLoopGroup,
+                                                                      Properties rpcParams)
       throws NoSuchMethodException, ClassNotFoundException, ConnectException {
 
-    T client = makeClient(new RpcConnectionKey(addr, protocolClass, false),
-        retries, 0, TimeUnit.SECONDS, false, eventLoopGroup);
+    T client = makeClient(eventLoopGroup, new RpcConnectionKey(addr, protocolClass, false), rpcParams);
     client.connect();
     assert client.isConnected();
     return client;
@@ -220,61 +210,10 @@ public class RpcClientManager {
     }
   }
 
-  public int getTimeoutSeconds() {
-    return timeoutSeconds;
-  }
-
-  public void setTimeoutSeconds(int timeoutSeconds) {
-    this.timeoutSeconds = timeoutSeconds;
-  }
-
-  public int getRetries() {
-    return retries;
-  }
-
-  public void setRetries(int retries) {
-    this.retries = retries;
-  }
-
-  static class RpcConnectionKey {
-    final InetSocketAddress addr;
-    final Class<?> protocolClass;
-    final boolean asyncMode;
-
-    final String description;
-
-    public RpcConnectionKey(InetSocketAddress addr,
-                            Class<?> protocolClass, boolean asyncMode) {
-      this.addr = addr;
-      this.protocolClass = protocolClass;
-      this.asyncMode = asyncMode;
-      this.description = "[" + protocolClass + "] " + addr + "," + asyncMode;
-    }
-
-    @Override
-    public String toString() {
-      return description;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof RpcConnectionKey)) {
-        return false;
-      }
-
-      return toString().equals(obj.toString());
-    }
-
-    @Override
-    public int hashCode() {
-      return description.hashCode();
-    }
-  }
-
   static class ClientCloseFutureListener implements GenericFutureListener {
-    private RpcClientManager.RpcConnectionKey key;
+    private RpcConnectionKey key;
 
-    public ClientCloseFutureListener(RpcClientManager.RpcConnectionKey key) {
+    public ClientCloseFutureListener(RpcConnectionKey key) {
       this.key = key;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java
new file mode 100644
index 0000000..2804010
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import java.net.InetSocketAddress;
+
+public class RpcConnectionKey {
+  final InetSocketAddress addr;
+  final Class<?> protocolClass;
+  final boolean asyncMode;
+
+  final String description;
+
+  public RpcConnectionKey(InetSocketAddress addr,
+                          Class<?> protocolClass, boolean asyncMode) {
+    this.addr = addr;
+    this.protocolClass = protocolClass;
+    this.asyncMode = asyncMode;
+    this.description = "[" + protocolClass + "] " + addr + "," + asyncMode;
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof RpcConnectionKey)) {
+      return false;
+    }
+
+    return toString().equals(obj.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    return description.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 8a3f385..6427ffe 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -42,6 +42,7 @@ import java.lang.annotation.Target;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -61,7 +62,7 @@ public class TestAsyncRpc {
   Interface stub;
   DummyProtocolAsyncImpl service;
   int retries;
-  RpcClientManager.RpcConnectionKey rpcConnectionKey;
+  RpcConnectionKey rpcConnectionKey;
   RpcClientManager manager = RpcClientManager.getInstance();
 
   @Retention(RetentionPolicy.RUNTIME)
@@ -129,10 +130,16 @@ public class TestAsyncRpc {
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    rpcConnectionKey = new RpcClientManager.RpcConnectionKey(
-        RpcUtils.getConnectAddress(server.getListenAddress()),
-        DummyProtocol.class, true);
-    client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true);
+    rpcConnectionKey = new RpcConnectionKey(
+        RpcUtils.getConnectAddress(server.getListenAddress()), DummyProtocol.class, true);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(10)));
+    connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true");
+
+
+    client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
     stub = client.getStub();
   }
@@ -347,10 +354,13 @@ public class TestAsyncRpc {
     });
     serverThread.start();
 
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
-    AsyncRpcClient client = manager.newClient(rpcConnectionKey,
-        retries, 0, TimeUnit.MILLISECONDS, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(address, DummyProtocol.class, true);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     Interface stub = client.getStub();
@@ -377,9 +387,13 @@ public class TestAsyncRpc {
         .setMessage(MESSAGE).build();
     CallFuture<EchoMessage> future = new CallFuture<>();
 
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
-    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(address, DummyProtocol.class, true);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+
+    AsyncRpcClient client = new AsyncRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams);
     try {
       client.connect();
       fail();
@@ -409,9 +423,13 @@ public class TestAsyncRpc {
     boolean expected = false;
     AsyncRpcClient client = null;
     try {
-      RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
-      client = new AsyncRpcClient(rpcConnectionKey, retries);
+      RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionKey(address, DummyProtocol.class, true);
+
+      Properties connParams = new Properties();
+      connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+
+      client = new AsyncRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams);
       client.connect();
       fail();
     } catch (ConnectException e) {
@@ -429,10 +447,14 @@ public class TestAsyncRpc {
   @SetupRpcConnection(setupRpcClient = false)
   public void testUnresolvedAddress2() throws Exception {
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(
             RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
-    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+
+    AsyncRpcClient client = new AsyncRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams);
     client.connect();
     try {
       assertTrue(client.isConnected());
@@ -453,9 +475,12 @@ public class TestAsyncRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testStubRecovery() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
-    AsyncRpcClient client = manager.newClient(rpcConnectionKey, 2, 0, TimeUnit.MILLISECONDS, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(2));
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams);
 
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -484,10 +509,15 @@ public class TestAsyncRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testIdleTimeout() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
-    //500 millis idle timeout
-    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+
+    // 500 millis idle timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     Thread.sleep(600);  //timeout
@@ -504,11 +534,16 @@ public class TestAsyncRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testPingOnIdle() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
 
-    //500 millis request timeout
-    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    // 500 millis idle timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+    connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true");
+
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     Thread.sleep(600);
@@ -522,10 +557,15 @@ public class TestAsyncRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testIdleTimeoutWithActiveRequest() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
-    //500 millis idle timeout
-    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+
+    // 500 millis idle timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     Interface stub = client.getStub();
@@ -547,11 +587,16 @@ public class TestAsyncRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testRequestTimeoutOnBusy() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+
+    // 500 millis idle timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries));
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+    connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true");
 
-    //500 millis request timeout
-    AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     Interface stub = client.getStub();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 0fae7ee..0687d0b 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -39,6 +39,7 @@ import java.lang.annotation.Target;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -119,11 +120,16 @@ public class TestBlockingRpc {
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, "1");
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(10)));
+    connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true");
+
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(
             RpcUtils.getConnectAddress(server.getListenAddress()),
             DummyProtocol.class, false);
-    client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true);
+    client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
     stub = client.getStub();
   }
@@ -317,11 +323,13 @@ public class TestBlockingRpc {
     });
     serverThread.start();
 
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(address, DummyProtocol.class, false);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
 
-    BlockingRpcClient client = manager.newClient(rpcConnectionKey,
-        retries, 0, TimeUnit.MILLISECONDS, false);
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     BlockingInterface stub = client.getStub();
@@ -342,9 +350,14 @@ public class TestBlockingRpc {
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
 
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false);
-    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(address, DummyProtocol.class, false);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
+
+    BlockingRpcClient client = new BlockingRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey,
+        connParams);
 
     try {
       client.connect();
@@ -370,9 +383,13 @@ public class TestBlockingRpc {
     boolean expected = false;
     BlockingRpcClient client = null;
     try {
-      RpcClientManager.RpcConnectionKey rpcConnectionKey =
-          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true);
-      client = new BlockingRpcClient(rpcConnectionKey, retries);
+      RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionKey(address, DummyProtocol.class, true);
+
+      Properties connParams = new Properties();
+      connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
+
+      client = new BlockingRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams);
       client.connect();
       fail();
     } catch (ConnectException e) {
@@ -388,11 +405,17 @@ public class TestBlockingRpc {
   @Test(timeout = 120000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testUnresolvedAddress2() throws Exception {
+
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(
             RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
-    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries);
+
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
+
+    BlockingRpcClient client =
+        new BlockingRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams);
     client.connect();
     assertTrue(client.isConnected());
 
@@ -410,9 +433,11 @@ public class TestBlockingRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testStubRecovery() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
-    BlockingRpcClient client = manager.newClient(rpcConnectionKey, 1, 0, TimeUnit.MILLISECONDS, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(1));
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams);
 
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -440,10 +465,15 @@ public class TestBlockingRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testIdleTimeout() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
-    //500 millis idle timeout
-    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+
+    // 500 millis socket timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     Thread.sleep(600);   //timeout
@@ -460,11 +490,16 @@ public class TestBlockingRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testPingOnIdle() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+
+    // 500 millis socket timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+    connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true");
 
-    //500 millis request timeout
-    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     Thread.sleep(600);
@@ -478,10 +513,15 @@ public class TestBlockingRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testIdleTimeoutWithActiveRequest() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
-    //500 millis idle timeout
-    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+
+    // 500 millis socket timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     BlockingInterface stub = client.getStub();
@@ -500,11 +540,16 @@ public class TestBlockingRpc {
   @Test(timeout = 60000)
   @SetupRpcConnection(setupRpcClient = false)
   public void testRequestTimeoutOnBusy() throws Exception {
-    RpcClientManager.RpcConnectionKey rpcConnectionKey =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+    RpcConnectionKey rpcConnectionKey =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false);
+
+    // 500 millis socket timeout
+    Properties connParams = new Properties();
+    connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + "");
+    connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500));
+    connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true");
 
-    //500 millis request timeout
-    BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true);
+    BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams);
     assertTrue(client.isConnected());
 
     BlockingInterface stub = client.getStub();

http://git-wip-us.apache.org/repos/asf/tajo/blob/1eb10045/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
index 1053de6..160c6a3 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
@@ -25,10 +25,10 @@ import org.junit.Test;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 
@@ -54,7 +54,7 @@ public class TestRpcClientManager {
               public void run() {
                 NettyClientBase client = null;
                 try {
-                  client = manager.getClient(address, DummyProtocol.class, false);
+                  client = manager.getClient(address, DummyProtocol.class, false, new Properties());
                 } catch (Throwable e) {
                   fail(e.getMessage());
                 }
@@ -68,7 +68,7 @@ public class TestRpcClientManager {
         future.get();
       }
 
-      NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false);
+      NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false, new Properties());
       RpcClientManager.cleanup(clientBase);
     } finally {
       server.shutdown();
@@ -87,11 +87,11 @@ public class TestRpcClientManager {
 
     try {
 
-      NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true);
+      NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true, new Properties());
       assertTrue(client.isConnected());
       assertTrue(client.getChannel().isWritable());
 
-      RpcClientManager.RpcConnectionKey key = client.getKey();
+      RpcConnectionKey key = client.getKey();
       assertTrue(RpcClientManager.contains(key));
 
       client.close();
@@ -113,10 +113,10 @@ public class TestRpcClientManager {
 
     try {
 
-      NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true);
+      NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true, new Properties());
       assertTrue(client.isConnected());
 
-      RpcClientManager.RpcConnectionKey key = client.getKey();
+      RpcConnectionKey key = client.getKey();
       assertTrue(RpcClientManager.contains(key));
 
       client.close();
@@ -144,17 +144,17 @@ public class TestRpcClientManager {
     NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
         service, new InetSocketAddress("127.0.0.1", 0), 3);
     server.start();
-    RpcClientManager.RpcConnectionKey key =
-        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
+    RpcConnectionKey key =
+        new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true);
     RpcClientManager.close();
     RpcClientManager manager = RpcClientManager.getInstance();
 
     try {
-      NettyClientBase client1 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false);
+      NettyClientBase client1 = manager.newClient(key, new Properties());
       assertTrue(client1.isConnected());
       assertFalse(RpcClientManager.contains(key));
 
-      NettyClientBase client2 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false);
+      NettyClientBase client2 = manager.newClient(key, new Properties());
       assertTrue(client2.isConnected());
       assertFalse(RpcClientManager.contains(key));
 


Mime
View raw message