tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [2/8] tajo git commit: TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager.
Date Fri, 09 Jan 2015 14:34:18 GMT
TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager.

Closes #334


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/307c6c9f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/307c6c9f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/307c6c9f

Branch: refs/heads/index_support
Commit: 307c6c9f558ae96ffa2f7351ffa3dc5788ba4dbb
Parents: 809cba3
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Fri Jan 9 14:47:47 2015 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Fri Jan 9 14:47:47 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../org/apache/tajo/master/QueryJobManager.java | 59 +++++++------
 .../apache/tajo/master/TajoMasterService.java   |  7 --
 .../master/rm/TajoWorkerResourceManager.java    |  2 +-
 .../tajo/master/rm/WorkerResourceManager.java   |  2 +-
 .../master/scheduler/SimpleFifoScheduler.java   |  3 +-
 .../tajo/querymaster/QueryInProgress.java       | 87 ++------------------
 .../apache/tajo/querymaster/QueryJobEvent.java  |  5 +-
 .../apache/tajo/querymaster/QueryMaster.java    |  2 +-
 .../tajo/querymaster/QueryMasterTask.java       | 47 +----------
 .../src/main/proto/TajoMasterProtocol.proto     |  1 -
 .../apache/tajo/querymaster/TestKillQuery.java  |  2 +-
 12 files changed, 53 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 369dbda..35670d7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1282: Cleanup the relationship of QueryInProgress and 
+    QueryJobManager. (hyunsik)
+
     TAJO-1258: Close() for classes derived from FileAppender should be robust.
     (Jongyoung Park via jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
index c9b8711..6a8da27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
@@ -28,22 +28,25 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.session.Session;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * QueryJobManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
 public class QueryJobManager extends CompositeService {
   private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
 
@@ -69,7 +72,7 @@ public class QueryJobManager extends CompositeService {
   }
 
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     try {
       this.dispatcher = new AsyncDispatcher();
       addService(this.dispatcher);
@@ -81,24 +84,24 @@ public class QueryJobManager extends CompositeService {
       catchException(null, e);
     }
 
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     synchronized(runningQueries) {
       for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
-        eachQueryInProgress.stop();
+        eachQueryInProgress.stopProgress();
       }
     }
     this.scheduler.stop();
-    super.stop();
+    super.serviceStop();
   }
 
   @Override
-  public void start() {
+  public void serviceStart() throws Exception {
     this.scheduler.start();
-    super.start();
+    super.serviceStart();
   }
 
   public EventHandler getEventHandler() {
@@ -164,39 +167,42 @@ public class QueryJobManager extends CompositeService {
       runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
     }
 
-    addService(queryInProgress);
-    queryInProgress.init(getConfig());
-    queryInProgress.start();
-
-    if (!queryInProgress.startQueryMaster()) {
+    if (queryInProgress.startQueryMaster()) {
+      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+          queryInProgress.getQueryInfo()));
+    } else {
       stopQuery(queryId);
     }
 
     return queryInProgress.getQueryInfo();
   }
 
-  public TajoMaster.MasterContext getMasterContext() {
-    return masterContext;
-  }
-
   class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
     @Override
     public void handle(QueryJobEvent event) {
       QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-      if(queryInProgress == null) {
+
+
+      if (queryInProgress == null) {
         LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId()
+ "]");
         return;
       }
 
-      if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+
+      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        queryInProgress.submmitQueryToMaster();
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
         stopQuery(event.getQueryInfo().getQueryId());
-      } else if (queryInProgress.isStarted()) {
-        queryInProgress.getEventHandler().handle(event);
+
       } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
         scheduler.removeQuery(queryInProgress.getQueryId());
-        queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-
+        queryInProgress.kill();
         stopQuery(queryInProgress.getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        queryInProgress.heartbeat(event.getQueryInfo());
       }
     }
   }
