tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [2/2] tajo git commit: TAJO-1563: Improve RPC error handling. (jinho)
Date Tue, 28 Apr 2015 09:28:33 GMT
TAJO-1563: Improve RPC error handling. (jinho)

Closes #554


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2e7d03df
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2e7d03df
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2e7d03df

Branch: refs/heads/master
Commit: 2e7d03dff9480ecb7a4e4ed09a5578ba618de2ae
Parents: 1971d85
Author: Jinho Kim <jhkim@apache.org>
Authored: Tue Apr 28 18:27:05 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Tue Apr 28 18:27:05 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../apache/tajo/client/SessionConnection.java   |  12 +-
 .../java/org/apache/tajo/master/TajoMaster.java |   9 +-
 .../tajo/master/rm/TajoResourceTracker.java     |   8 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  13 +-
 .../tajo/worker/ExecutionBlockContext.java      |   4 +-
 .../tajo/worker/TajoResourceAllocator.java      |   6 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   7 +
 .../java/org/apache/tajo/worker/TaskRunner.java |  14 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  19 +-
 .../tajo/ws/rs/resources/DatabasesResource.java |   8 +-
 .../responses/WorkerConnectionInfoResponse.java |  25 +-
 .../tajo/ws/rs/responses/WorkerResponse.java    |  47 ++-
 .../org/apache/tajo/rpc/RpcChannelFactory.java  |   6 +-
 .../java/org/apache/tajo/rpc/RpcConstants.java  |  32 ++
 .../org/apache/tajo/rpc/AsyncRpcClient.java     | 206 +++-------
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |  65 +--
 .../org/apache/tajo/rpc/BlockingRpcClient.java  | 214 ++++------
 .../org/apache/tajo/rpc/BlockingRpcServer.java  |  64 +--
 .../java/org/apache/tajo/rpc/CallFuture.java    |  32 +-
 .../apache/tajo/rpc/ChannelEventListener.java   |  39 ++
 .../tajo/rpc/ConnectionCloseFutureListener.java |  35 --
 .../apache/tajo/rpc/DefaultRpcController.java   |   2 +-
 .../apache/tajo/rpc/MonitorClientHandler.java   | 101 +++++
 .../apache/tajo/rpc/MonitorServerHandler.java   |  73 ++++
 .../org/apache/tajo/rpc/MonitorStateEvent.java  |  49 +++
 .../org/apache/tajo/rpc/NettyClientBase.java    | 303 ++++++++++++--
 .../tajo/rpc/ProtoChannelInitializer.java       |  57 ---
 .../tajo/rpc/ProtoClientChannelInitializer.java |  63 +++
 .../org/apache/tajo/rpc/ProtoDeclaration.java   |  24 ++
 .../tajo/rpc/ProtoServerChannelInitializer.java |  50 +++
 .../apache/tajo/rpc/RecoverableException.java   |  42 ++
 .../org/apache/tajo/rpc/RpcClientManager.java   | 123 ++++--
 .../src/main/proto/TestProtocol.proto           |   3 +-
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  | 350 +++++++++++++----
 .../org/apache/tajo/rpc/TestBlockingRpc.java    | 392 +++++++++++++------
 .../apache/tajo/rpc/TestRpcClientManager.java   | 155 ++++++--
 .../rpc/test/impl/DummyProtocolAsyncImpl.java   |  21 +-
 .../test/impl/DummyProtocolBlockingImpl.java    |  13 +-
 39 files changed, 1879 insertions(+), 809 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7b16e11..93fb092 100644
--- a/CHANGES
+++ b/CHANGES
@@ -22,6 +22,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1563: Improve RPC error handling. (jinho)
+
     TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)
 
     TAJO-1548: Refactoring condition code for CHAR into CatalogUtil.

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index d8152f4..be757af 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -31,15 +31,15 @@ import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.ProtoUtil;
 
-import io.netty.channel.ConnectTimeoutException;
-
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
@@ -87,6 +87,10 @@ public class SessionConnection implements Closeable {
     this.properties = properties;
 
     this.manager = RpcClientManager.getInstance();
+    this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
+    this.manager.setTimeoutSeconds(
+        properties.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 0)); // disable rpc timeout
+
     this.userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
 
@@ -98,12 +102,12 @@ public class SessionConnection implements Closeable {
   }
 
   public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
-      ConnectTimeoutException, ClassNotFoundException {
+      ConnectException, ClassNotFoundException {
     return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
   }
 
   public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode)
-      throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
+      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
     return manager.getClient(addr, protocolClass, asyncMode);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 0a5de58..d6ae49c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -44,6 +44,8 @@ import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.metrics.CatalogMetricsGaugeSet;
 import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.rule.EvaluationContext;
 import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
