tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [2/2] tajo git commit: TAJO-1583: Remove ServerCallable in RPC client. (jinho)
Date Thu, 30 Apr 2015 07:53:10 GMT
TAJO-1583: Remove ServerCallable in RPC client. (jinho)

Closes #556


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47554105
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47554105
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47554105

Branch: refs/heads/master
Commit: 475541057891518e08e5a18ebbbf916c1ad60c10
Parents: 9540f16
Author: Jinho Kim <jhkim@apache.org>
Authored: Thu Apr 30 16:51:56 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Thu Apr 30 16:51:56 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/catalog/AbstractCatalogClient.java     | 569 ++++++-------------
 .../org/apache/tajo/catalog/CatalogClient.java  |  49 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   8 +-
 .../tajo/catalog/LocalCatalogWrapper.java       |  20 +-
 .../tajo/client/CatalogAdminClientImpl.java     | 236 +++-----
 .../org/apache/tajo/client/QueryClientImpl.java | 328 +++++------
 .../apache/tajo/client/SessionConnection.java   | 275 ++++-----
 .../cli/tsql/TestDefaultCliOutputFormatter.java |   4 -
 .../apache/tajo/querymaster/TestKillQuery.java  |  63 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |   3 +-
 .../tajo/rpc/RetriesExhaustedException.java     | 104 ----
 .../org/apache/tajo/rpc/ServerCallable.java     | 148 -----
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  39 --
 14 files changed, 618 insertions(+), 1230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8bda2bd..952f852 100644
--- a/CHANGES
+++ b/CHANGES
@@ -214,6 +214,8 @@ Release 0.11.0 - unreleased
   
   TASKS
 
+    TAJO-1583: Remove ServerCallable in RPC client. (jinho)
+
     TAJO-1587: Upgrade java version to 1.7 for Travis CI. (jihoon)
 
     TAJO-1559: Fix data model description (tinyint, smallint).

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 49be29a..766f6c2 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,16 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.exception.InvalidOperationException;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.ProtoUtil;
 
-import java.net.InetSocketAddress;
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -46,50 +42,27 @@ import java.util.List;
 /**
  * CatalogClient provides a client API to access the catalog server.
  */
-public abstract class AbstractCatalogClient implements CatalogService {
-  private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
+public abstract class AbstractCatalogClient implements CatalogService, Closeable {
+  protected final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
 
-  protected ServiceTracker serviceTracker;
-  protected RpcClientManager manager;
-  protected InetSocketAddress catalogServerAddr;
   protected TajoConf conf;
 
-  abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
-
-  public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
-    this.manager = RpcClientManager.getInstance();
-    this.catalogServerAddr = catalogServerAddr;
-    this.serviceTracker = ServiceTrackerFactory.get(conf);
+  public AbstractCatalogClient(TajoConf conf) {
     this.conf = conf;
   }
 
-  private InetSocketAddress getCatalogServerAddr() {
-    if (catalogServerAddr == null) {
-      return null;
-    } else {
-
-      if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        return catalogServerAddr;
-      } else {
-        return serviceTracker.getCatalogAddress();
-      }
-    }
-  }
+  abstract CatalogProtocolService.BlockingInterface getStub() throws ServiceException;
 
   @Override
   public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
+      CatalogProtocolService.BlockingInterface stub = getStub();
 
-          CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
-          builder.setTablespaceName(tablespaceName);
-          builder.setTablespaceUri(tablespaceUri);
-          return stub.createTablespace(null, builder.build()).getValue();
-        }
-      }.withRetries();
-    } catch (ServiceException e) {
+      CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
+      builder.setTablespaceName(tablespaceName);
+      builder.setTablespaceUri(tablespaceUri);
+      return stub.createTablespace(null, builder.build()).getValue();
+    } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
     }
