tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [2/2] tajo git commit: TAJO-1497: RPC client does not share a connection. (jinho)
Date Thu, 16 Apr 2015 09:17:43 GMT
TAJO-1497: RPC client does not share a connection. (jinho)

Closes #533


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7b78668b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7b78668b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7b78668b

Branch: refs/heads/master
Commit: 7b78668b7d90d268bb6065586fe880cda08571c4
Parents: 338a2b7
Author: Jinho Kim <jhkim@apache.org>
Authored: Thu Apr 16 18:16:37 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Thu Apr 16 18:16:37 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/catalog/AbstractCatalogClient.java     |  86 ++++-----
 .../tajo/client/CatalogAdminClientImpl.java     |  40 ++--
 .../org/apache/tajo/client/QueryClientImpl.java |  50 ++---
 .../apache/tajo/client/SessionConnection.java   |  40 ++--
 .../org/apache/tajo/master/QueryInProgress.java |   6 +-
 .../apache/tajo/master/TajoContainerProxy.java  |  38 ++--
 .../apache/tajo/querymaster/QueryMaster.java    |  24 +--
 .../tajo/worker/ExecutionBlockContext.java      |  29 +--
 .../tajo/worker/TajoResourceAllocator.java      |  20 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  73 ++++---
 .../java/org/apache/tajo/worker/TaskRunner.java |   2 -
 .../tajo/worker/WorkerHeartbeatService.java     |  10 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  26 +--
 .../main/java/org/apache/tajo/rpc/RpcUtils.java |  34 ----
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  58 +++---
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |  82 ++++----
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  88 +++++----
 .../org/apache/tajo/rpc/BlockingRpcServer.java  |  85 ++++-----
 .../tajo/rpc/ConnectionCloseFutureListener.java |  35 ++++
 .../org/apache/tajo/rpc/NettyClientBase.java    | 124 ++++--------
 .../tajo/rpc/ProtoChannelInitializer.java       |  11 +-
 .../org/apache/tajo/rpc/RpcClientManager.java   | 185 ++++++++++++++++++
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 191 -------------------
 .../org/apache/tajo/rpc/ServerCallable.java     |  36 ++--
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |  72 +++++--
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  85 ++++++---
 .../apache/tajo/rpc/TestRpcClientManager.java   |  97 ++++++++++
 28 files changed, 837 insertions(+), 792 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index fe8dd8e..3938489 100644
--- a/CHANGES
+++ b/CHANGES
@@ -89,6 +89,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1497: RPC client does not share a connection. (jinho)
+
     TAJO-1467: Parenthesis at the start of SQL query is ignored. 
     (Keuntae Park)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 458d6e0..49be29a 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -30,7 +30,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