@@ -219,7 +225,7 @@ public class QueryJobManager extends CompositeService {
     LOG.info("Stop QueryInProgress:" + queryId);
     QueryInProgress queryInProgress = getQueryInProgress(queryId);
     if(queryInProgress != null) {
-      queryInProgress.stop();
+      queryInProgress.stopProgress();
       synchronized(submittedQueries) {
         submittedQueries.remove(queryId);
       }
@@ -245,7 +251,6 @@ public class QueryJobManager extends CompositeService {
         avgExecutionTime.set(executionTime);
       }
       executedQuerySize.incrementAndGet();
-      removeService(queryInProgress);
     } else {
       LOG.warn("No QueryInProgress while query stopping: " + queryId);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index a7df206..02bdfa1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -136,13 +136,6 @@ public class TajoMasterService extends AbstractService {
     }
 
     @Override
-    public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
-                                RpcCallback<BoolProto> done) {
-      context.getQueryJobManager().stopQuery(new QueryId(request));
-      done.run(BOOL_TRUE);
-    }
-
-    @Override
     public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto
request,
                                      RpcCallback<TajoMasterProtocol.WorkerResourcesRequest>
done) {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index c4200d5..9f2a3d5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -526,7 +526,7 @@ public class TajoWorkerResourceManager extends CompositeService implements
Worke
   }
 
   @Override