@@ -98,12 +71,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -113,12 +82,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -128,46 +93,32 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllTablespaceNames() {
     try {
-      return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<String> call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
-          return ProtoUtil.convertStrings(response);
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
+      return ProtoUtil.convertStrings(response);
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<String>();
     }
   }
   
   @Override
   public List<TablespaceProto> getAllTablespaces() {
     try {
-      return new ServerCallable<List<TablespaceProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TablespaceProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
-          return response.getTablespaceList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
+      return response.getTablespaceList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TablespaceProto>();
     }
   }
 
   @Override
   public TablespaceProto getTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<TablespaceProto>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public TablespaceProto call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -177,12 +128,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.alterTablespace(null, alterTablespace).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.alterTablespace(null, alterTablespace).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -192,18 +139,14 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
+      CatalogProtocolService.BlockingInterface stub = getStub();
 
-          CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
-          builder.setDatabaseName(databaseName);
-          if (tablespaceName != null) {
-            builder.setTablespaceName(tablespaceName);
-          }
-          return stub.createDatabase(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
+      builder.setDatabaseName(databaseName);
+      if (tablespaceName != null) {
+        builder.setTablespaceName(tablespaceName);
+      }
+      return stub.createDatabase(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -213,12 +156,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -228,12 +167,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -243,50 +178,36 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllDatabaseNames() {
     try {
-      return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<String> call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
-          return ProtoUtil.convertStrings(response);
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
+      return ProtoUtil.convertStrings(response);
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<String>();
     }
   }
   
   @Override
   public List<DatabaseProto> getAllDatabases() {
     try {
-      return new ServerCallable<List<DatabaseProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<DatabaseProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
-          return response.getDatabaseList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
+      return response.getDatabaseList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<DatabaseProto>();
     }
   }
 
   @Override
   public final TableDesc getTableDesc(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<TableDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public TableDesc call(NettyClientBase client) throws ServiceException {
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -302,89 +223,60 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<TableDescriptorProto> getAllTables() {
     try {
-      return new ServerCallable<List<TableDescriptorProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TableDescriptorProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
-          return response.getTableList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
+      return response.getTableList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TableDescriptorProto>();
     }
   }
   
   @Override
   public List<TableOptionProto> getAllTableOptions() {
     try {
-      return new ServerCallable<List<TableOptionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TableOptionProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
-          return response.getTableOptionList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
+      return response.getTableOptionList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TableOptionProto>();
     }
   }
   
   @Override
   public List<TableStatsProto> getAllTableStats() {
     try {
-      return new ServerCallable<List<TableStatsProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TableStatsProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
-          return response.getStatList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
+      return response.getStatList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TableStatsProto>();
     }
   }
   
   @Override
   public List<ColumnProto> getAllColumns() {
     try {
-      return new ServerCallable<List<ColumnProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<ColumnProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
-          return response.getColumnList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
+      return response.getColumnList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<ColumnProto>();
     }
   }
 
   @Override
   public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null,  builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -394,17 +286,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existPartitionMethod(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existPartitionMethod(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -415,18 +302,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public final PartitionDescProto getPartition(final String databaseName, final String tableName,
                                                final String partitionName) {
     try {
-      return new ServerCallable<PartitionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public PartitionDescProto call(NettyClientBase client) throws ServiceException {
-
-          PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
-          builder.setPartitionName(partitionName);
+      PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
+      builder.setPartitionName(partitionName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.getPartitionByPartitionName(null, builder.build());
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.getPartitionByPartitionName(null, builder.build());
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -436,94 +318,70 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<List<PartitionDescProto>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class,
-        false) {
-        public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException {
+      PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
-
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
-          return response.getPartitionList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
+      return response.getPartitionList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<PartitionDescProto>();
     }
   }
   @Override
   public List<TablePartitionProto> getAllPartitions() {
     try {
-      return new ServerCallable<List<TablePartitionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TablePartitionProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
-          return response.getPartList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
+      return response.getPartList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TablePartitionProto>();
     }
   }
 
   @Override
   public final Collection<String> getAllTableNames(final String databaseName) {
     try {
-      return new ServerCallable<Collection<String>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<String> call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
-          return ProtoUtil.convertStrings(response);
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
+      return ProtoUtil.convertStrings(response);
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<String>();
     }
   }
 
   @Override
   public final Collection<FunctionDesc> getFunctions() {
-    try {
-      return new ServerCallable<Collection<FunctionDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
-          List<FunctionDesc> list = new ArrayList<FunctionDesc>();
-          GetFunctionsResponse response;
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          response = stub.getFunctions(null, NullProto.newBuilder().build());
-          int size = response.getFunctionDescCount();
-          for (int i = 0; i < size; i++) {
-            try {
-              list.add(new FunctionDesc(response.getFunctionDesc(i)));
-            } catch (ClassNotFoundException e) {
-              LOG.error(e, e);
-              return null;
-            }
-          }
+    List<FunctionDesc> list = new ArrayList<FunctionDesc>();
+    try {
+      GetFunctionsResponse response;
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      response = stub.getFunctions(null, NullProto.newBuilder().build());
+      int size = response.getFunctionDescCount();
+      for (int i = 0; i < size; i++) {
+        try {
+          list.add(new FunctionDesc(response.getFunctionDesc(i)));
+        } catch (ClassNotFoundException e) {
+          LOG.error(e, e);
           return list;
         }
-      }.withRetries();
+      }
+      return list;
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return list;
     }
   }
 
   @Override
   public final boolean createTable(final TableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.createTable(null, desc.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.createTable(null, desc.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -537,17 +395,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
     final String simpleName = splitted[1];
 
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(simpleName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(simpleName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropTable(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropTable(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -561,17 +414,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
           "tableName cannot be composed of multiple parts, but it is \"" + tableName + "\"");
     }
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existsTable(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existsTable(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -586,12 +434,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createIndex(final IndexDesc index) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.createIndex(null, index.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.createIndex(null, index.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -601,16 +445,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          IndexNameProto.Builder builder = IndexNameProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setIndexName(indexName);
+      IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setIndexName(indexName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existIndexByName(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existIndexByName(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -620,17 +460,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
 
-          GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
-          builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-          builder.setColumnName(columnName);
+      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+      builder.setColumnName(columnName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existIndexByColumn(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existIndexByColumn(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -640,17 +476,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final IndexDesc getIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public IndexDesc call(NettyClientBase client) throws ServiceException {
-
-          IndexNameProto.Builder builder = IndexNameProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setIndexName(indexName);
+      IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setIndexName(indexName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return new IndexDesc(stub.getIndexByName(null, builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return new IndexDesc(stub.getIndexByName(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -662,17 +493,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
                                           final String tableName,
                                           final String columnName) {
     try {
-      return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public IndexDesc call(NettyClientBase client) throws ServiceException {
+      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+      builder.setColumnName(columnName);
 
-          GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
-          builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-          builder.setColumnName(columnName);
-
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -683,17 +509,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public boolean dropIndex(final String databaseName,
                            final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
+      IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setIndexName(indexName);
 
-          IndexNameProto.Builder builder = IndexNameProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setIndexName(indexName);
-
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropIndex(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropIndex(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -703,30 +524,20 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<IndexProto> getAllIndexes() {
     try {
-      return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<IndexProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
-          return response.getIndexList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
+      return response.getIndexList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<IndexProto>();
     }
   }
 
   @Override
   public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.createFunction(null, funcDesc.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.createFunction(null, funcDesc.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -736,15 +547,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean dropFunction(final String signature) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
-          builder.setSignature(signature);
+      UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
+      builder.setSignature(signature);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropFunction(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropFunction(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -769,24 +576,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
 
     FunctionDescProto descProto = null;
     try {
-      descProto = new ServerCallable<FunctionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public FunctionDescProto call(NettyClientBase client) throws ServiceException {
-          try {
-            CatalogProtocolService.BlockingInterface stub = getStub(client);
-            return stub.getFunctionMeta(null, builder.build());
-          } catch (NoSuchFunctionException e) {
-            abort();
-            throw e;
-          }
-        }
-      }.withRetries();
-    } catch(ServiceException e) {
-      // this is not good. we need to define user massage exception
-      if(e.getCause() instanceof NoSuchFunctionException){
-        LOG.debug(e.getMessage());
-      } else {
-        LOG.error(e.getMessage(), e);
-      }
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      descProto = stub.getFunctionMeta(null, builder.build());
+    } catch (NoSuchFunctionException e) {
+      LOG.debug(e.getMessage());
+    } catch (ServiceException e) {
+      LOG.error(e.getMessage(), e);
     }
 
     if (descProto == null) {
@@ -819,27 +614,21 @@ public abstract class AbstractCatalogClient implements CatalogService {
     }
 
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.containFunction(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.containFunction(null, builder.build()).getValue();
+    } catch (InvalidOperationException e) {
+      LOG.error(e.getMessage());
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return false;
     }
+    return false;
   }
 
   @Override
   public final boolean alterTable(final AlterTableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.alterTable(null, desc.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.alterTable(null, desc.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -849,12 +638,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.updateTableStats(null, updateTableStatsProto).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.updateTableStats(null, updateTableStatsProto).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 7666a97..80ded4a 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -18,35 +18,72 @@
 
 package org.apache.tajo.catalog;
 
+import com.google.protobuf.ServiceException;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CatalogClient provides a client API to access the catalog server.
  */
 public class CatalogClient extends AbstractCatalogClient {
+  protected NettyClientBase client;
+  protected ServiceTracker serviceTracker;
+  protected InetSocketAddress catalogServerAddr;
   /**
    * @throws java.io.IOException
    *
    */
   public CatalogClient(final TajoConf conf) throws IOException {
-    super(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS)));
+    super(conf);
+    this.catalogServerAddr = NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS));
+    this.serviceTracker = ServiceTrackerFactory.get(conf);
   }
 
-  public CatalogClient(TajoConf conf, String host, int port) throws IOException {
-    super(conf, NetUtils.createSocketAddr(host, port));
-  }
 
   @Override
-  BlockingInterface getStub(NettyClientBase client) {
-    return client.getStub();
+  BlockingInterface getStub() throws ServiceException {
+    return getCatalogConnection().getStub();
+  }
+
+  private InetSocketAddress getCatalogServerAddr() {
+    if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      return catalogServerAddr;
+    } else {
+      return serviceTracker.getCatalogAddress();
+    }
   }
 
+  public synchronized NettyClientBase getCatalogConnection() throws ServiceException {
+    if (client != null && client.isConnected()) return client;
+    else {
+      try {
+        if (client != null && client.isConnected()) return client;
+        RpcClientManager.cleanup(client);
+
+        int retry = conf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES);
+        // Client do not closed on idle state for support high available
+        this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, false,
+            retry, 0, TimeUnit.SECONDS, false);
+      } catch (Exception e) {
+        throw new ServiceException(e);
+      }
+      return client;
+    }
+  }
+
+  @Override
   public void close() {
+    RpcClientManager.cleanup(client);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index e9fb177..f2e9795 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -33,7 +33,6 @@ import org.apache.tajo.annotation.ThreadSafe;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
 import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
 import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.store.CatalogStore;
 import org.apache.tajo.catalog.store.DerbyStore;
@@ -61,7 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
 import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
 
 /**
  * This class provides the catalog service. The catalog service enables clients
@@ -1192,7 +1190,7 @@ public class CatalogServer extends AbstractService {
       if (functions.containsKey(funcDesc.getSignature())) {
         FunctionDescProto found = findFunctionStrictType(funcDesc, true);
         if (found != null) {
-          throw new AlreadyExistsFunctionException(signature.toString());
+          throw new ServiceException(new AlreadyExistsFunctionException(signature.toString()));
         }
       }
 
@@ -1209,7 +1207,7 @@ public class CatalogServer extends AbstractService {
         throws ServiceException {
 
       if (!containFunction(request.getSignature())) {
-        throw new NoSuchFunctionException(request.getSignature(), new DataType[] {});
+        throw new ServiceException(new NoSuchFunctionException(request.getSignature(), new DataType[]{}));
       }
 
       functions.remove(request.getSignature());
@@ -1231,7 +1229,7 @@ public class CatalogServer extends AbstractService {
       }
 
       if (function == null) {
-        throw new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList());
+        throw new ServiceException(new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList()));
       } else {
         return function;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
index df9bd2c..35e9e2e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
@@ -22,9 +22,6 @@
 package org.apache.tajo.catalog;
 
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-
-import java.io.IOException;
 
 /**
  * This class provides a catalog service interface in
@@ -34,20 +31,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
   private CatalogServer catalog;
   private CatalogProtocol.CatalogProtocolService.BlockingInterface stub;
 
-  public LocalCatalogWrapper(final TajoConf conf) throws IOException {
-    super(conf, null);
-    this.catalog = new CatalogServer();
-    this.catalog.init(conf);
-    this.catalog.start();
-    this.stub = catalog.getHandler();
-  }
-
   public LocalCatalogWrapper(final CatalogServer server) {
     this(server, server.getConf());
   }
 
   public LocalCatalogWrapper(final CatalogServer server, final TajoConf conf) {
-    super(conf, null);
+    super(conf);
     this.catalog = server;
     this.stub = server.getHandler();
   }
@@ -57,7 +46,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
   }
 
   @Override
-  CatalogProtocol.CatalogProtocolService.BlockingInterface getStub(NettyClientBase client) {
+  CatalogProtocol.CatalogProtocolService.BlockingInterface getStub() {
     return stub;
   }
+
+  @Override
+  public void close() {
+    //nothing to do
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 9d0e427..9397fcf 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@ -27,10 +27,8 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.jdbc.SQLStates;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
 
 import java.io.IOException;
 import java.net.URI;
@@ -48,79 +46,45 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
 
   @Override
   public boolean createDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMaster = client.getStub();
-        return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMaster = client.getStub();
+    return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
   }
 
   @Override
   public boolean existDatabase(final String databaseName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMaster = client.getStub();
-        return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMaster = client.getStub();
+    return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
   }
 
   @Override
   public boolean dropDatabase(final String databaseName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
   }
 
   @Override
   public List<String> getAllDatabaseNames() throws ServiceException {
 
-    return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public List<String> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
   }
 
   public boolean existTable(final String tableName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
   }
 
   @Override
@@ -133,32 +97,25 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
                                        final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
       throws SQLException, ServiceException {
 
-    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setName(tableName);
-        builder.setSchema(schema.getProto());
-        builder.setMeta(meta.getProto());
-        builder.setPath(path.toString());
-        if (partitionMethodDesc != null) {
-          builder.setPartition(partitionMethodDesc.getProto());
-        }
-        ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
-          return CatalogUtil.newTableDesc(res.getTableDesc());
-        } else {
-          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-        }
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setName(tableName);
+    builder.setSchema(schema.getProto());
+    builder.setMeta(meta.getProto());
+    builder.setPath(path.toString());
+    if (partitionMethodDesc != null) {
+      builder.setPartition(partitionMethodDesc.getProto());
+    }
+    ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+      return CatalogUtil.newTableDesc(res.getTableDesc());
+    } else {
+      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+    }
   }
 
   @Override
@@ -169,94 +126,67 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
 
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setName(tableName);
-        builder.setPurge(purge);
-        return tajoMasterService.dropTable(null, builder.build()).getValue();
-      }
-
-    }.withRetries();
+    ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setName(tableName);
+    builder.setPurge(purge);
+    return tajoMasterService.dropTable(null, builder.build()).getValue();
 
   }
 
   @Override
   public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
-    return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public List<String> call(NettyClientBase client) throws ServiceException {
 
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        if (databaseName != null) {
-          builder.setDatabaseName(databaseName);
-        }
-        ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
-        return res.getTablesList();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    if (databaseName != null) {
+      builder.setDatabaseName(databaseName);
+    }
+    ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
+    return res.getTablesList();
   }
 
   @Override
   public TableDesc getTableDesc(final String tableName) throws ServiceException {
 
-    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setTableName(tableName);
-        ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
-          return CatalogUtil.newTableDesc(res.getTableDesc());
-        } else {
-          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-        }
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setTableName(tableName);
+    ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
+    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+      return CatalogUtil.newTableDesc(res.getTableDesc());
+    } else {
+      throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
+    }
   }
 
   @Override
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
 
-    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
-        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
-      public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        String paramFunctionName = functionName == null ? "" : functionName;
-        ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
-            connection.convertSessionedString(paramFunctionName));
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
-          return res.getFunctionsList();
-        } else {
-          throw new SQLException(res.getErrorMessage());
-        }
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    String paramFunctionName = functionName == null ? "" : functionName;
+    ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+        connection.convertSessionedString(paramFunctionName));
+    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+      return res.getFunctionsList();
+    } else {
+      throw new ServiceException(new SQLException(res.getErrorMessage()));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 99c58b6..53889fe 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -28,11 +28,10 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.io.IOException;
@@ -40,6 +39,7 @@ import java.net.InetSocketAddress;
 import java.sql.ResultSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.tajo.ipc.ClientProtos.*;
 import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
@@ -102,7 +102,7 @@ public class QueryClientImpl implements QueryClient {
   public void closeNonForwardQuery(QueryId queryId) {
     NettyClientBase tmClient = null;
     try {
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub();
       connection.checkSessionAndGet(tmClient);
 
@@ -153,50 +153,37 @@ public class QueryClientImpl implements QueryClient {
 
   @Override
   public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
 
-    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    final QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(sql);
+    builder.setIsJson(false);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
 
-        connection.checkSessionAndGet(client);
-
-        final QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(sql);
-        builder.setIsJson(false);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
-        SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
-        if (response.getResultCode() == ResultCode.OK) {
-          connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-        }
-        return response;
-      }
-    }.withRetries();
+    SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
+    if (response.getResultCode() == ResultCode.OK) {
+      connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+    }
+    return response;
   }
 
   @Override
   public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
 
-    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
 
-        final QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(json);
-        builder.setIsJson(true);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    final QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(json);
+    builder.setIsJson(true);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
 
-        return tajoMasterService.submitQuery(null, builder.build());
-      }
-    }.withRetries();
+    return tajoMasterService.submitQuery(null, builder.build());
   }
 
   @Override
@@ -308,7 +295,7 @@ public class QueryClientImpl implements QueryClient {
 
     NettyClientBase tmClient = null;
     try {
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       connection.checkSessionAndGet(tmClient);
       builder.setSessionId(connection.sessionId);
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
@@ -348,7 +335,7 @@ public class QueryClientImpl implements QueryClient {
 
     try {
 
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       connection.checkSessionAndGet(tmClient);
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
 
@@ -369,42 +356,26 @@ public class QueryClientImpl implements QueryClient {
       throws ServiceException {
 
     try {
-      final ServerCallable<ClientProtos.SerializedResultSet> callable =
-          new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
-              TajoMasterClientProtocol.class, false) {
-
-            public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
-
-              connection.checkSessionAndGet(client);
-              TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-              GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
-              builder.setSessionId(connection.sessionId);
-              builder.setQueryId(queryId.getProto());
-              builder.setFetchRowNum(fetchRowNum);
-              try {
-                GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
-                if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
-                  abort();
-                  throw new ServiceException(response.getErrorMessage());
-                }
-
-                return response.getResultSet();
-              } catch (ServiceException e) {
-                abort();
-                throw e;
-              } catch (Throwable t) {
-                throw new ServiceException(t.getMessage(), t);
-              }
-            }
-          };
-
-      ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
+      NettyClientBase client = connection.getTajoMasterConnection();
+      connection.checkSessionAndGet(client);
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+      GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+      builder.setSessionId(connection.sessionId);
+      builder.setQueryId(queryId.getProto());
+      builder.setFetchRowNum(fetchRowNum);
+
+      GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
+      if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+        throw new ServiceException(response.getErrorMessage());
+      }
+
+      ClientProtos.SerializedResultSet resultSet = response.getResultSet();
 
       return new TajoMemoryResultSet(queryId,
-          new Schema(serializedResultSet.getSchema()),
-          serializedResultSet.getSerializedTuplesList(),
-          serializedResultSet.getSerializedTuplesCount(),
+          new Schema(resultSet.getSchema()),
+          resultSet.getSerializedTuplesList(),
+          resultSet.getSerializedTuplesCount(),
           getClientSideSessionVars());
     } catch (ServiceException e) {
       throw e;
@@ -416,119 +387,86 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public boolean updateQuery(final String sql) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public Boolean call(NettyClientBase client) throws ServiceException {
+    QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(sql);
+    builder.setIsJson(false);
+    ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
 
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(sql);
-        builder.setIsJson(false);
-        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-
-        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
-          connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-          return true;
-        } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
-          }
-          return false;
-        }
+    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+      connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+      return true;
+    } else {
+      if (response.hasErrorMessage()) {
+        LOG.error("ERROR: " + response.getErrorMessage());
       }
-    }.withRetries();
+      return false;
+    }
   }
 
   @Override
   public boolean updateQueryWithJson(final String json) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(json);
-        builder.setIsJson(true);
-        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
-          return true;
-        } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
-          }
-          return false;
-        }
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+    QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(json);
+    builder.setIsJson(true);
+    ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+      return true;
+    } else {
+      if (response.hasErrorMessage()) {
+        LOG.error("ERROR: " + response.getErrorMessage());
       }
-    }.withRetries();
+      return false;
+    }
   }
 
   @Override
   public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
-        return res.getQueryListList();
-
-      }
-    }.withRetries();
+    ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
+    return res.getQueryListList();
   }
 
   @Override
   public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
-        return res.getQueryListList();
-
-      }
-    }.withRetries();
+    ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
+    return res.getQueryListList();
   }
 
   @Override
   public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
-        return res.getWorkerListList();
-      }
-
-    }.withRetries();
+    ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
+    return res.getWorkerListList();
   }
 
   @Override
@@ -540,7 +478,7 @@ public class QueryClientImpl implements QueryClient {
     NettyClientBase tmClient = null;
     try {
       /* send a kill to the TM */
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
 
       connection.checkSessionAndGet(tmClient);
@@ -581,25 +519,20 @@ public class QueryClientImpl implements QueryClient {
   }
   
   public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
-    return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-      public QueryInfoProto call(NettyClientBase client) throws ServiceException {
-        connection.checkSessionAndGet(client);
-
-        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQueryId(queryId.getProto());
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
-          return res.getQueryInfo();
-        } else {
-          abort();
-          throw new ServiceException(res.getErrorMessage());
-        }
-      }
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+
+    QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQueryId(queryId.getProto());
+
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
+    if (res.getResultCode() == ResultCode.OK) {
+      return res.getQueryInfo();
+    } else {
+      throw new ServiceException(res.getErrorMessage());
+    }
   }
 
   public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@ -611,24 +544,31 @@ public class QueryClientImpl implements QueryClient {
     InetSocketAddress qmAddress = new InetSocketAddress(
         queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
 
-    return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
-        QueryMasterClientProtocol.class, false) {
-      public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
-        connection.checkSessionAndGet(client);
+    RpcClientManager manager = RpcClientManager.getInstance();
+    NettyClientBase queryMasterClient;
+    try {
+      queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+          manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+    } catch (Exception e) {
+      throw new ServiceException(e);
+    }
 
-        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQueryId(queryId.getProto());
+    try {
+      connection.checkSessionAndGet(connection.getTajoMasterConnection());
 
-        QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
-        GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
-          return res.getQueryHistory();
-        } else {
-          abort();
-          throw new ServiceException(res.getErrorMessage());
-        }
+      QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+      builder.setSessionId(connection.sessionId);
+      builder.setQueryId(queryId.getProto());
+
+      QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+      GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
+      if (res.getResultCode() == ResultCode.OK) {
+        return res.getQueryHistory();
+      } else {
+        throw new ServiceException(res.getErrorMessage());
       }
-    }.withRetries();
+    } finally {
+      queryMasterClient.close();
+    }
   }
 }


Mime
View raw message