tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
Date Wed, 06 May 2015 09:14:37 GMT
Repository: tajo
Updated Branches:
  refs/heads/master b6b9d4631 -> 04167bdc3


TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.

Closes #559


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/04167bdc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/04167bdc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/04167bdc

Branch: refs/heads/master
Commit: 04167bdc3bb04b53c5a245a9c18b6426ade82a26
Parents: b6b9d46
Author: Jinho Kim <jhkim@apache.org>
Authored: Wed May 6 18:13:40 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Wed May 6 18:13:40 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 -
 .../org/apache/tajo/master/QueryInProgress.java |  31 ++---
 .../querymaster/QueryMasterManagerService.java  | 135 ++++++++-----------
 .../tajo/worker/ExecutionBlockContext.java      |  32 +++--
 .../java/org/apache/tajo/worker/TajoWorker.java |   1 +
 .../tajo/worker/TajoWorkerManagerService.java   |   2 +
 .../main/java/org/apache/tajo/worker/Task.java  |   4 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  43 +++---
 .../src/main/proto/QueryMasterProtocol.proto    |  14 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |   7 +-
 .../org/apache/tajo/rpc/RpcClientManager.java   |   9 ++
 12 files changed, 133 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a790655..ebd88cd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
+    (jinho)
+
     TAJO-1563: Improve RPC error handling. (jinho)
 
     TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index bfba290..46e7618 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -224,8 +224,6 @@ public class TajoConf extends Configuration {
     HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()),
 
     // RPC --------------------------------------------------------------------
-    RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
-
     //  Internal RPC Client
     INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
         Runtime.getRuntime().availableProcessors() * 2),

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index d2286cf..6a074a2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -31,14 +32,13 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
 import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -92,7 +92,9 @@ public class QueryInProgress {
     try {
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
       if (queryMasterRpcClient != null) {
-        queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+        CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+        queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture);
+        callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
       }
     } catch (Throwable e) {
       catchException("Failed to kill query " + queryId + " by exception " + e, e);
@@ -111,9 +113,7 @@ public class QueryInProgress {
 
     masterContext.getResourceManager().releaseQueryMaster(queryId);
 
-    if(queryMasterRpc != null) {
-      RpcClientManager.cleanup(queryMasterRpc);
-    }
+    RpcClientManager.cleanup(queryMasterRpc);
 
     try {
       masterContext.getHistoryWriter().appendAndFlush(queryInfo);
@@ -156,8 +156,9 @@ public class QueryInProgress {
   private void connectQueryMaster() throws Exception {
     InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
-    queryMasterRpc =
-        RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true);
+
+    RpcClientManager.cleanup(queryMasterRpc);
+    queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class,
true);
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 
@@ -177,11 +178,7 @@ public class QueryInProgress {
       if(queryMasterRpcClient == null) {
         connectQueryMaster();
       }
-      if(queryMasterRpcClient == null) {
-        LOG.info("No QueryMaster connection info.");
-        //TODO wait
-        return;
-      }
+
       LOG.info("Call executeQuery to :" +
           queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
 
@@ -192,11 +189,15 @@ public class QueryInProgress {
           .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
           .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
 
-      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+      CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+      queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture);
+      callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
       querySubmitted.set(true);
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
     } catch (Exception e) {
       LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e);
+      catchException(e.getMessage(), e);
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 85cc553..59933a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -115,10 +115,6 @@ public class QueryMasterManagerService extends CompositeService
     return bindAddr;
   }
 
-  public String getHostAndPort() {
-    return bindAddr.getHostName() + ":" + bindAddr.getPort();
-  }
-
   @Override
   public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
                       RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
@@ -136,127 +132,106 @@ public class QueryMasterManagerService extends CompositeService
       }
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
+      controller.setFailed(e.getMessage());
     }
   }
 
   @Override
   public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
-      TaskAttemptId attemptId = new TaskAttemptId(request.getId());
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
-      if (queryMasterTask == null) {
-        queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
-      }
-      Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
-      Task task = sq.getTask(attemptId.getTaskId());
-      TaskAttempt attempt = task.getAttempt(attemptId.getId());
+                           RpcCallback<PrimitiveProtos.NullProto> done) {
+    QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
+    TaskAttemptId attemptId = new TaskAttemptId(request.getId());
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+    if (queryMasterTask == null) {
+      queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+    }
+    Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
+    Task task = sq.getTask(attemptId.getTaskId());
+    TaskAttempt attempt = task.getAttempt(attemptId.getId());
 
-      if(LOG.isDebugEnabled()){
-        LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(),
attempt.getState().name()));
-      }
+    if(LOG.isDebugEnabled()){
+      LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(),
attempt.getState().name()));
+    }
 
