tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [7/9] tajo git commit: TAJO-1262: Rename the prefix 'SubQuery' to 'Stage'.
Date Mon, 22 Dec 2014 08:11:42 GMT
TAJO-1262: Rename the prefix 'SubQuery' to 'Stage'.

Closes #314


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3c833e2a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3c833e2a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3c833e2a

Branch: refs/heads/index_support
Commit: 3c833e2a8c3ff7ff8a2e1b4497afb390098856bf
Parents: cf66a39
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Dec 22 16:43:39 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Dec 22 16:43:39 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |    2 +
 tajo-client/src/main/proto/ClientProtos.proto   |    4 +-
 .../tajo/master/DefaultTaskScheduler.java       |   36 +-
 .../apache/tajo/master/LazyTaskScheduler.java   |   30 +-
 .../tajo/master/TaskSchedulerFactory.java       |   12 +-
 .../tajo/master/event/QueryCompletedEvent.java  |    8 +-
 .../tajo/master/event/QueryEventType.java       |    4 +-
 .../tajo/master/event/QuerySubQueryEvent.java   |   35 -
 .../tajo/master/event/StageCompletedEvent.java  |   42 +
 .../event/StageContainerAllocationEvent.java    |   38 +
 .../event/StageDiagnosticsUpdateEvent.java      |   34 +
 .../apache/tajo/master/event/StageEvent.java    |   35 +
 .../tajo/master/event/StageEventType.java       |   43 +
 .../tajo/master/event/StageTaskEvent.java       |   43 +
 .../master/event/SubQueryCompletedEvent.java    |   42 -
 .../event/SubQueryContainerAllocationEvent.java |   38 -
 .../event/SubQueryDiagnosticsUpdateEvent.java   |   34 -
 .../apache/tajo/master/event/SubQueryEvent.java |   35 -
 .../tajo/master/event/SubQueryEventType.java    |   43 -
 .../tajo/master/event/SubQueryTaskEvent.java    |   43 -
 .../apache/tajo/master/event/TaskEventType.java |    4 +-
 .../apache/tajo/master/querymaster/Query.java   |  164 +--
 .../querymaster/QueryMasterManagerService.java  |    4 +-
 .../master/querymaster/QueryMasterTask.java     |   28 +-
 .../tajo/master/querymaster/Repartitioner.java  |  172 +--
 .../apache/tajo/master/querymaster/Stage.java   | 1342 +++++++++++++++++
 .../tajo/master/querymaster/StageState.java     |   30 +
 .../tajo/master/querymaster/SubQuery.java       | 1343 ------------------
 .../tajo/master/querymaster/SubQueryState.java  |   30 -
 .../apache/tajo/master/querymaster/Task.java    |   10 +-
 .../tajo/master/querymaster/TaskAttempt.java    |    6 +-
 .../main/java/org/apache/tajo/util/JSPUtil.java |   36 +-
 .../apache/tajo/util/history/HistoryReader.java |    2 +-
 .../apache/tajo/util/history/HistoryWriter.java |   12 +-
 .../apache/tajo/util/history/QueryHistory.java  |   23 +-
 .../apache/tajo/util/history/StageHistory.java  |  270 ++++
 .../tajo/util/history/SubQueryHistory.java      |  270 ----
 .../tajo/worker/TajoResourceAllocator.java      |   14 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   20 +-
 .../apache/tajo/worker/TaskAttemptContext.java  |    2 +-
 .../resources/webapps/admin/querydetail.jsp     |   32 +-
 .../main/resources/webapps/admin/querytasks.jsp |   36 +-
 .../resources/webapps/worker/querydetail.jsp    |   30 +-
 .../main/resources/webapps/worker/queryplan.jsp |   52 +-
 .../resources/webapps/worker/querytasks.jsp     |   18 +-
 .../src/main/resources/webapps/worker/task.jsp  |   10 +-
 .../org/apache/tajo/TajoTestingCluster.java     |    8 +-
 .../org/apache/tajo/TestQueryIdFactory.java     |    8 +-
 .../org/apache/tajo/client/TestTajoClient.java  |   12 +-
 .../tajo/engine/query/TestGroupByQuery.java     |   16 +-
 .../tajo/engine/query/TestTablePartitions.java  |    2 +-
 .../tajo/engine/query/TestUnionQuery.java       |    6 +-
 .../tajo/master/querymaster/TestKillQuery.java  |    8 +-
 .../querymaster/TestTaskStatusUpdate.java       |   18 +-
 .../util/history/TestHistoryWriterReader.java   |   26 +-
 tajo-dist/pom.xml                               |    2 +-
 .../tajo/pullserver/PullServerAuxService.java   |   16 +-
 .../tajo/pullserver/TajoPullServerService.java  |   16 +-
 .../apache/tajo/storage/FileStorageManager.java |    6 +-
 59 files changed, 2335 insertions(+), 2370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 245918e..29b0c0b 100644
