tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-646: TajoClient is blocked while main thread finished. (hyoungjunkim via jinho)
Date Tue, 04 Mar 2014 13:16:54 GMT
Repository: incubator-tajo
Updated Branches:
  refs/heads/branch-0.8.0 0c5b7702a -> c2a59d78f


TAJO-646: TajoClient is blocked while main thread finished. (hyoungjunkim via jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c2a59d78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c2a59d78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c2a59d78

Branch: refs/heads/branch-0.8.0
Commit: c2a59d78f318be8424ee69a1ba9ba295c91b3796
Parents: 0c5b770
Author: jinossy <jinossy@gmail.com>
Authored: Tue Mar 4 22:15:14 2014 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Tue Mar 4 22:15:14 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../tajo/catalog/AbstractCatalogClient.java     | 36 ++++++++++----------
 .../java/org/apache/tajo/client/TajoClient.java | 25 +++++++-------
 .../org/apache/tajo/jdbc/TajoResultSet.java     | 33 +++++++++++++++---
 .../java/org/apache/tajo/jdbc/TajoDriver.java   |  3 +-
 .../apache/tajo/jdbc/TajoPreparedStatement.java |  5 ++-
 .../org/apache/tajo/jdbc/TajoStatement.java     |  6 +++-
 .../org/apache/tajo/rpc/ServerCallable.java     | 29 ++++++----------
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  6 ++--
 9 files changed, 84 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 38bda26..b5aedba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -269,6 +269,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-646: TajoClient is blocked while main thread finished. 
+    (hyoungjunkim via jinho)
+
     TAJO-645: Task.Reporter can cause NPE during reporting. (hyunsik)
 
     TAJO-630: QueryMasterTask never finished when Internal error occurs.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 1a7e54c..1932db5 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -59,7 +59,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final TableDesc getTableDesc(final String name) {
     try {
-      return new ServerCallable<TableDesc>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<TableDesc>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public TableDesc call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return CatalogUtil.newTableDesc(stub.getTableDesc(null, StringProto.newBuilder().setValue(name).build()));
@@ -74,7 +74,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final PartitionMethodDesc getPartitionMethod(final String tableName) {
     try {
-      return new ServerCallable<PartitionMethodDesc>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<PartitionMethodDesc>(this.pool, catalogServerAddr,
CatalogProtocol.class, false) {
         public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null,
@@ -90,7 +90,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existPartitionMethod(final String tableId) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.existPartitionMethod(null, StringProto.newBuilder().
@@ -106,7 +106,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final Collection<String> getAllTableNames() {
     try {
-      return new ServerCallable<Collection<String>>(conf, catalogServerAddr,
CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<String>>(this.pool, catalogServerAddr,
CatalogProtocol.class, false) {
         public Collection<String> call(NettyClientBase client) throws ServiceException
{
           List<String> protos = new ArrayList<String>();
           GetAllTableNamesResponse response;
@@ -128,7 +128,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final Collection<FunctionDesc> getFunctions() {
     try {
-      return new ServerCallable<Collection<FunctionDesc>>(conf, catalogServerAddr,
CatalogProtocol.class, false) {
+      return new ServerCallable<Collection<FunctionDesc>>(this.pool, catalogServerAddr,
CatalogProtocol.class, false) {
         public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException
{
           List<FunctionDesc> list = new ArrayList<FunctionDesc>();
           GetFunctionsResponse response;
@@ -155,7 +155,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final boolean addTable(final TableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.addTable(null, desc.getProto()).getValue();
@@ -170,7 +170,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final boolean deleteTable(final String name) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.deleteTable(null,
@@ -186,7 +186,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final boolean existsTable(final String tableId) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub
@@ -203,7 +203,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final boolean addIndex(final IndexDesc index) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.addIndex(null, index.getProto()).getValue();
@@ -218,7 +218,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final boolean existIndex(final String indexName) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.existIndexByName(null, StringProto.newBuilder().
@@ -234,7 +234,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public boolean existIndex(final String tableName, final String columnName) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           GetIndexRequest.Builder builder = GetIndexRequest.newBuilder();
           builder.setTableName(tableName);
@@ -252,7 +252,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final IndexDesc getIndex(final String indexName) {
     try {
-      return new ServerCallable<IndexDesc>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<IndexDesc>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public IndexDesc call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return new IndexDesc(
@@ -269,7 +269,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final IndexDesc getIndex(final String tableName, final String columnName) {
     try {
-      return new ServerCallable<IndexDesc>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<IndexDesc>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public IndexDesc call(NettyClientBase client) throws ServiceException {
           GetIndexRequest.Builder builder = GetIndexRequest.newBuilder();
           builder.setTableName(tableName);
@@ -288,7 +288,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public boolean deleteIndex(final String indexName) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.delIndex(null,
@@ -304,7 +304,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.createFunction(null, funcDesc.getProto()).getValue();
@@ -319,7 +319,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
   @Override
   public final boolean dropFunction(final String signature) {
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
           builder.setSignature(signature);
@@ -352,7 +352,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
 
     FunctionDescProto descProto = null;
     try {
-      descProto = new ServerCallable<FunctionDescProto>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      descProto = new ServerCallable<FunctionDescProto>(this.pool, catalogServerAddr,
CatalogProtocol.class, false) {
         public FunctionDescProto call(NettyClientBase client) throws ServiceException {
           try {
             CatalogProtocolService.BlockingInterface stub = getStub(client);
@@ -402,7 +402,7 @@ public abstract class AbstractCatalogClient implements CatalogService
{
     }
 
     try {
-      return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class,
false) {
+      return new ServerCallable<Boolean>(this.pool, catalogServerAddr, CatalogProtocol.class,
false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
           return stub.containFunction(null, builder.build()).getValue();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index d9c511e..7ee4694 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -118,7 +118,7 @@ public class TajoClient {
   }
 
   public ExplainQueryResponse explainQuery(final String sql) throws ServiceException {
-    return new ServerCallable<ExplainQueryResponse>(conf, tajoMasterAddr,
+    return new ServerCallable<ExplainQueryResponse>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public ExplainQueryResponse call(NettyClientBase client) throws ServiceException {
         final ExplainQueryRequest.Builder builder = ExplainQueryRequest.newBuilder();
@@ -137,7 +137,7 @@ public class TajoClient {
    * or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
    */
   public GetQueryStatusResponse executeQuery(final String sql) throws ServiceException {
-    return new ServerCallable<GetQueryStatusResponse>(conf, tajoMasterAddr,
+    return new ServerCallable<GetQueryStatusResponse>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public GetQueryStatusResponse call(NettyClientBase client) throws ServiceException
{
         final QueryRequest.Builder builder = QueryRequest.newBuilder();
@@ -159,12 +159,11 @@ public class TajoClient {
    */
   public ResultSet executeQueryAndGetResult(final String sql)
       throws ServiceException, IOException {
-    GetQueryStatusResponse response = new ServerCallable<GetQueryStatusResponse>(conf,
tajoMasterAddr,
+    GetQueryStatusResponse response = new ServerCallable<GetQueryStatusResponse>(connPool,
tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public GetQueryStatusResponse call(NettyClientBase client) throws ServiceException
{
         final QueryRequest.Builder builder = QueryRequest.newBuilder();
         builder.setQuery(sql);
-
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
         return tajoMasterService.submitQuery(null, builder.build());
       }
@@ -313,7 +312,7 @@ public class TajoClient {
   }
 
   public boolean updateQuery(final String sql) throws ServiceException {
-    return new ServerCallable<Boolean>(conf, tajoMasterAddr,
+    return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         QueryRequest.Builder builder = QueryRequest.newBuilder();
@@ -342,7 +341,7 @@ public class TajoClient {
    * @throws ServiceException
    */
   public boolean existTable(final String name) throws ServiceException {
-    return new ServerCallable<Boolean>(conf, tajoMasterAddr,
+    return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         StringProto.Builder builder = StringProto.newBuilder();
@@ -356,7 +355,7 @@ public class TajoClient {
 
   public TableDesc createExternalTable(final String name, final Schema schema, final Path
path, final TableMeta meta)
       throws SQLException, ServiceException {
-    return new ServerCallable<TableDesc>(conf, tajoMasterAddr,
+    return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException
{
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -387,7 +386,7 @@ public class TajoClient {
    * @throws ServiceException
    */
   public boolean dropTable(final String tableName, final boolean purge) throws ServiceException
{
-    return new ServerCallable<Boolean>(conf, tajoMasterAddr,
+    return new ServerCallable<Boolean>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public Boolean call(NettyClientBase client) throws ServiceException {
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -402,7 +401,7 @@ public class TajoClient {
   }
 
   public List<BriefQueryInfo> getQueryList() throws ServiceException {
-    return new ServerCallable<List<BriefQueryInfo>>(conf, tajoMasterAddr,
+    return new ServerCallable<List<BriefQueryInfo>>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException
{
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -415,7 +414,7 @@ public class TajoClient {
   }
 
   public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
-    return new ServerCallable<List<WorkerResourceInfo>>(conf, tajoMasterAddr,
+    return new ServerCallable<List<WorkerResourceInfo>>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException
{
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -432,7 +431,7 @@ public class TajoClient {
    * represented as lower-case letters.
    */
   public List<String> getTableList() throws ServiceException {
-    return new ServerCallable<List<String>>(conf, tajoMasterAddr,
+    return new ServerCallable<List<String>>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public List<String> call(NettyClientBase client) throws ServiceException {
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -445,7 +444,7 @@ public class TajoClient {
   }
 
   public TableDesc getTableDesc(final String tableName) throws SQLException, ServiceException
{
-    return new ServerCallable<TableDesc>(conf, tajoMasterAddr,
+    return new ServerCallable<TableDesc>(connPool, tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException
{
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -495,7 +494,7 @@ public class TajoClient {
   }
 
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName)
throws ServiceException {
-    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(conf, tajoMasterAddr,
+    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool,
tajoMasterAddr,
         TajoMasterClientProtocol.class, false, true) {
       public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws
ServiceException, SQLException {
         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
index 942107c..d0a41c0 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
@@ -39,12 +39,16 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TajoResultSet extends TajoResultSetBase {
   private FileSystem fs;
   private Scanner scanner;
   private TajoClient tajoClient;
-  QueryId queryId;
+  private TajoConf conf;
+  private TableDesc desc;
+  private QueryId queryId;
+  private AtomicBoolean closed = new AtomicBoolean(false);
 
   public TajoResultSet(TajoClient tajoClient, QueryId queryId) {
     this.tajoClient = tajoClient;
@@ -56,6 +60,14 @@ public class TajoResultSet extends TajoResultSetBase {
                        TajoConf conf, TableDesc desc) throws IOException {
     this.tajoClient = tajoClient;
     this.queryId = queryId;
+    this.conf = conf;
+    this.desc = desc;
+
+    initScanner();
+    init();
+  }
+
+  private void initScanner() throws IOException {
     if(desc != null) {
       this.schema = desc.getSchema();
 
@@ -65,7 +77,6 @@ public class TajoResultSet extends TajoResultSetBase {
       List<FileFragment> frags = getFragments(desc.getPath());
       scanner = new MergeScanner(conf, schema, desc.getMeta(), frags);
     }
-    init();
   }
 
   @Override
@@ -111,7 +122,11 @@ public class TajoResultSet extends TajoResultSetBase {
   }
 
   @Override
-  public void close() throws SQLException {
+  public synchronized void close() throws SQLException {
+    if (closed.getAndSet(true)) {
+      return;
+    }
+
     try {
       if(tajoClient != null) {
         this.tajoClient.closeQuery(queryId);
@@ -136,6 +151,8 @@ public class TajoResultSet extends TajoResultSetBase {
     try {
       if(scanner != null) {
         scanner.reset();
+      } else {
+        initScanner();
       }
       init();
     } catch (IOException e) {
@@ -143,13 +160,19 @@ public class TajoResultSet extends TajoResultSetBase {
     }
   }
 
-
   @Override
   protected Tuple nextTuple() throws IOException {
     if(scanner == null) {
       return null;
     }
-    return scanner.next();
+    Tuple tuple = scanner.next();
+    if (tuple == null) {
+      //query is closed automatically by querymaster but scanner is not
+      scanner.close();
+      scanner = null;
+    }
+
+    return tuple;
   }
 
   public boolean hasResult() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java
index 03b45a6..c537eb5 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDriver.java
@@ -33,7 +33,7 @@ public class TajoDriver implements Driver, Closeable {
 
   public static final String TAJO_JDBC_URL_PREFIX = "jdbc:tajo://";
 
-  protected static TajoConf jdbcTajoConf;
+  protected static TajoConf jdbcTajoConf = new TajoConf();
 
   static {
     try {
@@ -45,7 +45,6 @@ public class TajoDriver implements Driver, Closeable {
   }
 
   public TajoDriver() {
-    jdbcTajoConf = new TajoConf();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
index 6fdda0d..e2b47c1 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
@@ -50,7 +50,7 @@ public class TajoPreparedStatement implements PreparedStatement {
   /**
    * Add SQLWarnings to the warningChain if needed.
    */
-  private  SQLWarning warningChain = null;
+  //private  SQLWarning warningChain = null;
 
   /**
    * Keep state so we can fail certain calls made after close().
@@ -449,7 +449,6 @@ public class TajoPreparedStatement implements PreparedStatement {
 
   @Override
   public void clearWarnings() throws SQLException {
-     warningChain=null;
   }
 
   public void closeOnCompletion() throws SQLException {
@@ -588,7 +587,7 @@ public class TajoPreparedStatement implements PreparedStatement {
 
   @Override
   public SQLWarning getWarnings() throws SQLException {
-    return warningChain;
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index b3afcdb..b7604a8 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -69,6 +69,9 @@ public class TajoStatement implements Statement {
 
   @Override
   public void close() throws SQLException {
+    if (resultSet != null) {
+      resultSet.close();
+    }
     resultSet = null;
     isClosed = true;
   }
@@ -112,7 +115,8 @@ public class TajoStatement implements Statement {
     }
 
     try {
-      return tajoClient.executeQueryAndGetResult(sql);
+      resultSet = tajoClient.executeQueryAndGetResult(sql);
+      return resultSet;
     } catch (Exception e) {
       throw new SQLFeatureNotSupportedException(e.getMessage(), e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index 3f724e8..f2efa4b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -36,16 +36,17 @@ public abstract class ServerCallable<T> {
   protected Class protocol;
   protected boolean asyncMode;
   protected boolean closeConn;
+  protected RpcConnectionPool connPool;
 
   public abstract T call(NettyClientBase client) throws Exception;
 
-  public ServerCallable(TajoConf conf, InetSocketAddress addr, Class protocol, boolean asyncMode)
{
-    this(conf, addr, protocol, asyncMode, false);
+  public ServerCallable(RpcConnectionPool connPool,  InetSocketAddress addr, Class protocol,
boolean asyncMode) {
+    this(connPool, addr, protocol, asyncMode, false);
   }
 
-  public ServerCallable(TajoConf conf, InetSocketAddress addr, Class protocol,
+  public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol,
                         boolean asyncMode, boolean closeConn) {
-    this.tajoConf = conf;
+    this.connPool = connPool;
     this.addr = addr;
     this.protocol = protocol;
     this.asyncMode = asyncMode;
@@ -91,14 +92,10 @@ public abstract class ServerCallable<T> {
       try {
         beforeCall();
         if(addr != null) {
-          client = RpcConnectionPool.getPool(tajoConf).getConnection(addr, protocol, asyncMode);
+          client = connPool.getConnection(addr, protocol, asyncMode);
         }
         return call(client);
       } catch (IOException ioe) {
-        if(!closeConn) {
-          RpcConnectionPool.getPool(tajoConf).closeConnection(client);
-          client = null;
-        }
         exceptions.add(ioe);
         if(abort) {
           throw new ServiceException(ioe.getMessage(), ioe);
@@ -111,9 +108,9 @@ public abstract class ServerCallable<T> {
       } finally {
         afterCall();
         if(closeConn) {
-          RpcConnectionPool.getPool(tajoConf).closeConnection(client);
+          connPool.closeConnection(client);
         } else {
-          RpcConnectionPool.getPool(tajoConf).releaseConnection(client);
+          connPool.releaseConnection(client);
         }
       }
       try {
@@ -137,13 +134,9 @@ public abstract class ServerCallable<T> {
     NettyClientBase client = null;
     try {
       beforeCall();
-      client = RpcConnectionPool.getPool(tajoConf).getConnection(addr, protocol, asyncMode);
+      client = connPool.getConnection(addr, protocol, asyncMode);
       return call(client);
     } catch (Throwable t) {
-      if(!closeConn) {
-        RpcConnectionPool.getPool(tajoConf).closeConnection(client);
-        client = null;
-      }
       Throwable t2 = translateException(t);
       if (t2 instanceof IOException) {
         throw (IOException)t2;
@@ -153,9 +146,9 @@ public abstract class ServerCallable<T> {
     } finally {
       afterCall();
       if(closeConn) {
-        RpcConnectionPool.getPool(tajoConf).closeConnection(client);
+        connPool.closeConnection(client);
       } else {
-        RpcConnectionPool.getPool(tajoConf).releaseConnection(client);
+        connPool.releaseConnection(client);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c2a59d78/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index ba2b919..dedd96e 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -90,7 +90,8 @@ public class TestBlockingRpc {
         .setX4(2.0f).build();
 
     SumResponse response =
-    new ServerCallable<SumResponse>(new TajoConf(), server.getListenAddress(), DummyProtocol.class,
false) {
+    new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(),
2),
+        server.getListenAddress(), DummyProtocol.class, false) {
       @Override
       public SumResponse call(NettyClientBase client) throws Exception {
         BlockingInterface stub2 = client.getStub();
@@ -102,7 +103,8 @@ public class TestBlockingRpc {
     assertTrue(8.15d == response.getResult());
 
     response =
-        new ServerCallable<SumResponse>(new TajoConf(), server.getListenAddress(),
DummyProtocol.class, false) {
+        new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(),
2),
+            server.getListenAddress(), DummyProtocol.class, false) {
           @Override
           public SumResponse call(NettyClientBase client) throws Exception {
             BlockingInterface stub2 = client.getStub();


Mime
View raw message