@@ -50,14 +50,14 @@ public abstract class AbstractCatalogClient implements CatalogService {
   private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
 
   protected ServiceTracker serviceTracker;
-  protected RpcConnectionPool pool;
+  protected RpcClientManager manager;
   protected InetSocketAddress catalogServerAddr;
   protected TajoConf conf;
 
   abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
 
   public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
-    this.pool = RpcConnectionPool.getPool();
+    this.manager = RpcClientManager.getInstance();
     this.catalogServerAddr = catalogServerAddr;
     this.serviceTracker = ServiceTrackerFactory.get(conf);
     this.conf = conf;
@@ -79,7 +79,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
 
@@ -98,7 +98,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
@@ -113,7 +113,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
@@ -128,7 +128,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllTablespaceNames() {
     try {
-      return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<String> call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
@@ -144,7 +144,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<TablespaceProto> getAllTablespaces() {
     try {
-      return new ServerCallable<List<TablespaceProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<TablespaceProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<TablespaceProto> call(NettyClientBase client) throws Exception {
@@ -162,7 +162,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public TablespaceProto getTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<TablespaceProto>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<TablespaceProto>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public TablespaceProto call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
@@ -177,7 +177,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.alterTablespace(null, alterTablespace).getValue();
@@ -192,7 +192,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
 
@@ -213,7 +213,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
@@ -228,7 +228,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
@@ -243,7 +243,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllDatabaseNames() {
     try {
-      return new ServerCallable<Collection<String>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<String> call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
@@ -259,7 +259,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<DatabaseProto> getAllDatabases() {
     try {
-      return new ServerCallable<List<DatabaseProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<DatabaseProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<DatabaseProto> call(NettyClientBase client) throws Exception {
@@ -277,7 +277,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final TableDesc getTableDesc(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<TableDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<TableDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public TableDesc call(NettyClientBase client) throws ServiceException {
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
           builder.setDatabaseName(databaseName);
@@ -302,7 +302,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<TableDescriptorProto> getAllTables() {
     try {
-      return new ServerCallable<List<TableDescriptorProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<TableDescriptorProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<TableDescriptorProto> call(NettyClientBase client) throws Exception {
@@ -320,7 +320,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<TableOptionProto> getAllTableOptions() {
     try {
-      return new ServerCallable<List<TableOptionProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<TableOptionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<TableOptionProto> call(NettyClientBase client) throws Exception {
@@ -338,7 +338,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<TableStatsProto> getAllTableStats() {
     try {
-      return new ServerCallable<List<TableStatsProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<TableStatsProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<TableStatsProto> call(NettyClientBase client) throws Exception {
@@ -356,7 +356,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<ColumnProto> getAllColumns() {
     try {
-      return new ServerCallable<List<ColumnProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<ColumnProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<ColumnProto> call(NettyClientBase client) throws Exception {
@@ -374,7 +374,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<PartitionMethodDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -394,7 +394,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -415,7 +415,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public final PartitionDescProto getPartition(final String databaseName, final String tableName,
                                                final String partitionName) {
     try {
-      return new ServerCallable<PartitionDescProto>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<PartitionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public PartitionDescProto call(NettyClientBase client) throws ServiceException {
 
           PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
@@ -436,7 +436,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<List<PartitionDescProto>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class,
+      return new ServerCallable<List<PartitionDescProto>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class,
         false) {
         public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException {
 
@@ -457,7 +457,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<TablePartitionProto> getAllPartitions() {
     try {
-      return new ServerCallable<List<TablePartitionProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<TablePartitionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<TablePartitionProto> call(NettyClientBase client) throws Exception {
@@ -475,7 +475,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllTableNames(final String databaseName) {
     try {
-      return new ServerCallable<Collection<String>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<String>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<String> call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
@@ -491,7 +491,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<FunctionDesc> getFunctions() {
     try {
-      return new ServerCallable<Collection<FunctionDesc>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<FunctionDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
           List<FunctionDesc> list = new ArrayList<FunctionDesc>();
           GetFunctionsResponse response;
@@ -518,7 +518,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createTable(final TableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.createTable(null, desc.getProto()).getValue();
@@ -537,7 +537,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
     final String simpleName = splitted[1];
 
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -561,7 +561,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
           "tableName cannot be composed of multiple parts, but it is \"" + tableName + "\"");
     }
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
@@ -586,7 +586,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createIndex(final IndexDesc index) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.createIndex(null, index.getProto()).getValue();
@@ -601,7 +601,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           IndexNameProto.Builder builder = IndexNameProto.newBuilder();
           builder.setDatabaseName(databaseName);
@@ -620,7 +620,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
@@ -640,7 +640,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final IndexDesc getIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public IndexDesc call(NettyClientBase client) throws ServiceException {
 
           IndexNameProto.Builder builder = IndexNameProto.newBuilder();
@@ -662,7 +662,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
                                           final String tableName,
                                           final String columnName) {
     try {
-      return new ServerCallable<IndexDesc>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public IndexDesc call(NettyClientBase client) throws ServiceException {
 
           GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
@@ -683,7 +683,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public boolean dropIndex(final String databaseName,
                            final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
 
           IndexNameProto.Builder builder = IndexNameProto.newBuilder();
@@ -703,7 +703,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<IndexProto> getAllIndexes() {
     try {
-      return new ServerCallable<List<IndexProto>>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
 
         @Override
         public List<IndexProto> call(NettyClientBase client) throws Exception {
@@ -721,7 +721,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.createFunction(null, funcDesc.getProto()).getValue();
@@ -736,7 +736,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean dropFunction(final String signature) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
           builder.setSignature(signature);
@@ -769,7 +769,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
 
     FunctionDescProto descProto = null;
     try {
-      descProto = new ServerCallable<FunctionDescProto>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      descProto = new ServerCallable<FunctionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public FunctionDescProto call(NettyClientBase client) throws ServiceException {
           try {
             CatalogProtocolService.BlockingInterface stub = getStub(client);
@@ -819,7 +819,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
     }
 
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.containFunction(null, builder.build()).getValue();
@@ -834,7 +834,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean alterTable(final AlterTableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.alterTable(null, desc.getProto()).getValue();
@@ -849,7 +849,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) {
     try {
-      return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.updateTableStats(null, updateTableStatsProto).getValue();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 6347ad1..9d0e427 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@ -48,8 +48,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
 
   @Override
   public boolean createDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
 
@@ -64,8 +64,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public boolean existDatabase(final String databaseName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
 
@@ -80,8 +80,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public boolean dropDatabase(final String databaseName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
 
@@ -96,8 +96,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public List<String> getAllDatabaseNames() throws ServiceException {
 
-    return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public List<String> call(NettyClientBase client) throws ServiceException {
 
@@ -111,8 +111,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
 
   public boolean existTable(final String tableName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         connection.checkSessionAndGet(client);
@@ -133,8 +133,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
                                        final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
       throws SQLException, ServiceException {
 
-    return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
 
@@ -169,8 +169,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
 
@@ -190,8 +190,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
 
   @Override
   public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
-    return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public List<String> call(NettyClientBase client) throws ServiceException {
 
@@ -213,8 +213,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public TableDesc getTableDesc(final String tableName) throws ServiceException {
 
-    return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
 
@@ -238,8 +238,8 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
 
-    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.connPool,
-        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 
       public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 4444a31..99c58b6 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.client;
 
 import com.google.protobuf.ServiceException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.*;
@@ -33,7 +32,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.util.ProtoUtil;
 
@@ -115,8 +113,6 @@ public class QueryClientImpl implements QueryClient {
       tajoMaster.closeNonForwardQuery(null, builder.build());
     } catch (Exception e) {
       LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
-    } finally {
-      connection.connPool.closeConnection(tmClient);
     }
   }
 
@@ -158,8 +154,8 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
 
-    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
 
@@ -184,8 +180,8 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
 
-    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
 
@@ -321,8 +317,6 @@ public class QueryClientImpl implements QueryClient {
 
     } catch (Exception e) {
       throw new ServiceException(e.getMessage(), e);
-    } finally {
-      connection.connPool.releaseConnection(tmClient);
     }
     return new QueryStatus(res);
   }
@@ -367,8 +361,6 @@ public class QueryClientImpl implements QueryClient {
 
     } catch (Exception e) {
       throw new ServiceException(e.getMessage(), e);
-    } finally {
-      connection.connPool.releaseConnection(tmClient);
     }
   }
 
@@ -378,8 +370,8 @@ public class QueryClientImpl implements QueryClient {
 
     try {
       final ServerCallable<ClientProtos.SerializedResultSet> callable =
-          new ServerCallable<ClientProtos.SerializedResultSet>(connection.connPool, connection.getTajoMasterAddr(),
-              TajoMasterClientProtocol.class, false, true) {
+          new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
+              TajoMasterClientProtocol.class, false) {
 
             public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
 
@@ -424,8 +416,8 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public boolean updateQuery(final String sql) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
 
@@ -454,8 +446,8 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public boolean updateQueryWithJson(final String json) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
 
@@ -482,8 +474,8 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
 
@@ -502,8 +494,8 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
 
@@ -522,8 +514,8 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
 
@@ -574,8 +566,6 @@ public class QueryClientImpl implements QueryClient {
 
     } catch(Exception e) {
       LOG.debug("Error when checking for application status", e);
-    } finally {
-      connection.connPool.releaseConnection(tmClient);
     }
     return status;
   }
@@ -591,8 +581,8 @@ public class QueryClientImpl implements QueryClient {
   }
   
   public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
-    return new ServerCallable<QueryInfoProto>(connection.connPool, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
       public QueryInfoProto call(NettyClientBase client) throws ServiceException {
         connection.checkSessionAndGet(client);
 
@@ -621,8 +611,8 @@ public class QueryClientImpl implements QueryClient {
     InetSocketAddress qmAddress = new InetSocketAddress(
         queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
 
-    return new ServerCallable<QueryHistoryProto>(connection.connPool, qmAddress,
-        QueryMasterClientProtocol.class, false, true) {
+    return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
+        QueryMasterClientProtocol.class, false) {
       public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
         connection.checkSessionAndGet(client);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 6363198..d8152f4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -30,7 +30,7 @@ import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.KeyValueSet;
@@ -55,7 +55,7 @@ public class SessionConnection implements Closeable {
 
   private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
 
-  final RpcConnectionPool connPool;
+  final RpcClientManager manager;
 
   private String baseDatabase;
 
@@ -86,8 +86,8 @@ public class SessionConnection implements Closeable {
 
     this.properties = properties;
 
-    connPool = RpcConnectionPool.getPool();
-    userInfo = UserRoleInfo.getCurrentUser();
+    this.manager = RpcClientManager.getInstance();
+    this.userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
 
     this.serviceTracker = tracker;
@@ -99,12 +99,12 @@ public class SessionConnection implements Closeable {
 
   public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
       ConnectTimeoutException, ClassNotFoundException {
-    return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
+    return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
   }
 
   public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode)
       throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
-    return connPool.getConnection(addr, protocolClass, asyncMode);
+    return manager.getClient(addr, protocolClass, asyncMode);
   }
 
   protected KeyValueSet getProperties() {
@@ -127,7 +127,7 @@ public class SessionConnection implements Closeable {
   public boolean isConnected() {
     if(!closed.get()){
       try {
-        return connPool.getConnection(serviceTracker.getClientServiceAddress(),
+        return manager.getClient(serviceTracker.getClientServiceAddress(),
             TajoMasterClientProtocol.class, false).isConnected();
       } catch (Throwable e) {
         return false;
@@ -141,7 +141,7 @@ public class SessionConnection implements Closeable {
   }
 
   public String getCurrentDatabase() throws ServiceException {
-    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 
       public String call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -153,8 +153,8 @@ public class SessionConnection implements Closeable {
   }
 
   public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException {
-    return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Map<String, String> call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -179,7 +179,7 @@ public class SessionConnection implements Closeable {
   }
 
   public Map<String, String> unsetSessionVariables(final List<String> variables)  throws ServiceException {
-    return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 
       public Map<String, String> call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -209,7 +209,7 @@ public class SessionConnection implements Closeable {
   }
 
   public String getSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 
       public String call(NettyClientBase client) throws ServiceException {
 
@@ -229,7 +229,7 @@ public class SessionConnection implements Closeable {
   }
 
   public Boolean existSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -247,8 +247,8 @@ public class SessionConnection implements Closeable {
   }
 
   public Map<String, String> getAllSessionVariables() throws ServiceException {
-    return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class,
-        false, true) {
+    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class,
+        false) {
 
       public Map<String, String> call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -260,8 +260,8 @@ public class SessionConnection implements Closeable {
   }
 
   public Boolean selectDatabase(final String databaseName) throws ServiceException {
-    Boolean selected = new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
+    Boolean selected = new ServerCallable<Boolean>(manager, getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         checkSessionAndGet(client);
@@ -286,13 +286,13 @@ public class SessionConnection implements Closeable {
     // remove session
     NettyClientBase client = null;
     try {
-      client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+      client = manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
       tajoMaster.removeSession(null, sessionId);
     } catch (Throwable e) {
       // ignore
     } finally {
-      connPool.releaseConnection(client);
+      RpcClientManager.cleanup(client);
     }
   }
 
@@ -329,7 +329,7 @@ public class SessionConnection implements Closeable {
   }
 
   public boolean reconnect() throws Exception {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+    return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
 
       public Boolean call(NettyClientBase client) throws ServiceException {
         CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index 668a770..d2286cf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -33,7 +33,7 @@ import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
@@ -112,7 +112,7 @@ public class QueryInProgress {
     masterContext.getResourceManager().releaseQueryMaster(queryId);
 
     if(queryMasterRpc != null) {
-      RpcConnectionPool.getPool().closeConnection(queryMasterRpc);
+      RpcClientManager.cleanup(queryMasterRpc);
     }
 
     try {
@@ -157,7 +157,7 @@ public class QueryInProgress {
     InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
     queryMasterRpc =
-        RpcConnectionPool.getPool().getConnection(addr, QueryMasterProtocol.class, true);
+        RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true);
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 139359c..1fda7d4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -35,7 +35,7 @@ import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.worker.TajoWorker;
 
@@ -83,14 +83,12 @@ public class TajoContainerProxy extends ContainerProxy {
     NettyClientBase tajoWorkerRpc = null;
     try {
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
-      tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true);
+      tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
       tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
     } catch (Throwable e) {
       /* Worker RPC failure */
       context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage()));
-    } finally {
-      RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
     }
   }
 
@@ -101,7 +99,7 @@ public class TajoContainerProxy extends ContainerProxy {
           .getQueryMasterManagerService().getBindAddr();
 
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
-      tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true);
+      tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
 
       TajoWorkerProtocol.RunExecutionBlockRequestProto request =
@@ -118,8 +116,6 @@ public class TajoContainerProxy extends ContainerProxy {
       tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get());
     } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
-    } finally {
-      RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
     }
   }
 
@@ -160,21 +156,19 @@ public class TajoContainerProxy extends ContainerProxy {
       containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
     }
 
-    RpcConnectionPool connPool = RpcConnectionPool.getPool();
+    RpcClientManager manager = RpcClientManager.getInstance();
     NettyClientBase tmClient = null;
-    try {
-      ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker();
-      tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
-
-      QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
-        masterClientService.releaseWorkerResource(null,
-            QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
-                .setExecutionBlockId(executionBlockId.getProto())
-                .addAllContainerIds(containerIdProtos)
-                .build(),
-            NullCallback.get());
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
+
+    ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker();
+    tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
+
+    QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
+    masterClientService.releaseWorkerResource(null,
+        QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
+            .setExecutionBlockId(executionBlockId.getProto())
+            .addAllContainerIds(containerIdProtos)
+            .build(),
+        NullCallback.get());
+
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 9cbfb95..67dae06 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -41,7 +41,7 @@ import org.apache.tajo.master.event.QueryStopEvent;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.NetUtils;
@@ -88,7 +88,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private TajoWorker.WorkerContext workerContext;
 
-  private RpcConnectionPool connPool;
+  private RpcClientManager manager;
 
   private ExecutorService eventExecutor;
 
@@ -104,7 +104,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
     try {
       this.systemConf = (TajoConf)conf;
-      this.connPool = RpcConnectionPool.getPool();
+      this.manager = RpcClientManager.getInstance();
 
       querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
       queryMasterContext = new QueryMasterContext(systemConf);
@@ -187,7 +187,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
-        rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
+        rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
             TajoWorkerProtocol.class, true);
         TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
 
@@ -197,8 +197,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
         continue;
       } catch (Exception e) {
         continue;
-      } finally {
-        connPool.releaseConnection(rpc);
       }
     }
   }
@@ -211,15 +209,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
     for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
-        rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
+        rpc = manager.getClient(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
             TajoWorkerProtocol.class, true);
         TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
 
         tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
-      } finally {
-        connPool.releaseConnection(rpc);
       }
     }
   }
@@ -234,7 +230,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       // update master address in worker context.
 
       ServiceTracker serviceTracker = workerContext.getServiceTracker();
-      rpc = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
+      rpc = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
       QueryCoordinatorProtocolService masterService = rpc.getStub();
 
       CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
@@ -245,8 +241,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return workerResourcesRequest.getWorkerResourcesList();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(rpc);
     }
     return new ArrayList<WorkerResourceProto>();
   }
@@ -342,7 +336,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       NettyClientBase tmClient = null;
       try {
-        tmClient = connPool.getConnection(workerContext.getServiceTracker().getUmbilicalAddress(),
+        tmClient = manager.getClient(workerContext.getServiceTracker().getUmbilicalAddress(),
             QueryCoordinatorProtocol.class, true);
 
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
@@ -352,8 +346,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
         //When tajo do stop cluster, tajo master maybe throw closed connection exception
 
         LOG.error(e.getMessage(), e);
-      } finally {
-        connPool.releaseConnection(tmClient);
       }
 
       try {
@@ -445,7 +437,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
             try {
 
               ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker();
-              tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(),
+              tmClient = manager.getClient(serviceTracker.getUmbilicalAddress(),
                   QueryCoordinatorProtocol.class, true);
               QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 2377720..5ffc7a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -36,7 +36,7 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
@@ -79,7 +79,7 @@ public class ExecutionBlockContext {
   private ExecutionBlockSharedResource resource;
 
   private TajoQueryEngine queryEngine;
-  private RpcConnectionPool connPool;
+  private RpcClientManager connManager;
   private InetSocketAddress qmMasterAddr;
   private WorkerConnectionInfo queryMaster;
   private TajoConf systemConf;
@@ -100,7 +100,7 @@ public class ExecutionBlockContext {
                                ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster) throws Throwable {
     this.manager = manager;
     this.executionBlockId = executionBlockId;
-    this.connPool = RpcConnectionPool.getPool();
+    this.connManager = RpcClientManager.getInstance();
     this.queryMaster = queryMaster;
     this.systemConf = conf;
     this.reporter = new Reporter();
@@ -139,12 +139,8 @@ public class ExecutionBlockContext {
     } catch (Throwable e) {
       try {
         NettyClientBase client = getQueryMasterConnection();
-        try {
-          QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
-          stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
-        } finally {
-          connPool.releaseConnection(client);
-        }
+        QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+        stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
       } catch (Throwable t) {
         //ignore
       }
@@ -158,11 +154,7 @@ public class ExecutionBlockContext {
 
   public NettyClientBase getQueryMasterConnection()
       throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
-    return connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
-  }
-
-  public void releaseConnection(NettyClientBase connection) {
-    connPool.releaseConnection(connection);
+    return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true);
   }
 
   public void stop(){
@@ -274,12 +266,8 @@ public class ExecutionBlockContext {
 
   private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
     NettyClientBase client = getQueryMasterConnection();
-    try {
-      QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
-      stub.doneExecutionBlock(null, reporter, NullCallback.get());
-    } finally {
-      connPool.releaseConnection(client);
-    }
+    QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+    stub.doneExecutionBlock(null, reporter, NullCallback.get());
   }
 
   protected void reportExecutionBlock(ExecutionBlockId ebId) {
@@ -405,7 +393,6 @@ public class ExecutionBlockContext {
                 throw new RuntimeException(t);
               }
             } finally {
-              releaseConnection(client);
               if (remainingRetries > 0 && !reporterStop.get()) {
                 synchronized (reporterThread) {
                   try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 6798c33..7ba2ebc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -52,19 +52,14 @@ import org.apache.tajo.querymaster.StageState;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.net.InetSocketAddress;
 import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TajoResourceAllocator extends AbstractResourceAllocator {
@@ -203,15 +198,13 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     NettyClientBase tajoWorkerRpc = null;
     try {
       InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort());
-      tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true);
+      tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
 
       tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(),
           NullCallback.get(PrimitiveProtos.BoolProto.class));
     } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
-    } finally {
-      RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc);
     }
   }
 
@@ -318,17 +311,16 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
           .build();
 
-      RpcConnectionPool connPool = RpcConnectionPool.getPool();
+
       NettyClientBase tmClient = null;
       try {
         ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();
-        tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
+        tmClient = RpcClientManager.getInstance().
+            getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.allocateWorkerResources(null, request, callBack);
       } catch (Throwable e) {
         LOG.error(e.getMessage(), e);
-      } finally {
-        connPool.releaseConnection(tmClient);
       }
 
       WorkerResourceAllocationResponse response = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index b08af2b..61a05dc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -426,50 +426,47 @@ public class Task {
       context.getHashShuffleAppenderManager().finalizeTask(taskId);
 
       NettyClientBase client = executionBlockContext.getQueryMasterConnection();
-      try {
-        QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
-        if (context.isStopped()) {
-          context.setExecutorProgress(0.0f);
 
-          if (context.getState() == TaskAttemptState.TA_KILLED) {
-            queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
-            executionBlockContext.killedTasksNum.incrementAndGet();
-          } else {
-            context.setState(TaskAttemptState.TA_FAILED);
-            TaskFatalErrorReport.Builder errorBuilder =
-                TaskFatalErrorReport.newBuilder()
-                    .setId(getId().getProto());
-            if (error != null) {
-              if (error.getMessage() == null) {
-                errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
-              } else {
-                errorBuilder.setErrorMessage(error.getMessage());
-              }
-              errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
-            }
+      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
+      if (context.isStopped()) {
+        context.setExecutorProgress(0.0f);
 
-            queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
-            executionBlockContext.failedTasksNum.incrementAndGet();
-          }
+        if (context.getState() == TaskAttemptState.TA_KILLED) {
+          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
+          executionBlockContext.killedTasksNum.incrementAndGet();
         } else {
-          // if successful
-          context.setProgress(1.0f);
-          context.setState(TaskAttemptState.TA_SUCCEEDED);
-          executionBlockContext.succeededTasksNum.incrementAndGet();
+          context.setState(TaskAttemptState.TA_FAILED);
+          TaskFatalErrorReport.Builder errorBuilder =
+              TaskFatalErrorReport.newBuilder()
+                  .setId(getId().getProto());
+          if (error != null) {
+            if (error.getMessage() == null) {
+              errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
+            } else {
+              errorBuilder.setErrorMessage(error.getMessage());
+            }
+            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
+          }
 
-          TaskCompletionReport report = getTaskCompletionReport();
-          queryMasterStub.done(null, report, NullCallback.get());
+          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
+          executionBlockContext.failedTasksNum.incrementAndGet();
         }
-        finishTime = System.currentTimeMillis();
-        LOG.info(context.getTaskId() + " completed. " +
-            "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
-            ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
-            + ", killed: " + executionBlockContext.killedTasksNum.intValue()
-            + ", failed: " + executionBlockContext.failedTasksNum.intValue());
-        cleanupTask();
-      } finally {
-        executionBlockContext.releaseConnection(client);
+      } else {
+        // if successful
+        context.setProgress(1.0f);
+        context.setState(TaskAttemptState.TA_SUCCEEDED);
+        executionBlockContext.succeededTasksNum.incrementAndGet();
+
+        TaskCompletionReport report = getTaskCompletionReport();
+        queryMasterStub.done(null, report, NullCallback.get());
       }
+      finishTime = System.currentTimeMillis();
+      LOG.info(context.getTaskId() + " completed. " +
+          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
+          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
+          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
+          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
+      cleanupTask();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 058ea42..642c914 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -299,8 +299,6 @@ public class TaskRunner extends AbstractService {
               }
             } catch (Throwable t) {
               LOG.fatal(t.getMessage(), t);
-            } finally {
-              executionBlockContext.releaseConnection(client);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index e9f90ca..f8d5fd9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -31,7 +31,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.storage.DiskDeviceInfo;
 import org.apache.tajo.storage.DiskMountInfo;
@@ -54,7 +54,7 @@ public class WorkerHeartbeatService extends AbstractService {
 
   private final TajoWorker.WorkerContext context;
   private TajoConf systemConf;
-  private RpcConnectionPool connectionPool;
+  private RpcClientManager connectionManager;
   private WorkerHeartbeatThread thread;
   private static final float HDFS_DATANODE_STORAGE_SIZE;
 
@@ -74,7 +74,7 @@ public class WorkerHeartbeatService extends AbstractService {
     }
     this.systemConf = (TajoConf) conf;
 
-    connectionPool = RpcConnectionPool.getPool();
+    this.connectionManager = RpcClientManager.getInstance();
     super.serviceInit(conf);
   }
 
@@ -181,7 +181,7 @@ public class WorkerHeartbeatService extends AbstractService {
           CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
 
           ServiceTracker serviceTracker = context.getServiceTracker();
-          rmClient = connectionPool.getConnection(serviceTracker.getResourceTrackerAddress(),
+          rmClient = connectionManager.getClient(serviceTracker.getResourceTrackerAddress(),
               TajoResourceTrackerProtocol.class, true);
           TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
           resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
@@ -204,8 +204,6 @@ public class WorkerHeartbeatService extends AbstractService {
           LOG.warn("Heartbeat response is being delayed.", te);
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
-        } finally {
-          connectionPool.releaseConnection(rmClient);
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 4b76c73..f94bd78 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -19,20 +19,15 @@
 package org.apache.tajo.worker.rule;
 
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rule.*;
 import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
-import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
-import java.net.InetSocketAddress;
-
 /**
  * With this rule, Tajo worker will check the connectivity to tajo master server.
  */
@@ -42,20 +37,11 @@ import java.net.InetSocketAddress;
 public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
   
   private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception {
-    RpcConnectionPool pool = RpcConnectionPool.getPool();
-    NettyClientBase masterClient = null;
-    InetSocketAddress masterAddress = null;
-    
-    try {
-      ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf);
-      masterClient = pool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
-      masterClient.getStub();
-    } finally {
-      if (masterClient != null) {
-        pool.releaseConnection(masterClient);
-      }
-    }
-    
+    RpcClientManager manager = RpcClientManager.getInstance();
+
+    ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf);
+    NettyClientBase masterClient = manager.getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
+    masterClient.getStub();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
index 3866b09..74c09e5 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -67,40 +67,6 @@ public class RpcUtils {
     return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
   }
 
-  public static class Timer {
-    private long remaining;
-    private long prev;
-    public Timer(long timeout) {
-      this.remaining = timeout;
-      this.prev = System.currentTimeMillis();
-    }
-
-    public boolean isTimedOut() {
-      return remaining <= 0;
-    }
-
-    public void elapsed() {
-      long current = System.currentTimeMillis();
-      remaining -= current - prev;
-      prev = current;
-    }
-
-    public void interval(long wait) {
-      if (wait <= 0 || isTimedOut()) {
-        return;
-      }
-      try {
-        Thread.sleep(Math.min(remaining, wait));
-      } catch (Exception ex) {
-        // ignore
-      }
-    }
-
-    public long remaining() {
-      return remaining;
-    }
-  }
-
   // non-blocking lock which passes only a ticket before cleared or removed
   public static class Scrutineer<T> {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 3d856ce..8f2c2a1 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -20,17 +20,16 @@ package org.apache.tajo.rpc;
 
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.*;
-
 import io.netty.channel.*;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.GenericFutureListener;
-
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Map;
@@ -53,11 +52,16 @@ public class AsyncRpcClient extends NettyClientBase {
    */
   AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
       throws ClassNotFoundException, NoSuchMethodException {
+    this(rpcConnectionKey, retries, 0);
+  }
+
+  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds)
+      throws ClassNotFoundException, NoSuchMethodException {
     super(rpcConnectionKey, retries);
     stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
     rpcChannel = new ProxyRpcChannel();
     inboundHandler = new ClientChannelInboundHandler();
-    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
+    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds));
   }
 
   @Override
@@ -177,7 +181,7 @@ public class AsyncRpcClient extends NettyClientBase {
   }
 
   @ChannelHandler.Sharable
-  private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
+  private class ClientChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> {
 
     void registerCallback(int seqId, ResponseCallback callback) {
 
@@ -188,25 +192,23 @@ public class AsyncRpcClient extends NettyClientBase {
     }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      if (msg instanceof RpcResponse) {
-        try {
-          RpcResponse response = (RpcResponse) msg;
-          ResponseCallback callback = requests.remove(response.getId());
+    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
+      ResponseCallback callback = requests.remove(response.getId());
 
-          if (callback == null) {
-            LOG.warn("Dangling rpc call");
-          } else {
-            callback.run(response);
-          }
-        } finally {
-          ReferenceCountUtil.release(msg);
-        }
+      if (callback == null) {
+        LOG.warn("Dangling rpc call");
+      } else {
+        callback.run(response);
       }
     }
 
     @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+      super.channelActive(ctx);
+      LOG.info("Connection established successfully : " + ctx.channel().remoteAddress());
+    }
+
+    @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
       LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause);
@@ -218,9 +220,17 @@ public class AsyncRpcClient extends NettyClientBase {
       } else {
         LOG.error("RPC Exception:" + cause.getMessage());
       }
-      
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent) {
+        IdleStateEvent e = (IdleStateEvent) evt;
+        /* If all requests is done and event is triggered, channel will be closed. */
+        if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) {
+          ctx.close();
+          LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress());
+        }
       }
     }
   }


Mime
View raw message