--- a/CHANGES
+++ b/CHANGES
@@ -240,6 +240,8 @@ Release 0.9.1 - unreleased
 
   SUB TASKS
 
+    TAJO-1262: Rename the prefix 'SubQuery' to 'Stage'. (hyunsik)
+
     TAJO-324: Rename the prefix 'QueryUnit' to Task. (hyunsik)
 
     TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 51db763..a741268 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -253,7 +253,7 @@ message QueryInfoProto {
   optional int32 queryMasterInfoPort = 11;
 }
 
-message SubQueryHistoryProto {
+message StageHistoryProto {
   required string executionBlockId =1;
   required string state = 2;
   optional int64 startTime = 3;
@@ -283,7 +283,7 @@ message QueryHistoryProto {
   optional string logicalPlan = 4;
   optional string distributedPlan = 5;
   repeated KeyValueProto sessionVariables = 6;
-  repeated SubQueryHistoryProto subQueryHistories = 7;
+  repeated StageHistoryProto stageHistories = 7;
 }
 
 message GetQueryHistoryResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index d9d496e..dd6233c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -40,7 +40,7 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.Task;
 import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.Stage;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -60,7 +60,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
 
   private final TaskSchedulerContext context;
-  private SubQuery subQuery;
+  private Stage stage;
 
   private Thread schedulingThread;
   private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
@@ -71,10 +71,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   private int nextTaskId = 0;
   private int scheduledObjectNum = 0;
 
-  public DefaultTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
+  public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) {
     super(DefaultTaskScheduler.class.getName());
     this.context = context;
-    this.subQuery = subQuery;
+    this.stage = stage;
   }
 
   @Override
@@ -117,8 +117,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   private static final TaskAttemptId NULL_ATTEMPT_ID;
   public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
   static {
-    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0);
+    ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
 
     TajoWorkerProtocol.TaskRequestProto.Builder builder =
         TajoWorkerProtocol.TaskRequestProto.newBuilder();
@@ -192,13 +192,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
         if (context.isLeafQuery()) {
           TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext();
-          Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++);
+          Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
           task.addFragment(castEvent.getLeftFragment(), true);
           scheduledObjectNum++;
           if (castEvent.hasRightFragments()) {
             task.addFragments(castEvent.getRightFragments());
           }
-          subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+          stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
         } else {
           fragmentsForNonLeafTask = new FileFragment[2];
           fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
@@ -217,7 +217,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
         Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
         TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
-        Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++);
+        Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
         scheduledObjectNum++;
         for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
           task.addFetches(eachFetch.getKey(), eachFetch.getValue());
@@ -229,7 +229,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) {
           task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask));
         }
-        subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+        stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
       } else if (event instanceof TaskAttemptToSchedulerEvent) {
         TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
         if (context.isLeafQuery()) {
@@ -239,7 +239,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
       }
     } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
-      // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler.
+      // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler.
       // This event is triggered by TaskAttempt.
       TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event;
       scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId());
@@ -832,7 +832,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
 
         if (attemptId != null) {
-          Task task = subQuery.getTask(attemptId.getTaskId());
+          Task task = stage.getTask(attemptId.getTaskId());
           TaskRequest taskAssign = new TaskRequestImpl(
               attemptId,
               new ArrayList<FragmentProto>(task.getAllFragments()),
@@ -840,8 +840,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               false,
               task.getLogicalPlan().toJson(),
               context.getMasterContext().getQueryContext(),
-              subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
-          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+              stage.getDataChannel(), stage.getBlock().getEnforcer());
+          if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
             taskAssign.setInterQuery();
           }
 
@@ -888,7 +888,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           LOG.debug("Assigned based on * match");
 
           Task task;
-          task = subQuery.getTask(attemptId.getTaskId());
+          task = stage.getTask(attemptId.getTaskId());
           TaskRequest taskAssign = new TaskRequestImpl(
               attemptId,
               Lists.newArrayList(task.getAllFragments()),
@@ -896,9 +896,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               false,
               task.getLogicalPlan().toJson(),
               context.getMasterContext().getQueryContext(),
-              subQuery.getDataChannel(),
-              subQuery.getBlock().getEnforcer());
-          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+              stage.getDataChannel(),
+              stage.getBlock().getEnforcer());
+          if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
             taskAssign.setInterQuery();
           }
           for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 0ab19db..32af17b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -37,7 +37,7 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.Task;
 import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.Stage;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -57,7 +57,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
   private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class);
 
   private final TaskSchedulerContext context;
-  private final SubQuery subQuery;
+  private final Stage stage;
 
   private Thread schedulingThread;
   private volatile boolean stopEventHandling;
@@ -77,10 +77,10 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
   private int nextTaskId = 0;
   private int containerNum;
 
