tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-274: Maintaining connectivity to Tajo master regardless of the restart of the Tajo master. (Keuntae Park via hyunsik)
Date Mon, 28 Oct 2013 12:37:53 GMT
TAJO-274: Maintaining connectivity to Tajo master regardless of the restart of the Tajo master. (Keuntae Park via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2f094504
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2f094504
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2f094504

Branch: refs/heads/master
Commit: 2f09450442bdbdda2a34f9ad4ec66643344a17fd
Parents: 3ed1514
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Oct 28 21:13:57 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Oct 28 21:34:19 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../tajo/catalog/AbstractCatalogClient.java     | 250 ++++++++----
 .../org/apache/tajo/catalog/CatalogClient.java  |  29 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   4 +
 .../tajo/catalog/LocalCatalogWrapper.java       |  16 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +
 .../main/java/org/apache/tajo/cli/TajoCli.java  |   5 +-
 .../java/org/apache/tajo/client/TajoClient.java | 387 ++++++++++++-------
 .../apache/tajo/master/TajoContainerProxy.java  |  27 +-
 .../tajo/master/querymaster/QueryMaster.java    |  53 ++-
 .../master/querymaster/QueryMasterTask.java     |  21 +-
 .../tajo/worker/TajoResourceAllocator.java      |  19 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  76 ++--
 .../main/java/org/apache/tajo/worker/Task.java  |   7 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  76 ++--
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  46 ++-
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  53 ++-
 .../java/org/apache/tajo/rpc/CallFuture.java    |  10 +
 .../apache/tajo/rpc/DefaultRpcController.java   |  60 +++
 .../org/apache/tajo/rpc/NettyClientBase.java    |   5 +-
 .../tajo/rpc/RetriesExhaustedException.java     | 104 +++++
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 134 +++++++
 .../org/apache/tajo/rpc/ServerCallable.java     | 144 +++++++
 tajo-rpc/src/main/proto/TestProtocol.proto      |   1 +
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |  18 +
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  34 ++
 .../rpc/test/impl/DummyProtocolAsyncImpl.java   |   5 +
 .../test/impl/DummyProtocolBlockingImpl.java    |   5 +
 28 files changed, 1231 insertions(+), 366 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f87064c..3f309cc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,9 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-274: Maintaining connectivity to Tajo master regardless of the restart
+    of the Tajo master. (Keuntae Park via hyunsik)
+
     TAJO-287: Refactor TableDesc, TableMeta, and Fragment. (hyunsik)
 
     TAJO-275: Separating QueryMaster and TaskRunner roles in worker. 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index ad085f8..5f79f10 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -24,9 +24,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -36,72 +39,94 @@ import java.util.List;
  */
 public abstract class AbstractCatalogClient implements CatalogService {
   private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
-  protected CatalogProtocolService.BlockingInterface stub;
 
-  protected void setStub(CatalogProtocolService.BlockingInterface stub) {
-    this.stub = stub;
-  }
+  protected RpcConnectionPool pool;
+  protected InetSocketAddress catalogServerAddr;
+  protected TajoConf conf;
+
+  abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
 
-  protected CatalogProtocolService.BlockingInterface getStub() {
-    return this.stub;
+  public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
+    this.pool = RpcConnectionPool.getPool(conf);
+    this.catalogServerAddr = catalogServerAddr;
+    this.conf= conf;
   }
 
   @Override
   public final TableDesc getTableDesc(final String name) {
     try {
-      return CatalogUtil.newTableDesc(stub.getTableDesc(null, StringProto.newBuilder().setValue(name).build()));
+      return new ServerCallable<TableDesc>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public TableDesc call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return CatalogUtil.newTableDesc(stub.getTableDesc(null, StringProto.newBuilder().setValue(name).build()));
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return null;
     }
   }
 
   @Override
   public final Collection<String> getAllTableNames() {
-    List<String> protos = new ArrayList<String>();
-    GetAllTableNamesResponse response;
-
     try {
-      response = stub.getAllTableNames(null, NullProto.newBuilder().build());
+      return new ServerCallable<Collection<String>>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Collection<String> call(NettyClientBase client) throws ServiceException {
+          List<String> protos = new ArrayList<String>();
+          GetAllTableNamesResponse response;
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          response = stub.getAllTableNames(null, NullProto.newBuilder().build());
+          int size = response.getTableNameCount();
+          for (int i = 0; i < size; i++) {
+            protos.add(response.getTableName(i));
+          }
+          return protos;
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return null;
     }
-    int size = response.getTableNameCount();
-    for (int i = 0; i < size; i++) {
-      protos.add(response.getTableName(i));
-    }
-    return protos;
   }
 
   @Override
   public final Collection<FunctionDesc> getFunctions() {
-    List<FunctionDesc> list = new ArrayList<FunctionDesc>();
-    GetFunctionsResponse response;
     try {
-      response = stub.getFunctions(null, NullProto.newBuilder().build());
+      return new ServerCallable<Collection<FunctionDesc>>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
+          List<FunctionDesc> list = new ArrayList<FunctionDesc>();
+          GetFunctionsResponse response;
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          response = stub.getFunctions(null, NullProto.newBuilder().build());
+          int size = response.getFunctionDescCount();
+          for (int i = 0; i < size; i++) {
+            try {
+              list.add(new FunctionDesc(response.getFunctionDesc(i)));
+            } catch (ClassNotFoundException e) {
+              LOG.error(e);
+              return null;
+            }
+          }
+          return list;
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return null;
     }
-    int size = response.getFunctionDescCount();
-    for (int i = 0; i < size; i++) {
-      try {
-        list.add(new FunctionDesc(response.getFunctionDesc(i)));
-      } catch (ClassNotFoundException e) {
-        LOG.error(e);
-        return null;
-      }
-    }
-    return list;
   }
 
   @Override
   public final boolean addTable(final TableDesc desc) {
     try {
-      return stub.addTable(null, (TableDescProto) desc.getProto()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.addTable(null, (TableDescProto) desc.getProto()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
@@ -109,10 +134,15 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean deleteTable(final String name) {
     try {
-      return stub.deleteTable(null,
-          StringProto.newBuilder().setValue(name).build()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.deleteTable(null,
+              StringProto.newBuilder().setValue(name).build()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
@@ -120,81 +150,117 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existsTable(final String tableId) {
     try {
-      return stub
-          .existsTable(null, StringProto.newBuilder().setValue(tableId).build())
-          .getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub
+              .existsTable(null, StringProto.newBuilder().setValue(tableId).build())
+              .getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
 
   @Override
-  public final boolean addIndex(IndexDesc index) {
+  public final boolean addIndex(final IndexDesc index) {
     try {
-      return stub.addIndex(null, index.getProto()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.addIndex(null, index.getProto()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
 
   @Override
-  public final boolean existIndex(String indexName) {
+  public final boolean existIndex(final String indexName) {
     try {
-      return stub.existIndexByName(null, StringProto.newBuilder().
-          setValue(indexName).build()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.existIndexByName(null, StringProto.newBuilder().
+              setValue(indexName).build()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
 
   @Override
-  public boolean existIndex(String tableName, String columnName) {
-    GetIndexRequest.Builder builder = GetIndexRequest.newBuilder();
-    builder.setTableName(tableName);
-    builder.setColumnName(columnName);
+  public boolean existIndex(final String tableName, final String columnName) {
     try {
-      return stub.existIndex(null, builder.build()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          GetIndexRequest.Builder builder = GetIndexRequest.newBuilder();
+          builder.setTableName(tableName);
+          builder.setColumnName(columnName);
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.existIndex(null, builder.build()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
 
   @Override
-  public final IndexDesc getIndex(String indexName) {
+  public final IndexDesc getIndex(final String indexName) {
     try {
-      return new IndexDesc(
-          stub.getIndexByName(null,
-              StringProto.newBuilder().setValue(indexName).build()));
+      return new ServerCallable<IndexDesc>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public IndexDesc call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return new IndexDesc(
+              stub.getIndexByName(null,
+                  StringProto.newBuilder().setValue(indexName).build()));
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return null;
     }
   }
 
   @Override
-  public final IndexDesc getIndex(String tableName, String columnName) {
-    GetIndexRequest.Builder builder = GetIndexRequest.newBuilder();
-    builder.setTableName(tableName);
-    builder.setColumnName(columnName);
+  public final IndexDesc getIndex(final String tableName, final String columnName) {
     try {
-      return new IndexDesc(stub.getIndex(null, builder.build()));
+      return new ServerCallable<IndexDesc>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public IndexDesc call(NettyClientBase client) throws ServiceException {
+          GetIndexRequest.Builder builder = GetIndexRequest.newBuilder();
+          builder.setTableName(tableName);
+          builder.setColumnName(columnName);
+
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return new IndexDesc(stub.getIndex(null, builder.build()));
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return null;
     }
   }
 
   @Override
-  public boolean deleteIndex(String indexName) {
+  public boolean deleteIndex(final String indexName) {
     try {
-      return stub.delIndex(null,
-          StringProto.newBuilder().setValue(indexName).build()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.delIndex(null,
+              StringProto.newBuilder().setValue(indexName).build()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
@@ -202,21 +268,32 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
-      return stub.createFunction(null, funcDesc.getProto()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.createFunction(null, funcDesc.getProto()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
 
   @Override
   public final boolean dropFunction(final String signature) {
-    UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
-    builder.setSignature(signature);
     try {
-      return stub.dropFunction(null, builder.build()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
+          builder.setSignature(signature);
+
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.dropFunction(null, builder.build()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }
@@ -228,7 +305,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
 
   @Override
   public final FunctionDesc getFunction(final String signature, FunctionType funcType, DataType... paramTypes) {
-    GetFunctionMetaRequest.Builder builder = GetFunctionMetaRequest.newBuilder();
+    final GetFunctionMetaRequest.Builder builder = GetFunctionMetaRequest.newBuilder();
     builder.setSignature(signature);
     if (funcType != null) {
       builder.setFunctionType(funcType);
@@ -239,9 +316,14 @@ public abstract class AbstractCatalogClient implements CatalogService {
 
     FunctionDescProto descProto;
     try {
-      descProto = stub.getFunctionMeta(null, builder.build());
+      descProto = new ServerCallable<FunctionDescProto>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public FunctionDescProto call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.getFunctionMeta(null, builder.build());
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return null;
     }
     try {
@@ -259,7 +341,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
 
   @Override
   public final boolean containFunction(final String signature, FunctionType funcType, DataType... paramTypes) {
-    ContainFunctionRequest.Builder builder =
+    final ContainFunctionRequest.Builder builder =
         ContainFunctionRequest.newBuilder();
     if (funcType != null) {
       builder.setFunctionType(funcType);
@@ -268,10 +350,16 @@ public abstract class AbstractCatalogClient implements CatalogService {
     for (DataType type : paramTypes) {
       builder.addParameterTypes(type);
     }
+
     try {
-      return stub.containFunction(null, builder.build()).getValue();
+      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
+        public Boolean call(NettyClientBase client) throws ServiceException {
+          CatalogProtocolService.BlockingInterface stub = getStub(client);
+          return stub.containFunction(null, builder.build()).getValue();
+        }
+      }.withRetries();
     } catch (ServiceException e) {
-      LOG.error(e);
+      LOG.error(e.getMessage(), e);
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 6240631..7666a97 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -18,50 +18,35 @@
 
 package org.apache.tajo.catalog;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.rpc.BlockingRpcClient;
+import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 /**
  * CatalogClient provides a client API to access the catalog server.
  */
 public class CatalogClient extends AbstractCatalogClient {
-  private final Log LOG = LogFactory.getLog(CatalogClient.class);
-  private BlockingRpcClient client;
-
   /**
    * @throws java.io.IOException
    *
    */
   public CatalogClient(final TajoConf conf) throws IOException {
-    String catalogAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
-    connect(NetUtils.createSocketAddr(catalogAddr));
+    super(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS)));
   }
 
-  public CatalogClient(String host, int port) throws IOException {
-    connect(NetUtils.createSocketAddr(host, port));
+  public CatalogClient(TajoConf conf, String host, int port) throws IOException {
+    super(conf, NetUtils.createSocketAddr(host, port));
   }
 
-  private void connect(InetSocketAddress serverAddr) throws IOException {
-    String addrStr = NetUtils.normalizeInetSocketAddress(serverAddr);
-    LOG.info("Trying to connect the catalog (" + addrStr + ")");
-    try {
-      client = new BlockingRpcClient(CatalogProtocol.class, serverAddr);
-      setStub((BlockingInterface) client.getStub());
-    } catch (Exception e) {
-      throw new IOException("Can't connect the catalog server (" + addrStr +")");
-    }
-    LOG.info("Connected to the catalog server (" + addrStr + ")");
+  @Override
+  BlockingInterface getStub(NettyClientBase client) {
+    return client.getStub();
   }
 
   public void close() {
-    client.close();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index a6ef576..d8b2004 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -120,6 +120,10 @@ public class CatalogServer extends AbstractService {
     super.init(conf);
   }
 
+  public TajoConf getConf() {
+    return conf;
+  }
+
   public String getCatalogServerName() {
     return bindAddressStr + ", store=" + this.store.getClass().getSimpleName() + ", jdbc="
         + conf.get(CatalogConstants.JDBC_URI);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
index 13cbaa1..eb062fd 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
@@ -21,9 +21,8 @@
  */
 package org.apache.tajo.catalog;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.rpc.NettyClientBase;
 
 import java.io.IOException;
 
@@ -32,22 +31,29 @@ import java.io.IOException;
  * local.
  */
 public class LocalCatalogWrapper extends AbstractCatalogClient {
-  private static final Log LOG = LogFactory.getLog(LocalCatalogWrapper.class);
   private CatalogServer catalog;
+  private CatalogProtocol.CatalogProtocolService.BlockingInterface stub;
 
   public LocalCatalogWrapper(final TajoConf conf) throws IOException {
+    super(conf, null);
     this.catalog = new CatalogServer();
     this.catalog.init(conf);
     this.catalog.start();
-    setStub(catalog.getHandler());
+    this.stub = catalog.getHandler();
   }
 
   public LocalCatalogWrapper(final CatalogServer server) {
+    super(server.getConf(), null);
     this.catalog = server;
-    setStub(server.getHandler());
+    this.stub = server.getHandler();
   }
 
   public void shutdown() {
     this.catalog.stop();
   }
+
+  @Override
+  CatalogProtocol.CatalogProtocolService.BlockingInterface getStub(NettyClientBase client) {
+    return stub;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 360d7f0..5b779ad 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -169,6 +169,11 @@ public class TajoConf extends YarnConfiguration {
         (long)256 * 1048576),
 
     //////////////////////////////////
+    // RPC
+    //////////////////////////////////
+    RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
+
+    //////////////////////////////////
     // The Below is reserved
     //////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index 9aea8a5..c94260f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -289,7 +289,10 @@ public class TajoCli {
 
       } else { // submit a query to TajoMaster
         ClientProtos.GetQueryStatusResponse response = client.executeQuery(stripped);
-        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+        if (response == null) {
+          sout.println("response is null");
+        }
+        else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
           QueryId queryId = null;
           try {
             queryId = new QueryId(response.getQueryId());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
index f73fe2e..7e8aee0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -37,99 +37,94 @@ import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
-import org.apache.tajo.rpc.BlockingRpcClient;
+import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.rpc.ServerCallable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @ThreadSafe
 public class TajoClient {
   private final Log LOG = LogFactory.getLog(TajoClient.class);
 
   private final TajoConf conf;
-  private BlockingRpcClient tasjoMasterClient;
-  private TajoMasterClientProtocolService.BlockingInterface tajoMasterService;
 
-  private Map<QueryId, QueryMasterClientProtocolService.BlockingInterface> queryMasterConnectionMap =
-          new HashMap<QueryId, QueryMasterClientProtocolService.BlockingInterface>();
+  private Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>();
 
-  private Map<QueryId, BlockingRpcClient> queryMasterClientMap =
-          new HashMap<QueryId, BlockingRpcClient>();
+  private InetSocketAddress tajoMasterAddr;
+
+  private RpcConnectionPool connPool;
 
   public TajoClient(TajoConf conf) throws IOException {
+    this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)));
+  }
+
+  public TajoClient(TajoConf conf, InetSocketAddress addr) throws IOException {
     this.conf = conf;
     this.conf.set("tajo.disk.scheduler.report.interval", "0");
-    String masterAddr = this.conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
-    InetSocketAddress addr = NetUtils.createSocketAddr(masterAddr);
-    connect(addr);
+    this.tajoMasterAddr = addr;
+
+    connPool = RpcConnectionPool.getPool(conf);
   }
 
   public TajoClient(InetSocketAddress addr) throws IOException {
-    this.conf = new TajoConf();
-    connect(addr);
+    this(new TajoConf(), addr);
   }
 
   public TajoClient(String hostname, int port) throws IOException {
-    this.conf = new TajoConf();
-    connect(NetUtils.createSocketAddr(hostname, port));
-  }
-
-  private void connect(InetSocketAddress addr) throws IOException {
-    try {
-      tasjoMasterClient = new BlockingRpcClient(TajoMasterClientProtocol.class, addr);
-      tajoMasterService = tasjoMasterClient.getStub();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+    this(new TajoConf(), NetUtils.createSocketAddr(hostname, port));
   }
 
   public void close() {
-    tasjoMasterClient.close();
-
-    for(BlockingRpcClient eachClient: queryMasterClientMap.values()) {
-      eachClient.close();
-    }
-    queryMasterClientMap.clear();
-    queryMasterConnectionMap.clear();
+    queryMasterMap.clear();
   }
 
-  public void closeQuery(QueryId queryId) {
-    if(queryMasterClientMap.containsKey(queryId)) {
+  /**
+   * Call to QueryMaster closing query resources
+   * @param queryId
+   */
+  public void closeQuery(final QueryId queryId) {
+    if(queryMasterMap.containsKey(queryId)) {
+      NettyClientBase qmClient = null;
       try {
-        queryMasterConnectionMap.get(queryId).killQuery(null, queryId.getProto());
+        qmClient = connPool.getConnection(queryMasterMap.get(queryId), QueryMasterClientProtocol.class, false);
+        QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
+        queryMasterService.killQuery(null, queryId.getProto());
       } catch (Exception e) {
+        connPool.closeConnection(qmClient);
+        qmClient = null;
         LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
+      } finally {
+        connPool.releaseConnection(qmClient);
+        queryMasterMap.remove(queryId);
       }
-      queryMasterClientMap.get(queryId).close();
-      LOG.info("Closed a QueryMaster connection (qid=" + queryId + ", addr="
-          + queryMasterClientMap.get(queryId).getRemoteAddress() + ")");
-      queryMasterClientMap.remove(queryId);
-      queryMasterConnectionMap.remove(queryId);
     }
   }
 
-  public boolean isConnected() {
-    return tasjoMasterClient.isConnected();
-  }
-
   /**
    * It submits a query statement and get a response immediately.
    * The response only contains a query id, and submission status.
    * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}
    * or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
    */
-  public GetQueryStatusResponse executeQuery(String sql) throws ServiceException {
-    QueryRequest.Builder builder = QueryRequest.newBuilder();
-    builder.setQuery(sql);
-
-    return tajoMasterService.submitQuery(null, builder.build());
+  public GetQueryStatusResponse executeQuery(final String sql) throws ServiceException {
+    return new ServerCallable<GetQueryStatusResponse>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public GetQueryStatusResponse call(NettyClientBase client) throws ServiceException {
+        final QueryRequest.Builder builder = QueryRequest.newBuilder();
+        builder.setQuery(sql);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.submitQuery(null, builder.build());
+      }
+    }.withRetries();
   }
 
   /**
@@ -140,11 +135,19 @@ public class TajoClient {
    *
    * @return If failed, return null.
    */
-  public ResultSet executeQueryAndGetResult(String sql)
+  public ResultSet executeQueryAndGetResult(final String sql)
       throws ServiceException, IOException {
-    QueryRequest.Builder builder = QueryRequest.newBuilder();
-    builder.setQuery(sql);
-    GetQueryStatusResponse response = tajoMasterService.submitQuery(null, builder.build());
+    GetQueryStatusResponse response = new ServerCallable<GetQueryStatusResponse>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public GetQueryStatusResponse call(NettyClientBase client) throws ServiceException {
+        final QueryRequest.Builder builder = QueryRequest.newBuilder();
+        builder.setQuery(sql);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.submitQuery(null, builder.build());
+      }
+    }.withRetries();
+
     QueryId queryId = new QueryId(response.getQueryId());
     if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
       return this.createNullResultSet(queryId);
@@ -159,40 +162,58 @@ public class TajoClient {
     builder.setQueryId(queryId.getProto());
 
     GetQueryStatusResponse res = null;
-    if(queryMasterConnectionMap.containsKey(queryId)) {
-      QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
-      res = queryMasterService.getQueryStatus(null, builder.build());
-    } else {
-      res = tajoMasterService.getQueryStatus(null, builder.build());
-
-      String queryMasterHost = res.getQueryMasterHost();
-      if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
-        connectionToQueryMaster(queryId, queryMasterHost, res.getQueryMasterPort());
-
-        QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
+    if(queryMasterMap.containsKey(queryId)) {
+      NettyClientBase qmClient = null;
+      try {
+        qmClient = connPool.getConnection(queryMasterMap.get(queryId),
+            QueryMasterClientProtocol.class, false);
+        QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
         res = queryMasterService.getQueryStatus(null, builder.build());
+      } catch (Exception e) {
+        connPool.closeConnection(qmClient);
+        qmClient = null;
+        throw new ServiceException(e.getMessage(), e);
+      } finally {
+        connPool.releaseConnection(qmClient);
+      }
+    } else {
+      NettyClientBase tmClient = null;
+      try {
+        tmClient = connPool.getConnection(tajoMasterAddr,
+            TajoMasterClientProtocol.class, false);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
+        res = tajoMasterService.getQueryStatus(null, builder.build());
+
+        String queryMasterHost = res.getQueryMasterHost();
+        if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
+          NettyClientBase qmClient = null;
+          try {
+            InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort());
+            qmClient = connPool.getConnection(
+                qmAddr, QueryMasterClientProtocol.class, false);
+            QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
+            res = queryMasterService.getQueryStatus(null, builder.build());
+
+            queryMasterMap.put(queryId, qmAddr);
+          } catch (Exception e) {
+            connPool.closeConnection(qmClient);
+            qmClient = null;
+            throw new ServiceException(e.getMessage(), e);
+          } finally {
+            connPool.releaseConnection(qmClient);
+          }
+        }
+      } catch (Exception e) {
+        connPool.closeConnection(tmClient);
+        tmClient = null;
+        throw new ServiceException(e.getMessage(), e);
+      } finally {
+        connPool.releaseConnection(tmClient);
       }
     }
     return new QueryStatus(res);
   }
 
-  private void connectionToQueryMaster(QueryId queryId, String queryMasterHost, int queryMasterPort) {
-    try {
-      InetSocketAddress addr = NetUtils.createSocketAddr(queryMasterHost, queryMasterPort);
-      BlockingRpcClient client = new BlockingRpcClient(QueryMasterClientProtocol.class, addr);
-      QueryMasterClientProtocolService.BlockingInterface service = client.getStub();
-
-      queryMasterConnectionMap.put(queryId, service);
-      queryMasterClientMap.put(queryId, client);
-
-      LOG.info("Connected to Query Master (qid=" + queryId + ", addr=" + NetUtils.normalizeInetSocketAddress(addr)
-          + ")");
-    } catch (Exception e) {
-      LOG.error(e.getMessage());
-      throw new RuntimeException(e);
-    }
-  }
-
   private static boolean isQueryRunnning(QueryState state) {
     return state == QueryState.QUERY_NEW ||
         state == QueryState.QUERY_INIT ||
@@ -204,9 +225,9 @@ public class TajoClient {
 
   public ResultSet getQueryResult(QueryId queryId)
       throws ServiceException, IOException {
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-        return createNullResultSet(queryId);
-      }
+    if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+      return createNullResultSet(queryId);
+    }
 
     TableDesc tableDesc = getResultDesc(queryId);
     return new ResultSetImpl(this, queryId, conf, tableDesc);
@@ -253,59 +274,125 @@ public class TajoClient {
       return null;
     }
 
-    QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterConnectionMap.get(queryId);
-    if(queryMasterService == null) {
-      LOG.warn("No Connection to QueryMaster for " + queryId);
-      return null;
+    NettyClientBase client = null;
+    try {
+      InetSocketAddress queryMasterAddr = queryMasterMap.get(queryId);
+      if(queryMasterAddr == null) {
+        LOG.warn("No Connection to QueryMaster for " + queryId);
+        return null;
+      }
+      client = connPool.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false);
+      QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
+      GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
+      builder.setQueryId(queryId.getProto());
+      GetQueryResultResponse response = queryMasterService.getQueryResult(null,
+          builder.build());
+
+      return CatalogUtil.newTableDesc(response.getTableDesc());
+    } catch (Exception e) {
+      connPool.closeConnection(client);
+      client = null;
+      throw new ServiceException(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(client);
     }
-    GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
-    builder.setQueryId(queryId.getProto());
-    GetQueryResultResponse response = queryMasterService.getQueryResult(null,
-        builder.build());
-
-    return CatalogUtil.newTableDesc(response.getTableDesc());
   }
 
-  public boolean updateQuery(String sql) throws ServiceException {
-    QueryRequest.Builder builder = QueryRequest.newBuilder();
-    builder.setQuery(sql);
-
-    ResultCode resultCode =
-        tajoMasterService.updateQuery(null, builder.build()).getResultCode();
-    return resultCode == ResultCode.OK;
+  public boolean updateQuery(final String sql) throws ServiceException {
+    return new ServerCallable<Boolean>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        QueryRequest.Builder builder = QueryRequest.newBuilder();
+        builder.setQuery(sql);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        ResultCode resultCode =
+            tajoMasterService.updateQuery(null, builder.build()).getResultCode();
+        return resultCode == ResultCode.OK;
+      }
+    }.withRetries();
   }
 
-  public boolean existTable(String name) throws ServiceException {
-    StringProto.Builder builder = StringProto.newBuilder();
-    builder.setValue(name);
-    return tajoMasterService.existTable(null, builder.build()).getValue();
+  /**
+   * Test for the existence of table in catalog data.
+   * <p/>
+   * This will return true if table exists, false if not.
+   * @param name
+   * @return
+   * @throws ServiceException
+   */
+  public boolean existTable(final String name) throws ServiceException {
+    return new ServerCallable<Boolean>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        StringProto.Builder builder = StringProto.newBuilder();
+        builder.setValue(name);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.existTable(null, builder.build()).getValue();
+      }
+    }.withRetries();
   }
 
-  public boolean detachTable(String name) throws ServiceException {
-    StringProto.Builder builder = StringProto.newBuilder();
-    builder.setValue(name);
-    return tajoMasterService.detachTable(null, builder.build()).getValue();
+  /**
+   * Deletes table schema from catalog data, but doesn't delete data file in hdfs.
+   * @param tableName
+   * @return
+   * @throws ServiceException
+   */
+  public boolean detachTable(final String tableName) throws ServiceException {
+    return new ServerCallable<Boolean>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        StringProto.Builder builder = StringProto.newBuilder();
+        builder.setValue(tableName);
+        return tajoMasterService.detachTable(null, builder.build()).getValue();
+      }
+    }.withRetries();
   }
 
-  public TableDesc createExternalTable(String name, Schema schema, Path path, TableMeta meta)
+  public TableDesc createExternalTable(final String name, final Schema schema, final Path path, final TableMeta meta)
       throws SQLException, ServiceException {
-    CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
-    builder.setName(name);
-    builder.setSchema(schema.getProto());
-    builder.setMeta(meta.getProto());
-    builder.setPath(path.toUri().toString());
-    TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
-    if (res.getResultCode() == ResultCode.OK) {
-      return CatalogUtil.newTableDesc(res.getTableDesc());
-    } else {
-      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-    }
+    return new ServerCallable<TableDesc>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
+        builder.setName(name);
+        builder.setSchema(schema.getProto());
+        builder.setMeta(meta.getProto());
+        builder.setPath(path.toUri().toString());
+        TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+        if (res.getResultCode() == ResultCode.OK) {
+          return CatalogUtil.newTableDesc(res.getTableDesc());
+        } else {
+          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+        }
+      }
+    }.withRetries();
   }
 
-  public boolean dropTable(String name) throws ServiceException {
-    StringProto.Builder builder = StringProto.newBuilder();
-    builder.setValue(name);
-    return tajoMasterService.dropTable(null, builder.build()).getValue();
+  /**
+   * Deletes table schema from catalog data and deletes data file in hdfs
+   * @param tableName
+   * @return
+   * @throws ServiceException
+   */
+  public boolean dropTable(final String tableName) throws ServiceException {
+    return new ServerCallable<Boolean>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        StringProto.Builder builder = StringProto.newBuilder();
+        builder.setValue(tableName);
+        return tajoMasterService.dropTable(null, builder.build()).getValue();
+      }
+    }.withRetries();
+
   }
 
   /**
@@ -313,30 +400,48 @@ public class TajoClient {
    * represented as lower-case letters.
    */
   public List<String> getTableList() throws ServiceException {
-    GetTableListRequest.Builder builder = GetTableListRequest.newBuilder();
-    GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
-    return res.getTablesList();
+    return new ServerCallable<List<String>>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public List<String> call(NettyClientBase client) throws ServiceException {
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        GetTableListRequest.Builder builder = GetTableListRequest.newBuilder();
+        GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
+        return res.getTablesList();
+      }
+    }.withRetries();
   }
 
-  public TableDesc getTableDesc(String tableName) throws SQLException, ServiceException {
-    GetTableDescRequest.Builder build = GetTableDescRequest.newBuilder();
-    build.setTableName(tableName);
-    TableResponse res = tajoMasterService.getTableDesc(null, build.build());
-    if (res.getResultCode() == ResultCode.OK) {
-      return CatalogUtil.newTableDesc(res.getTableDesc());
-    } else {
-      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-    }
+  public TableDesc getTableDesc(final String tableName) throws SQLException, ServiceException {
+    return new ServerCallable<TableDesc>(conf, tajoMasterAddr,
+        TajoMasterClientProtocol.class, false) {
+      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        GetTableDescRequest.Builder build = GetTableDescRequest.newBuilder();
+        build.setTableName(tableName);
+        TableResponse res = tajoMasterService.getTableDesc(null, build.build());
+        if (res.getResultCode() == ResultCode.OK) {
+          return CatalogUtil.newTableDesc(res.getTableDesc());
+        } else {
+          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+        }
+      }
+    }.withRetries();
   }
 
-  public boolean killQuery(QueryId queryId)
+  public boolean killQuery(final QueryId queryId)
       throws ServiceException, IOException {
 
     QueryStatus status = getQueryStatus(queryId);
 
+    NettyClientBase tmClient = null;
     try {
       /* send a kill to the TM */
+      tmClient = connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false);
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
       tajoMasterService.killQuery(null, queryId.getProto());
+
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
       while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
@@ -350,9 +455,13 @@ public class TajoClient {
         currentTimeMillis = System.currentTimeMillis();
         status = getQueryStatus(queryId);
       }
-    } catch(ServiceException io) {
-      LOG.debug("Error when checking for application status", io);
+    } catch(Exception e) {
+      connPool.closeConnection(tmClient);
+      tmClient = null;
+      LOG.debug("Error when checking for application status", e);
       return false;
+    } finally {
+      connPool.releaseConnection(tmClient);
     }
 
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index be9d047..d542390 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -31,7 +31,9 @@ import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -160,11 +162,24 @@ public class TajoContainerProxy extends ContainerProxy {
           .build()
       );
     }
-    context.getQueryMasterContext().getWorkerContext().getTajoMasterRpcClient()
-        .releaseWorkerResource(null,
-            TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
-                .addAllWorkerResources(workerResourceProtos)
-                .build(),
-            NullCallback.get());
+
+    RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
+    NettyClientBase tmClient = null;
+    try {
+        tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
+        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        masterClientService.releaseWorkerResource(null,
+          TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+              .addAllWorkerResources(workerResourceProtos)
+              .build(),
+          NullCallback.get());
+    } catch (Exception e) {
+      connPool.closeConnection(tmClient);
+      tmClient = null;
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 1198f3e..54d2370 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -32,10 +32,12 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
+import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.worker.TajoWorker;
@@ -44,10 +46,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
+import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
 
 // TODO - when exception, send error status to QueryJobManager
 public class QueryMaster extends CompositeService implements EventHandler {
@@ -81,6 +83,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private TajoWorker.WorkerContext workerContext;
 
+  private RpcConnectionPool connPool;
+
   public QueryMaster(TajoWorker.WorkerContext workerContext) {
     super(QueryMaster.class.getName());
     this.workerContext = workerContext;
@@ -89,7 +93,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
   public void init(Configuration conf) {
     LOG.info("QueryMaster init");
     try {
-      systemConf = (TajoConf)conf;
+      this.systemConf = (TajoConf)conf;
+      this.connPool = RpcConnectionPool.getPool(systemConf);
 
       querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT);
       queryMasterContext = new QueryMasterContext(systemConf);
@@ -160,7 +165,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
     LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
+    NettyClientBase tmClient = null;
     try {
+      tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+          TajoMasterProtocol.class, true);
+      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+
       TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
           .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
           .setPeerRpcPort(workerContext.getPeerRpcPort())
@@ -169,9 +179,16 @@ public class QueryMaster extends CompositeService implements EventHandler {
           .setState(state)
           .setQueryId(queryId.getProto());
 
-      workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeatBuilder.build(), NullCallback.get());
+      CallFuture<TajoHeartbeatResponse> callBack =
+          new CallFuture<TajoHeartbeatResponse>();
+
+      masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
     } catch (Exception e) {
+      connPool.closeConnection(tmClient);
+      tmClient = null;
       LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
     }
   }
 
@@ -265,12 +282,20 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       if(queryMasterTask != null) {
         TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
-        CallFuture future = new CallFuture();
-        workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, future);
+        CallFuture<TajoHeartbeatResponse> future = new CallFuture<TajoHeartbeatResponse>();
+
+        NettyClientBase tmClient = null;
         try {
-          future.get(3, TimeUnit.SECONDS);
-        } catch (Throwable e) {
-          LOG.warn(e);
+          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
+          TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+          masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
+        } catch (Exception e) {
+          connPool.closeConnection(tmClient);
+          tmClient = null;
+          LOG.error(e.getMessage(), e);
+        } finally {
+          connPool.releaseConnection(tmClient);
         }
 
         try {
@@ -331,9 +356,17 @@ public class QueryMaster extends CompositeService implements EventHandler {
         }
         synchronized(queryMasterTasks) {
           for(QueryMasterTask eachTask: tempTasks) {
+            NettyClientBase tmClient;
             try {
+              tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+                  TajoMasterProtocol.class, true);
+              TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+
+              CallFuture<TajoHeartbeatResponse> callBack =
+                  new CallFuture<TajoHeartbeatResponse>();
+
               TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
-              workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
+              masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
             } catch (Throwable t) {
               t.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 26cba45..3fb4ac0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -42,11 +42,14 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.worker.AbstractResourceAllocator;
 import org.apache.tajo.worker.TajoResourceAllocator;
@@ -159,8 +162,22 @@ public class QueryMasterTask extends CompositeService {
     LOG.info("Stopping QueryMasterTask:" + queryId);
 
     CallFuture future = new CallFuture();
-    queryMasterContext.getWorkerContext().getTajoMasterRpcClient()
-        .stopQueryMaster(null, queryId.getProto(), future);
+
+    RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
+    NettyClientBase tmClient = null;
+    try {
+      tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
+          TajoMasterProtocol.class, true);
+      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+      masterClientService.stopQueryMaster(null, queryId.getProto(), future);
+    } catch (Exception e) {
+      connPool.closeConnection(tmClient);
+      tmClient = null;
+      LOG.error(e.getMessage(), e);
+    } finally {
+      connPool.releaseConnection(tmClient);
+    }
+
     try {
       future.get(3, TimeUnit.SECONDS);
     } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 46a5807..275660a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -47,6 +47,8 @@ import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.io.IOException;
@@ -234,8 +236,21 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               .setExecutionBlockId(event.getExecutionBlockId().getProto())
               .build();
 
-      queryTaskContext.getQueryMasterContext().getWorkerContext().
-          getTajoMasterRpcClient().allocateWorkerResources(null, request, callBack);
+      RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
+      NettyClientBase tmClient = null;
+      try {
+        tmClient = connPool.getConnection(
+            queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
+        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        masterClientService.allocateWorkerResources(null, request, callBack);
+      } catch (Exception e) {
+        connPool.closeConnection(tmClient);
+        tmClient = null;
+        LOG.error(e.getMessage(), e);
+      } finally {
+        connPool.releaseConnection(tmClient);
+      }
 
       TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
       while(!stopped.get()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 92e30bf..4d46a45 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,8 +37,9 @@ import org.apache.tajo.master.querymaster.QueryMaster;
 import org.apache.tajo.master.querymaster.QueryMasterManagerService;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.NetUtils;
@@ -52,7 +54,6 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -82,11 +83,6 @@ public class TajoWorker extends CompositeService {
 
   private InetSocketAddress tajoMasterAddress;
 
-  //to TajoMaster
-  private AsyncRpcClient tajoMasterRpc;
-
-  private TajoMasterProtocol.TajoMasterProtocolService tajoMasterRpcClient;
-
   private CatalogClient catalogClient;
 
   private WorkerContext workerContext;
@@ -113,6 +109,8 @@ public class TajoWorker extends CompositeService {
 
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
+  private RpcConnectionPool connPool;
+
   private String[] cmdArgs;
 
   public TajoWorker() throws Exception {
@@ -162,7 +160,8 @@ public class TajoWorker extends CompositeService {
     this.systemConf = (TajoConf)conf;
     RackResolver.init(systemConf);
 
-    workerContext = new WorkerContext();
+    this.connPool = RpcConnectionPool.getPool(systemConf);
+    this.workerContext = new WorkerContext();
 
     String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
 
@@ -232,8 +231,7 @@ public class TajoWorker extends CompositeService {
     super.init(conf);
 
     if(yarnContainerMode && queryMasterMode) {
-      String tajoMasterAddress = cmdArgs[2];
-      connectToTajoMaster(tajoMasterAddress);
+      tajoMasterAddress = NetUtils.createSocketAddr(cmdArgs[2]);
       connectToCatalog();
 
       QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
@@ -242,7 +240,7 @@ public class TajoWorker extends CompositeService {
     } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
       taskRunnerManager.startTask(cmdArgs);
     } else {
-      connectToTajoMaster(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+      tajoMasterAddress = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
       connectToCatalog();
       workerHeartbeatThread = new WorkerHeartbeatThread();
       workerHeartbeatThread.start();
@@ -280,8 +278,8 @@ public class TajoWorker extends CompositeService {
       catalogClient.close();
     }
 
-    if(tajoMasterRpc != null) {
-      tajoMasterRpc.close();
+    if(connPool != null) {
+      connPool.close();
     }
 
     if(webServer != null && webServer.isAlive()) {
@@ -314,10 +312,6 @@ public class TajoWorker extends CompositeService {
       return tajoWorkerClientService;
     }
 
-    public TajoMasterProtocol.TajoMasterProtocolService getTajoMasterRpcClient() {
-      return tajoMasterRpcClient;
-    }
-
     public TaskRunnerManager getTaskRunnerManager() {
       return taskRunnerManager;
     }
@@ -364,6 +358,10 @@ public class TajoWorker extends CompositeService {
       return TajoWorker.this.numClusterSlots.get();
     }
 
+    public InetSocketAddress getTajoMasterAddress() {
+      return tajoMasterAddress;
+    }
+
     public int getPeerRpcPort() {
       return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
     }
@@ -381,27 +379,6 @@ public class TajoWorker extends CompositeService {
     stop();
   }
 
-  private void connectToTajoMaster(String tajoMasterAddrString) {
-    LOG.info("Connecting to TajoMaster (" + tajoMasterAddrString +")");
-    this.tajoMasterAddress = NetUtils.createSocketAddr(tajoMasterAddrString);
-
-    while(true) {
-      try {
-        tajoMasterRpc = new AsyncRpcClient(TajoMasterProtocol.class, this.tajoMasterAddress);
-        tajoMasterRpcClient = tajoMasterRpc.getStub();
-        break;
-      } catch (Exception e) {
-        LOG.error("Can't connect to TajoMaster[" + NetUtils.normalizeInetSocketAddress(tajoMasterAddress) + "], "
-            + e.getMessage(), e);
-      }
-
-      try {
-        Thread.sleep(3000);
-      } catch (InterruptedException e) {
-      }
-    }
-  }
-
   private void connectToCatalog() {
     try {
       catalogClient = new CatalogClient(systemConf);
@@ -456,8 +433,6 @@ public class TajoWorker extends CompositeService {
     }
 
     public void run() {
-      CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
-          new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
       LOG.info("Worker Resource Heartbeat Thread start.");
       int sendDiskInfoCount = 0;
       int pullServerPort = 0;
@@ -535,9 +510,15 @@ public class TajoWorker extends CompositeService {
             .setServerStatus(serverStatus)
             .build();
 
-        workerContext.getTajoMasterRpcClient().heartbeat(null, heartbeatProto, callBack);
-
+        NettyClientBase tmClient = null;
         try {
+          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+
+          tmClient = connPool.getConnection(tajoMasterAddress, TajoMasterProtocol.class, true);
+          TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+          masterClientService.heartbeat(callBack.getController(), heartbeatProto, callBack);
+
           TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
           if(response != null) {
             if(response.getNumClusterNodes() > 0) {
@@ -547,10 +528,19 @@ public class TajoWorker extends CompositeService {
             if(response.getNumClusterSlots() > 0) {
               workerContext.setNumClusterSlots(response.getNumClusterSlots());
             }
+          } else {
+            if(callBack.getController().failed()) {
+              throw new ServiceException(callBack.getController().errorText());
+            }
           }
         } catch (InterruptedException e) {
           break;
-        } catch (TimeoutException e) {
+        } catch (Exception e) {
+          connPool.closeConnection(tmClient);
+          tmClient = null;
+          LOG.error(e.getMessage(), e);
+        } finally {
+          connPool.releaseConnection(tmClient);
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index cca7242..922719d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -40,10 +40,9 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.*;
-import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.Fragment;
@@ -66,7 +65,7 @@ public class Task {
   private final QueryContext queryContext;
   private final FileSystem localFS;
   private final TaskRunner.TaskRunnerContext taskRunnerContext;
-  private final QueryMasterProtocol.QueryMasterProtocolService.Interface masterProxy;
+  private final QueryMasterProtocolService.Interface masterProxy;
   private final LocalDirAllocator lDirAllocator;
   private final QueryUnitAttemptId taskId;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index f0c9033..1920f25 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -39,15 +39,14 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.*;
 
@@ -70,7 +69,7 @@ public class TaskRunner extends AbstractService {
   private ContainerId containerId;
 
   // Cluster Management
-  private QueryMasterProtocolService.Interface master;
+  //private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
 
   // for temporal or intermediate files
   private FileSystem localFS;
@@ -102,16 +101,20 @@ public class TaskRunner extends AbstractService {
   private String baseDir;
   private Path baseDirPath;
 
-  private AsyncRpcClient client;
-
   private TaskRunnerManager taskRunnerManager;
 
   private long finishTime;
 
+  private RpcConnectionPool connPool;
+
+  private InetSocketAddress qmMasterAddr;
+
   public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
     super(TaskRunner.class.getName());
 
     this.taskRunnerManager = taskRunnerManager;
+    this.connPool = RpcConnectionPool.getPool(conf);
+
     try {
       final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
 
@@ -128,23 +131,23 @@ public class TaskRunner extends AbstractService {
       // QueryMaster's address
       String host = args[4];
       int port = Integer.parseInt(args[5]);
-      final InetSocketAddress masterAddr = NetUtils.createSocketAddrForHost(host, port);
+      this.qmMasterAddr = NetUtils.createSocketAddrForHost(host, port);
 
-      LOG.info("QueryMaster Address:" + masterAddr);
+      LOG.info("QueryMaster Address:" + qmMasterAddr);
       // TODO - 'load credential' should be implemented
       // Getting taskOwner
       UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
       //taskOwner.addToken(token);
 
-      // initialize QueryMasterProtocol as an actual task owner.
-      this.client =
-          taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
-            @Override
-            public AsyncRpcClient run() throws Exception {
-              return new AsyncRpcClient(QueryMasterProtocol.class, masterAddr);
-            }
-          });
-      this.master = client.getStub();
+      // initialize MasterWorkerProtocol as an actual task owner.
+//      this.client =
+//          taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
+//            @Override
+//            public AsyncRpcClient run() throws Exception {
+//              return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+//            }
+//          });
+//      this.master = client.getStub();
 
       this.executionBlockId = executionBlockId;
       this.queryId = executionBlockId.getQueryId();
@@ -213,10 +216,10 @@ public class TaskRunner extends AbstractService {
       }
     }
 
-    if(client != null) {
-      client.close();
-      client = null;
-    }
+//    if(client != null) {
+//      client.close();
+//      client = null;
+//    }
 
     LOG.info("Stop TaskRunner: " + executionBlockId);
     synchronized (this) {
@@ -237,9 +240,9 @@ public class TaskRunner extends AbstractService {
       return nodeId.toString();
     }
 
-    public QueryMasterProtocolService.Interface getMaster() {
-      return master;
-    }
+//    public TajoWorkerProtocolService.Interface getMaster() {
+//      return master;
+//    }
 
     public FileSystem getLocalFS() {
       return localFS;
@@ -282,12 +285,13 @@ public class TaskRunner extends AbstractService {
     return taskRunnerContext;
   }
 
-  static void fatalError(QueryMasterProtocolService.Interface proxy,
+  static void fatalError(QueryMasterProtocolService.Interface qmClientService,
                          QueryUnitAttemptId taskAttemptId, String message) {
     TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
         .setId(taskAttemptId.getProto())
         .setErrorMessage(message);
-    proxy.fatalError(null, builder.build(), NullCallback.get());
+
+    qmClientService.fatalError(null, builder.build(), NullCallback.get());
   }
 
   public void run() {
@@ -303,7 +307,12 @@ public class TaskRunner extends AbstractService {
           QueryUnitRequestProto taskRequest = null;
 
           while(!stopped) {
+            NettyClientBase qmClient = null;
+            QueryMasterProtocolService.Interface qmClientService = null;
             try {
+              qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+              qmClientService = qmClient.getStub();
+
               if (callFuture == null) {
                 callFuture = new CallFuture<QueryUnitRequestProto>();
                 LOG.info("Request GetTask: " + getId());
@@ -311,7 +320,8 @@ public class TaskRunner extends AbstractService {
                     .setExecutionBlockId(executionBlockId.getProto())
                     .setContainerId(((ContainerIdPBImpl) containerId).getProto())
                     .build();
-                master.getTask(null, request, callFuture);
+
+                qmClientService.getTask(null, request, callFuture);
               }
               try {
                 // wait for an assigning task for 3 seconds
@@ -347,14 +357,14 @@ public class TaskRunner extends AbstractService {
 
                   QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
                   if (tasks.containsKey(taskAttemptId)) {
-                    fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
+                    fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
                     continue;
                   }
 
                   LOG.info("Initializing: " + taskAttemptId);
                   Task task;
                   try {
-                    task = new Task(taskAttemptId, taskRunnerContext, master,
+                    task = new Task(taskAttemptId, taskRunnerContext, qmClientService,
                         new QueryUnitRequestImpl(taskRequest));
                     tasks.put(taskAttemptId, task);
 
@@ -365,7 +375,7 @@ public class TaskRunner extends AbstractService {
                     // task.run() is a blocking call.
                     task.run();
                   } catch (Throwable t) {
-                    fatalError(taskRunnerContext.getMaster(), taskAttemptId, t.getMessage());
+                    fatalError(qmClientService, taskAttemptId, t.getMessage());
                     t.printStackTrace();
                   } finally {
                     callFuture = null;
@@ -374,7 +384,11 @@ public class TaskRunner extends AbstractService {
                 }
               }
             } catch (Throwable t) {
+              connPool.closeConnection(qmClient);
+              qmClient = null;
               t.printStackTrace();
+            } finally {
+              connPool.releaseConnection(qmClient);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2f094504/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index b062eca..e8031c8 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+
 public class AsyncRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(RpcProtos.class);
 
@@ -47,6 +49,8 @@ public class AsyncRpcClient extends NettyClientBase {
   private final Class<?> protocol;
   private final Method stubMethod;
 
+  private RpcConnectionKey key;
+
   public AsyncRpcClient(final Class<?> protocol,
                         final InetSocketAddress addr)
       throws Exception {
@@ -62,10 +66,21 @@ public class AsyncRpcClient extends NettyClientBase {
         RpcResponse.getDefaultInstance());
     super.init(addr, pipeFactory);
     rpcChannel = new ProxyRpcChannel(getChannel());
+    this.key = new RpcConnectionKey(addr, protocol, true);
   }
 
-  public <T> T getStub() throws Exception {
-    return (T) stubMethod.invoke(null, rpcChannel);
+  @Override
+  public RpcConnectionKey getKey() {
+    return key;
+  }
+
+  @Override
+  public <T> T getStub() {
+    try {
+      return (T) stubMethod.invoke(null, rpcChannel);
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage(), e);
+    }
   }
 
   public RpcChannel getRpcChannel() {
@@ -99,6 +114,7 @@ public class AsyncRpcClient extends NettyClientBase {
 
       handler.registerCallback(nextSeqId,
           new ResponseCallback(controller, responseType, done));
+
       channel.write(rpcRequest);
     }
 
@@ -131,20 +147,16 @@ public class AsyncRpcClient extends NettyClientBase {
       this.callback = callback;
     }
 
+    @Override
     public void run(RpcResponse rpcResponse) {
-
       // if hasErrorMessage is true, it means rpc-level errors.
-      // it does not call the callback function
+      // it does not call the callback function\
       if (rpcResponse.hasErrorMessage()) {
-
         if (controller != null) {
           this.controller.setFailed(rpcResponse.getErrorMessage());
         }
         callback.run(null);
-        throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
-
       } else { // if rpc call succeed
-
         try {
           Message responseMessage;
           if (!rpcResponse.hasResponseMessage()) {
@@ -184,7 +196,6 @@ public class AsyncRpcClient extends NettyClientBase {
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
         throws Exception {
-
       RpcResponse response = (RpcResponse) e.getMessage();
       ResponseCallback callback = requests.remove(response.getId());
 
@@ -200,7 +211,22 @@ public class AsyncRpcClient extends NettyClientBase {
         throws Exception {
       LOG.error(addr + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
       e.getChannel().close();
-      throw new RemoteException(getErrorMessage(addr.toString()), e.getCause());
+
+      for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
+        ResponseCallback callback = callbackEntry.getValue();
+        Integer id = callbackEntry.getKey();
+
+        RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
+            .setErrorMessage(e.toString())
+            .setId(id);
+
+        callback.run(responseBuilder.build());
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.error("" + e.getCause(), e.getCause());
+      } else {
+        LOG.error("RPC Exception:" + e.getCause());
+      }
     }
   }
 }
\ No newline at end of file


Mime
View raw message