tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/6] tajo git commit: TAJO-1583: Remove ServerCallable in RPC client. (jinho)
Date Thu, 07 May 2015 16:36:00 GMT
Repository: tajo
Updated Branches:
  refs/heads/index_support 86c97b2a1 -> 42bcf2de0


http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index be757af..84decd5 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -32,19 +32,18 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
-import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
@@ -53,7 +52,7 @@ import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProto
 
 public class SessionConnection implements Closeable {
 
-  private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
+  private final static Log LOG = LogFactory.getLog(SessionConnection.class);
 
   final RpcClientManager manager;
 
@@ -70,6 +69,8 @@ public class SessionConnection implements Closeable {
 
   private ServiceTracker serviceTracker;
 
+  private NettyClientBase client;
+
   private KeyValueSet properties;
 
   /**
@@ -88,27 +89,34 @@ public class SessionConnection implements Closeable {
 
     this.manager = RpcClientManager.getInstance();
     this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
-    this.manager.setTimeoutSeconds(
-        properties.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 0)); // disable rpc timeout
-
     this.userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
 
     this.serviceTracker = tracker;
+    try {
+      this.client = getTajoMasterConnection();
+    } catch (ServiceException e) {
+      throw new IOException(e);
+    }
   }
 
   public Map<String, String> getClientSideSessionVars() {
     return Collections.unmodifiableMap(sessionVarsCache);
   }
 
-  public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
-      ConnectException, ClassNotFoundException {
-    return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
-  }
-
-  public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean
asyncMode)
-      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
-    return manager.getClient(addr, protocolClass, asyncMode);
+  public synchronized NettyClientBase getTajoMasterConnection() throws ServiceException {
+    if (client != null && client.isConnected()) return client;
+    else {
+      try {
+        RpcClientManager.cleanup(client);
+        // Client do not closed on idle state for support high available
+        this.client = manager.newClient(getTajoMasterAddr(), TajoMasterClientProtocol.class,
false,
+            manager.getRetries(), 0, TimeUnit.SECONDS, false);
+      } catch (Exception e) {
+        throw new ServiceException(e);
+      }
+      return client;
+    }
   }
 
   protected KeyValueSet getProperties() {
@@ -129,10 +137,9 @@ public class SessionConnection implements Closeable {
   }
 
   public boolean isConnected() {
-    if(!closed.get()){
+    if (!closed.get()) {
       try {
-        return manager.getClient(serviceTracker.getClientServiceAddress(),
-            TajoMasterClientProtocol.class, false).isConnected();
+        return getTajoMasterConnection().isConnected();
       } catch (Throwable e) {
         return false;
       }
@@ -145,64 +152,51 @@ public class SessionConnection implements Closeable {
   }
 
   public String getCurrentDatabase() throws ServiceException {
-    return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
-
-      public String call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
   }
 
   public Map<String, String> updateSessionVariables(final Map<String, String>
variables) throws ServiceException {
-    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Map<String, String> call(NettyClientBase client) throws ServiceException
{
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        KeyValueSet keyValueSet = new KeyValueSet();
-        keyValueSet.putAll(variables);
-        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-            .setSessionId(sessionId)
-            .setSessionVars(keyValueSet.getProto()).build();
-
-        SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
-
-        if (response.getResultCode() == ResultCode.OK) {
-          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-          return Collections.unmodifiableMap(sessionVarsCache);
-        } else {
-          throw new ServiceException(response.getMessage());
-        }
-      }
-    }.withRetries();
-  }
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-  public Map<String, String> unsetSessionVariables(final List<String> variables)
 throws ServiceException {
-    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
TajoMasterClientProtocol.class, false) {
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    KeyValueSet keyValueSet = new KeyValueSet();
+    keyValueSet.putAll(variables);
+    ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+        .setSessionId(sessionId)
+        .setSessionVars(keyValueSet.getProto()).build();
 
-      public Map<String, String> call(NettyClientBase client) throws ServiceException
{
-        checkSessionAndGet(client);
+    SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-            .setSessionId(sessionId)
-            .addAllUnsetVariables(variables).build();
+    if (response.getResultCode() == ResultCode.OK) {
+      updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+      return Collections.unmodifiableMap(sessionVarsCache);
+    } else {
+      throw new ServiceException(response.getMessage());
+    }
+  }
 
-        SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+  public Map<String, String> unsetSessionVariables(final List<String> variables)
throws ServiceException {
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        if (response.getResultCode() == ResultCode.OK) {
-          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-          return Collections.unmodifiableMap(sessionVarsCache);
-        } else {
-          throw new ServiceException(response.getMessage());
-        }
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+        .setSessionId(sessionId)
+        .addAllUnsetVariables(variables).build();
+
+    SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+
+    if (response.getResultCode() == ResultCode.OK) {
+      updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+      return Collections.unmodifiableMap(sessionVarsCache);
+    } else {
+      throw new ServiceException(response.getMessage());
+    }
   }
 
   void updateSessionVarsCache(Map<String, String> variables) {
@@ -213,35 +207,26 @@ public class SessionConnection implements Closeable {
   }
 
   public String getSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
-
-      public String call(NettyClientBase client) throws ServiceException {
-
-        synchronized (sessionVarsCache) {
-          // If a desired variable is client side one and exists in the cache, immediately
return the variable.
-          if (sessionVarsCache.containsKey(varname)) {
-            return sessionVarsCache.get(varname);
-          }
-        }
+    synchronized (sessionVarsCache) {
+      // If a desired variable is client side one and exists in the cache, immediately return
the variable.
+      if (sessionVarsCache.containsKey(varname)) {
+        return sessionVarsCache.get(varname);
+      }
+    }
 
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
   }
 
   public Boolean existSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
   }
 
   public Map<String, String> getCachedAllSessionVariables() {
@@ -251,29 +236,19 @@ public class SessionConnection implements Closeable {
   }
 
   public Map<String, String> getAllSessionVariables() throws ServiceException {
-    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
TajoMasterClientProtocol.class,
-        false) {
-
-      public Map<String, String> call(NettyClientBase client) throws ServiceException
{
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId));
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId));
   }
 
   public Boolean selectDatabase(final String databaseName) throws ServiceException {
-    Boolean selected = new ServerCallable<Boolean>(manager, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    boolean selected = tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
 
     if (selected) {
       this.baseDatabase = databaseName;
@@ -283,14 +258,14 @@ public class SessionConnection implements Closeable {
 
   @Override
   public void close() {
-    if(closed.getAndSet(true)){
+    if (closed.getAndSet(true)) {
       return;
     }
 
     // remove session
     NettyClientBase client = null;
     try {
-      client = manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+      client = getTajoMasterConnection();
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
       tajoMaster.removeSession(null, sessionId);
     } catch (Throwable e) {
@@ -333,55 +308,51 @@ public class SessionConnection implements Closeable {
   }
 
   public boolean reconnect() throws Exception {
-    return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class,
false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
-        builder.setUsername(userInfo.getUserName()).build();
-        if (baseDatabase != null) {
-          builder.setBaseDatabaseName(baseDatabase);
-        }
+    CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+    builder.setUsername(userInfo.getUserName()).build();
+    if (baseDatabase != null) {
+      builder.setBaseDatabaseName(baseDatabase);
+    }
 
+    NettyClientBase client = getTajoMasterConnection();
 
-        // create new session
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
-        if (response.getResultCode() != ResultCode.OK) {
-          return false;
-        }
+    // create new session
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
+    if (response.getResultCode() != ResultCode.OK) {
+      return false;
+    }
 
-        // Invalidate some session variables in client cache
-        sessionId = response.getSessionId();
-        Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
-        synchronized (sessionVarsCache) {
-          for (SessionVars var : UPDATE_ON_RECONNECT) {
-            String value = sessionVars.get(var.keyname());
-            if (value != null) {
-              sessionVarsCache.put(var.keyname(), value);
-            }
-          }
+    // Invalidate some session variables in client cache
+    sessionId = response.getSessionId();
+    Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+    synchronized (sessionVarsCache) {
+      for (SessionVars var : UPDATE_ON_RECONNECT) {
+        String value = sessionVars.get(var.keyname());
+        if (value != null) {
+          sessionVarsCache.put(var.keyname(), value);
         }
+      }
+    }
 
-        // Update the session variables in server side
-        try {
-          KeyValueSet keyValueSet = new KeyValueSet();
-          keyValueSet.putAll(sessionVarsCache);
-          ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-              .setSessionId(sessionId)
-              .setSessionVars(keyValueSet.getProto()).build();
-
-          if (tajoMasterService.updateSessionVariables(null, request).getResultCode() !=
ResultCode.OK) {
-            tajoMasterService.removeSession(null, sessionId);
-            return false;
-          }
-          LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(),
userInfo.getUserName()));
-          return true;
-        } catch (ServiceException e) {
-          tajoMasterService.removeSession(null, sessionId);
-          return false;
-        }
+    // Update the session variables in server side
+    try {
+      KeyValueSet keyValueSet = new KeyValueSet();
+      keyValueSet.putAll(sessionVarsCache);
+      ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+          .setSessionId(sessionId)
+          .setSessionVars(keyValueSet.getProto()).build();
+
+      if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK)
{
+        tajoMasterService.removeSession(null, sessionId);
+        return false;
       }
-    }.withRetries();
+      LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(),
userInfo.getUserName()));
+      return true;
+    } catch (ServiceException e) {
+      tajoMasterService.removeSession(null, sessionId);
+      return false;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
index df709c5..0bb11e0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
@@ -88,7 +88,6 @@ public class TestDefaultCliOutputFormatter {
     String multiLineMessage =
         "ERROR: java.sql.SQLException: ERROR: no such a table: table1\n" +
         "com.google.protobuf.ServiceException: java.sql.SQLException: ERROR: no such a table:
table1\n" +
-        "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:107)\n"
+
         "\tat org.apache.tajo.client.TajoClient.getTableDesc(TajoClient.java:777)\n" +
         "\tat org.apache.tajo.cli.tsql.commands.DescTableCommand.invoke(DescTableCommand.java:43)\n"
+
         "\tat org.apache.tajo.cli.tsql.TajoCli.executeMetaCommand(TajoCli.java:300)\n" +
@@ -96,9 +95,6 @@ public class TestDefaultCliOutputFormatter {
         "\tat org.apache.tajo.cli.tsql.TajoCli.runShell(TajoCli.java:271)\n" +
         "\tat org.apache.tajo.cli.tsql.TajoCli.main(TajoCli.java:420)\n" +
         "Caused by: java.sql.SQLException: ERROR: no such a table: table1\n" +
-        "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:791)\n" +
-        "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:778)\n" +
-        "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:97)\n" +
         "\t... 6 more";
 
     assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(multiLineMessage));

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index b2e1ce9..b1a27fa 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -34,10 +34,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.master.event.QueryEvent;
-import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.master.event.StageEvent;
-import org.apache.tajo.master.event.StageEventType;
+import org.apache.tajo.master.event.*;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -131,8 +128,7 @@ public class TestKillQuery {
     assertNotNull(stage);
 
     // fire kill event
-    Query q = queryMasterTask.getQuery();
-    q.handle(new QueryEvent(queryId, QueryEventType.KILL));
+    queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
 
     try {
       cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED,
50);
@@ -157,24 +153,55 @@ public class TestKillQuery {
   @Test
   public final void testIgnoreStageStateFromKilled() throws Exception {
 
-    ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
-    QueryId queryId = new QueryId(res.getQueryId());
-    cluster.waitForQuerySubmitted(queryId);
+    SQLAnalyzer analyzer = new SQLAnalyzer();
+    QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+    Session session = LocalTajoTestingUtility.createDummySession();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(queryStr);
+    LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext(conf);
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
 
-    QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
-    Query query = qmt.getQuery();
+    CountDownLatch barrier  = new CountDownLatch(1);
+    MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+
+    QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+    QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+        queryId, session, defaultContext, expr.toJson(), dispatch);
 
-    // wait for a stage created
-    cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
-    query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+    queryMasterTask.init(conf);
+    queryMasterTask.getQueryTaskContext().getDispatcher().start();
+    queryMasterTask.startQuery();
 
     try{
-      cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
-    } finally {
-      assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState());
+      barrier.await(5000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+    }
+
+    Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+    assertNotNull(stage);
+
+    // fire kill event
+    queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+    try {
+      cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED,
50);
+      assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+    }   finally {
+      queryMasterTask.stop();
     }
 
-    List<Stage> stages = Lists.newArrayList(query.getStages());
+    List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
     Stage lastStage = stages.get(stages.size() - 1);
 
     assertEquals(StageState.KILLED, lastStage.getSynchronizedState());

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 190beae..8f6f9ed 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -252,11 +252,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration,
Closeable
 
   @Override
   public void close() {
-    getHandler().sendExceptions(getClass().getSimpleName() + "terminates all the connections");
-
     Channel channel = getChannel();
     if (channel != null && channel.isOpen()) {
       LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+      /* channelInactive receives event and then client terminates all the requests */
       channel.close().syncUninterruptibly();
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
deleted file mode 100644
index 3c054ad..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-public class RetriesExhaustedException extends RuntimeException {
-  private static final long serialVersionUID = 1876775844L;
-
-  public RetriesExhaustedException(final String msg) {
-    super(msg);
-  }
-
-  public RetriesExhaustedException(final String msg, final IOException e) {
-    super(msg, e);
-  }
-
-  /**
-   * Datastructure that allows adding more info around Throwable incident.
-   */
-  public static class ThrowableWithExtraContext {
-    private final Throwable t;
-    private final long when;
-    private final String extras;
-
-    public ThrowableWithExtraContext(final Throwable t, final long when,
-        final String extras) {
-      this.t = t;
-      this.when = when;
-      this.extras = extras;
-    }
- 
-    @Override
-    public String toString() {
-      return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
-    }
-  }
-
-  /**
-   * Create a new RetriesExhaustedException from the list of prior failures.
-   * @param callableVitals Details from the {@link ServerCallable} we were using
-   * when we got this exception.
-   * @param numTries The number of tries we made
-   * @param exceptions List of exceptions that failed before giving up
-   */
-  public RetriesExhaustedException(final String callableVitals, int numTries,
-      List<Throwable> exceptions) {
-    super(getMessage(callableVitals, numTries, exceptions));
-  }
-
-  /**
-   * Create a new RetriesExhaustedException from the list of prior failures.
-   * @param numTries
-   * @param exceptions List of exceptions that failed before giving up
-   */
-  public RetriesExhaustedException(final int numTries,
-      final List<Throwable> exceptions) {
-    super(getMessage(numTries, exceptions));
-  }
-
-  private static String getMessage(String callableVitals, int numTries,
-      List<Throwable> exceptions) {
-    StringBuilder buffer = new StringBuilder("Failed contacting ");
-    buffer.append(callableVitals);
-    buffer.append(" after ");
-    buffer.append(numTries + 1);
-    buffer.append(" attempts.\nExceptions:\n");
-    for (Throwable t : exceptions) {
-      buffer.append(t.toString());
-      buffer.append("\n");
-    }
-    return buffer.toString();
-  }
-
-  private static String getMessage(final int numTries,
-      final List<Throwable> exceptions) {
-    StringBuilder buffer = new StringBuilder("Failed after attempts=");
-    buffer.append(numTries + 1);
-    buffer.append(", exceptions:\n");
-    for (Throwable t : exceptions) {
-      buffer.append(t.toString());
-      buffer.append("\n");
-    }
-    return buffer.toString();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
deleted file mode 100644
index 2804a03..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ServiceException;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-
-public abstract class ServerCallable<T> {
-  protected InetSocketAddress addr;
-  protected long startTime;
-  protected long endTime;
-  protected Class<?> protocol;
-  protected boolean asyncMode;
-  protected boolean closeConn;
-  protected RpcClientManager manager;
-
-  public abstract T call(NettyClientBase client) throws Exception;
-
-  public ServerCallable(RpcClientManager manager, InetSocketAddress addr, Class<?>
protocol,
-                        boolean asyncMode) {
-    this.manager = manager;
-    this.addr = addr;
-    this.protocol = protocol;
-    this.asyncMode = asyncMode;
-  }
-
-  public void beforeCall() {
-    this.startTime = System.currentTimeMillis();
-  }
-
-  public long getStartTime(){
-    return startTime;
-  }
-
-  public void afterCall() {
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public long getEndTime(){
-    return endTime;
-  }
-
-  boolean abort = false;
-  public void abort() {
-    abort = true;
-  }
-  /**
-   * Run this instance with retries, timed waits,
-   * and refinds of missing regions.
-   *
-   * @return an object of type T
-   * @throws com.google.protobuf.ServiceException if a remote or network exception occurs
-   */
-
-  public T withRetries() throws ServiceException {
-    //TODO configurable
-    final long pause = 500; //ms
-    final int numRetries = 3;
-
-    for (int tries = 0; tries < numRetries; tries++) {
-      NettyClientBase client = null;
-      try {
-        beforeCall();
-        if(addr != null) {
-          client = manager.getClient(addr, protocol, asyncMode);
-        }
-        return call(client);
-      } catch (IOException ioe) {
-        if(abort) {
-          throw new ServiceException(ioe.getMessage(), ioe);
-        }
-        if (tries == numRetries - 1) {
-          throw new ServiceException("Giving up after tries=" + tries, ioe);
-        }
-      } catch (Throwable t) {
-        throw new ServiceException(t);
-      } finally {
-        afterCall();
-        if(closeConn) {
-          RpcClientManager.cleanup(client);
-        }
-      }
-      try {
-        Thread.sleep(pause * (tries + 1));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceException("Giving up after tries=" + tries, e);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Run this instance against the server once.
-   * @return an object of type T
-   * @throws java.io.IOException if a remote or network exception occurs
-   * @throws RuntimeException other unspecified error
-   */
-  public T withoutRetries() throws IOException, RuntimeException {
-    NettyClientBase client = null;
-    try {
-      beforeCall();
-      client = manager.getClient(addr, protocol, asyncMode);
-      return call(client);
-    } catch (Throwable t) {
-      Throwable t2 = translateException(t);
-      if (t2 instanceof IOException) {
-        throw (IOException)t2;
-      } else {
-        throw new RuntimeException(t2);
-      }
-    } finally {
-      afterCall();
-      if(closeConn) {
-        RpcClientManager.cleanup(client);
-      }
-    }
-  }
-
-  private static Throwable translateException(Throwable t) throws IOException {
-    if (t instanceof UndeclaredThrowableException) {
-      t = t.getCause();
-    }
-    if (t instanceof RemoteException && t.getCause() != null) {
-      t = t.getCause();
-    }
-    return t;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 6f7fdd1..c86db80 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -170,45 +170,6 @@ public class TestBlockingRpc {
   }
 
   @Test
-  @SetupRpcConnection(setupRpcClient = false)
-  @Deprecated // serverCallable will be remove
-  public void testRpcWithServiceCallable() throws Exception {
-    RpcClientManager manager = RpcClientManager.getInstance();
-    final SumRequest request = SumRequest.newBuilder()
-        .setX1(1)
-        .setX2(2)
-        .setX3(3.15d)
-        .setX4(2.0f).build();
-
-    SumResponse response =
-        new ServerCallable<SumResponse>(manager,
-            server.getListenAddress(), DummyProtocol.class, false) {
-          @Override
-          public SumResponse call(NettyClientBase client) throws Exception {
-            BlockingInterface stub2 = client.getStub();
-            SumResponse response1 = stub2.sum(null, request);
-            return response1;
-          }
-        }.withRetries();
-
-    assertEquals(8.15d, response.getResult(), 1e-15);
-
-    response =
-        new ServerCallable<SumResponse>(manager,
-            server.getListenAddress(), DummyProtocol.class, false) {
-          @Override
-          public SumResponse call(NettyClientBase client) throws Exception {
-            BlockingInterface stub2 = client.getStub();
-            SumResponse response1 = stub2.sum(null, request);
-            return response1;
-          }
-        }.withoutRetries();
-
-    assertTrue(8.15d == response.getResult());
-    RpcClientManager.close();
-  }
-
-  @Test
   public void testThrowException() throws Exception {
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();


Mime
View raw message