-      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
-        LOG.warn(attemptId + " Killed");
-        attempt.handle(
-            new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
-      } else {
-        queryMasterTask.getEventHandler().handle(
-            new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
+    if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+      LOG.warn(attemptId + " Killed");
+      attempt.handle(
+          new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+    } else {
+      queryMasterTask.getEventHandler().handle(
+          new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
     }
+
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void ping(RpcController controller,
                    TajoIdProtos.ExecutionBlockIdProto requestProto,
-                   RpcCallback<PrimitiveProtos.BoolProto> done) {
-    done.run(TajoWorker.TRUE_PROTO);
+                   RpcCallback<PrimitiveProtos.NullProto> done) {
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport
report,
-                         RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
-      if (queryMasterTask != null) {
-        queryMasterTask.handleTaskFailed(report);
-      } else {
-        LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
+                         RpcCallback<PrimitiveProtos.NullProto> done) {
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+        new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+    if (queryMasterTask != null) {
+      queryMasterTask.handleTaskFailed(report);
+    } else {
+      LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
     }
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
-                   RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
-      if (queryMasterTask != null) {
-        queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
+                   RpcCallback<PrimitiveProtos.NullProto> done) {
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+        new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+    if (queryMasterTask != null) {
+      queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
     }
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void doneExecutionBlock(
       RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request,
-      RpcCallback<PrimitiveProtos.BoolProto> done) {
+      RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
     if (queryMasterTask != null) {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
       queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
     }
-    done.run(TajoWorker.TRUE_PROTO);
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
-                        RpcCallback<PrimitiveProtos.BoolProto> done) {
+                        RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryId queryId = new QueryId(request);
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
     if (queryMasterTask != null) {
-      Query query = queryMasterTask.getQuery();
-      if (query != null) {
-        query.handle(new QueryEvent(queryId, QueryEventType.KILL));
-      }
+      queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
     }
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void executeQuery(RpcController controller,
                            TajoWorkerProtocol.QueryExecutionRequestProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
-
-      QueryId queryId = new QueryId(request.getQueryId());
-      LOG.info("Receive executeQuery request:" + queryId);
-      queryMaster.handle(new QueryStartEvent(queryId,
-          new Session(request.getSession()),
-          new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
-              request.getQueryContext()), request.getExprInJson().getValue(),
-          request.getLogicalPlanJson().getValue()));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
+                           RpcCallback<PrimitiveProtos.NullProto> done) {
+    workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
+    QueryId queryId = new QueryId(request.getQueryId());
+    LOG.info("Receive executeQuery request:" + queryId);
+    queryMaster.handle(new QueryStartEvent(queryId,
+        new Session(request.getSession()),
+        new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
+            request.getQueryContext()), request.getExprInJson().getValue(),
+        request.getLogicalPlanJson().getValue()));
+    done.run(TajoWorker.NULL_PROTO);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 0d26e6c..cd4b6a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
 
 public class ExecutionBlockContext {
   /** class logger */
@@ -78,6 +79,8 @@ public class ExecutionBlockContext {
   private TajoQueryEngine queryEngine;
   private RpcClientManager connManager;
   private InetSocketAddress qmMasterAddr;
+  private NettyClientBase client;
+  private QueryMasterProtocol.QueryMasterProtocolService.Interface stub;
   private WorkerConnectionInfo queryMaster;
   private TajoConf systemConf;
   // for the doAs block
@@ -132,16 +135,14 @@ public class ExecutionBlockContext {
 
     // initialize DFS and LocalFileSystems
     this.taskOwner = taskOwner;
+    this.stub = getRpcClient().getStub();
     this.reporter.startReporter();
-
     // resource intiailization
     try{
       this.resource.initialize(queryContext, plan);
     } catch (Throwable e) {
       try {
-        NettyClientBase client = getQueryMasterConnection();
-        QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
-        stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+        getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
       } catch (Throwable t) {
         //ignore
       }
@@ -153,9 +154,20 @@ public class ExecutionBlockContext {
     return resource;
   }
 
-  public NettyClientBase getQueryMasterConnection()
+  private NettyClientBase getRpcClient()
       throws NoSuchMethodException, ConnectException, ClassNotFoundException {
-    return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true);
+    if (client != null) return client;
+
+    client = connManager.newClient(qmMasterAddr, QueryMasterProtocol.class, true);
+    return client;
+  }
+
+  public Interface getStub() {
+    return stub;
+  }
+
+  public boolean isStopped() {
+    return stop.get();
   }
 
   public void stop(){
@@ -184,6 +196,7 @@ public class ExecutionBlockContext {
     tasks.clear();
 
     resource.release();
+    RpcClientManager.cleanup(client);
   }
 
   public TajoConf getConf() {
@@ -282,8 +295,7 @@ public class ExecutionBlockContext {
     /* This case is that worker did not ran tasks */
     if(completedTasksNum.get() == 0) return;
 
-    NettyClientBase client = getQueryMasterConnection();
-    QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+    Interface stub = getStub();
 
     ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
     reporterBuilder.setEbId(ebId.getProto());
@@ -379,10 +391,8 @@ public class ExecutionBlockContext {
         public void run() {
           while (!reporterStop.get() && !Thread.interrupted()) {
 
-            NettyClientBase client = null;
             try {
-              client = getQueryMasterConnection();
-              QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();
+              Interface masterStub = getStub();
 
               if(tasks.size() == 0){
                 masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 79b83e4..b666f80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -77,6 +77,7 @@ import static org.apache.tajo.conf.TajoConf.ConfVars;
 public class TajoWorker extends CompositeService {
   public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
   public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+  public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build();
 
   private static final Log LOG = LogFactory.getLog(TajoWorker.class);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 71d96c4..bbf8564 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -127,6 +127,7 @@ public class TajoWorkerManagerService extends CompositeService
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
+      controller.setFailed(t.getMessage());
       done.run(TajoWorker.FALSE_PROTO);
     }
   }
@@ -142,6 +143,7 @@ public class TajoWorkerManagerService extends CompositeService
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
+      controller.setFailed(e.getMessage());
       done.run(TajoWorker.FALSE_PROTO);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index a983f78..53ed73e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -440,9 +440,7 @@ public class Task {
       executionBlockContext.completedTasksNum.incrementAndGet();
       context.getHashShuffleAppenderManager().finalizeTask(taskId);
 
-      NettyClientBase client = executionBlockContext.getQueryMasterConnection();
-
-      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
+      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
       if (context.isStopped()) {
         context.setExecutorProgress(0.0f);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 6076913..31f25f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -194,24 +194,8 @@ public class TaskRunner extends AbstractService {
           CallFuture<TaskRequestProto> callFuture = null;
           TaskRequestProto taskRequest = null;
 
-          while(!stopped) {
-            NettyClientBase client;
-            try {
-              client = executionBlockContext.getQueryMasterConnection();
-            } catch (ConnectException ce) {
-              // NettyClientBase throws ConnectTimeoutException if connection was failed
-              stop();
-              getContext().stopTaskRunner(getId());
-              LOG.error("Connecting to QueryMaster was failed.", ce);
-              break;
-            } catch (Throwable t) {
-              LOG.fatal("Unable to handle exception: " + t.getMessage(), t);
-              stop();
-              getContext().stopTaskRunner(getId());
-              break;
-            }
-
-            QueryMasterProtocolService.Interface qmClientService = client.getStub();
+          while(!stopped && !executionBlockContext.isStopped()) {
+            QueryMasterProtocolService.Interface qmClientService = executionBlockContext.getStub();
 
             try {
               if (callFuture == null) {
@@ -243,8 +227,12 @@ public class TaskRunner extends AbstractService {
                 }
                 continue;
               } catch (ExecutionException ee) {
-                LOG.error(ee.getMessage(), ee);
-                break;
+                if(!getContext().isStopped()){
+                  LOG.error(ee.getMessage(), ee);
+                } else {
+                  /* EB is stopped */
+                  break;
+                }
               }
 
               if (taskRequest != null) {
@@ -253,9 +241,6 @@ public class TaskRunner extends AbstractService {
                 // immediately.
                 if (taskRequest.getShouldDie()) {
                   LOG.info("Received ShouldDie flag:" + getId());
-                  stop();
-                  //notify to TaskRunnerManager
-                  getContext().stopTaskRunner(getId());
                 } else {
                   getContext().getWorkerContext().getWorkerSystemMetrics().counter("query",
"task").inc();
                   LOG.info("Accumulated Received Task: " + (++receivedNum));
@@ -268,7 +253,7 @@ public class TaskRunner extends AbstractService {
                   }
 
                   LOG.info("Initializing: " + taskAttemptId);
-                  Task task;
+                  Task task = null;
                   try {
                     task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
                         new TaskRequestImpl(taskRequest));
@@ -283,20 +268,22 @@ public class TaskRunner extends AbstractService {
                   } catch (Throwable t) {
                     LOG.error(t.getMessage(), t);
                     fatalError(qmClientService, taskAttemptId, t.getMessage());
+                    if(task != null) {
+                      task.cleanupTask();
+                    }
                   } finally {
                     callFuture = null;
                     taskRequest = null;
                   }
                 }
-              } else {
-                stop();
-                //notify to TaskRunnerManager
-                getContext().stopTaskRunner(getId());
               }
             } catch (Throwable t) {
               LOG.fatal(t.getMessage(), t);
             }
           }
+          stop();
+          //notify to TaskRunnerManager
+          getContext().stopTaskRunner(getId());
         }
       });
       taskLauncher.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index ae20309..855c2c6 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -34,13 +34,13 @@ package hadoop.yarn;
 service QueryMasterProtocolService {
   //from Worker
   rpc getTask(GetTaskRequestProto) returns (TaskRequestProto);
-  rpc statusUpdate (TaskStatusProto) returns (BoolProto);
-  rpc ping (ExecutionBlockIdProto) returns (BoolProto);
-  rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
-  rpc done (TaskCompletionReport) returns (BoolProto);
-  rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);
+  rpc statusUpdate (TaskStatusProto) returns (NullProto);
+  rpc ping (ExecutionBlockIdProto) returns (NullProto);
+  rpc fatalError(TaskFatalErrorReport) returns (NullProto);
+  rpc done (TaskCompletionReport) returns (NullProto);
+  rpc doneExecutionBlock(ExecutionBlockReport) returns (NullProto);
 
   //from TajoMaster's QueryJobManager
-  rpc killQuery(QueryIdProto) returns (BoolProto);
-  rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+  rpc killQuery(QueryIdProto) returns (NullProto);
+  rpc executeQuery(QueryExecutionRequestProto) returns (NullProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 8f6f9ed..0d86527 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -195,7 +195,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration,
Closeable
       if (maxRetries > retries) {
         retries++;
 
-        LOG.warn(future.cause() + " Try to reconnect : " + getKey().addr);
+        LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + " Try to reconnect
: " + getKey().addr);
         try {
           Thread.sleep(RpcConstants.DEFAULT_PAUSE);
         } catch (InterruptedException e) {
@@ -246,8 +246,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration,
Closeable
 
   private String getErrorMessage(String message) {
     return "Exception [" + getKey().protocolClass.getCanonicalName() +
-        "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-        getChannel().remoteAddress()) + ")]: " + message;
+        "(" + getKey().addr + ")]: " + message;
   }
 
   @Override
@@ -332,7 +331,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration,
Closeable
         throws Exception {
 
       Throwable rootCause = ExceptionUtils.getRootCause(cause);
-      LOG.error(getKey().addr + "," + getKey().protocolClass + "," + ExceptionUtils.getMessage(rootCause),
rootCause);
+      LOG.error(getErrorMessage(ExceptionUtils.getMessage(rootCause)), rootCause);
 
       if (cause instanceof RecoverableException) {
         sendException((RecoverableException) cause);

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index f8def7f..111754e 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -130,6 +130,15 @@ public class RpcClientManager {
     return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout,
timeUnit, enablePing);
   }
 
+  public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr,
+                                                              Class<?> protocolClass,
+                                                              boolean asyncMode)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+    return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode),
+        retries, getTimeoutSeconds(), TimeUnit.SECONDS, true);
+  }
+
   public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key,
                                                               int retries,
                                                               long timeout,


Mime
View raw message