-  public LazyTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) {
+  public LazyTaskScheduler(TaskSchedulerContext context, Stage stage) {
     super(LazyTaskScheduler.class.getName());
     this.context = context;
-    this.subQuery = subQuery;
+    this.stage = stage;
   }
 
   @Override
@@ -101,8 +101,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
 
   @Override
   public void start() {
-    containerNum = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
-        subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+    containerNum = stage.getContext().getResourceAllocator().calculateNumRequestContainers(
+        stage.getContext().getQueryMasterContext().getWorkerContext(),
         context.getEstimatedTaskNum(), 512);
 
     LOG.info("Start TaskScheduler");
@@ -129,8 +129,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
   private static final TaskAttemptId NULL_ATTEMPT_ID;
   public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
   static {
-    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0);
+    ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
 
     TajoWorkerProtocol.TaskRequestProto.Builder builder =
         TajoWorkerProtocol.TaskRequestProto.newBuilder();
@@ -362,7 +362,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       String host = container.getTaskHostName();
       TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID,
           host, taskRequest.getCallback());
-      Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++);
+      Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
 
       FragmentPair fragmentPair;
       List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
@@ -371,7 +371,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       long taskSize = adjustTaskSize();
       LOG.info("Adjusted task size: " + taskSize);
 
-      TajoConf conf = subQuery.getContext().getConf();
+      TajoConf conf = stage.getContext().getConf();
       // host local, disk local
       String normalized = NetUtils.normalizeHost(host);
       Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
@@ -450,7 +450,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());
 
       task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
-      subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+      stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
     }
   }
 
@@ -469,9 +469,9 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
             taskRequest.getContainerId());
         TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID,
             container.getTaskHostName(), taskRequest.getCallback());
-        Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++);
+        Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
         task.setFragment(scheduledFragments.getAllFragments());
-        subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+        stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
       }
     }
   }
@@ -485,8 +485,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
         false,
         taskAttempt.getTask().getLogicalPlan().toJson(),
         context.getMasterContext().getQueryContext(),
-        subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
-    if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
+        stage.getDataChannel(), stage.getBlock().getEnforcer());
+    if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
       taskAssign.setInterQuery();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