@@ -167,8 +169,12 @@ public class TajoMaster extends CompositeService {
     try {
       RackResolver.init(systemConf);
 
+      RpcClientManager rpcManager = RpcClientManager.getInstance();
+      rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
+      rpcManager.setTimeoutSeconds(
+          systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS));
+
       initResourceManager();
-      initWebServer();
 
       this.dispatcher = new AsyncDispatcher();
       addIfService(dispatcher);
@@ -328,6 +334,7 @@ public class TajoMaster extends CompositeService {
       LOG.error(e.getMessage(), e);
     }
 
+    initWebServer();
     initSystemMetrics();
 
     haService = ServiceTrackerFactory.get(systemConf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index ba021fc..4f3b66a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -76,7 +76,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   }
 
   @Override
-  public void serviceInit(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     if (!(conf instanceof TajoConf)) {
       throw new IllegalArgumentException("Configuration must be a TajoConf instance");
     }
@@ -98,17 +98,17 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     systemConf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
 
     LOG.info("TajoResourceTracker starts up (" + this.bindAddress + ")");
-    super.start();
+    super.serviceInit(conf);
   }
 
   @Override
-  public void serviceStop() {
+  public void serviceStop() throws Exception {
     // server can be null if some exception occurs before the rpc server starts up.
     if(server != null) {
       server.shutdown();
       server = null;
     }
-    super.stop();
+    super.serviceStop();
   }
 
   /** The response builder */

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 67dae06..6c5bd22 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -38,10 +38,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.NetUtils;
@@ -192,10 +189,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
         TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
 
         tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get());
-      } catch (RuntimeException e) {
-        LOG.warn("Ignoring RuntimeException. " + e.getMessage(), e);
-        continue;
       } catch (Exception e) {
+        LOG.warn("Ignoring exception. " + e.getMessage(), e);
         continue;
       }
     }
@@ -341,6 +336,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
+        future.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
       }  catch (Exception e) {
         //this function will be closed in new thread.
         //When tajo do stop cluster, tajo master maybe throw closed connection exception
@@ -441,9 +437,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
                   QueryCoordinatorProtocol.class, true);
               QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
 
-              CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
               TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
-              masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);
+              masterClientService.heartbeat(null, queryHeartbeat, NullCallback.get());
             } catch (Throwable t) {
               t.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index fcf787e..0d26e6c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import io.netty.channel.ConnectTimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +44,7 @@ import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -154,7 +154,7 @@ public class ExecutionBlockContext {
   }
 
   public NettyClientBase getQueryMasterConnection()
-      throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
+      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
     return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 7ba2ebc..05dd1a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -318,7 +318,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         tmClient = RpcClientManager.getInstance().
             getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
         QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
-        masterClientService.allocateWorkerResources(null, request, callBack);
+        masterClientService.allocateWorkerResources(callBack.getController(), request, callBack);
       } catch (Throwable e) {
         LOG.error(e.getMessage(), e);
       }
@@ -335,8 +335,12 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         } catch (TimeoutException e) {
           LOG.info("No available worker resource for " + event.getExecutionBlockId());
           continue;
+        } catch (ExecutionException e) {
+          LOG.error(e.getMessage(), e);
+          break;
         }
       }