-  public void stopQueryMaster(QueryId queryId) {
+  public void releaseQueryMaster(QueryId queryId) {
     if(!rmContext.getQueryMasterContainer().containsKey(queryId)) {
       LOG.warn("No QueryMaster resource info for " + queryId);
       return;

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index b237cc5..79ec0ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -80,7 +80,7 @@ public interface WorkerResourceManager extends Service {
    *
    * @param queryId QueryId to be stopped
    */
-  public void stopQueryMaster(QueryId queryId);
+  public void releaseQueryMaster(QueryId queryId);
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index bd8ca28..a091ed5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -58,7 +58,8 @@ public class SimpleFifoScheduler implements Scheduler {
       LOG.info("Size of Fifo queue is " + qSize);
     }
 
-    QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(),
1, queryInProgress.getStartTime());
+    QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(),
1,
+        queryInProgress.getQueryInfo().getStartTime());
     boolean result = pool.add(querySchedulingInfo);
     if (getRunningQueries().size() == 0) wakeupProcessor();
     return result;

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
index bda2ec1..f83f244 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
@@ -20,11 +20,7 @@ package org.apache.tajo.querymaster;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.query.QueryContext;
@@ -35,12 +31,12 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
 import org.apache.tajo.master.QueryInfo;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
@@ -48,15 +44,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
 
-public class QueryInProgress extends CompositeService {
+public class QueryInProgress {
   private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
 
   private QueryId queryId;
 
   private Session session;
 
-  private AsyncDispatcher dispatcher;
-
   private LogicalRootNode plan;
 
   private AtomicBoolean querySubmitted = new AtomicBoolean(false);
@@ -76,7 +70,7 @@ public class QueryInProgress extends CompositeService {
       Session session,
       QueryContext queryContext,
       QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-    super(QueryInProgress.class.getName());
+
     this.masterContext = masterContext;
     this.session = session;
     this.queryId = queryId;
@@ -86,23 +80,14 @@ public class QueryInProgress extends CompositeService {
     queryInfo.setStartTime(System.currentTimeMillis());
   }
 
-  @Override
-  public void init(Configuration conf) {
-    dispatcher = new AsyncDispatcher();
-    this.addService(dispatcher);
-
-    dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
-    super.init(conf);
-  }
-
   public synchronized void kill() {
+    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
     if(queryMasterRpcClient != null){
       queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
     }
   }
 
-  @Override
-  public void stop() {
+  public void stopProgress() {
     if(stopped.getAndSet(true)) {
       return;
     }
@@ -110,52 +95,15 @@ public class QueryInProgress extends CompositeService {
     LOG.info("=========================================================");
     LOG.info("Stop query:" + queryId);
 
-    masterContext.getResourceManager().stopQueryMaster(queryId);
-
-    long startTime = System.currentTimeMillis();
-    while(true) {
-      try {
-        if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
-          LOG.info(queryId + " QueryMaster stopped");
-          break;
-        }
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
-        break;
-      }
-
-      try {
-        synchronized (this){
-          wait(100);
-        }
-      } catch (InterruptedException e) {
-        break;
-      }
-      if(System.currentTimeMillis() - startTime > 60 * 1000) {
-        LOG.warn("Failed to stop QueryMaster:" + queryId);
-        break;
-      }
-    }
+    masterContext.getResourceManager().releaseQueryMaster(queryId);
 
     if(queryMasterRpc != null) {
       RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
     }
 
     masterContext.getHistoryWriter().appendHistory(queryInfo);
-    super.stop();
-  }
-
-  @Override
-  public void start() {
-    super.start();
   }
 
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-
-
   public boolean startQueryMaster() {
     try {
       LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
@@ -173,8 +121,6 @@ public class QueryInProgress extends CompositeService {
       queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
       queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
 
-      getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
-
       return true;
     } catch (Exception e) {
       catchException(e);
@@ -182,23 +128,6 @@ public class QueryInProgress extends CompositeService {
     }
   }
 
-  class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
-    @Override
-    public void handle(QueryJobEvent queryJobEvent) {
-      if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
-        heartbeat(queryJobEvent.getQueryInfo());
-      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
-        QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
-        queryInProgress.getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
-      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
-        submmitQueryToMaster();
-      } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        kill();
-      }
-    }
-  }
-
   private void connectQueryMaster() throws Exception {
     InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
@@ -207,7 +136,7 @@ public class QueryInProgress extends CompositeService {
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 
-  private synchronized void submmitQueryToMaster() {
+  public synchronized void submmitQueryToMaster() {
     if(querySubmitted.get()) {
       return;
     }
@@ -256,7 +185,7 @@ public class QueryInProgress extends CompositeService {
     return !stopped.get() && this.querySubmitted.get();
   }
 
-  private void heartbeat(QueryInfo queryInfo) {
+  public void heartbeat(QueryInfo queryInfo) {
     LOG.info("Received QueryMaster heartbeat:" + queryInfo);
 
     // to avoid partial update by different heartbeats

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
index 1a1f2ff..27eb2b6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
@@ -35,12 +35,9 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type>
{
   }
 
   public enum Type {
-    QUERY_JOB_START,
+    QUERY_MASTER_START,
     QUERY_JOB_HEARTBEAT,
-    QUERY_JOB_FINISH,
     QUERY_JOB_STOP,
-    QUERY_MASTER_START,
-    QUERY_MASTER_STOP,
     QUERY_JOB_KILL
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 76df397..02760a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -473,7 +473,7 @@ public class QueryMaster extends CompositeService implements EventHandler
{
     public void handle(QueryStartEvent event) {
       LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
       QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(),
event.getLogicalPlanJson());
+          event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr());
 
       synchronized(queryMasterTasks) {
         queryMasterTasks.put(event.getQueryId(), queryMasterTask);

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index bab5903..fd52488 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -41,13 +41,10 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -58,8 +55,7 @@ import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageProperty;
 import org.apache.tajo.storage.StorageUtil;
@@ -96,12 +92,8 @@ public class QueryMasterTask extends CompositeService {
 
   private Query query;
 
-  private MasterPlan masterPlan;
-
   private String jsonExpr;
 
-  private String logicalPlanJson;
-
   private AsyncDispatcher dispatcher;
 
   private final long querySubmitTime;
@@ -124,8 +116,7 @@ public class QueryMasterTask extends CompositeService {
       new ArrayList<TajoWorkerProtocol.TaskFatalErrorReport>();
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, Session session, QueryContext queryContext, String
jsonExpr,
-                         String logicalPlanJson) {
+                         QueryId queryId, Session session, QueryContext queryContext, String
jsonExpr) {
 
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
@@ -133,7 +124,6 @@ public class QueryMasterTask extends CompositeService {
     this.session = session;
     this.queryContext = queryContext;
     this.jsonExpr = jsonExpr;
-    this.logicalPlanJson = logicalPlanJson;
     this.querySubmitTime = System.currentTimeMillis();
   }
 
@@ -198,42 +188,11 @@ public class QueryMasterTask extends CompositeService {
       LOG.fatal(t.getMessage(), t);
     }
 
-    RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf());
-    NettyClientBase tmClient = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master
and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
-
-    super.stop();
-
-    //TODO change report to tajo master
     if (queryMetrics != null) {
       queryMetrics.report(new MetricsConsoleReporter());
     }
 
+    super.stop();
     LOG.info("Stopped QueryMasterTask:" + queryId);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index cc83e47..bc73596 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -143,6 +143,5 @@ service TajoMasterProtocolService {
   rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
   rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
   rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
-  rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
   rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/307c6c9f/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index a125196..bd899cd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -90,7 +90,7 @@ public class TestKillQuery {
 
     QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
-        queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson());
+        queryId, session, defaultContext, expr.toJson());
 
     queryMasterTask.init(conf);
     queryMasterTask.getQueryTaskContext().getDispatcher().start();


Mime
View raw message