index 520ecd3..e5291e9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master;
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.Stage;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -29,7 +29,7 @@ import java.util.Map;
 public class TaskSchedulerFactory {
   private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS;
   private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
-  private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, SubQuery.class };
+  private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class };
 
   public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf)
       throws IOException {
@@ -46,7 +46,7 @@ public class TaskSchedulerFactory {
   }
 
   public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context,
-                                                        SubQuery subQuery) {
+                                                        Stage stage) {
     T result;
     try {
       Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
@@ -55,15 +55,15 @@ public class TaskSchedulerFactory {
         constructor.setAccessible(true);
         CONSTRUCTOR_CACHE.put(clazz, constructor);
       }
-      result = constructor.newInstance(new Object[]{context, subQuery});
+      result = constructor.newInstance(new Object[]{context, stage});
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
     return result;
   }
 
-  public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, SubQuery subQuery)
+  public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage)
       throws IOException {
-    return get(getTaskSchedulerClass(conf), context, subQuery);
+    return get(getTaskSchedulerClass(conf), context, stage);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
index dc75a1d..e5a9a32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
@@ -19,14 +19,14 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.querymaster.SubQueryState;
+import org.apache.tajo.master.querymaster.StageState;
 
 public class QueryCompletedEvent extends QueryEvent {
   private final ExecutionBlockId executionBlockId;
-  private final SubQueryState finalState;
+  private final StageState finalState;
 
   public QueryCompletedEvent(final ExecutionBlockId executionBlockId,
-                             SubQueryState finalState) {
+                             StageState finalState) {
     super(executionBlockId.getQueryId(), QueryEventType.QUERY_COMPLETED);
     this.executionBlockId = executionBlockId;
     this.finalState = finalState;
@@ -36,7 +36,7 @@ public class QueryCompletedEvent extends QueryEvent {
     return executionBlockId;
   }
 
-  public SubQueryState getState() {
+  public StageState getState() {
     return finalState;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java
index edc0cd8..e38a3c4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java
@@ -24,8 +24,8 @@ public enum QueryEventType {
   START,
   KILL,
 
-  // Producer: SubQuery
-  SUBQUERY_COMPLETED,
+  // Producer: Stage
+  STAGE_COMPLETED,
 
   // Producer: Query
   QUERY_COMPLETED,

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
deleted file mode 100644
index ae36a69..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-
-public class QuerySubQueryEvent extends QueryEvent {
-  private ExecutionBlockId executionBlockId;
-
-  public QuerySubQueryEvent(final ExecutionBlockId id,
-                            final QueryEventType queryEvent) {
-    super(id.getQueryId(), queryEvent);
-    this.executionBlockId = id;
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return this.executionBlockId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java
new file mode 100644
index 0000000..2d16fbe
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.StageState;
+
+public class StageCompletedEvent extends QueryEvent {
+  private final ExecutionBlockId executionBlockId;
+  private final StageState finalState;
+
+  public StageCompletedEvent(final ExecutionBlockId executionBlockId,
+                             StageState finalState) {
+    super(executionBlockId.getQueryId(), QueryEventType.STAGE_COMPLETED);
+    this.executionBlockId = executionBlockId;
+    this.finalState = finalState;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public StageState getState() {
+    return finalState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java
new file mode 100644
index 0000000..0d29e44
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.container.TajoContainer;
+
+import java.util.List;
+
+public class StageContainerAllocationEvent extends StageEvent {
+  private List<TajoContainer> allocatedContainer;
+
+  public StageContainerAllocationEvent(final ExecutionBlockId id,
+                                       List<TajoContainer> allocatedContainer) {
+    super(id, StageEventType.SQ_CONTAINER_ALLOCATED);
+    this.allocatedContainer = allocatedContainer;
+  }
+
+  public List<TajoContainer> getAllocatedContainer() {
+    return this.allocatedContainer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java
new file mode 100644
index 0000000..39afc92
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+
+public class StageDiagnosticsUpdateEvent extends StageEvent {
+  private final String msg;
+
+  public StageDiagnosticsUpdateEvent(final ExecutionBlockId id, String diagnostic) {
+    super(id, StageEventType.SQ_DIAGNOSTIC_UPDATE);
+    this.msg = diagnostic;
+  }
+
+  public String getDiagnosticUpdate() {
+    return msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java
new file mode 100644
index 0000000..6fc4746
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.ExecutionBlockId;
+
+public class StageEvent extends AbstractEvent<StageEventType> {
+  private final ExecutionBlockId id;
+
+  public StageEvent(ExecutionBlockId id, StageEventType stageEventType) {
+    super(stageEventType);
+    this.id = id;
+  }
+
+  public ExecutionBlockId getStageId() {
+    return id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
new file mode 100644
index 0000000..fa808d4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+/**
+ * Event Types handled by Stage
+ */
+public enum StageEventType {
+
+  // Producer: Query
+  SQ_INIT,
+  SQ_START,
+  SQ_CONTAINER_ALLOCATED,
+  SQ_KILL,
+  SQ_LAUNCH,
+
+  // Producer: Task
+  SQ_TASK_COMPLETED,
+  SQ_FAILED,
+
+  // Producer: Completed
+  SQ_STAGE_COMPLETED,
+
+  // Producer: Any component
+  SQ_DIAGNOSTIC_UPDATE,
+  SQ_INTERNAL_ERROR
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java
new file mode 100644
index 0000000..4377881
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.TaskId;
+import org.apache.tajo.master.TaskState;
+
+/**
+ * Event Class: From Task to Stage
+ */
+public class StageTaskEvent extends StageEvent {
+  private TaskId taskId;
+  private TaskState state;
+  public StageTaskEvent(TaskId taskId, TaskState state) {
+    super(taskId.getExecutionBlockId(), StageEventType.SQ_TASK_COMPLETED);
+    this.taskId = taskId;
+    this.state = state;
+  }
+
+  public TaskId getTaskId() {
+    return this.taskId;
+  }
+
+  public TaskState getState() {
+    return state;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
deleted file mode 100644
index 6389798..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.querymaster.SubQueryState;
-
-public class SubQueryCompletedEvent extends QueryEvent {
-  private final ExecutionBlockId executionBlockId;
-  private final SubQueryState finalState;
-
-  public SubQueryCompletedEvent(final ExecutionBlockId executionBlockId,
-                                SubQueryState finalState) {
-    super(executionBlockId.getQueryId(), QueryEventType.SUBQUERY_COMPLETED);
-    this.executionBlockId = executionBlockId;
-    this.finalState = finalState;
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return executionBlockId;
-  }
-
-  public SubQueryState getState() {
-    return finalState;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
deleted file mode 100644
index e617d53..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.master.container.TajoContainer;
-
-import java.util.List;
-
-public class SubQueryContainerAllocationEvent extends SubQueryEvent {
-  private List<TajoContainer> allocatedContainer;
-
-  public SubQueryContainerAllocationEvent(final ExecutionBlockId id,
-                                          List<TajoContainer> allocatedContainer) {
-    super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED);
-    this.allocatedContainer = allocatedContainer;
-  }
-
-  public List<TajoContainer> getAllocatedContainer() {
-    return this.allocatedContainer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java
deleted file mode 100644
index 0810e81..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-
-public class SubQueryDiagnosticsUpdateEvent extends SubQueryEvent {
-  private final String msg;
-
-  public SubQueryDiagnosticsUpdateEvent(final ExecutionBlockId id, String diagnostic) {
-    super(id, SubQueryEventType.SQ_DIAGNOSTIC_UPDATE);
-    this.msg = diagnostic;
-  }
-
-  public String getDiagnosticUpdate() {
-    return msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
deleted file mode 100644
index 2b3d598..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ExecutionBlockId;
-
-public class SubQueryEvent extends AbstractEvent<SubQueryEventType> {
-  private final ExecutionBlockId id;
-
-  public SubQueryEvent(ExecutionBlockId id, SubQueryEventType subQueryEventType) {
-    super(subQueryEventType);
-    this.id = id;
-  }
-
-  public ExecutionBlockId getSubQueryId() {
-    return id;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
deleted file mode 100644
index 79b6e2e..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-/**
- * Event Types handled by SubQuery
- */
-public enum SubQueryEventType {
-
-  // Producer: Query
-  SQ_INIT,
-  SQ_START,
-  SQ_CONTAINER_ALLOCATED,
-  SQ_KILL,
-  SQ_LAUNCH,
-
-  // Producer: Task
-  SQ_TASK_COMPLETED,
-  SQ_FAILED,
-
-  // Producer: Completed
-  SQ_SUBQUERY_COMPLETED,
-
-  // Producer: Any component
-  SQ_DIAGNOSTIC_UPDATE,
-  SQ_INTERNAL_ERROR
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
deleted file mode 100644
index 816bc48..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.TaskId;
-import org.apache.tajo.master.TaskState;
-
-/**
- * Event Class: From Task to SubQuery
- */
-public class SubQueryTaskEvent extends SubQueryEvent {
-  private TaskId taskId;
-  private TaskState state;
-  public SubQueryTaskEvent(TaskId taskId, TaskState state) {
-    super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED);
-    this.taskId = taskId;
-    this.state = state;
-  }
-
-  public TaskId getTaskId() {
-    return this.taskId;
-  }
-
-  public TaskState getState() {
-    return state;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java
index 9448863..0f26821 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java
@@ -23,10 +23,10 @@ package org.apache.tajo.master.event;
  */
 public enum TaskEventType {
 
-  //Producer:Client, SubQuery
+  //Producer:Client, Stage
   T_KILL,
 
-  //Producer:SubQuery
+  //Producer:Stage
   T_SCHEDULE,
 
   //Producer:TaskAttempt

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 918cc82..a626df1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -50,7 +50,7 @@ import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.QueryHistory;
-import org.apache.tajo.util.history.SubQueryHistory;
+import org.apache.tajo.util.history.StageHistory;
 
 import java.io.IOException;
 import java.util.*;
@@ -65,7 +65,7 @@ public class Query implements EventHandler<QueryEvent> {
   private final TajoConf systemConf;
   private final Clock clock;
   private String queryStr;
-  private Map<ExecutionBlockId, SubQuery> subqueries;
+  private Map<ExecutionBlockId, Stage> stages;
   private final EventHandler eventHandler;
   private final MasterPlan plan;
   QueryMasterTask.QueryMasterTaskContext context;
@@ -77,11 +77,11 @@ public class Query implements EventHandler<QueryEvent> {
   private long startTime;
   private long finishTime;
   private TableDesc resultDesc;
-  private int completedSubQueryCount = 0;
-  private int successedSubQueryCount = 0;
-  private int killedSubQueryCount = 0;
-  private int failedSubQueryCount = 0;
-  private int erroredSubQueryCount = 0;
+  private int completedStagesCount = 0;
+  private int successedStagesCount = 0;
+  private int killedStagesCount = 0;
+  private int failedStagesCount = 0;
+  private int erroredStagesCount = 0;
   private final List<String> diagnostics = new ArrayList<String>();
 
   // Internal Variables
@@ -96,7 +96,7 @@ public class Query implements EventHandler<QueryEvent> {
   // Transition Handler
   private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
-  private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition();
+  private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
   private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
 
   protected static final StateMachineFactory
@@ -120,8 +120,8 @@ public class Query implements EventHandler<QueryEvent> {
 
           // Transitions from RUNNING state
           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
-              QueryEventType.SUBQUERY_COMPLETED,
-              SUBQUERY_COMPLETED_TRANSITION)
+              QueryEventType.STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
           .addTransition(QueryState.QUERY_RUNNING,
               EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
                   QueryState.QUERY_ERROR),
@@ -132,7 +132,7 @@ public class Query implements EventHandler<QueryEvent> {
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
               QueryEventType.KILL,
-              new KillSubQueriesTransition())
+              new KillAllStagesTransition())
           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
@@ -143,8 +143,8 @@ public class Query implements EventHandler<QueryEvent> {
               DIAGNOSTIC_UPDATE_TRANSITION)
           // ignore-able transitions
           .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
-              QueryEventType.SUBQUERY_COMPLETED,
-              SUBQUERY_COMPLETED_TRANSITION)
+              QueryEventType.STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
           .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
               QueryEventType.KILL)
           .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
@@ -153,8 +153,8 @@ public class Query implements EventHandler<QueryEvent> {
 
           // Transitions from KILL_WAIT state
           .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
-              QueryEventType.SUBQUERY_COMPLETED,
-              SUBQUERY_COMPLETED_TRANSITION)
+              QueryEventType.STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
           .addTransition(QueryState.QUERY_KILL_WAIT,
               EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
                   QueryState.QUERY_ERROR),
@@ -191,7 +191,7 @@ public class Query implements EventHandler<QueryEvent> {
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able transitions
           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
-              EnumSet.of(QueryEventType.KILL, QueryEventType.SUBQUERY_COMPLETED))
+              EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED))
 
           .installTopology();
 
@@ -206,7 +206,7 @@ public class Query implements EventHandler<QueryEvent> {
     this.clock = context.getClock();
     this.appSubmitTime = appSubmitTime;
     this.queryStr = queryStr;
-    this.subqueries = Maps.newConcurrentMap();
+    this.stages = Maps.newConcurrentMap();
     this.eventHandler = eventHandler;
     this.plan = plan;
     this.cursor = new ExecutionBlockCursor(plan, true);
@@ -237,15 +237,15 @@ public class Query implements EventHandler<QueryEvent> {
       return 1.0f;
     } else {
       int idx = 0;
-      List<SubQuery> tempSubQueries = new ArrayList<SubQuery>();
-      synchronized(subqueries) {
-        tempSubQueries.addAll(subqueries.values());
+      List<Stage> tempStages = new ArrayList<Stage>();
+      synchronized(stages) {
+        tempStages.addAll(stages.values());
       }
 
-      float [] subProgresses = new float[tempSubQueries.size()];
-      for (SubQuery subquery: tempSubQueries) {
-        if (subquery.getState() != SubQueryState.NEW) {
-          subProgresses[idx] = subquery.getProgress();
+      float [] subProgresses = new float[tempStages.size()];
+      for (Stage stage: tempStages) {
+        if (stage.getState() != StageState.NEW) {
+          subProgresses[idx] = stage.getProgress();
         } else {
           subProgresses[idx] = 0.0f;
         }
@@ -285,17 +285,17 @@ public class Query implements EventHandler<QueryEvent> {
 
   public QueryHistory getQueryHistory() {
     QueryHistory queryHistory = makeQueryHistory();
-    queryHistory.setSubQueryHistories(makeSubQueryHistories());
+    queryHistory.setStageHistories(makeStageHistories());
     return queryHistory;
   }
 
-  private List<SubQueryHistory> makeSubQueryHistories() {
-    List<SubQueryHistory> subQueryHistories = new ArrayList<SubQueryHistory>();
-    for(SubQuery eachSubQuery: getSubQueries()) {
-      subQueryHistories.add(eachSubQuery.getSubQueryHistory());
+  private List<StageHistory> makeStageHistories() {
+    List<StageHistory> stageHistories = new ArrayList<StageHistory>();
+    for(Stage eachStage : getStages()) {
+      stageHistories.add(eachStage.getStageHistory());
     }
 
-    return subQueryHistories;
+    return stageHistories;
   }
 
   private QueryHistory makeQueryHistory() {
@@ -348,20 +348,20 @@ public class Query implements EventHandler<QueryEvent> {
     return stateMachine;
   }
   
-  public void addSubQuery(SubQuery subquery) {
-    subqueries.put(subquery.getId(), subquery);
+  public void addStage(Stage stage) {
+    stages.put(stage.getId(), stage);
   }
   
   public QueryId getId() {
     return this.id;
   }
 
-  public SubQuery getSubQuery(ExecutionBlockId id) {
-    return this.subqueries.get(id);
+  public Stage getStage(ExecutionBlockId id) {
+    return this.stages.get(id);
   }
 
-  public Collection<SubQuery> getSubQueries() {
-    return this.subqueries.values();
+  public Collection<Stage> getStages() {
+    return this.stages.values();
   }
 
   public QueryState getSynchronizedState() {
@@ -389,13 +389,13 @@ public class Query implements EventHandler<QueryEvent> {
     public void transition(Query query, QueryEvent queryEvent) {
 
       query.setStartTime();
-      SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
+      Stage stage = new Stage(query.context, query.getPlan(),
           query.getExecutionBlockCursor().nextBlock());
-      subQuery.setPriority(query.priority--);
-      query.addSubQuery(subQuery);
+      stage.setPriority(query.priority--);
+      query.addStage(stage);
 
-      subQuery.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INIT));
-      LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+      stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+      LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
     }
   }
 
@@ -403,20 +403,20 @@ public class Query implements EventHandler<QueryEvent> {
 
     @Override
     public QueryState transition(Query query, QueryEvent queryEvent) {
-      QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
+      QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent;
       QueryState finalState;
 
-      if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
-        finalState = finalizeQuery(query, subQueryEvent);
-      } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
+      if (stageEvent.getState() == StageState.SUCCEEDED) {
+        finalState = finalizeQuery(query, stageEvent);
+      } else if (stageEvent.getState() == StageState.FAILED) {
         finalState = QueryState.QUERY_FAILED;
-      } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
+      } else if (stageEvent.getState() == StageState.KILLED) {
         finalState = QueryState.QUERY_KILLED;
       } else {
         finalState = QueryState.QUERY_ERROR;
       }
       if (finalState != QueryState.QUERY_SUCCEEDED) {
-        SubQuery lastStage = query.getSubQuery(subQueryEvent.getExecutionBlockId());
+        Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
         if (lastStage != null && lastStage.getTableMeta() != null) {
           StoreType storeType = lastStage.getTableMeta().getStoreType();
           if (storeType != null) {
@@ -436,7 +436,7 @@ public class Query implements EventHandler<QueryEvent> {
     }
 
     private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
-      SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId());
+      Stage lastStage = query.getStage(event.getExecutionBlockId());
       StoreType storeType = lastStage.getTableMeta().getStoreType();
       try {
         LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
@@ -490,7 +490,7 @@ public class Query implements EventHandler<QueryEvent> {
       @Override
       public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
                                 Path finalOutputDir) {
-        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        Stage lastStage = query.getStage(finalExecBlockId);
         NodeType type = lastStage.getBlock().getPlan().getType();
         return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
       }
@@ -499,7 +499,7 @@ public class Query implements EventHandler<QueryEvent> {
       public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
                           Query query, ExecutionBlockId finalExecBlockId,
                           Path finalOutputDir) throws Exception {
-        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        Stage lastStage = query.getStage(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
 
         String nullChar = queryContext.get(SessionVars.NULL_CHAR);
@@ -526,7 +526,7 @@ public class Query implements EventHandler<QueryEvent> {
       @Override
       public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
                                 Path finalOutputDir) {
-        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        Stage lastStage = query.getStage(finalExecBlockId);
         return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
       }
 
@@ -534,7 +534,7 @@ public class Query implements EventHandler<QueryEvent> {
       public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
                           Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
         CatalogService catalog = context.getWorkerContext().getCatalog();
-        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        Stage lastStage = query.getStage(finalExecBlockId);
         TableStats stats = lastStage.getResultStats();
 
         CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
@@ -565,7 +565,7 @@ public class Query implements EventHandler<QueryEvent> {
       @Override
       public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
                                 Path finalOutputDir) {
-        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        Stage lastStage = query.getStage(finalExecBlockId);
         return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
       }
 
@@ -575,7 +575,7 @@ public class Query implements EventHandler<QueryEvent> {
           throws Exception {
 
         CatalogService catalog = context.getWorkerContext().getCatalog();
-        SubQuery lastStage = query.getSubQuery(finalExecBlockId);
+        Stage lastStage = query.getStage(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
         TableStats stats = lastStage.getResultStats();
 
@@ -613,7 +613,7 @@ public class Query implements EventHandler<QueryEvent> {
     return directorySummary.getLength();
   }
 
-  public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+  public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
 
     private boolean hasNext(Query query) {
       ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
@@ -624,43 +624,43 @@ public class Query implements EventHandler<QueryEvent> {
     private void executeNextBlock(Query query) {
       ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
       ExecutionBlock nextBlock = cursor.nextBlock();
-      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock);
-      nextSubQuery.setPriority(query.priority--);
-      query.addSubQuery(nextSubQuery);
-      nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
+      Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
+      nextStage.setPriority(query.priority--);
+      query.addStage(nextStage);
+      nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
 
-      LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+      LOG.info("Scheduling Stage:" + nextStage.getId());
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
-        LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+        LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
+        LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
       }
     }
 
     @Override
     public void transition(Query query, QueryEvent event) {
       try {
-        query.completedSubQueryCount++;
-        SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
-
-        if (castEvent.getState() == SubQueryState.SUCCEEDED) {
-          query.successedSubQueryCount++;
-        } else if (castEvent.getState() == SubQueryState.KILLED) {
-          query.killedSubQueryCount++;
-        } else if (castEvent.getState() == SubQueryState.FAILED) {
-          query.failedSubQueryCount++;
-        } else if (castEvent.getState() == SubQueryState.ERROR) {
-          query.erroredSubQueryCount++;
+        query.completedStagesCount++;
+        StageCompletedEvent castEvent = (StageCompletedEvent) event;
+
+        if (castEvent.getState() == StageState.SUCCEEDED) {
+          query.successedStagesCount++;
+        } else if (castEvent.getState() == StageState.KILLED) {
+          query.killedStagesCount++;
+        } else if (castEvent.getState() == StageState.FAILED) {
+          query.failedStagesCount++;
+        } else if (castEvent.getState() == StageState.ERROR) {
+          query.erroredStagesCount++;
         } else {
-          LOG.error(String.format("Invalid SubQuery (%s) State %s at %s",
+          LOG.error(String.format("Invalid Stage (%s) State %s at %s",
               castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name()));
           query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
         }
 
-        // if a subquery is succeeded and a query is running
-        if (castEvent.getState() == SubQueryState.SUCCEEDED &&  // latest subquery succeeded
+        // if a stage is succeeded and a query is running
+        if (castEvent.getState() == StageState.SUCCEEDED &&  // latest stage succeeded
             query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
-            hasNext(query)) {                                   // there remains at least one subquery.
-          query.getSubQuery(castEvent.getExecutionBlockId()).waitingIntermediateReport();
+            hasNext(query)) {                                   // there remains at least one stage.
+          query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
           executeNextBlock(query);
         } else { // if a query is completed due to finished, kill, failure, or error
           query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
@@ -687,12 +687,12 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  private static class KillSubQueriesTransition implements SingleArcTransition<Query, QueryEvent> {
+  private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> {
     @Override
     public void transition(Query query, QueryEvent event) {
-      synchronized (query.subqueries) {
-        for (SubQuery subquery : query.subqueries.values()) {
-          query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL));
+      synchronized (query.stages) {
+        for (Stage stage : query.stages.values()) {
+          query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index e7e2bc0..c2e1009 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -150,7 +150,7 @@ public class QueryMasterManagerService extends CompositeService
       if (queryMasterTask == null) {
         queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
       }
-      SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getTaskId().getExecutionBlockId());
+      Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
       Task task = sq.getTask(attemptId.getTaskId());
       TaskAttempt attempt = task.getAttempt(attemptId.getId());
 
@@ -221,7 +221,7 @@ public class QueryMasterManagerService extends CompositeService
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
     if (queryMasterTask != null) {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
-      queryMasterTask.getQuery().getSubQuery(ebId).receiveExecutionBlockReport(request);
+      queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request);
     }
     done.run(TajoWorker.TRUE_PROTO);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index c96b86e..e3d3d79 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -160,7 +160,7 @@ public class QueryMasterTask extends CompositeService {
       dispatcher = new TajoAsyncDispatcher(queryId.toString());
       addService(dispatcher);
 
-      dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
+      dispatcher.register(StageEventType.class, new StageEventDispatcher());
       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
       dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
@@ -255,7 +255,7 @@ public class QueryMasterTask extends CompositeService {
 
   public void handleTaskRequestEvent(TaskRequestEvent event) {
     ExecutionBlockId id = event.getExecutionBlockId();
-    query.getSubQuery(id).handleTaskRequestEvent(event);
+    query.getStage(id).handleTaskRequestEvent(event);
   }
 
   public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) {
@@ -274,13 +274,13 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
-  private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> {
-    public void handle(SubQueryEvent event) {
-      ExecutionBlockId id = event.getSubQueryId();
+  private class StageEventDispatcher implements EventHandler<StageEvent> {
+    public void handle(StageEvent event) {
+      ExecutionBlockId id = event.getStageId();
       if(LOG.isDebugEnabled()) {
-        LOG.debug("SubQueryEventDispatcher:" + id + "," + event.getType());
+        LOG.debug("StageEventDispatcher:" + id + "," + event.getType());
       }
-      query.getSubQuery(id).handle(event);
+      query.getStage(id).handle(event);
     }
   }
 
@@ -291,7 +291,7 @@ public class QueryMasterTask extends CompositeService {
       if(LOG.isDebugEnabled()) {
         LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType());
       }
-      Task task = query.getSubQuery(taskId.getExecutionBlockId()).
+      Task task = query.getStage(taskId.getExecutionBlockId()).
           getTask(taskId);
       task.handle(event);
     }
@@ -301,8 +301,8 @@ public class QueryMasterTask extends CompositeService {
       implements EventHandler<TaskAttemptEvent> {
     public void handle(TaskAttemptEvent event) {
       TaskAttemptId attemptId = event.getTaskAttemptId();
-      SubQuery subQuery = query.getSubQuery(attemptId.getTaskId().getExecutionBlockId());
-      Task task = subQuery.getTask(attemptId.getTaskId());
+      Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId());
+      Task task = stage.getTask(attemptId.getTaskId());
       TaskAttempt attempt = task.getAttempt(attemptId);
       attempt.handle(event);
     }
@@ -311,8 +311,8 @@ public class QueryMasterTask extends CompositeService {
   private class TaskSchedulerDispatcher
       implements EventHandler<TaskSchedulerEvent> {
     public void handle(TaskSchedulerEvent event) {
-      SubQuery subQuery = query.getSubQuery(event.getExecutionBlockId());
-      subQuery.getTaskScheduler().handle(event);
+      Stage stage = query.getStage(event.getExecutionBlockId());
+      stage.getTaskScheduler().handle(event);
     }
   }
 
@@ -627,8 +627,8 @@ public class QueryMasterTask extends CompositeService {
       return dispatcher;
     }
 
-    public SubQuery getSubQuery(ExecutionBlockId id) {
-      return query.getSubQuery(id);
+    public Stage getStage(ExecutionBlockId id) {
+      return query.getStage(id);
     }
 
     public Map<String, TableDesc> getTableDescMap() {


Mime
View raw message