+
       int numAllocatedContainers = 0;
 
       if(response != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 17af71a..79b83e4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -37,6 +37,8 @@ import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.function.FunctionSignature;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.service.TajoMasterInfo;
@@ -152,6 +154,11 @@ public class TajoWorker extends CompositeService {
     this.systemConf = (TajoConf)conf;
     RackResolver.init(systemConf);
 
+    RpcClientManager rpcManager = RpcClientManager.getInstance();
+    rpcManager.setRetries(systemConf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
+    rpcManager.setTimeoutSeconds(
+        systemConf.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS));
+
     serviceTracker = ServiceTrackerFactory.get(systemConf);
 
     this.workerContext = new WorkerContext();

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 642c914..6076913 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.worker;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,8 +37,7 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 
-import io.netty.channel.ConnectTimeoutException;
-
+import java.net.ConnectException;
 import java.util.concurrent.*;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -200,7 +198,7 @@ public class TaskRunner extends AbstractService {
             NettyClientBase client;
             try {
               client = executionBlockContext.getQueryMasterConnection();
-            } catch (ConnectTimeoutException ce) {
+            } catch (ConnectException ce) {
               // NettyClientBase throws ConnectTimeoutException if connection was failed
               stop();
               getContext().stopTaskRunner(getId());
@@ -238,17 +236,15 @@ public class TaskRunner extends AbstractService {
                 if(stopped) {
                   break;
                 }
-
-                if(callFuture.getController().failed()){
-                  LOG.error(callFuture.getController().errorText());
-                  break;
-                }
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
                 if (LOG.isDebugEnabled()) {
                   LOG.info("Retry assigning task:" + getId());
                 }
                 continue;
+              } catch (ExecutionException ee) {
+                LOG.error(ee.getMessage(), ee);
+                break;
               }
 
               if (taskRequest != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index f8d5fd9..9afee5a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -19,13 +19,12 @@
 package org.apache.tajo.worker;
 
 import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
@@ -187,17 +186,13 @@ public class WorkerHeartbeatService extends AbstractService {
           resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
 
           TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
-          if(response != null) {
-            ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
-            if(clusterResourceSummary.getNumWorkers() > 0) {
-              context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
-            }
-            context.setClusterResource(clusterResourceSummary);
-          } else {
-            if(callBack.getController().failed()) {
-              throw new ServiceException(callBack.getController().errorText());
-            }
+
+          QueryCoordinatorProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+          if(clusterResourceSummary.getNumWorkers() > 0) {
+            context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
           }
+          context.setClusterResource(clusterResourceSummary);
+
         } catch (InterruptedException e) {
           break;
         } catch (TimeoutException te) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
index 3868cab..b807198 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/DatabasesResource.java
@@ -254,14 +254,18 @@ public class DatabasesResource {
       if (selectedDatabase != null) {
         List<TablespaceProto> tablespacesList = catalogService.getAllTablespaces();
         TablespaceProto selectedTablespace = null;
-        
+
         for (TablespaceProto tablespace: tablespacesList) {
           if (tablespace.hasId() && tablespace.getId() == selectedDatabase.getSpaceId()) {
             selectedTablespace = tablespace;
             break;
           }
         }
-        
+
+        if(selectedTablespace ==  null) {
+          return ResourcesUtil.createExceptionResponse(LOG, "Tablespace not found.");
+        }
+
         DatabaseInfoResponse databaseInfo = new DatabaseInfoResponse();
         databaseInfo.setId(selectedDatabase.getId());
         databaseInfo.setName(selectedDatabase.getName());

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java
index 3c8cb8b..67e59ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerConnectionInfoResponse.java
@@ -52,16 +52,27 @@ public class WorkerConnectionInfoResponse {
     return id;
   }
 
-  public void setId(int id) {
-    this.id = id;
-  }
-
   public String getHost() {
     return host;
   }
 
-  public void setHost(String host) {
-    this.host = host;
+  public int getPeerRpcPort() {
+    return peerRpcPort;
+  }
+
+  public int getPullServerPort() {
+    return pullServerPort;
+  }
+
+  public int getQueryMasterPort() {
+    return queryMasterPort;
+  }
+
+  public int getClientPort() {
+    return clientPort;
+  }
+
+  public int getHttpInfoPort() {
+    return httpInfoPort;
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
index 84b81f8..ff0399c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/responses/WorkerResponse.java
@@ -69,8 +69,51 @@ public class WorkerResponse {
     return connectionInfo;
   }
 
-  public void setConnectionInfo(WorkerConnectionInfoResponse connectionInfo) {
-    this.connectionInfo = connectionInfo;
+  public int getCpuCoreSlots() {
+    return cpuCoreSlots;
   }
 
+  public int getMemoryMB() {
+    return memoryMB;
+  }
+
+  public float getUsedDiskSlots() {
+    return usedDiskSlots;
+  }
+
+  public int getUsedMemoryMB() {
+    return usedMemoryMB;
+  }
+
+  public int getUsedCpuCoreSlots() {
+    return usedCpuCoreSlots;
+  }
+
+  public long getMaxHeap() {
+    return maxHeap;
+  }
+
+  public long getFreeHeap() {
+    return freeHeap;
+  }
+
+  public long getTotalHeap() {
+    return totalHeap;
+  }
+
+  public int getNumRunningTasks() {
+    return numRunningTasks;
+  }
+
+  public int getNumQueryMasterTasks() {
+    return numQueryMasterTasks;
+  }
+
+  public long getLastHeartbeatTime() {
+    return lastHeartbeatTime;
+  }
+
+  public float getDiskSlots() {
+    return diskSlots;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index ed6b634..eb34ca2 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -19,13 +19,11 @@
 package org.apache.tajo.rpc;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.Map;
 import java.util.Queue;

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
new file mode 100644
index 0000000..165cd2a
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+public class RpcConstants {
+
+  public static final String PING_PACKET = "TAJO";
+  public static final String RPC_CLIENT_RETRY_MAX = "tajo.rpc.client.retry.max";
+  public static final String RPC_CLIENT_TIMEOUT_SECS = "tajo.rpc.client.timeout-secs";
+
+  public static final int DEFAULT_RPC_RETRIES = 3;
+  public static final int DEFAULT_RPC_TIMEOUT_SECONDS = 180;
+  public static final int DEFAULT_CONNECT_TIMEOUT = 60000;  // 60 sec
+  public static final int DEFAULT_PAUSE = 1000; // 1 sec
+  public static final int DEFAULT_FUTURE_TIMEOUT_SECONDS = 10;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 8f2c2a1..dd7d495 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -20,120 +20,97 @@ package org.apache.tajo.rpc;
 
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.*;
-import io.netty.channel.*;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.util.concurrent.GenericFutureListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import io.netty.channel.ChannelHandler;
 import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
 import java.lang.reflect.Method;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class AsyncRpcClient extends NettyClientBase {
-  private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
-
-  private final ConcurrentMap<Integer, ResponseCallback> requests =
-      new ConcurrentHashMap<Integer, ResponseCallback>();
+public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallback> {
 
   private final Method stubMethod;
   private final ProxyRpcChannel rpcChannel;
-  private final ClientChannelInboundHandler inboundHandler;
+  private final NettyChannelInboundHandler handler;
 
-  /**
-   * Intentionally make this method package-private, avoiding user directly
-   * new an instance through this constructor.
-   */
   AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
       throws ClassNotFoundException, NoSuchMethodException {
-    this(rpcConnectionKey, retries, 0);
+    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false);
   }
 
-  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds)
+  /**
+   * Intentionally make this method package-private, avoiding user directly
+   * new an instance through this constructor.
+   *
+   * @param rpcConnectionKey
+   * @param retries          retry operation number of times
+   * @param timeout          disable ping, it trigger timeout event on idle-state.
+   *                         otherwise it is request timeout on active-state
+   * @param timeUnit         TimeUnit
+   * @param enablePing       enable to detect remote peer hangs
+   * @throws ClassNotFoundException
+   * @throws NoSuchMethodException
+   */
+  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing)
       throws ClassNotFoundException, NoSuchMethodException {
     super(rpcConnectionKey, retries);
-    stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
-    rpcChannel = new ProxyRpcChannel();
-    inboundHandler = new ClientChannelInboundHandler();
-    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds));
+
+    this.stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
+    this.rpcChannel = new ProxyRpcChannel();
+    this.handler = new ClientChannelInboundHandler();
+    init(new ProtoClientChannelInitializer(handler,
+        RpcResponse.getDefaultInstance(),
+        timeUnit.toNanos(timeout),
+        enablePing));
   }
 
   @Override
-  public <T> T getStub() {
+  public <I> I getStub() {
     return getStub(stubMethod, rpcChannel);
   }
 
-  protected void sendExceptions(String message) {
-    for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
-      ResponseCallback callback = callbackEntry.getValue();
-      Integer id = callbackEntry.getKey();
-
-      RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
-          .setErrorMessage(message)
-          .setId(id);
-
-      callback.run(responseBuilder.build());
-    }
-  }
-
   @Override
-  public void close() {
-    sendExceptions("AsyncRpcClient terminates all the connections");
-
-    super.close();
+  protected NettyChannelInboundHandler getHandler() {
+    return handler;
   }
 
   private class ProxyRpcChannel implements RpcChannel {
 
+    private final AtomicInteger sequence = new AtomicInteger(0);
+
     public void callMethod(final MethodDescriptor method,
                            final RpcController controller,
                            final Message param,
                            final Message responseType,
-                           RpcCallback<Message> done) {
+                           final RpcCallback<Message> done) {
 
       int nextSeqId = sequence.getAndIncrement();
+      RpcProtos.RpcRequest rpcRequest = buildRequest(nextSeqId, method, param);
 
-      Message rpcRequest = buildRequest(nextSeqId, method, param);
-
-      inboundHandler.registerCallback(nextSeqId,
-          new ResponseCallback(controller, responseType, done));
-
-      ChannelPromise channelPromise = getChannel().newPromise();
-      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
-
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
-          }
-        }
-      });
-      getChannel().writeAndFlush(rpcRequest, channelPromise);
+      invoke(rpcRequest, new ResponseCallback(controller, responseType, done), 0);
     }
+  }
 
-    private Message buildRequest(int seqId,
-                                 MethodDescriptor method,
-                                 Message param) {
+  @ChannelHandler.Sharable
+  private class ClientChannelInboundHandler extends NettyChannelInboundHandler {
 
-      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
-          .setId(seqId)
-          .setMethodName(method.getName());
+    @Override
+    protected void run(RpcResponse response, ResponseCallback callback) throws Exception {
+      callback.run(response);
+    }
 
-      if (param != null) {
-          requestBuilder.setRequestMessage(param.toByteString());
-      }
+    @Override
+    protected void handleException(int requestId, ResponseCallback callback, String message) {
+      RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
+          .setErrorMessage(message + "")
+          .setId(requestId);
 
-      return requestBuilder.build();
+      callback.run(responseBuilder.build());
     }
   }
 
-  private class ResponseCallback implements RpcCallback<RpcResponse> {
+  static class ResponseCallback implements RpcCallback<RpcResponse> {
     private final RpcController controller;
     private final Message responsePrototype;
     private final RpcCallback<Message> callback;
@@ -149,88 +126,27 @@ public class AsyncRpcClient extends NettyClientBase {
     @Override
     public void run(RpcResponse rpcResponse) {
       // if hasErrorMessage is true, it means rpc-level errors.
-      // it does not call the callback function\
+      // it can be called the callback function with null response.
       if (rpcResponse.hasErrorMessage()) {
         if (controller != null) {
           this.controller.setFailed(rpcResponse.getErrorMessage());
         }
         callback.run(null);
       } else { // if rpc call succeed
-        try {
-          Message responseMessage;
-          if (!rpcResponse.hasResponseMessage()) {
-            responseMessage = null;
-          } else {
+
+        Message responseMessage = null;
+        if (rpcResponse.hasResponseMessage()) {
+
+          try {
             responseMessage = responsePrototype.newBuilderForType().mergeFrom(
                 rpcResponse.getResponseMessage()).build();
+          } catch (InvalidProtocolBufferException e) {
+            if (controller != null) {
+              this.controller.setFailed(e.getMessage());
+            }
           }
-
-          callback.run(responseMessage);
-
-        } catch (InvalidProtocolBufferException e) {
-          throw new RemoteException(getErrorMessage(""), e);
-        }
-      }
-    }
-  }
-
-  private String getErrorMessage(String message) {
-    return "Exception [" + protocol.getCanonicalName() +
-        "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-        getChannel().remoteAddress()) + ")]: " + message;
-  }
-
-  @ChannelHandler.Sharable
-  private class ClientChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> {
-
-    void registerCallback(int seqId, ResponseCallback callback) {
-
-      if (requests.putIfAbsent(seqId, callback) != null) {
-        throw new RemoteException(
-            getErrorMessage("Duplicate Sequence Id "+ seqId));
-      }
-    }
-
-    @Override
-    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
-      ResponseCallback callback = requests.remove(response.getId());
-
-      if (callback == null) {
-        LOG.warn("Dangling rpc call");
-      } else {
-        callback.run(response);
-      }
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-      super.channelActive(ctx);
-      LOG.info("Connection established successfully : " + ctx.channel().remoteAddress());
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause);
-
-      sendExceptions(cause.getMessage());
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.error(cause.getMessage(), cause);
-      } else {
-        LOG.error("RPC Exception:" + cause.getMessage());
-      }
-    }
-
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-      if (evt instanceof IdleStateEvent) {
-        IdleStateEvent e = (IdleStateEvent) evt;
-        /* If all requests is done and event is triggered, channel will be closed. */
-        if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) {
-          ctx.close();
-          LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress());
         }
+        callback.run(responseMessage);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index e4109fe..22f47b0 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -24,6 +24,7 @@ import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
 import io.netty.channel.*;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
@@ -52,7 +53,7 @@ public class AsyncRpcServer extends NettyServerBase {
     Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
     this.service = (Service) method.invoke(null, instance);
 
-    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+    this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
     super.init(this.initializer, workerNum);
   }
 
@@ -81,53 +82,67 @@ public class AsyncRpcServer extends NettyServerBase {
     protected void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) throws Exception {
 
       String methodName = request.getMethodName();
-      MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+      final MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
 
       if (methodDescriptor == null) {
-        throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+        exceptionCaught(ctx, new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)));
+        return;
       }
 
-      Message paramProto = null;
-      if (request.hasRequestMessage()) {
-        try {
+      try {
+        Message paramProto = null;
+        if (request.hasRequestMessage()) {
           paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
               .mergeFrom(request.getRequestMessage()).build();
-        } catch (Throwable t) {
-          throw new RemoteCallException(request.getId(), methodDescriptor, t);
         }
-      }
 
-      final RpcController controller = new NettyRpcController();
+        final RpcController controller = new NettyRpcController();
 
-      RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() {
+        final RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() {
 
-        public void run(Message returnValue) {
+          public void run(Message returnValue) {
 
-          RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+            RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
 
-          if (returnValue != null) {
-            builder.setResponseMessage(returnValue.toByteString());
-          }
+            if (returnValue != null) {
+              builder.setResponseMessage(returnValue.toByteString());
+            }
 
-          if (controller.failed()) {
-            builder.setErrorMessage(controller.errorText());
-          }
+            if (controller.failed()) {
+              builder.setErrorMessage(controller.errorText());
+            }
 
-          ctx.writeAndFlush(builder.build());
-        }
-      };
+            ctx.writeAndFlush(builder.build());
+          }
+        };
 
-      service.callMethod(methodDescriptor, controller, paramProto, callback);
+        service.callMethod(methodDescriptor, controller, paramProto, callback);
+      } catch (RemoteCallException e) {
+        exceptionCaught(ctx, e);
+      } catch (Throwable throwable) {
+        exceptionCaught(ctx, new RemoteCallException(request.getId(), methodDescriptor, throwable));
+      }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception{
+        throws Exception {
       if (cause instanceof RemoteCallException) {
         RemoteCallException callException = (RemoteCallException) cause;
         ctx.writeAndFlush(callException.getResponse());
+
+        if(LOG.isDebugEnabled()) {
+          Throwable rootCause = ExceptionUtils.getRootCause(cause);
+          LOG.error(ExceptionUtils.getMessage(rootCause), rootCause);
+        }
       } else {
-        LOG.error(cause.getMessage());
+        /* unhandled exception. */
+        if (ctx.channel().isOpen()) {
+          /* client can be triggered channelInactiveEvent */
+          ctx.close();
+        }
+        Throwable rootCause = ExceptionUtils.getRootCause(cause);
+        LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index ad536a4..349a0a0 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,71 +18,68 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.*;
 import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import io.netty.channel.*;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.util.concurrent.GenericFutureListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import io.netty.channel.ChannelHandler;
 import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
-import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
-import java.util.Map;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class BlockingRpcClient extends NettyClientBase {
-  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
-
-  private final Map<Integer, ProtoCallFuture> requests =
-      new ConcurrentHashMap<Integer, ProtoCallFuture>();
+public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCallFuture> {
 
   private final Method stubMethod;
   private final ProxyRpcChannel rpcChannel;
-  private final ChannelInboundHandlerAdapter inboundHandler;
+  private final NettyChannelInboundHandler handler;
 
-  /**
-   * Intentionally make this method package-private, avoiding user directly
-   * new an instance through this constructor.
-   */
   BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
       throws NoSuchMethodException, ClassNotFoundException {
-    this(rpcConnectionKey, retries, 0);
+    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false);
   }
 
-  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds)
-      throws ClassNotFoundException, NoSuchMethodException {
+  /**
+   * Intentionally make this method package-private, avoiding user directly
+   * new an instance through this constructor.
+   *
+   * @param rpcConnectionKey
+   * @param retries          retry operation number of times
+   * @param timeout          disable ping, it trigger timeout event on idle-state.
+   *                         otherwise it is request timeout on active-state
+   * @param timeUnit         TimeUnit
+   * @param enablePing       enable to detect remote peer hangs
+   * @throws ClassNotFoundException
+   * @throws NoSuchMethodException
+   */
+  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit,
+                    boolean enablePing) throws ClassNotFoundException, NoSuchMethodException {
     super(rpcConnectionKey, retries);
-    stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
-    rpcChannel = new ProxyRpcChannel();
-    inboundHandler = new ClientChannelInboundHandler();
-    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds));
+
+    this.stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
+    this.rpcChannel = new ProxyRpcChannel();
+    this.handler = new ClientChannelInboundHandler();
+    init(new ProtoClientChannelInitializer(handler,
+        RpcResponse.getDefaultInstance(),
+        timeUnit.toNanos(timeout),
+        enablePing));
   }
 
   @Override
-  public <T> T getStub() {
+  public <I> I getStub() {
     return getStub(stubMethod, rpcChannel);
   }
 
   @Override
-  public void close() {
-    for(ProtoCallFuture callback: requests.values()) {
-      callback.setFailed("BlockingRpcClient terminates all the connections",
-          new ServiceException("BlockingRpcClient terminates all the connections"));
-    }
-    requests.clear();
-    super.close();
+  protected NettyChannelInboundHandler getHandler() {
+    return handler;
   }
 
   private class ProxyRpcChannel implements BlockingRpcChannel {
 
+    private final AtomicInteger sequence = new AtomicInteger(0);
+
     @Override
     public Message callBlockingMethod(final MethodDescriptor method,
                                       final RpcController controller,
@@ -91,136 +88,66 @@ public class BlockingRpcClient extends NettyClientBase {
         throws TajoServiceException {
 
       int nextSeqId = sequence.getAndIncrement();
+      RpcProtos.RpcRequest rpcRequest = buildRequest(nextSeqId, method, param);
+      ProtoCallFuture callFuture = new ProtoCallFuture(controller, responsePrototype);
 
-      Message rpcRequest = buildRequest(nextSeqId, method, param);
-
-      ProtoCallFuture callFuture =
-          new ProtoCallFuture(controller, responsePrototype);
-      requests.put(nextSeqId, callFuture);
-
-      ChannelPromise channelPromise = getChannel().newPromise();
-      channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (!future.isSuccess()) {
-            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
-          }
-        }
-      });
-      getChannel().writeAndFlush(rpcRequest, channelPromise);
+      invoke(rpcRequest, callFuture, 0);
 
       try {
-        return callFuture.get(60, TimeUnit.SECONDS);
+        return callFuture.get();
       } catch (Throwable t) {
         if (t instanceof ExecutionException) {
           Throwable cause = t.getCause();
           if (cause != null && cause instanceof TajoServiceException) {
-            throw (TajoServiceException)cause;
+            throw (TajoServiceException) cause;
           }
         }
         throw new TajoServiceException(t.getMessage());
       }
     }
-
-    private Message buildRequest(int seqId,
-                                 MethodDescriptor method,
-                                 Message param) {
-      RpcRequest.Builder requestBuilder = RpcRequest.newBuilder()
-          .setId(seqId)
-          .setMethodName(method.getName());
-
-      if (param != null) {
-        requestBuilder.setRequestMessage(param.toByteString());
-      }
-
-      return requestBuilder.build();
-    }
-  }
-
-  private String getErrorMessage(String message) {
-    if(getChannel() != null) {
-      return protocol.getName() +
-          "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-          getChannel().remoteAddress()) + "): " + message;
-    } else {
-      return "Exception " + message;
-    }
   }
 
   private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
-    if(getChannel() != null) {
-      return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
-          RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
+    if (getChannel() != null) {
+      return new TajoServiceException(response.getErrorMessage(), cause, getKey().protocolClass.getName(),
+          RpcUtils.normalizeInetSocketAddress((InetSocketAddress) getChannel().remoteAddress()));
     } else {
       return new TajoServiceException(response.getErrorMessage());
     }
   }
 
   @ChannelHandler.Sharable
-  private class ClientChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> {
+  public class ClientChannelInboundHandler extends NettyChannelInboundHandler {
 
     @Override
-    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {
-      ProtoCallFuture callback = requests.remove(rpcResponse.getId());
-
-      if (callback == null) {
-        LOG.warn("Dangling rpc call");
+    protected void run(RpcResponse rpcResponse, ProtoCallFuture callback) throws Exception {
+      if (rpcResponse.hasErrorMessage()) {
+        callback.setFailed(rpcResponse.getErrorMessage(),
+            makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
       } else {
-        if (rpcResponse.hasErrorMessage()) {
-          callback.setFailed(rpcResponse.getErrorMessage(),
-              makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
-        } else {
-          Message responseMessage;
-
-          if (!rpcResponse.hasResponseMessage()) {
-            responseMessage = null;
-          } else {
+        Message responseMessage = null;
+
+        if (rpcResponse.hasResponseMessage()) {
+          try {
             responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
                 .build();
+          } catch (InvalidProtocolBufferException e) {
+            callback.setFailed(e.getMessage(), e);
           }
-
-          callback.setResponse(responseMessage);
         }
+        callback.setResponse(responseMessage);
       }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      /* Current requests will be failed */
-      for(ProtoCallFuture callback: requests.values()) {
-        callback.setFailed(cause.getMessage(), cause);
-      }
-      requests.clear();
-
-      if(LOG.isDebugEnabled()) {
-        LOG.error("" + cause.getMessage(), cause);
-      } else {
-        LOG.error("RPC Exception:" + cause.getMessage());
-      }
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-      super.channelActive(ctx);
-      LOG.info("Connection established successfully : " + ctx.channel().remoteAddress());
-    }
-
-    @Override
-    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-      if (evt instanceof IdleStateEvent) {
-        IdleStateEvent e = (IdleStateEvent) evt;
-        /* If all requests is done and event is triggered, channel will be closed. */
-        if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) {
-          ctx.close();
-          LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress());
-        }
-      }
+    protected void handleException(int requestId, ProtoCallFuture callback, String message) {
+      callback.setFailed(message + "", new TajoServiceException(message));
     }
   }
 
- static class ProtoCallFuture implements Future<Message> {
+  static class ProtoCallFuture implements Future<Message> {
     private Semaphore sem = new Semaphore(0);
+    private boolean done = false;
     private Message response = null;
     private Message returnType;
 
@@ -240,8 +167,9 @@ public class BlockingRpcClient extends NettyClientBase {
 
     @Override
     public Message get() throws InterruptedException, ExecutionException {
-      sem.acquire();
-      if(ee != null) {
+      if(!isDone()) sem.acquire();
+
+      if (ee != null) {
         throw ee;
       }
       return response;
@@ -250,14 +178,16 @@ public class BlockingRpcClient extends NettyClientBase {
     @Override
     public Message get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
-      if(sem.tryAcquire(timeout, unit)) {
-        if (ee != null) {
-          throw ee;
+      if(!isDone()) {
+        if (!sem.tryAcquire(timeout, unit)) {
+          throw new TimeoutException();
         }
-        return response;
-      } else {
-        throw new TimeoutException();
       }
+
+      if (ee != null) {
+        throw ee;
+      }
+      return response;
     }
 
     @Override
@@ -267,19 +197,21 @@ public class BlockingRpcClient extends NettyClientBase {
 
     @Override
     public boolean isDone() {
-      return sem.availablePermits() > 0;
+      return done;
     }
 
     public void setResponse(Message response) {
       this.response = response;
+      done = true;
       sem.release();
     }
 
     public void setFailed(String errorText, Throwable t) {
-      if(controller != null) {
+      if (controller != null) {
         this.controller.setFailed(errorText);
       }
       ee = new ExecutionException(errorText, t);
+      done = true;
       sem.release();
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index bb31367..93c28e3 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -23,10 +23,10 @@ import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
 import io.netty.channel.*;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
-import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -53,7 +53,7 @@ public class BlockingRpcServer extends NettyServerBase {
         "newReflectiveBlockingService", interfaceClass);
 
     this.service = (BlockingService) method.invoke(null, instance);
-    this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+    this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
 
     super.init(this.initializer, workerNum);
   }
@@ -80,43 +80,40 @@ public class BlockingRpcServer extends NettyServerBase {
     }
 
     @Override
-    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
+    protected void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) throws Exception {
 
       String methodName = request.getMethodName();
-      MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
-
+      final MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
       if (methodDescriptor == null) {
-        throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+        exceptionCaught(ctx, new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)));
+        return;
       }
-      Message paramProto = null;
-      if (request.hasRequestMessage()) {
-        try {
+
+      try {
+        Message paramProto = null;
+        if (request.hasRequestMessage()) {
           paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
               .mergeFrom(request.getRequestMessage()).build();
-
-        } catch (Throwable t) {
-          throw new RemoteCallException(request.getId(), methodDescriptor, t);
         }
-      }
-      Message returnValue;
-      RpcController controller = new NettyRpcController();
 
-      try {
-        returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
-      } catch (Throwable t) {
-        throw new RemoteCallException(request.getId(), methodDescriptor, t);
-      }
+        RpcController controller = new NettyRpcController();
+        Message returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
 
-      RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+        RpcProtos.RpcResponse.Builder builder = RpcProtos.RpcResponse.newBuilder().setId(request.getId());
 
-      if (returnValue != null) {
-        builder.setResponseMessage(returnValue.toByteString());
-      }
+        if (returnValue != null) {
+          builder.setResponseMessage(returnValue.toByteString());
+        }
 
-      if (controller.failed()) {
-        builder.setErrorMessage(controller.errorText());
+        if (controller.failed()) {
+          builder.setErrorMessage(controller.errorText());
+        }
+        ctx.writeAndFlush(builder.build());
+      } catch (RemoteCallException e) {
+        exceptionCaught(ctx, e);
+      } catch (Throwable throwable) {
+        exceptionCaught(ctx, new RemoteCallException(request.getId(), methodDescriptor, throwable));
       }
-      ctx.writeAndFlush(builder.build());
     }
 
     @Override
@@ -124,6 +121,19 @@ public class BlockingRpcServer extends NettyServerBase {
       if (cause instanceof RemoteCallException) {
         RemoteCallException callException = (RemoteCallException) cause;
         ctx.writeAndFlush(callException.getResponse());
+
+        if(LOG.isDebugEnabled()) {
+          Throwable rootCause = ExceptionUtils.getRootCause(cause);
+          LOG.debug(ExceptionUtils.getMessage(rootCause), rootCause);
+        }
+      } else {
+        /* unhandled exception. */
+        if (ctx.channel().isOpen()) {
+          /* client can be triggered channelInactiveEvent */
+          ctx.close();
+        }
+        Throwable rootCause = ExceptionUtils.getRootCause(cause);
+        LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
index c4c3256..dee0ff4 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java
@@ -20,11 +20,9 @@ package org.apache.tajo.rpc;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
 
 public class CallFuture<T> implements RpcCallback<T>, Future<T> {
 
@@ -66,19 +64,29 @@ public class CallFuture<T> implements RpcCallback<T>, Future<T> {
   }
 
   @Override
-  public T get() throws InterruptedException {
-    sem.acquire();
+  public T get() throws InterruptedException, ExecutionException {
+    if (!isDone())
+      sem.acquire();
 
+    throwIfFailed();
     return response;
   }
 
   @Override
-  public T get(long timeout, TimeUnit unit)
-      throws InterruptedException, TimeoutException {
-    if (sem.tryAcquire(timeout, unit)) {
-      return response;
-    } else {
-      throw new TimeoutException();
+  public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException {
+    if (!isDone()) {
+      if (!sem.tryAcquire(timeout, unit)) {
+        throw new TimeoutException();
+      }
+    }
+
+    throwIfFailed();
+    return response;
+  }
+
+  private void throwIfFailed() throws ExecutionException {
+    if (controller.failed()) {
+      throw new ExecutionException(new ServiceException(controller.errorText()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java
new file mode 100644
index 0000000..e0f2c0d
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ChannelEventListener.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.ChannelHandlerContext;
+
+/**
+ * Event listener for netty code. Users can subscribe events by using this interface.
+ */
+public interface ChannelEventListener {
+
+  /**
+   * Performs actions before channel open.
+   * @param ctx
+   */
+  void channelRegistered(ChannelHandlerContext ctx);
+  
+  /**
+   * Performs actions after channel close
+   * @param ctx
+   */
+  void channelUnregistered(ChannelHandlerContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
deleted file mode 100644
index 29c9772..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
-public class ConnectionCloseFutureListener implements GenericFutureListener {
-  private RpcClientManager.RpcConnectionKey key;
-
-  public ConnectionCloseFutureListener(RpcClientManager.RpcConnectionKey key) {
-    this.key = key;
-  }
-
-  @Override
-  public void operationComplete(Future future) throws Exception {
-    RpcClientManager.remove(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
index 4ba19a5..47125e4 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
@@ -28,7 +28,7 @@ public class DefaultRpcController implements RpcController {
 
   @Override
   public void reset() {
-    errorText = "";
+    errorText = null;
     error = false;
     canceled = false;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java
new file mode 100644
index 0000000..441c78a
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorClientHandler.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * MonitorClientHandler is a packet sender for detecting server hangs
+ * Triggers an {@link MonitorStateEvent} when a remote peer has not respond.
+ */
+
+public class MonitorClientHandler extends ChannelInboundHandlerAdapter {
+  private static final Log LOG = LogFactory.getLog(MonitorClientHandler.class);
+
+  private ByteBuf ping;
+  private boolean enableMonitor;
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    // Initialize the message.
+    ping = ctx.alloc().buffer(RpcConstants.PING_PACKET.length())
+        .writeBytes(RpcConstants.PING_PACKET.getBytes(Charset.defaultCharset()));
+    IdleStateHandler handler = ctx.pipeline().get(IdleStateHandler.class);
+    if(handler != null && handler.getWriterIdleTimeInMillis() > 0) {
+      enableMonitor = true;
+    }
+    super.channelActive(ctx);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    ping.release();
+    super.channelInactive(ctx);
+  }
+
+  private boolean isPing(Object msg) {
+    if(msg instanceof ByteBuf){
+      return ByteBufUtil.equals(ping.duplicate(), ((ByteBuf)msg).duplicate());
+    }
+    return false;
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    if(enableMonitor && isPing(msg)){
+      //ignore ping response
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("received ping " + ctx.channel());
+      }
+      ReferenceCountUtil.release(msg);
+    } else {
+      super.channelRead(ctx, msg);
+    }
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+    if (enableMonitor && evt instanceof IdleStateEvent) {
+      IdleStateEvent e = (IdleStateEvent) evt;
+      if (e.state() == IdleState.READER_IDLE && !e.isFirst()) {
+        /* trigger expired event */
+        LOG.info("Server has not respond " + ctx.channel());
+        ctx.fireUserEventTriggered(MonitorStateEvent.MONITOR_EXPIRED_STATE_EVENT);
+      } else if (e.state() == IdleState.WRITER_IDLE) {
+        /* send ping packet to remote server */
+        if(LOG.isDebugEnabled()){
+          LOG.debug("sending ping request " + ctx.channel());
+        }
+        ctx.writeAndFlush(ping.duplicate().retain());
+      }
+    }
+    super.userEventTriggered(ctx, evt);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java
new file mode 100644
index 0000000..e4333f4
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorServerHandler.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.nio.charset.Charset;
+
+/**
+ * MonitorServerHandler is a packet receiver for detecting server hangs
+ * Reply response when a remote peer sent a ping packet.
+ */
+
+public class MonitorServerHandler extends ChannelInboundHandlerAdapter {
+  private static final Log LOG = LogFactory.getLog(MonitorServerHandler.class);
+
+  private ByteBuf ping;
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
+    // Initialize the message.
+    ping = ctx.alloc().directBuffer(4).writeBytes(RpcConstants.PING_PACKET.getBytes(Charset.defaultCharset()));
+    super.channelActive(ctx);
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    ping.release();
+    super.channelInactive(ctx);
+  }
+
+  private boolean isPing(Object msg) {
+    if (msg instanceof ByteBuf) {
+      return ByteBufUtil.equals(ping.duplicate(), ((ByteBuf) msg).duplicate());
+    }
+
+    return false;
+  }
+
+  @Override
+  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+    if (isPing(msg)) {
+      /* reply to client */
+      if(LOG.isDebugEnabled()){
+        LOG.debug("reply to " + ctx.channel());
+      }
+      ctx.writeAndFlush(msg);
+    } else {
+      super.channelRead(ctx, msg);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java
new file mode 100644
index 0000000..294be3b
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/MonitorStateEvent.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+/**
+ * A user event triggered by {@link org.apache.tajo.rpc.MonitorStateEvent} when a ping packet is not receive.
+ */
+public final class MonitorStateEvent {
+  /**
+   * An {@link Enum} that represents the monitor state of a remote peer.
+   */
+  public enum MonitorState {
+    /**
+     * No ping was received.
+     */
+    PING_EXPIRED
+  }
+
+  public static final MonitorStateEvent MONITOR_EXPIRED_STATE_EVENT = new MonitorStateEvent(MonitorState.PING_EXPIRED);
+
+  private final MonitorState state;
+
+  private MonitorStateEvent(MonitorState state) {
+    this.state = state;
+  }
+
+  /**
+   * Returns the monitor state.
+   */
+  public MonitorState state() {
+    return state;
+  }
+}
\ No newline at end of file


Mime
View raw message