tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [13/30] git commit: TAJO-1140: Separate TajoClient into fine grained parts.
Date Wed, 05 Nov 2014 09:52:09 GMT
TAJO-1140: Separate TajoClient into fine grained parts.

Closes #213


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f80beaf6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f80beaf6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f80beaf6

Branch: refs/heads/hbase_storage
Commit: f80beaf64d33850ee79e8ae32b33a852f56712f0
Parents: 8741e68
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Oct 27 22:16:30 2014 -0700
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Oct 27 22:16:30 2014 -0700

----------------------------------------------------------------------
 CHANGES                                         |    2 +
 .../tajo/cli/DefaultTajoCliOutputFormatter.java |    6 +-
 .../org/apache/tajo/cli/DescTableCommand.java   |    4 +-
 .../tajo/cli/InvalidClientSessionException.java |   27 -
 .../main/java/org/apache/tajo/cli/TajoCli.java  |   20 +-
 .../org/apache/tajo/cli/TajoShellCommand.java   |    2 +-
 .../apache/tajo/client/CatalogAdminClient.java  |  137 +++
 .../tajo/client/CatalogAdminClientImpl.java     |  266 +++++
 .../client/InvalidClientSessionException.java   |   27 +
 .../org/apache/tajo/client/QueryClient.java     |  116 ++
 .../org/apache/tajo/client/QueryClientImpl.java |  622 +++++++++++
 .../apache/tajo/client/SessionConnection.java   |  326 ++++++
 .../java/org/apache/tajo/client/TajoAdmin.java  |    8 +-
 .../java/org/apache/tajo/client/TajoClient.java | 1023 +-----------------
 .../org/apache/tajo/client/TajoClientImpl.java  |  215 ++++
 .../org/apache/tajo/client/TajoClientUtil.java  |   94 ++
 .../java/org/apache/tajo/client/TajoDump.java   |    4 +-
 .../org/apache/tajo/client/TajoGetConf.java     |    8 +-
 .../org/apache/tajo/client/TajoHAAdmin.java     |    8 +-
 .../apache/tajo/client/TajoHAClientUtil.java    |    2 +-
 .../org/apache/tajo/jdbc/FetchResultSet.java    |    5 +-
 .../org/apache/tajo/jdbc/TajoResultSet.java     |    7 +-
 .../org/apache/tajo/benchmark/BenchmarkSet.java |    5 +-
 .../org/apache/tajo/master/GlobalEngine.java    |    4 +-
 .../tajo/webapp/QueryExecutorServlet.java       |   19 +-
 .../apache/tajo/LocalTajoTestingUtility.java    |    3 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |    3 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   10 +-
 .../org/apache/tajo/client/TestTajoClient.java  |    4 +-
 .../java/org/apache/tajo/jdbc/TestTajoJdbc.java |    6 +-
 .../tajo/master/ha/TestHAServiceHDFSImpl.java   |    3 +-
 .../master/querymaster/TestQueryProgress.java   |    6 +-
 .../tajo/scheduler/TestFifoScheduler.java       |    8 +-
 .../org/apache/tajo/worker/TestHistory.java     |    3 +-
 .../org/apache/tajo/jdbc/JdbcConnection.java    |  444 ++++++++
 .../org/apache/tajo/jdbc/TajoConnection.java    |  437 --------
 .../apache/tajo/jdbc/TajoDatabaseMetaData.java  |   27 +-
 .../java/org/apache/tajo/jdbc/TajoDriver.java   |    2 +-
 .../apache/tajo/jdbc/TajoPreparedStatement.java |    4 +-
 .../org/apache/tajo/jdbc/TajoStatement.java     |    5 +-
 40 files changed, 2346 insertions(+), 1576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index d239a19..ba2646d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -11,6 +11,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1140: Separate TajoClient into fine grained parts. (hyunsik)
+
     TAJO-1132: More detailed version info in tsql. (hyunsik)
 
     TAJO-1125: Separate logical plan and optimizer into a maven module.

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
index 5797778..c583aa2 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/DefaultTajoCliOutputFormatter.java
@@ -23,8 +23,8 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.client.QueryStatus;
-import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.util.FileUtil;
 
 import java.io.InputStream;
@@ -56,10 +56,10 @@ public class DefaultTajoCliOutputFormatter implements TajoCliOutputFormatter {
     TableStats stat = tableDesc.getStats();
     String volume = stat == null ? (endOfTuple ? "0 B" : "unknown bytes") :
         FileUtil.humanReadableByteCount(stat.getNumBytes(), false);
-    long resultRows = stat == null ? TajoClient.UNKNOWN_ROW_NUMBER : stat.getNumRows();
+    long resultRows = stat == null ? QueryClient.UNKNOWN_ROW_NUMBER : stat.getNumRows();
 
     String displayRowNum;
-    if (resultRows == TajoClient.UNKNOWN_ROW_NUMBER) {
+    if (resultRows == QueryClient.UNKNOWN_ROW_NUMBER) {
 
       if (endOfTuple) {
         displayRowNum = totalPrintedRows + " rows";

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
index 8fab138..4b34858 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
@@ -23,7 +23,7 @@ import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 
@@ -82,7 +82,7 @@ public class DescTableCommand extends TajoShellCommand {
     if (desc.getStats() != null) {
 
       long row = desc.getStats().getNumRows();
-      String rowText = row == TajoClient.UNKNOWN_ROW_NUMBER ? "unknown" : row + "";
+      String rowText = row == QueryClient.UNKNOWN_ROW_NUMBER ? "unknown" : row + "";
       sb.append("number of rows: ").append(rowText).append("\n");
       sb.append("volume: ").append(
           FileUtil.humanReadableByteCount(desc.getStats().getNumBytes(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java b/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java
deleted file mode 100644
index 5c6c96e..0000000
--- a/tajo-client/src/main/java/org/apache/tajo/cli/InvalidClientSessionException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.cli;
-
-import com.google.protobuf.ServiceException;
-
-public class InvalidClientSessionException extends ServiceException {
-  public InvalidClientSessionException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 05b919c..c732fd9 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -21,17 +21,13 @@ package org.apache.tajo.cli;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
-
 import jline.UnsupportedTerminal;
 import jline.console.ConsoleReader;
-
 import org.apache.commons.cli.*;
 import org.apache.tajo.*;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.client.QueryStatus;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.client.TajoHAClientUtil;
+import org.apache.tajo.client.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ipc.ClientProtos;
@@ -233,9 +229,9 @@ public class TajoCli {
       throw new RuntimeException("cannot find valid Tajo server address");
     } else if (hostName != null && port != null) {
       conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
-      client = new TajoClient(conf, baseDatabase);
+      client = new TajoClientImpl(conf, baseDatabase);
     } else if (hostName == null && port == null) {
-      client = new TajoClient(conf, baseDatabase);
+      client = new TajoClientImpl(conf, baseDatabase);
     }
 
     try {
@@ -565,7 +561,7 @@ public class TajoCli {
       if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
         displayFormatter.printResult(sout, sin, desc, responseTime, res);
       } else {
-        res = TajoClient.createResultSet(client, response);
+        res = TajoClientUtil.createResultSet(conf, client, response);
         displayFormatter.printResult(sout, sin, desc, responseTime, res);
       }
     } catch (Throwable t) {
@@ -597,17 +593,17 @@ public class TajoCli {
       while (true) {
         // TODO - configurable
         status = client.getQueryStatus(queryId);
-        if(TajoClient.isInPreNewState(status.getState())) {
+        if(TajoClientUtil.isQueryWaitingForSchedule(status.getState())) {
           Thread.sleep(Math.min(20 * initRetries, 1000));
           initRetries++;
           continue;
         }
 
-        if (TajoClient.isInRunningState(status.getState()) || status.getState() == QueryState.QUERY_SUCCEEDED) {
+        if (TajoClientUtil.isQueryRunning(status.getState()) || status.getState() == QueryState.QUERY_SUCCEEDED) {
           displayFormatter.printProgress(sout, status);
         }
 
-        if (TajoClient.isInCompleteState(status.getState()) && status.getState() != QueryState.QUERY_KILL_WAIT) {
+        if (TajoClientUtil.isQueryComplete(status.getState()) && status.getState() != QueryState.QUERY_KILL_WAIT) {
           break;
         } else {
           Thread.sleep(Math.min(200 * progressRetries, 1000));
@@ -626,7 +622,7 @@ public class TajoCli {
           float responseTime = ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0f);
           ClientProtos.GetQueryResultResponse response = client.getResultResponse(queryId);
           if (status.hasResult()) {
-            res = TajoClient.createResultSet(client, queryId, response);
+            res = TajoClientUtil.createResultSet(conf, client, queryId, response);
             TableDesc desc = new TableDesc(response.getTableDesc());
             displayFormatter.printResult(sout, sin, desc, responseTime, res);
           } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java
index 39f5377..138aec4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoShellCommand.java
@@ -40,7 +40,7 @@ public abstract class TajoShellCommand {
   protected int maxColumn;
 
   public TajoShellCommand(TajoCli.TajoCliContext context) {
-    maxColumn = context.getTajoClient().getConf().getIntVar(TajoConf.ConfVars.$CLI_MAX_COLUMN);
+    maxColumn = context.getConf().getIntVar(TajoConf.ConfVars.$CLI_MAX_COLUMN);
     this.context = context;
     client = context.getTajoClient();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java
new file mode 100644
index 0000000..a36fc0e
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClient.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+
+import java.io.Closeable;
+import java.sql.SQLException;
+import java.util.List;
+
+public interface CatalogAdminClient extends Closeable {
+  /**
+   * Create a database.
+   *
+   * @param databaseName The database name to be created. This name is case sensitive.
+   * @return True if created successfully.
+   * @throws com.google.protobuf.ServiceException
+   */
+  public boolean createDatabase(final String databaseName) throws ServiceException;
+  /**
+   * Does the database exist?
+   *
+   * @param databaseName The database name to be checked. This name is case sensitive.
+   * @return True if so.
+   * @throws ServiceException
+   */
+  public boolean existDatabase(final String databaseName) throws ServiceException;
+  /**
+   * Drop the database
+   *
+   * @param databaseName The database name to be dropped. This name is case sensitive.
+   * @return True if the database is dropped successfully.
+   * @throws ServiceException
+   */
+  public boolean dropDatabase(final String databaseName) throws ServiceException;
+
+  public List<String> getAllDatabaseNames() throws ServiceException;
+
+  /**
+   * Does the table exist?
+   *
+   * @param tableName The table name to be checked. This name is case sensitive.
+   * @return True if so.
+   */
+  public boolean existTable(final String tableName) throws ServiceException;
+
+  /**
+   * Create an external table.
+   *
+   * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
+   *                  If the table name is not qualified, the current database in the session will be used.
+   * @param schema The schema
+   * @param path The external table location
+   * @param meta Table meta
+   * @return the created table description.
+   * @throws java.sql.SQLException
+   * @throws ServiceException
+   */
+  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+                                       final TableMeta meta) throws SQLException, ServiceException;
+
+  /**
+   * Create an external table.
+   *
+   * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
+   *                  If the table name is not qualified, the current database in the session will be used.
+   * @param schema The schema
+   * @param path The external table location
+   * @param meta Table meta
+   * @param partitionMethodDesc Table partition description
+   * @return the created table description.
+   * @throws SQLException
+   * @throws ServiceException
+   */
+  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+                                       final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
+      throws SQLException, ServiceException;
+
+  /**
+   * Drop a table
+   *
+   * @param tableName The table name to be dropped. This name is case sensitive.
+   * @return True if the table is dropped successfully.
+   */
+  public boolean dropTable(final String tableName) throws ServiceException;
+
+  /**
+   * Drop a table.
+   *
+   * @param tableName The table name to be dropped. This name is case sensitive.
+   * @param purge If purge is true, this call will remove the entry in catalog as well as the table contents.
+   * @return True if the table is dropped successfully.
+   */
+  public boolean dropTable(final String tableName, final boolean purge) throws ServiceException;
+
+  /**
+   * Get a list of table names.
+   *
+   * @param databaseName The database name to show all tables. This name is case sensitive.
+   *                     If it is null, this method will show all tables
+   *                     in the current database of this session.
+   */
+  public List<String> getTableList(@Nullable final String databaseName) throws ServiceException;
+
+  /**
+   * Get a table description
+   *
+   * @param tableName The table name to get. This name is case sensitive.
+   * @return Table description
+   */
+  public TableDesc getTableDesc(final String tableName) throws ServiceException;
+
+  public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
new file mode 100644
index 0000000..496161d
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.jdbc.SQLStates;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.ServerCallable;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
+import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface;
+
+public class CatalogAdminClientImpl implements CatalogAdminClient {
+  private final SessionConnection connection;
+
+  public CatalogAdminClientImpl(SessionConnection connection) {
+    this.connection = connection;
+  }
+
+  @Override
+  public boolean createDatabase(final String databaseName) throws ServiceException {
+    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMaster = client.getStub();
+        return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public boolean existDatabase(final String databaseName) throws ServiceException {
+
+    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMaster = client.getStub();
+        return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public boolean dropDatabase(final String databaseName) throws ServiceException {
+
+    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public List<String> getAllDatabaseNames() throws ServiceException {
+
+    return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public List<String> call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
+      }
+
+    }.withRetries();
+  }
+
+  public boolean existTable(final String tableName) throws ServiceException {
+
+    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public TableDesc createExternalTable(String tableName, Schema schema, Path path, TableMeta meta)
+      throws SQLException, ServiceException {
+    return createExternalTable(tableName, schema, path, meta, null);
+  }
+
+  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+                                       final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
+      throws SQLException, ServiceException {
+
+    return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+
+        ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setName(tableName);
+        builder.setSchema(schema.getProto());
+        builder.setMeta(meta.getProto());
+        builder.setPath(path.toUri().toString());
+        if (partitionMethodDesc != null) {
+          builder.setPartition(partitionMethodDesc.getProto());
+        }
+        ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+          return CatalogUtil.newTableDesc(res.getTableDesc());
+        } else {
+          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+        }
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public boolean dropTable(String tableName) throws ServiceException {
+    return dropTable(tableName, false);
+  }
+
+  @Override
+  public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
+
+    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+
+        ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setName(tableName);
+        builder.setPurge(purge);
+        return tajoMasterService.dropTable(null, builder.build()).getValue();
+      }
+
+    }.withRetries();
+
+  }
+
+  @Override
+  public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
+    return new ServerCallable<List<String>>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public List<String> call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+
+        ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        if (databaseName != null) {
+          builder.setDatabaseName(databaseName);
+        }
+        ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
+        return res.getTablesList();
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public TableDesc getTableDesc(final String tableName) throws ServiceException {
+
+    return new ServerCallable<TableDesc>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+
+        ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setTableName(tableName);
+        ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
+        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+          return CatalogUtil.newTableDesc(res.getTableDesc());
+        } else {
+          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+        }
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
+
+    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.connPool,
+        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
+
+        connection.checkSessionAndGet(client);
+        BlockingInterface tajoMasterService = client.getStub();
+
+        String paramFunctionName = functionName == null ? "" : functionName;
+        ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+            connection.convertSessionedString(paramFunctionName));
+        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+          return res.getFunctionsList();
+        } else {
+          throw new SQLException(res.getErrorMessage());
+        }
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java b/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java
new file mode 100644
index 0000000..acbc33f
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/InvalidClientSessionException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+
+public class InvalidClientSessionException extends ServiceException {
+  public InvalidClientSessionException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
new file mode 100644
index 0000000..dbbafb6
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClient.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.TajoIdProtos.SessionIdProto;
+
+public interface QueryClient extends Closeable {
+
+  int UNKNOWN_ROW_NUMBER = -1;
+
+  public void setSessionId(SessionIdProto sessionId);
+
+  public boolean isConnected();
+
+  public SessionIdProto getSessionId();
+
+  public String getBaseDatabase();
+
+  @Override
+  public void close();
+
+  public UserGroupInformation getUserInfo();
+
+  /**
+   * Call to QueryMaster closing query resources
+   * @param queryId
+   */
+  public void closeQuery(final QueryId queryId);
+
+  public void closeNonForwardQuery(final QueryId queryId);
+
+  public String getCurrentDatabase() throws ServiceException;
+
+  public Boolean selectDatabase(final String databaseName) throws ServiceException;
+
+  public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException;
+
+  public Boolean unsetSessionVariables(final List<String> variables) throws ServiceException;
+
+  public String getSessionVariable(final String varname) throws ServiceException;
+
+  public Boolean existSessionVariable(final String varname) throws ServiceException;
+
+  public Map<String, String> getAllSessionVariables() throws ServiceException;
+
+  /**
+   * It submits a query statement and get a response immediately.
+   * The response only contains a query id, and submission status.
+   * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}.
+   */
+  public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException;
+
+  public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException;
+
+  /**
+   * It submits a query statement and get a response.
+   * The main difference from {@link #executeQuery(String)}
+   * is a blocking method. So, this method is wait for
+   * the finish of the submitted query.
+   *
+   * @return If failed, return null.
+   */
+  public ResultSet executeQueryAndGetResult(final String sql) throws ServiceException, IOException;
+
+  public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException;
+
+  public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException;
+
+  public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException;
+
+  public ResultSet createNullResultSet(QueryId queryId) throws IOException;
+
+  public ClientProtos.GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException;
+
+  public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException;
+
+  public boolean updateQuery(final String sql) throws ServiceException;
+
+  public boolean updateQueryWithJson(final String json) throws ServiceException;
+
+  public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException;
+
+  public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException;
+
+  public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException;
+
+  public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
new file mode 100644
index 0000000..235ce19
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -0,0 +1,622 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.ClientProtos.*;
+import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
+import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
+
+public class QueryClientImpl implements QueryClient {
+  private static final Log LOG = LogFactory.getLog(QueryClientImpl.class);
+  private final SessionConnection connection;
+
+  public QueryClientImpl(SessionConnection connection) {
+    this.connection = connection;
+  }
+
+  @Override
+  public void setSessionId(TajoIdProtos.SessionIdProto sessionId) {
+    connection.setSessionId(sessionId);
+  }
+
+  @Override
+  public boolean isConnected() {
+    return connection.isConnected();
+  }
+
+  @Override
+  public TajoIdProtos.SessionIdProto getSessionId() {
+    return connection.getSessionId();
+  }
+
+  @Override
+  public String getBaseDatabase() {
+    return connection.getBaseDatabase();
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public UserGroupInformation getUserInfo() {
+    return connection.getUserInfo();
+  }
+
+  @Override
+  public void closeQuery(QueryId queryId) {
+    if(connection.queryMasterMap.containsKey(queryId)) {
+      NettyClientBase qmClient = null;
+      try {
+        qmClient = connection.getConnection(queryId, QueryMasterClientProtocol.class, false);
+        QueryMasterClientProtocolService.BlockingInterface queryMaster = qmClient.getStub();
+        queryMaster.closeQuery(null, queryId.getProto());
+      } catch (Exception e) {
+        LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
+      } finally {
+        connection.connPool.closeConnection(qmClient);
+        connection.queryMasterMap.remove(queryId);
+      }
+    }
+  }
+
+  @Override
+  public void closeNonForwardQuery(QueryId queryId) {
+    NettyClientBase tmClient = null;
+    try {
+      tmClient = connection.getTajoMasterConnection(false);
+      TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub();
+      connection.checkSessionAndGet(tmClient);
+
+      ClientProtos.QueryIdRequest.Builder builder = ClientProtos.QueryIdRequest.newBuilder();
+
+      builder.setSessionId(getSessionId());
+      builder.setQueryId(queryId.getProto());
+      tajoMaster.closeNonForwardQuery(null, builder.build());
+    } catch (Exception e) {
+      LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
+    } finally {
+      connection.connPool.closeConnection(tmClient);
+    }
+  }
+
+  @Override
+  public String getCurrentDatabase() throws ServiceException {
+    return connection.getCurrentDatabase();
+  }
+
+  @Override
+  public Boolean selectDatabase(String databaseName) throws ServiceException {
+    return connection.selectDatabase(databaseName);
+  }
+
+  @Override
+  public Boolean updateSessionVariables(Map<String, String> variables) throws ServiceException {
+    return connection.updateSessionVariables(variables);
+  }
+
+  @Override
+  public Boolean unsetSessionVariables(List<String> variables) throws ServiceException {
+    return connection.unsetSessionVariables(variables);
+  }
+
+  @Override
+  public String getSessionVariable(String varname) throws ServiceException {
+    return getSessionVariable(varname);
+  }
+
+  @Override
+  public Boolean existSessionVariable(String varname) throws ServiceException {
+    return existSessionVariable(varname);
+  }
+
+  @Override
+  public Map<String, String> getAllSessionVariables() throws ServiceException {
+    return getAllSessionVariables();
+  }
+
+  @Override
+  public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
+
+    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+
+        final QueryRequest.Builder builder = QueryRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setQuery(sql);
+        builder.setIsJson(false);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+
+        return tajoMasterService.submitQuery(null, builder.build());
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
+
+    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+
+        final QueryRequest.Builder builder = QueryRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setQuery(json);
+        builder.setIsJson(true);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+
+        return tajoMasterService.submitQuery(null, builder.build());
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public ResultSet executeQueryAndGetResult(String sql) throws ServiceException, IOException {
+
+    ClientProtos.SubmitQueryResponse response = executeQuery(sql);
+
+    if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+      throw new ServiceException(response.getErrorTrace());
+    }
+
+    QueryId queryId = new QueryId(response.getQueryId());
+
+    if (response.getIsForwarded()) {
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+        return this.createNullResultSet(queryId);
+      } else {
+        return this.getQueryResultAndWait(queryId);
+      }
+
+    } else {
+      // If a non-forwarded insert into query
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() == 0) {
+        return this.createNullResultSet(queryId);
+      } else {
+        if (response.hasResultSet() || response.hasTableDesc()) {
+          return TajoClientUtil.createResultSet(connection.getConf() , this, response);
+        } else {
+          return this.createNullResultSet(queryId);
+        }
+      }
+    }
+  }
+
+  @Override
+  public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException {
+
+    ClientProtos.SubmitQueryResponse response = executeQueryWithJson(json);
+
+    if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+      throw new ServiceException(response.getErrorTrace());
+    }
+
+    QueryId queryId = new QueryId(response.getQueryId());
+
+    if (response.getIsForwarded()) {
+
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+        return this.createNullResultSet(queryId);
+      } else {
+        return this.getQueryResultAndWait(queryId);
+      }
+
+    } else {
+
+      if (response.hasResultSet() || response.hasTableDesc()) {
+        return TajoClientUtil.createResultSet(connection.getConf(), this, response);
+      } else {
+        return this.createNullResultSet(queryId);
+      }
+
+    }
+  }
+
+  private ResultSet getQueryResultAndWait(QueryId queryId) throws ServiceException, IOException {
+
+    if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+      return createNullResultSet(queryId);
+    }
+
+    QueryStatus status = getQueryStatus(queryId);
+
+    while(status != null && !TajoClientUtil.isQueryComplete(status.getState())) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      status = getQueryStatus(queryId);
+    }
+
+    if (status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+      if (status.hasResult()) {
+        return getQueryResult(queryId);
+      } else {
+        return createNullResultSet(queryId);
+      }
+
+    } else {
+      LOG.warn("Query (" + status.getQueryId() + ") failed: " + status.getState());
+
+      //TODO throw SQLException(?)
+      return createNullResultSet(queryId);
+    }
+  }
+
+  @Override
+  public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException {
+
+    ClientProtos.GetQueryStatusRequest.Builder builder = ClientProtos.GetQueryStatusRequest.newBuilder();
+    builder.setQueryId(queryId.getProto());
+
+    ClientProtos.GetQueryStatusResponse res = null;
+
+    if(connection.queryMasterMap.containsKey(queryId)) {
+      NettyClientBase qmClient = null;
+
+      try {
+
+        qmClient = connection.connPool.getConnection(connection.queryMasterMap.get(queryId),
+            QueryMasterClientProtocol.class, false);
+        QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
+        res = queryMasterService.getQueryStatus(null, builder.build());
+
+      } catch (Exception e) {
+        throw new ServiceException(e.getMessage(), e);
+      } finally {
+        connection.connPool.releaseConnection(qmClient);
+      }
+
+    } else {
+
+      NettyClientBase tmClient = null;
+
+      try {
+        tmClient = connection.getTajoMasterConnection(false);
+        connection.checkSessionAndGet(tmClient);
+        builder.setSessionId(connection.sessionId);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
+
+        res = tajoMasterService.getQueryStatus(null, builder.build());
+
+        String queryMasterHost = res.getQueryMasterHost();
+
+        if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
+          NettyClientBase qmClient = null;
+
+          try {
+
+            InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort());
+            qmClient = connection.connPool.getConnection(
+                qmAddr, QueryMasterClientProtocol.class, false);
+            QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
+            res = queryMasterService.getQueryStatus(null, builder.build());
+
+            connection.queryMasterMap.put(queryId, qmAddr);
+
+          } catch (Exception e) {
+            throw new ServiceException(e.getMessage(), e);
+          } finally {
+            connection.connPool.releaseConnection(qmClient);
+          }
+        }
+
+      } catch (Exception e) {
+        throw new ServiceException(e.getMessage(), e);
+      } finally {
+        connection.connPool.releaseConnection(tmClient);
+      }
+    }
+    return new QueryStatus(res);
+  }
+
+  @Override
+  public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException {
+
+    if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+      return createNullResultSet(queryId);
+    }
+
+    GetQueryResultResponse response = getResultResponse(queryId);
+    TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc());
+    connection.getConf().setVar(ConfVars.USERNAME, response.getTajoUserName());
+
+    return new TajoResultSet(this, queryId, connection.getConf(), tableDesc);
+  }
+
+  @Override
+  public ResultSet createNullResultSet(QueryId queryId) throws IOException {
+    return new TajoResultSet(this, queryId);
+  }
+
+  @Override
+  public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException {
+    if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+      return null;
+    }
+
+    NettyClientBase client = null;
+
+    try {
+
+      InetSocketAddress queryMasterAddr = connection.queryMasterMap.get(queryId);
+      if(queryMasterAddr == null) {
+        LOG.warn("No Connection to QueryMaster for " + queryId);
+        return null;
+      }
+
+      client = connection.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false);
+      QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
+
+      GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
+      builder.setQueryId(queryId.getProto());
+      GetQueryResultResponse response = queryMasterService.getQueryResult(null,builder.build());
+
+      return response;
+
+    } catch (Exception e) {
+      throw new ServiceException(e.getMessage(), e);
+    } finally {
+      connection.connPool.releaseConnection(client);
+    }
+  }
+
+  @Override
+  public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum)
+      throws ServiceException {
+
+    try {
+      ServerCallable<ClientProtos.SerializedResultSet> callable =
+          new ServerCallable<ClientProtos.SerializedResultSet>(connection.connPool, connection.getTajoMasterAddr(),
+              TajoMasterClientProtocol.class, false, true) {
+
+            public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
+
+              connection.checkSessionAndGet(client);
+              TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+              GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+              builder.setSessionId(connection.sessionId);
+              builder.setQueryId(queryId.getProto());
+              builder.setFetchRowNum(fetchRowNum);
+              try {
+                GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
+                if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+                  throw new ServiceException(response.getErrorTrace());
+                }
+
+                return response.getResultSet();
+              } catch (ServiceException e) {
+                abort();
+                throw e;
+              } catch (Throwable t) {
+                throw new ServiceException(t.getMessage(), t);
+              }
+            }
+          };
+
+      ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
+
+      return new TajoMemoryResultSet(
+          new Schema(serializedResultSet.getSchema()),
+          serializedResultSet.getSerializedTuplesList(),
+          serializedResultSet.getSerializedTuplesCount());
+    } catch (Exception e) {
+      throw new ServiceException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public boolean updateQuery(final String sql) throws ServiceException {
+
+    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        QueryRequest.Builder builder = QueryRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setQuery(sql);
+        builder.setIsJson(false);
+        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+
+        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+          return true;
+        } else {
+          if (response.hasErrorMessage()) {
+            System.err.println("ERROR: " + response.getErrorMessage());
+          }
+          return false;
+        }
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public boolean updateQueryWithJson(final String json) throws ServiceException {
+
+    return new ServerCallable<Boolean>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        QueryRequest.Builder builder = QueryRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        builder.setQuery(json);
+        builder.setIsJson(true);
+        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+          return true;
+        } else {
+          if (response.hasErrorMessage()) {
+            System.err.println("ERROR: " + response.getErrorMessage());
+          }
+          return false;
+        }
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException {
+
+    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
+        return res.getQueryListList();
+
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException {
+
+    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
+        return res.getQueryListList();
+
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException {
+
+    return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.connPool, connection.getTajoMasterAddr(),
+        TajoMasterClientProtocol.class, false, true) {
+
+      public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
+
+        connection.checkSessionAndGet(client);
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+        ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
+        builder.setSessionId(connection.sessionId);
+        ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
+        return res.getWorkerListList();
+      }
+
+    }.withRetries();
+  }
+
+  @Override
+  public QueryStatus killQuery(final QueryId queryId)
+      throws ServiceException, IOException {
+
+    QueryStatus status = getQueryStatus(queryId);
+
+    NettyClientBase tmClient = null;
+    try {
+      /* send a kill to the TM */
+      tmClient = connection.getTajoMasterConnection(false);
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
+
+      connection.checkSessionAndGet(tmClient);
+
+      ClientProtos.QueryIdRequest.Builder builder = ClientProtos.QueryIdRequest.newBuilder();
+      builder.setSessionId(connection.sessionId);
+      builder.setQueryId(queryId.getProto());
+      tajoMasterService.killQuery(null, builder.build());
+
+      long currentTimeMillis = System.currentTimeMillis();
+      long timeKillIssued = currentTimeMillis;
+      while ((currentTimeMillis < timeKillIssued + 10000L)
+          && ((status.getState() != TajoProtos.QueryState.QUERY_KILLED)
+          || (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT))) {
+        try {
+          Thread.sleep(100L);
+        } catch(InterruptedException ie) {
+          break;
+        }
+        currentTimeMillis = System.currentTimeMillis();
+        status = getQueryStatus(queryId);
+      }
+
+    } catch(Exception e) {
+      LOG.debug("Error when checking for application status", e);
+    } finally {
+      connection.connPool.releaseConnection(tmClient);
+    }
+    return status;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
new file mode 100644
index 0000000..42085a2
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.util.HAServiceUtil;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.ConnectTimeoutException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
+import static org.apache.tajo.ipc.ClientProtos.CreateSessionResponse;
+import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
+
+public class SessionConnection implements Closeable {
+
+  private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
+
+  private final TajoConf conf;
+
+  final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>();
+
+  final InetSocketAddress tajoMasterAddr;
+
+  final RpcConnectionPool connPool;
+
+  private final String baseDatabase;
+
+  private final UserGroupInformation userInfo;
+
+  volatile TajoIdProtos.SessionIdProto sessionId;
+
+  private AtomicBoolean closed = new AtomicBoolean(false);
+
+
+  public SessionConnection(TajoConf conf) throws IOException {
+    this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
+  }
+
+  public SessionConnection(TajoConf conf, @Nullable String baseDatabase) throws IOException {
+    this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
+  }
+
+  public SessionConnection(InetSocketAddress addr) throws IOException {
+    this(new TajoConf(), addr, null);
+  }
+
+  public SessionConnection(String hostname, int port, String baseDatabase) throws IOException {
+    this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase);
+  }
+
+  /**
+   * Connect to TajoMaster
+   *
+   * @param conf TajoConf
+   * @param addr TajoMaster address
+   * @param baseDatabase The base database name. It is case sensitive. If it is null,
+   *                     the 'default' database will be used.
+   * @throws java.io.IOException
+   */
+  public SessionConnection(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
+    this.conf = conf;
+    this.conf.set("tajo.disk.scheduler.report.interval", "0");
+    this.tajoMasterAddr = addr;
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
+    // Don't share connection pool per client
+    connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
+    userInfo = UserGroupInformation.getCurrentUser();
+    this.baseDatabase = baseDatabase != null ? baseDatabase : null;
+  }
+
+  public <T> T getStub(QueryId queryId, Class protocolClass, boolean asyncMode) throws NoSuchMethodException,
+      ConnectTimeoutException, ClassNotFoundException {
+    InetSocketAddress addr = queryMasterMap.get(queryId);
+    return connPool.getConnection(addr, protocolClass, asyncMode).getStub();
+  }
+
+  public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
+      ConnectTimeoutException, ClassNotFoundException {
+    return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
+  }
+
+  public NettyClientBase getConnection(QueryId queryId, Class protocolClass, boolean asyncMode)
+      throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
+    InetSocketAddress addr = queryMasterMap.get(queryId);
+    return connPool.getConnection(addr, protocolClass, asyncMode);
+  }
+
+  public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode)
+      throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
+    return connPool.getConnection(addr, protocolClass, asyncMode);
+  }
+
+  @SuppressWarnings("unused")
+  public void setSessionId(TajoIdProtos.SessionIdProto sessionId) {
+    this.sessionId = sessionId;
+  }
+
+  public TajoIdProtos.SessionIdProto getSessionId() {
+    return sessionId;
+  }
+
+  public String getBaseDatabase() {
+    return baseDatabase;
+  }
+
+  public boolean isConnected() {
+    if(!closed.get()){
+      try {
+        return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
+      } catch (Throwable e) {
+        return false;
+      }
+    }
+    return false;
+  }
+
+  public TajoConf getConf() {
+    return conf;
+  }
+
+  public UserGroupInformation getUserInfo() {
+    return userInfo;
+  }
+
+  public String getCurrentDatabase() throws ServiceException {
+    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public String call(NettyClientBase client) throws ServiceException {
+        checkSessionAndGet(client);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
+      }
+    }.withRetries();
+  }
+
+  public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        checkSessionAndGet(client);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        KeyValueSet keyValueSet = new KeyValueSet();
+        keyValueSet.putAll(variables);
+        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+            .setSessionId(sessionId)
+            .setSetVariables(keyValueSet.getProto()).build();
+
+        return tajoMasterService.updateSessionVariables(null, request).getValue();
+      }
+    }.withRetries();
+  }
+
+  public Boolean unsetSessionVariables(final List<String> variables)  throws ServiceException {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        checkSessionAndGet(client);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+            .setSessionId(sessionId)
+            .addAllUnsetVariables(variables).build();
+        return tajoMasterService.updateSessionVariables(null, request).getValue();
+      }
+    }.withRetries();
+  }
+
+  public String getSessionVariable(final String varname) throws ServiceException {
+    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public String call(NettyClientBase client) throws ServiceException {
+        checkSessionAndGet(client);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
+      }
+    }.withRetries();
+  }
+
+  public Boolean existSessionVariable(final String varname) throws ServiceException {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        checkSessionAndGet(client);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
+      }
+    }.withRetries();
+  }
+
+  public Map<String, String> getAllSessionVariables() throws ServiceException {
+    return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class,
+        false, true) {
+
+      public Map<String, String> call(NettyClientBase client) throws ServiceException {
+        checkSessionAndGet(client);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        KeyValueSet keyValueSet = new KeyValueSet(tajoMasterService.getAllSessionVariables(null, sessionId));
+        return keyValueSet.getAllKeyValus();
+      }
+    }.withRetries();
+  }
+
+  public Boolean selectDatabase(final String databaseName) throws ServiceException {
+    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+
+      public Boolean call(NettyClientBase client) throws ServiceException {
+        checkSessionAndGet(client);
+
+        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+        return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
+      }
+    }.withRetries();
+  }
+
+  @Override
+  public void close() {
+    if(closed.getAndSet(true)){
+      return;
+    }
+
+    // remove session
+    try {
+
+      NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+      TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
+      tajoMaster.removeSession(null, sessionId);
+
+    } catch (Throwable e) {
+    }
+
+    if(connPool != null) {
+      connPool.shutdown();
+    }
+
+    queryMasterMap.clear();
+  }
+
+  protected InetSocketAddress getTajoMasterAddr() {
+    if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      return tajoMasterAddr;
+    } else {
+      if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
+        return HAServiceUtil.getMasterClientAddress(conf);
+      } else {
+        return tajoMasterAddr;
+      }
+    }
+  }
+
+  protected void checkSessionAndGet(NettyClientBase client) throws ServiceException {
+
+    if (sessionId == null) {
+
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+      CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+      builder.setUsername(userInfo.getUserName()).build();
+
+      if (baseDatabase != null) {
+        builder.setBaseDatabaseName(baseDatabase);
+      }
+
+      CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
+
+      if (response.getState() == CreateSessionResponse.ResultState.SUCCESS) {
+
+        sessionId = response.getSessionId();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+        }
+
+      } else {
+        throw new InvalidClientSessionException(response.getMessage());
+      }
+    }
+  }
+
+  ClientProtos.SessionedStringProto convertSessionedString(String str) {
+    ClientProtos.SessionedStringProto.Builder builder = ClientProtos.SessionedStringProto.newBuilder();
+    builder.setSessionId(sessionId);
+    builder.setValue(str);
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 498402c..817a698 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -143,9 +143,9 @@ public class TajoAdmin {
       return;
     } else if (hostName != null && port != null) {
       tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
-      tajoClient = new TajoClient(tajoConf);
+      tajoClient = new TajoClientImpl(tajoConf);
     } else if (hostName == null && port == null) {
-      tajoClient = new TajoClient(tajoConf);
+      tajoClient = new TajoClientImpl(tajoConf);
     }
 
     switch (cmdType) {
@@ -196,7 +196,7 @@ public class TajoAdmin {
         long end = queryInfo.getFinishTime();
         long start = queryInfo.getStartTime();
         String executionTime = decimalF.format((end-start) / 1000) + " sec";
-        if (TajoClient.isInCompleteState(queryInfo.getState())) {
+        if (TajoClientUtil.isQueryComplete(queryInfo.getState())) {
           writer.write("Finished Time: " + df.format(queryInfo.getFinishTime()));
           writer.write("\n");
         }
@@ -434,7 +434,7 @@ public class TajoAdmin {
       }
       writer.write("\n");
     } else {
-      String confMasterServiceAddr = tajoClient.getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+      String confMasterServiceAddr = tajoConf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
       InetSocketAddress masterAddress = NetUtils.createSocketAddr(confMasterServiceAddr);
       writer.write(masterAddress.getHostName());
       writer.write("\n");


Mime
View raw message