tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [3/8] TAJO-91: Launch QueryMaster on NodeManager per query. (hyoungjunkim via hyunsik)
Date Wed, 14 Aug 2013 06:48:01 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
new file mode 100644
index 0000000..5443858
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.ipc.QueryMasterProtocol.Partition;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskCompletionReport;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
+
+  private static final Log LOG = LogFactory.getLog(QueryUnitAttempt.class);
+
+  private final static int EXPIRE_TIME = 15000;
+
+  private final QueryUnitAttemptId id;
+  private final QueryUnit queryUnit;
+  final EventHandler eventHandler;
+
+  private String hostName;
+  private int port;
+  private int expire;
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  private static final StateMachineFactory
+      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      stateMachineFactory = new StateMachineFactory
+      <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+      (TaskAttemptState.TA_NEW)
+
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
+          TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED, new LaunchTransition())
+
+      // from assigned
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
+          TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED,
+          EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      // from running
+      .addTransition(TaskAttemptState.TA_RUNNING,
+          EnumSet.of(TaskAttemptState.TA_RUNNING),
+          TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
+
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new SucceededTransition())
+
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_UPDATE)
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+
+      .installTopology();
+
+  private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+    stateMachine;
+
+
+  public QueryUnitAttempt(final QueryUnitAttemptId id, final QueryUnit queryUnit,
+                          final EventHandler eventHandler) {
+    this.id = id;
+    this.expire = QueryUnitAttempt.EXPIRE_TIME;
+    this.queryUnit = queryUnit;
+    this.eventHandler = eventHandler;
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public TaskAttemptState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public QueryUnitAttemptId getId() {
+    return this.id;
+  }
+
+  public boolean isLeafTask() {
+    return this.queryUnit.isLeafTask();
+  }
+
+  public QueryUnit getQueryUnit() {
+    return this.queryUnit;
+  }
+
+  public String getHost() {
+    return this.hostName;
+  }
+
+  public void setHost(String host) {
+    this.hostName = host;
+  }
+
+  public void setPullServerPort(int port) {
+    this.port = port;
+  }
+
+  public int getPullServerPort() {
+    return port;
+  }
+
+  public synchronized void setExpireTime(int expire) {
+    this.expire = expire;
+  }
+
+  public synchronized void updateExpireTime(int period) {
+    this.setExpireTime(this.expire - period);
+  }
+
+  public synchronized void resetExpireTime() {
+    this.setExpireTime(QueryUnitAttempt.EXPIRE_TIME);
+  }
+
+  public int getLeftTime() {
+    return this.expire;
+  }
+
+  private void fillTaskStatistics(TaskCompletionReport report) {
+    if (report.getPartitionsCount() > 0) {
+      this.getQueryUnit().setPartitions(report.getPartitionsList());
+
+      List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+      for (Partition p : report.getPartitionsList()) {
+        IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
+            getId().getId(), p.getPartitionKey(), getHost(), getPullServerPort());
+        partitions.add(entry);
+      }
+      this.getQueryUnit().setIntermediateData(partitions);
+    }
+    if (report.hasResultStats()) {
+      this.getQueryUnit().setStats(new TableStat(report.getResultStats()));
+    }
+  }
+
+  private static class TaskAttemptScheduleTransition implements
+    SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+
+      if (taskAttempt.isLeafTask()
+          && taskAttempt.getQueryUnit().getScanNodes().length == 1) {
+        Set<String> racks = new HashSet<String>();
+        for (String host : taskAttempt.getQueryUnit().getDataLocations()) {
+          racks.add(RackResolver.resolve(host).getNetworkLocation());
+        }
+
+        taskAttempt.eventHandler.handle(new TaskScheduleEvent(
+            taskAttempt.getId(), EventType.T_SCHEDULE, true,
+            taskAttempt.getQueryUnit().getDataLocations(),
+            racks.toArray(new String[racks.size()])
+        ));
+      } else {
+        taskAttempt.eventHandler.handle(new TaskScheduleEvent(
+            taskAttempt.getId(), EventType.T_SCHEDULE,
+            false,
+            null,
+            null
+        ));
+      }
+    }
+  }
+
+  private static class LaunchTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+      taskAttempt.setHost(castEvent.getHostName());
+      taskAttempt.setPullServerPort(castEvent.getPullServerPort());
+      taskAttempt.eventHandler.handle(
+          new TaskTAttemptEvent(taskAttempt.getId(),
+              TaskEventType.T_ATTEMPT_LAUNCHED));
+    }
+  }
+
+  private static class StatusUpdateTransition
+      implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
+
+    @Override
+    public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
+                                       TaskAttemptEvent event) {
+      TaskAttemptStatusUpdateEvent updateEvent =
+          (TaskAttemptStatusUpdateEvent) event;
+
+      switch (updateEvent.getStatus().getState()) {
+        case TA_PENDING:
+        case TA_RUNNING:
+          return TaskAttemptState.TA_RUNNING;
+
+        default:
+          return taskAttempt.getState();
+      }
+    }
+  }
+
+  private void addDiagnosticInfo(String diag) {
+    if (diag != null && !diag.equals("")) {
+      diagnostics.add(diag);
+    }
+  }
+
+  private static class AlreadyAssignedTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(QueryUnitAttempt queryUnitAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+      LOG.info(">>>>>>>>> Already Assigned: " + queryUnitAttempt.getId());
+    }
+  }
+
+  private static class AlreadyDoneTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+
+    @Override
+    public void transition(QueryUnitAttempt queryUnitAttempt,
+                           TaskAttemptEvent taskAttemptEvent) {
+      LOG.info(">>>>>>>>> Already Done: " + queryUnitAttempt.getId());
+    }
+  }
+
+  private static class SucceededTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskCompletionReport report = ((TaskCompletionEvent)event).getReport();
+
+      taskAttempt.fillTaskStatistics(report);
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(),
+          TaskEventType.T_ATTEMPT_SUCCEEDED));
+    }
+  }
+
+  private static class FailedTransition
+      implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
+      taskAttempt.eventHandler.handle(
+          new TaskTAttemptEvent(taskAttempt.getId(),
+              TaskEventType.T_ATTEMPT_FAILED));
+      LOG.error("FROM " + taskAttempt.getHost() + " >> "
+          + errorEvent.errorMessage());
+      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
+    }
+  }
+
+  @Override
+  public void handle(TaskAttemptEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getTaskAttemptId() + " of type "
+          + event.getType());
+    }
+    try {
+      writeLock.lock();
+      TaskAttemptState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state of "
+            + event.getTaskAttemptId() + ")", e);
+        eventHandler.handle(new QueryEvent(getId().getQueryId(),
+            QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+       if (oldState != getState()) {
+          LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
new file mode 100644
index 0000000..3957d57
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
+import org.apache.tajo.engine.planner.UniformRangePartition;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * Repartitioner creates non-leaf tasks and shuffles intermediate data.
+ * It supports two repartition methods, such as hash and range repartition.
+ */
+public class Repartitioner {
+  private static final Log LOG = LogFactory.getLog(Repartitioner.class);
+
+  private static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900;
+
+  public static QueryUnit[] createJoinTasks(SubQuery subQuery)
+      throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    //CatalogService catalog = subQuery.getContext().getCatalog();
+
+    ScanNode[] scans = execBlock.getScanNodes();
+    Path tablePath;
+    Fragment [] fragments = new Fragment[2];
+    TableStat [] stats = new TableStat[2];
+
+    // initialize variables from the child operators
+    for (int i =0; i < 2; i++) {
+      // TODO - temporarily tables should be stored in temporarily catalog for each query
+      TableDesc tableDesc = subQuery.getContext().getTableDescMap().get(scans[i].getFromTable().getTableName());
+      if (scans[i].getTableId().startsWith(SubQueryId.PREFIX)) {
+        tablePath = subQuery.getStorageManager().getTablePath(scans[i].getTableId());
+      } else {
+        tablePath = tableDesc.getPath();
+      }
+
+      if (scans[i].isLocal()) { // it only requires a dummy fragment.
+        fragments[i] = new Fragment(scans[i].getTableId(), tablePath,
+            CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV),
+            0, 0, null);
+      } else {
+        fragments[i] = subQuery.getStorageManager().getSplits(scans[i].getTableId(),
+                tableDesc.getMeta(),
+            new Path(tablePath, "data")).get(0);
+      }
+
+      // Getting a table stat for each scan
+      stats[i] = subQuery.getChildQuery(scans[i]).getTableStat();
+    }
+
+    // Assigning either fragments or fetch urls to query units
+    QueryUnit [] tasks;
+    if (scans[0].isBroadcast() || scans[1].isBroadcast()) {
+      tasks = new QueryUnit[1];
+      tasks[0] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
+          false, subQuery.getEventHandler());
+      tasks[0].setLogicalPlan(execBlock.getPlan());
+      tasks[0].setFragment(scans[0].getTableId(), fragments[0]);
+      tasks[0].setFragment(scans[1].getTableId(), fragments[1]);
+    } else {
+      // The hash map is modeling as follows:
+      // <Partition Id, <Table Name, Intermediate Data>>
+      Map<Integer, Map<String, List<IntermediateEntry>>> hashEntries =
+          new HashMap<Integer, Map<String, List<IntermediateEntry>>>();
+
+      // Grouping IntermediateData by a partition key and a table name
+      for (ScanNode scan : scans) {
+        SubQuery childSubQuery = subQuery.getChildQuery(scan);
+        for (QueryUnit task : childSubQuery.getQueryUnits()) {
+          if (task.getIntermediateData() != null) {
+            for (IntermediateEntry intermEntry : task.getIntermediateData()) {
+              if (hashEntries.containsKey(intermEntry.getPartitionId())) {
+                Map<String, List<IntermediateEntry>> tbNameToInterm =
+                    hashEntries.get(intermEntry.getPartitionId());
+
+                if (tbNameToInterm.containsKey(scan.getTableId())) {
+                  tbNameToInterm.get(scan.getTableId()).add(intermEntry);
+                } else {
+                  tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
+                }
+              } else {
+                Map<String, List<IntermediateEntry>> tbNameToInterm =
+                    new HashMap<String, List<IntermediateEntry>>();
+                tbNameToInterm.put(scan.getTableId(), TUtil.newList(intermEntry));
+                hashEntries.put(intermEntry.getPartitionId(), tbNameToInterm);
+              }
+            }
+          }
+        }
+      }
+
+      LOG.info("Outer Intermediate Volume: " + stats[0].getNumBytes());
+      LOG.info("Inner Intermediate Volume: " + stats[1].getNumBytes());
+
+      // Getting the desire number of join tasks according to the volumn
+      // of a larger table
+      int largerIdx = stats[0].getNumBytes() >= stats[1].getNumBytes() ? 0 : 1;
+      int desireJoinTaskVolumn = subQuery.getContext().getConf().
+          getIntVar(ConfVars.JOIN_TASK_VOLUME);
+
+      // calculate the number of tasks according to the data size
+      int mb = (int) Math.ceil((double)stats[largerIdx].getNumBytes() / 1048576);
+      LOG.info("Larger intermediate data is approximately " + mb + " MB");
+      // determine the number of task per 64MB
+      int maxTaskNum = (int) Math.ceil((double)mb / desireJoinTaskVolumn);
+      LOG.info("The calculated number of tasks is " + maxTaskNum);
+      LOG.info("The number of total partition keys is " + hashEntries.size());
+      // the number of join tasks cannot be larger than the number of
+      // distinct partition ids.
+      int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
+      LOG.info("The determined number of join tasks is " + joinTaskNum);
+      QueryUnit [] createdTasks = newEmptyJoinTask(subQuery, fragments, joinTaskNum);
+
+      // Assign partitions to tasks in a round robin manner.
+      int i = 0;
+      for (Entry<Integer, Map<String, List<IntermediateEntry>>> entry
+          : hashEntries.entrySet()) {
+        addJoinPartition(createdTasks[i++], subQuery, entry.getKey(), entry.getValue());
+        if (i >= joinTaskNum) {
+          i = 0;
+        }
+      }
+
+      List<QueryUnit> filteredTasks = new ArrayList<QueryUnit>();
+      for (QueryUnit task : createdTasks) {
+        // if there are at least two fetches, the join is possible.
+        if (task.getFetches().size() > 1) {
+          filteredTasks.add(task);
+        }
+      }
+
+      tasks = filteredTasks.toArray(new QueryUnit[filteredTasks.size()]);
+    }
+
+    return tasks;
+  }
+
+  private static QueryUnit [] newEmptyJoinTask(SubQuery subQuery, Fragment [] fragments, int taskNum) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    QueryUnit [] tasks = new QueryUnit[taskNum];
+    for (int i = 0; i < taskNum; i++) {
+      tasks[i] = new QueryUnit(
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), i), execBlock.isLeafBlock(),
+          subQuery.getEventHandler());
+      tasks[i].setLogicalPlan(execBlock.getPlan());
+      for (Fragment fragment : fragments) {
+        tasks[i].setFragment2(fragment);
+      }
+    }
+
+    return tasks;
+  }
+
+  private static void addJoinPartition(QueryUnit task, SubQuery subQuery, int partitionId,
+                                       Map<String, List<IntermediateEntry>> grouppedPartitions) {
+
+    for (ScanNode scanNode : subQuery.getBlock().getScanNodes()) {
+      Map<String, List<IntermediateEntry>> requests;
+      if (grouppedPartitions.containsKey(scanNode.getTableId())) {
+          requests = mergeHashPartitionRequest(grouppedPartitions.get(scanNode.getTableId()));
+      } else {
+        return;
+      }
+      Set<URI> fetchURIs = TUtil.newHashSet();
+      for (Entry<String, List<IntermediateEntry>> requestPerNode : requests.entrySet()) {
+        Collection<URI> uris = createHashFetchURL(requestPerNode.getKey(),
+            subQuery.getChildQuery(scanNode).getId(),
+            partitionId, PartitionType.HASH,
+            requestPerNode.getValue());
+        fetchURIs.addAll(uris);
+      }
+      task.addFetches(scanNode.getTableId(), fetchURIs);
+    }
+  }
+
+  /**
+   * This method merges the partition request associated with the pullserver's address.
+   * It reduces the number of TCP connections.
+   *
+   * @return key: pullserver's address, value: a list of requests
+   */
+  private static Map<String, List<IntermediateEntry>> mergeHashPartitionRequest(
+      List<IntermediateEntry> partitions) {
+    Map<String, List<IntermediateEntry>> mergedPartitions =
+        new HashMap<String, List<IntermediateEntry>>();
+    for (IntermediateEntry partition : partitions) {
+      if (mergedPartitions.containsKey(partition.getPullAddress())) {
+        mergedPartitions.get(partition.getPullAddress()).add(partition);
+      } else {
+        mergedPartitions.put(partition.getPullAddress(), TUtil.newList(partition));
+      }
+    }
+
+    return mergedPartitions;
+  }
+
+  public static QueryUnit [] createNonLeafTask(SubQuery subQuery,
+                                               SubQuery childSubQuery,
+                                               int maxNum)
+      throws InternalException {
+    ExecutionBlock childExecBlock = childSubQuery.getBlock();
+    if (childExecBlock.getPartitionType() == PartitionType.HASH) {
+      return createHashPartitionedTasks(subQuery, childSubQuery, maxNum);
+    } else if (childExecBlock.getPartitionType() == PartitionType.RANGE) {
+      return createRangePartitionedTasks(subQuery, childSubQuery, maxNum);
+    } else {
+      throw new InternalException("Cannot support partition type");
+    }
+  }
+
+  public static QueryUnit [] createRangePartitionedTasks(SubQuery subQuery,
+                                                         SubQuery childSubQuery,
+                                                         int maxNum)
+      throws InternalException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    TableStat stat = childSubQuery.getTableStat();
+    if (stat.getNumRows() == 0) {
+      return new QueryUnit[0];
+    }
+
+    ScanNode scan = execBlock.getScanNodes()[0];
+    Path tablePath;
+    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
+
+    StoreTableNode store = (StoreTableNode) childSubQuery.getBlock().getPlan();
+    SortNode sort = (SortNode) store.getSubNode();
+    SortSpec[] sortSpecs = sort.getSortKeys();
+    Schema sortSchema = PlannerUtil.sortSpecsToSchema(sort.getSortKeys());
+
+    // calculate the number of maximum query ranges
+    TupleRange mergedRange =
+        TupleUtil.columnStatToRange(sort.getOutSchema(),
+            sortSchema, stat.getColumnStats());
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(sortSchema, mergedRange);
+    BigDecimal card = partitioner.getTotalCardinality();
+
+    // if the number of the range cardinality is less than the desired number of tasks,
+    // we set the the number of tasks to the number of range cardinality.
+    int determinedTaskNum;
+    if (card.compareTo(new BigDecimal(maxNum)) < 0) {
+      LOG.info("The range cardinality (" + card
+          + ") is less then the desired number of tasks (" + maxNum + ")");
+      determinedTaskNum = card.intValue();
+    } else {
+      determinedTaskNum = maxNum;
+    }
+
+    LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum +
+        " sub ranges (total units: " + determinedTaskNum + ")");
+    TupleRange [] ranges = partitioner.partition(determinedTaskNum);
+
+    Fragment dummyFragment = new Fragment(scan.getTableId(), tablePath,
+        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
+        0, 0, null);
+
+    List<String> basicFetchURIs = new ArrayList<String>();
+
+    SubQuery child = childSubQuery.getContext().getSubQuery(
+        subQuery.getBlock().getChildBlock(scan).getId());
+    for (QueryUnit qu : child.getQueryUnits()) {
+      for (IntermediateEntry p : qu.getIntermediateData()) {
+        String uri = createBasicFetchUri(p.getPullHost(), p.getPullPort(),
+            childSubQuery.getId(), p.taskId, p.attemptId);
+        basicFetchURIs.add(uri);
+      }
+    }
+
+    boolean ascendingFirstKey = sortSpecs[0].isAscending();
+    SortedMap<TupleRange, Set<URI>> map;
+    if (ascendingFirstKey) {
+      map = new TreeMap<TupleRange, Set<URI>>();
+    } else {
+      map = new TreeMap<TupleRange, Set<URI>>(new TupleRange.DescendingTupleRangeComparator());
+    }
+
+    Set<URI> uris;
+    try {
+      for (int i = 0; i < ranges.length; i++) {
+        uris = new HashSet<URI>();
+        for (String uri: basicFetchURIs) {
+          String rangeParam = TupleUtil.rangeToQuery(sortSchema, ranges[i],
+              ascendingFirstKey, ascendingFirstKey ? i == (ranges.length - 1) : i == 0);
+          URI finalUri = URI.create(uri + "&" + rangeParam);
+          uris.add(finalUri);
+        }
+        map.put(ranges[i], uris);
+      }
+
+    } catch (UnsupportedEncodingException e) {
+      LOG.error(e);
+    }
+
+    QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, dummyFragment);
+    assignPartitionByRoundRobin(map, scan.getTableId(), tasks);
+    return tasks;
+  }
+
+  public static QueryUnit [] assignPartitionByRoundRobin(Map<?, Set<URI>> partitions,
+                                               String tableName, QueryUnit [] tasks) {
+    int tid = 0;
+    for (Entry<?, Set<URI>> entry : partitions.entrySet()) {
+      for (URI uri : entry.getValue()) {
+        tasks[tid].addFetch(tableName, uri);
+      }
+
+      if (tid >= tasks.length) {
+        tid = 0;
+      } else {
+        tid ++;
+      }
+    }
+
+    return tasks;
+  }
+
+  public static String createBasicFetchUri(String hostName, int port,
+                                           SubQueryId childSid,
+                                           int taskId, int attemptId) {
+    String scheme = "http://";
+    StringBuilder sb = new StringBuilder(scheme);
+    sb.append(hostName).append(":").append(port)
+        .append("/?").append("sid=").append(childSid.getId())
+        .append("&").append("ta=").append(taskId).append("_").append(attemptId)
+        .append("&").append("p=0")
+        .append("&").append("type=r");
+
+    return sb.toString();
+  }
+
+  public static QueryUnit [] createHashPartitionedTasks(SubQuery subQuery,
+                                                 SubQuery childSubQuery,
+                                                 int maxNum) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    TableStat stat = childSubQuery.getTableStat();
+    if (stat.getNumRows() == 0) {
+      return new QueryUnit[0];
+    }
+
+    ScanNode scan = execBlock.getScanNodes()[0];
+    Path tablePath;
+    tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableId());
+
+    List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>();
+    for (QueryUnit tasks : childSubQuery.getQueryUnits()) {
+      if (tasks.getIntermediateData() != null) {
+        partitions.addAll(tasks.getIntermediateData());
+      }
+    }
+
+    Fragment frag = new Fragment(scan.getTableId(), tablePath,
+        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
+        0, 0, null);
+
+    Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
+    Map<String, List<IntermediateEntry>> hashedByHost;
+    Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
+
+    for (Entry<Integer, List<IntermediateEntry>> interm : hashed.entrySet()) {
+      hashedByHost = hashByHost(interm.getValue());
+      for (Entry<String, List<IntermediateEntry>> e : hashedByHost.entrySet()) {
+        Collection<URI> uris = createHashFetchURL(e.getKey(), childSubQuery.getId(),
+            interm.getKey(),
+            childSubQuery.getBlock().getPartitionType(), e.getValue());
+
+        if (finalFetchURI.containsKey(interm.getKey())) {
+          finalFetchURI.get(interm.getKey()).addAll(uris);
+        } else {
+          finalFetchURI.put(interm.getKey(), TUtil.newList(uris));
+        }
+      }
+    }
+
+    GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getStoreTableNode().
+        getSubNode();
+    // the number of tasks cannot exceed the number of merged fetch uris.
+    int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
+    if (groupby.getGroupingColumns().length == 0) {
+      determinedTaskNum = 1;
+    }
+
+    QueryUnit [] tasks = createEmptyNonLeafTasks(subQuery, determinedTaskNum, frag);
+
+    int tid = 0;
+    for (Entry<Integer, List<URI>> entry : finalFetchURI.entrySet()) {
+      for (URI uri : entry.getValue()) {
+        tasks[tid].addFetch(scan.getTableId(), uri);
+      }
+
+      tid ++;
+
+      if (tid == tasks.length) {
+       tid = 0;
+      }
+    }
+
+    return tasks;
+  }
+
+  public static Collection<URI> createHashFetchURL(String hostAndPort, SubQueryId childSid,
+                                       int partitionId, PartitionType type,
+                                       List<IntermediateEntry> entries) {
+    String scheme = "http://";
+    StringBuilder urlPrefix = new StringBuilder(scheme);
+    urlPrefix.append(hostAndPort)
+        .append("/?").append("sid=").append(childSid.getId())
+        .append("&").append("p=").append(partitionId)
+        .append("&").append("type=");
+    if (type == PartitionType.HASH) {
+      urlPrefix.append("h");
+    } else if (type == PartitionType.RANGE) {
+      urlPrefix.append("r");
+    }
+    urlPrefix.append("&ta=");
+
+    // If the get request is longer than 2000 characters,
+    // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long.
+    // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15
+    // The below code transforms a long request to multiple requests.
+    List<String> taskIdsParams = new ArrayList<String>();
+    boolean first = true;
+    StringBuilder taskIdListBuilder = new StringBuilder();
+    for (IntermediateEntry entry: entries) {
+      StringBuilder taskAttemptId = new StringBuilder();
+
+      if (!first) { // when comma is added?
+        taskAttemptId.append(",");
+      } else {
+        first = false;
+      }
+
+      taskAttemptId.append(entry.getTaskId()).append("_").
+          append(entry.getAttemptId());
+      if (taskIdListBuilder.length() + taskAttemptId.length()
+          > HTTP_REQUEST_MAXIMUM_LENGTH) {
+        taskIdsParams.add(taskIdListBuilder.toString());
+        taskIdListBuilder = new StringBuilder(entry.getTaskId() + "_" + entry.getAttemptId());
+      } else {
+        taskIdListBuilder.append(taskAttemptId);
+      }
+    }
+
+    // if the url params remain
+    if (taskIdListBuilder.length() > 0) {
+      taskIdsParams.add(taskIdListBuilder.toString());
+    }
+
+    Collection<URI> fetchURLs = new ArrayList<URI>();
+    for (String param : taskIdsParams) {
+      fetchURLs.add(URI.create(urlPrefix + param));
+    }
+
+    return fetchURLs;
+  }
+
+  public static Map<Integer, List<IntermediateEntry>> hashByKey(
+      List<IntermediateEntry> entries) {
+    Map<Integer, List<IntermediateEntry>> hashed = new HashMap<Integer, List<IntermediateEntry>>();
+    for (IntermediateEntry entry : entries) {
+      if (hashed.containsKey(entry.getPartitionId())) {
+        hashed.get(entry.getPartitionId()).add(entry);
+      } else {
+        hashed.put(entry.getPartitionId(), TUtil.newList(entry));
+      }
+    }
+
+    return hashed;
+  }
+
+  public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
+                                                     Fragment frag) {
+    LogicalNode plan = subQuery.getBlock().getPlan();
+    QueryUnit [] tasks = new QueryUnit[num];
+    for (int i = 0; i < num; i++) {
+      tasks[i] = new QueryUnit(QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
+          false, subQuery.getEventHandler());
+      tasks[i].setFragment2(frag);
+      tasks[i].setLogicalPlan(plan);
+    }
+    return tasks;
+  }
+
+  public static Map<String, List<IntermediateEntry>> hashByHost(
+      List<IntermediateEntry> entries) {
+    Map<String, List<IntermediateEntry>> hashed = new HashMap<String, List<IntermediateEntry>>();
+
+    String hostName;
+    for (IntermediateEntry entry : entries) {
+      hostName = entry.getPullHost() + ":" + entry.getPullPort();
+      if (hashed.containsKey(hostName)) {
+        hashed.get(hostName).add(entry);
+      } else {
+        hashed.put(hostName, TUtil.newList(entry));
+      }
+    }
+
+    return hashed;
+  }
+
+  public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n) {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    Column[] keys = null;
+    // if the next query is join,
+    // set the partition number for the current logicalUnit
+    // TODO: the union handling is required when a join has unions as its child
+    ExecutionBlock parentBlock = execBlock.getParentBlock();
+    if (parentBlock != null) {
+      if (parentBlock.getStoreTableNode().getSubNode().getType() == ExprType.JOIN) {
+        execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
+            execBlock.getStoreTableNode().getPartitionKeys(), n);
+        keys = execBlock.getStoreTableNode().getPartitionKeys();
+      }
+    }
+
+    StoreTableNode store = execBlock.getStoreTableNode();
+    // set the partition number for group by and sort
+    if (execBlock.getPartitionType() == PartitionType.HASH) {
+      if (store.getSubNode().getType() == ExprType.GROUP_BY) {
+        GroupbyNode groupby = (GroupbyNode)store.getSubNode();
+        keys = groupby.getGroupingColumns();
+      }
+    } else if (execBlock.getPartitionType() == PartitionType.RANGE) {
+      if (store.getSubNode().getType() == ExprType.SORT) {
+        SortNode sort = (SortNode)store.getSubNode();
+        keys = new Column[sort.getSortKeys().length];
+        for (int i = 0; i < keys.length; i++) {
+          keys[i] = sort.getSortKeys()[i].getSortKey();
+        }
+      }
+    }
+    if (keys != null) {
+      if (keys.length == 0) {
+        store.setPartitions(execBlock.getPartitionType(), new Column[]{}, 1);
+      } else {
+        store.setPartitions(execBlock.getPartitionType(), keys, n);
+      }
+    } else {
+      store.setListPartition();
+    }
+    return subQuery;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
new file mode 100644
index 0000000..305ef1b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.ColumnStat;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ExprType;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.TaskScheduler;
+import org.apache.tajo.master.TaskSchedulerImpl;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.StorageManager;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+
+
+/**
+ * SubQuery plays a role in controlling an ExecutionBlock and is a finite state machine.
+ */
+public class SubQuery implements EventHandler<SubQueryEvent> {
+
+  private static final Log LOG = LogFactory.getLog(SubQuery.class);
+
+  private ExecutionBlock block;
+  private int priority;
+  private TableMeta meta;
+  private EventHandler eventHandler;
+  private final StorageManager sm;
+  private TaskSchedulerImpl taskScheduler;
+  private QueryContext context;
+
+  private long startTime;
+  private long finishTime;
+
+  volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>();
+  volatile Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+
+
+  private static ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
+  private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent>
+      stateMachine;
+
+  private StateMachineFactory<SubQuery, SubQueryState,
+      SubQueryEventType, SubQueryEvent> stateMachineFactory =
+      new StateMachineFactory <SubQuery, SubQueryState,
+          SubQueryEventType, SubQueryEvent> (SubQueryState.NEW)
+
+          .addTransition(SubQueryState.NEW,
+              EnumSet.of(SubQueryState.INIT, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
+              SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
+
+          .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
+
+          .addTransition(SubQueryState.CONTAINER_ALLOCATED,
+              EnumSet.of(SubQueryState.RUNNING, SubQueryState.FAILED,
+                  SubQueryState.SUCCEEDED), SubQueryEventType.SQ_START, new StartTransition())
+          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
+
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, SubQueryEventType.SQ_START)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_TASK_COMPLETED, new TaskCompletedTransition())
+          .addTransition(SubQueryState.RUNNING, SubQueryState.SUCCEEDED,
+              SubQueryEventType.SQ_SUBQUERY_COMPLETED, new SubQueryCompleteTransition())
+          .addTransition(SubQueryState.RUNNING, SubQueryState.FAILED,
+              SubQueryEventType.SQ_FAILED, new InternalErrorTransition())
+
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+              SubQueryEventType.SQ_START)
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
+
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              SubQueryEventType.SQ_START)
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+                 SubQueryEventType.SQ_FAILED)
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              SubQueryEventType.SQ_INTERNAL_ERROR);
+
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private int completedTaskCount = 0;
+
+  public SubQuery(QueryContext context, ExecutionBlock block, StorageManager sm) {
+    this.context = context;
+    this.block = block;
+    this.sm = sm;
+    this.eventHandler = context.getEventHandler();
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public QueryContext getContext() {
+    return context;
+  }
+
+  public EventHandler getEventHandler() {
+    return eventHandler;
+  }
+
+  public TaskScheduler getTaskScheduler() {
+    return taskScheduler;
+  }
+
+  public void setStartTime() {
+    startTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public void setFinishTime() {
+    finishTime = context.getClock().getTime();
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public float getProgress() {
+    readLock.lock();
+    try {
+      if (getState() == SubQueryState.NEW) {
+        return 0;
+      } else {
+        if (completedTaskCount == 0) {
+          return 0.0f;
+        } else {
+          return (float)completedTaskCount / (float)tasks.size();
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public ExecutionBlock getBlock() {
+    return block;
+  }
+
+  public void addTask(QueryUnit task) {
+    tasks.put(task.getId(), task);
+  }
+
+  public void abortSubQuery(SubQueryState finalState) {
+    // TODO -
+    // - committer.abortSubQuery(...)
+    // - record SubQuery Finish Time
+    // - CleanUp Tasks
+    // - Record History
+
+    eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
+  }
+
+  public StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> getStateMachine() {
+    return this.stateMachine;
+  }
+
+  public void setPriority(int priority) {
+    this.priority = priority;
+  }
+
+
+  public int getPriority() {
+    return this.priority;
+  }
+
+  public StorageManager getStorageManager() {
+    return sm;
+  }
+  
+  public SubQuery getChildQuery(ScanNode scanForChild) {
+    return context.getSubQuery(block.getChildBlock(scanForChild).getId());
+  }
+  
+  public SubQueryId getId() {
+    return block.getId();
+  }
+  
+  public QueryUnit[] getQueryUnits() {
+    return tasks.values().toArray(new QueryUnit[tasks.size()]);
+  }
+  
+  public QueryUnit getQueryUnit(QueryUnitId qid) {
+    return tasks.get(qid);
+  }
+
+  public void setTableMeta(TableMeta meta) {
+    this.meta = meta;
+  }
+
+  @SuppressWarnings("UnusedDeclaration")
+  public TableMeta getTableMeta() {
+    return meta;
+  }
+
+  public TableStat getTableStat() {
+    return this.meta.getStat();
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(this.getId());
+    return sb.toString();
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof SubQuery) {
+      SubQuery other = (SubQuery)o;
+      return getId().equals(other.getId());
+    }
+    return false;
+  }
+  
+  @Override
+  public int hashCode() {
+    return getId().hashCode();
+  }
+  
+  public int compareTo(SubQuery other) {
+    return getId().compareTo(other.getId());
+  }
+
+  public SubQueryState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static TableStat computeStatFromUnionBlock(SubQuery unit) {
+    TableStat stat = new TableStat();
+    TableStat childStat;
+    long avgRows = 0, numBytes = 0, numRows = 0;
+    int numBlocks = 0, numPartitions = 0;
+    List<ColumnStat> columnStats = Lists.newArrayList();
+
+    Iterator<ExecutionBlock> it = unit.getBlock().getChildBlocks().iterator();
+    while (it.hasNext()) {
+      ExecutionBlock block = it.next();
+      SubQuery childSubQuery = unit.context.getSubQuery(block.getId());
+      childStat = childSubQuery.getTableStat();
+      avgRows += childStat.getAvgRows();
+      columnStats.addAll(childStat.getColumnStats());
+      numBlocks += childStat.getNumBlocks();
+      numBytes += childStat.getNumBytes();
+      numPartitions += childStat.getNumPartitions();
+      numRows += childStat.getNumRows();
+    }
+
+    stat.setColumnStats(columnStats);
+    stat.setNumBlocks(numBlocks);
+    stat.setNumBytes(numBytes);
+    stat.setNumPartitions(numPartitions);
+    stat.setNumRows(numRows);
+    stat.setAvgRows(avgRows);
+    return stat;
+  }
+
+  public TableMeta buildTableMeta() throws IOException {
+    finishTime = context.getClock().getTime();
+
+    TableStat stat;
+    if (block.hasUnion()) {
+      stat = computeStatFromUnionBlock(this);
+    } else {
+      stat = computeStatFromTasks();
+    }
+    TableMeta meta = writeStat(this, stat);
+    meta.setStat(stat);
+    setTableMeta(meta);
+    return meta;
+  }
+
+  private TableStat computeStatFromTasks() {
+    List<TableStat> stats = Lists.newArrayList();
+    for (QueryUnit unit : getQueryUnits()) {
+      stats.add(unit.getStats());
+    }
+    TableStat tableStat = StatisticsUtil.aggregateTableStat(stats);
+    return tableStat;
+  }
+
+  private TableMeta writeStat(SubQuery subQuery, TableStat stat)
+      throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    StoreTableNode storeTableNode = execBlock.getStoreTableNode();
+    TableMeta meta = toTableMeta(storeTableNode);
+    meta.setStat(stat);
+    sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
+    return meta;
+  }
+
+  private static TableMeta toTableMeta(StoreTableNode store) {
+    if (store.hasOptions()) {
+      return CatalogUtil.newTableMeta(store.getOutSchema(),
+          store.getStorageType(), store.getOptions());
+    } else {
+      return CatalogUtil.newTableMeta(store.getOutSchema(),
+          store.getStorageType());
+    }
+  }
+
+  private void stopScheduler() {
+    // If there are launched TaskRunners, send the 'shouldDie' message to all r
+    // via received task requests.
+    if (taskScheduler != null) {
+      taskScheduler.stop();
+    }
+  }
+
+  private void releaseContainers() {
+    // If there are still live TaskRunners, try to kill the containers.
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(),
+        containers.values()));
+  }
+
+  private void finish() {
+    TableMeta meta = null;
+    try {
+      meta = buildTableMeta();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    setTableMeta(meta);
+    setFinishTime();
+    eventHandler.handle(new SubQuerySucceeEvent(getId(), meta));
+  }
+
+  @Override
+  public void handle(SubQueryEvent event) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType());
+    }
+
+    try {
+      writeLock.lock();
+      SubQueryState oldState = getState();
+      try {
+        getStateMachine().doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new SubQueryEvent(getId(),
+            SubQueryEventType.SQ_INTERNAL_ERROR));
+      }
+
+      // notify the eventhandler of state change
+      if (LOG.isDebugEnabled()) {
+        if (oldState != getState()) {
+          LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to "
+              + getState());
+        }
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  private static class InitAndRequestContainer implements MultipleArcTransition<SubQuery,
+      SubQueryEvent, SubQueryState> {
+
+    @Override
+    public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      subQuery.setStartTime();
+      ExecutionBlock execBlock = subQuery.getBlock();
+      SubQueryState state;
+
+      try {
+        // Union operator does not require actual query processing. It is performed logically.
+        if (execBlock.hasUnion()) {
+          subQuery.finish();
+          state = SubQueryState.SUCCEEDED;
+        } else {
+          setRepartitionIfNecessary(subQuery);
+          createTasks(subQuery);
+
+          if (subQuery.tasks.size() == 0) { // if there is no tasks
+            subQuery.finish();
+            return SubQueryState.SUCCEEDED;
+          } else {
+            initTaskScheduler(subQuery);
+            allocateContainers(subQuery);
+            return SubQueryState.INIT;
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
+        subQuery.eventHandler.handle(
+            new QueryDiagnosticsUpdateEvent(subQuery.getId().getQueryId(), e.getMessage()));
+        subQuery.eventHandler.handle(
+            new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.FAILED));
+        return SubQueryState.FAILED;
+      }
+
+      return state;
+    }
+
+    private void initTaskScheduler(SubQuery subQuery) {
+      subQuery.taskScheduler = new TaskSchedulerImpl(subQuery.context);
+      subQuery.taskScheduler.init(subQuery.context.getConf());
+      subQuery.taskScheduler.start();
+    }
+
+    /**
+     * If a parent block requires a repartition operation, the method sets proper repartition
+     * methods and the number of partitions to a given subquery.
+     */
+    private static void setRepartitionIfNecessary(SubQuery subQuery) {
+      if (subQuery.getBlock().hasParentBlock()) {
+        int numTasks = calculatePartitionNum(subQuery);
+        Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks);
+      }
+    }
+
+    /**
+     * Getting the desire number of partitions according to the volume of input data.
+     * This method is only used to determine the partition key number of hash join or aggregation.
+     *
+     * @param subQuery
+     * @return
+     */
+    public static int calculatePartitionNum(SubQuery subQuery) {
+      TajoConf conf = subQuery.context.getConf();
+      ExecutionBlock parent = subQuery.getBlock().getParentBlock();
+
+      GroupbyNode grpNode = null;
+      if (parent != null) {
+        grpNode = (GroupbyNode) PlannerUtil.findTopNode(
+            parent.getPlan(), ExprType.GROUP_BY);
+      }
+
+      // Is this subquery the first step of join?
+      if (parent != null && parent.getScanNodes().length == 2) {
+        Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
+
+        // for inner
+        ExecutionBlock outer = child.next();
+        long outerVolume = getInputVolume(subQuery.context, outer);
+
+        // for inner
+        ExecutionBlock inner = child.next();
+        long innerVolume = getInputVolume(subQuery.context, inner);
+        LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
+        LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
+
+        long smaller = Math.min(outerVolume, innerVolume);
+
+        int mb = (int) Math.ceil((double)smaller / 1048576);
+        LOG.info("Smaller Table's volume is approximately " + mb + " MB");
+        // determine the number of task
+        int taskNum = (int) Math.ceil((double)mb /
+            conf.getIntVar(ConfVars.JOIN_PARTITION_VOLUME));
+        LOG.info("The determined number of join partitions is " + taskNum);
+        return taskNum;
+
+        // Is this subquery the first step of group-by?
+      } else if (grpNode != null) {
+
+        if (grpNode.getGroupingColumns().length == 0) {
+          return 1;
+        } else {
+          long volume = getInputVolume(subQuery.context, subQuery.block);
+
+          int mb = (int) Math.ceil((double)volume / 1048576);
+          LOG.info("Table's volume is approximately " + mb + " MB");
+          // determine the number of task
+          int taskNum = (int) Math.ceil((double)mb /
+              conf.getIntVar(ConfVars.AGGREGATION_PARTITION_VOLUME));
+          LOG.info("The determined number of aggregation partitions is " + taskNum);
+          return taskNum;
+        }
+      } else {
+        LOG.info("============>>>>> Unexpected Case! <<<<<================");
+        long volume = getInputVolume(subQuery.context, subQuery.block);
+
+        int mb = (int) Math.ceil((double)volume / 1048576);
+        LOG.info("Table's volume is approximately " + mb + " MB");
+        // determine the number of task per 128MB
+        int taskNum = (int) Math.ceil((double)mb / 128);
+        LOG.info("The determined number of partitions is " + taskNum);
+        return taskNum;
+      }
+    }
+
+    private static void createTasks(SubQuery subQuery) throws IOException {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      QueryUnit [] tasks;
+      if (execBlock.isLeafBlock() && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+        tasks = createLeafTasks(subQuery);
+
+      } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+        tasks = Repartitioner.createJoinTasks(subQuery);
+
+      } else { // Case 3: Others (Sort or Aggregation)
+        int numTasks = getNonLeafTaskNum(subQuery);
+        SubQueryId childId = subQuery.getBlock().getChildBlocks().iterator().next().getId();
+        SubQuery child = subQuery.context.getSubQuery(childId);
+        tasks = Repartitioner.createNonLeafTask(subQuery, child, numTasks);
+      }
+
+      LOG.info("Create " + tasks.length + " Tasks");
+
+      for (QueryUnit task : tasks) {
+        subQuery.addTask(task);
+      }
+    }
+
+    /**
+     * Getting the desire number of tasks according to the volume of input data
+     *
+     * @param subQuery
+     * @return
+     */
+    public static int getNonLeafTaskNum(SubQuery subQuery) {
+      // Getting intermediate data size
+      long volume = getInputVolume(subQuery.context, subQuery.getBlock());
+
+      int mb = (int) Math.ceil((double)volume / 1048576);
+      LOG.info("Table's volume is approximately " + mb + " MB");
+      // determine the number of task per 64MB
+      int maxTaskNum = (int) Math.ceil((double)mb / 64);
+      LOG.info("The determined number of non-leaf tasks is " + maxTaskNum);
+      return maxTaskNum;
+    }
+
+    public static long getInputVolume(QueryContext context, ExecutionBlock execBlock) {
+      Map<String, TableDesc> tableMap = context.getTableDescMap();
+      if (execBlock.isLeafBlock()) {
+        ScanNode outerScan = execBlock.getScanNodes()[0];
+        TableStat stat = tableMap.get(outerScan.getFromTable().getTableName()).getMeta().getStat();
+        return stat.getNumBytes();
+      } else {
+        long aggregatedVolume = 0;
+        for (ExecutionBlock childBlock : execBlock.getChildBlocks()) {
+          SubQuery subquery = context.getSubQuery(childBlock.getId());
+          aggregatedVolume += subquery.getTableStat().getNumBytes();
+        }
+
+        return aggregatedVolume;
+      }
+    }
+
+    public static void allocateContainers(SubQuery subQuery) {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      QueryUnit [] tasks = subQuery.getQueryUnits();
+
+      int numClusterNodes = subQuery.getContext().getNumClusterNode();
+      int numRequest = Math.min(tasks.length, numClusterNodes * 4);
+
+      final Resource resource = Records.newRecord(Resource.class);
+      // TODO - for each different subquery, the volume of resource should be different.
+      resource.setMemory(2000);
+
+      Priority priority = Records.newRecord(Priority.class);
+      priority.setPriority(subQuery.getPriority());
+      ContainerAllocationEvent event =
+          new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ,
+              subQuery.getId(), priority, resource, numRequest,
+              execBlock.isLeafBlock(), 0.0f);
+      subQuery.eventHandler.handle(event);
+    }
+
+    private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      ScanNode[] scans = execBlock.getScanNodes();
+      Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
+      TableMeta meta;
+      Path inputPath;
+
+      ScanNode scan = scans[0];
+      TableDesc desc = subQuery.context.getTableDescMap().get(scan.getFromTable().getTableName());
+      inputPath = desc.getPath();
+      meta = desc.getMeta();
+
+      // TODO - should be change the inner directory
+      Path oldPath = new Path(inputPath, "data");
+      FileSystem fs = inputPath.getFileSystem(subQuery.context.getConf());
+      if (fs.exists(oldPath)) {
+        inputPath = oldPath;
+      }
+      List<Fragment> fragments = subQuery.getStorageManager().getSplits(scan.getTableId(), meta, inputPath);
+
+      QueryUnit queryUnit;
+      List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
+
+      int i = 0;
+      for (Fragment fragment : fragments) {
+        queryUnit = newQueryUnit(subQuery, i++, fragment);
+        queryUnits.add(queryUnit);
+      }
+
+      return queryUnits.toArray(new QueryUnit[queryUnits.size()]);
+    }
+
+    private static QueryUnit newQueryUnit(SubQuery subQuery, int taskId, Fragment fragment) {
+      ExecutionBlock execBlock = subQuery.getBlock();
+      QueryUnit unit = new QueryUnit(
+          QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), execBlock.isLeafBlock(),
+          subQuery.eventHandler);
+      unit.setLogicalPlan(execBlock.getPlan());
+      unit.setFragment2(fragment);
+      return unit;
+    }
+  }
+
+  int i = 0;
+  private static class ContainerLaunchTransition
+      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+    @Override
+    public void transition(SubQuery subQuery, SubQueryEvent event) {
+      SubQueryContainerAllocationEvent allocationEvent =
+          (SubQueryContainerAllocationEvent) event;
+      for (Container container : allocationEvent.getAllocatedContainer()) {
+        ContainerId cId = container.getId();
+        if (subQuery.containers.containsKey(cId)) {
+          LOG.info(">>>>>>>>>>>> Duplicate Container! <<<<<<<<<<<");
+        }
+        subQuery.containers.put(cId, container);
+        // TODO - This is debugging message. Should be removed
+        subQuery.i++;
+      }
+      LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
+      subQuery.eventHandler.handle(
+          new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
+              subQuery.getId(), allocationEvent.getAllocatedContainer()));
+
+      subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
+          SubQueryEventType.SQ_START));
+    }
+  }
+
+  private static class StartTransition implements
+      MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+
+    @Override
+    public SubQueryState transition(SubQuery subQuery,
+                           SubQueryEvent subQueryEvent) {
+      // schedule tasks
+      try {
+        for (QueryUnitId taskId : subQuery.tasks.keySet()) {
+          subQuery.eventHandler.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
+        }
+
+        return  SubQueryState.RUNNING;
+      } catch (Exception e) {
+        LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
+        return SubQueryState.FAILED;
+      }
+    }
+  }
+
+  private static class TaskCompletedTransition
+      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+    @Override
+    public void transition(SubQuery subQuery,
+                                     SubQueryEvent event) {
+      subQuery.completedTaskCount++;
+      SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event;
+      QueryUnitAttempt task = subQuery.getQueryUnit(taskEvent.getTaskId()).getSuccessfulAttempt();
+
+      LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/"
+          + subQuery.tasks.size() + " on " + task.getHost());
+      if (subQuery.completedTaskCount == subQuery.tasks.size()) {
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
+            SubQueryEventType.SQ_SUBQUERY_COMPLETED));
+      }
+    }
+  }
+
+  private static class SubQueryCompleteTransition
+      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+    @Override
+    public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      // TODO - Commit subQuery & do cleanup
+      // TODO - records succeeded, failed, killed completed task
+      // TODO - records metrics
+      subQuery.stopScheduler();
+      subQuery.releaseContainers();
+      subQuery.finish();
+    }
+  }
+
+  private static class InternalErrorTransition
+      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+
+    @Override
+    public void transition(SubQuery subQuery,
+                           SubQueryEvent subQueryEvent) {
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
new file mode 100644
index 0000000..c8256ec
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+public enum SubQueryState {
+  NEW,
+  CONTAINER_ALLOCATED,
+  INIT,
+  RUNNING,
+  SUCCEEDED,
+  FAILED
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
index c615532..c01cabc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/RMContainerAllocator.java
@@ -25,17 +25,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.client.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.tajo.SubQueryId;
-import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.master.QueryMaster.QueryContext;
-import org.apache.tajo.master.SubQueryState;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.master.querymaster.SubQueryState;
 
 import java.util.HashMap;
 import java.util.List;
@@ -115,9 +116,10 @@ public class RMContainerAllocator extends AMRMClientImpl
             if (!stopped.get()) {
               LOG.warn("Allocated thread interrupted. Returning.");
             }
-            return;
+            break;
           }
         }
+        LOG.info("Allocated thread stopped");
       }
     });
     allocatorThread.setName("RMContainerAllocator");
@@ -126,24 +128,9 @@ public class RMContainerAllocator extends AMRMClientImpl
 
   public void stop() {
     stopped.set(true);
+    allocatorThread.interrupt();
+    LOG.info("RMContainerAllocator stopped");
     super.stop();
-    FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
-    QueryState state = context.getQuery().getState();
-    if (state == QueryState.QUERY_SUCCEEDED) {
-      finishState = FinalApplicationStatus.SUCCEEDED;
-    } else if (state == QueryState.QUERY_KILLED
-        || (state == QueryState.QUERY_RUNNING)) {
-      finishState = FinalApplicationStatus.KILLED;
-    } else if (state == QueryState.QUERY_FAILED
-        || state == QueryState.QUERY_ERROR) {
-      finishState = FinalApplicationStatus.FAILED;
-    }
-
-    try {
-      unregisterApplicationMaster(finishState, "", "http://localhost:1234");
-    } catch (YarnRemoteException e) {
-      LOG.error(e);
-    }
   }
 
   private final Map<Priority, SubQueryId> subQueryMap =

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 9e281c6..a56841b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -37,7 +37,7 @@ import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.engine.MasterWorkerProtos.*;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
 import org.apache.tajo.engine.exception.UnfinishedTaskException;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
@@ -45,7 +45,7 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
 import org.apache.tajo.master.ExecutionBlock.PartitionType;
 import org.apache.tajo.rpc.NullCallback;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index ca8ef43..a41b280 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -37,11 +37,9 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.SubQueryId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.ipc.MasterWorkerProtocol;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.*;
 import org.apache.tajo.rpc.CallFuture2;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.ProtoAsyncRpcClient;
@@ -53,7 +51,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.*;
 
-import static org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+import static org.apache.tajo.ipc.QueryMasterProtocol.*;
 
 /**
  * The driver class for Tajo QueryUnit processing.
@@ -72,7 +70,7 @@ public class TaskRunner extends AbstractService {
   private final ContainerId containerId;
 
   // Cluster Management
-  private MasterWorkerProtocolService.Interface master;
+  private QueryMasterProtocol.QueryMasterProtocolService.Interface master;
 
   // for temporal or intermediate files
   private FileSystem localFS;
@@ -186,7 +184,7 @@ public class TaskRunner extends AbstractService {
       return nodeId.toString();
     }
 
-    public MasterWorkerProtocolService.Interface getMaster() {
+    public QueryMasterProtocolService.Interface getMaster() {
       return master;
     }
 
@@ -223,7 +221,7 @@ public class TaskRunner extends AbstractService {
     }
   }
 
-  static void fatalError(MasterWorkerProtocolService.Interface proxy,
+  static void fatalError(QueryMasterProtocolService.Interface proxy,
                          QueryUnitAttemptId taskAttemptId, String message) {
     TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
         .setId(taskAttemptId.getProto())
@@ -338,11 +336,11 @@ public class TaskRunner extends AbstractService {
   /**
    * TaskRunner takes 5 arguments as follows:
    * <ol>
-   * <li>1st: TaskRunnerListener hostname</li>
-   * <li>2nd: TaskRunnerListener port</li>
-   * <li>3nd: SubQueryId</li>
-   * <li>4th: NodeId</li>
-   * <li>5th: ContainerId</li>
+   * <li>1st: SubQueryId</li>
+   * <li>2nd: NodeId</li>
+   * <li>3nd: ContainerId</li>
+   * <li>4th: QueryMaster hostname</li>
+   * <li>5th: QueryMaster port</li>
    * </ol>
    */
   public static void main(String[] args) throws Exception {
@@ -356,17 +354,17 @@ public class TaskRunner extends AbstractService {
 
     UserGroupInformation.setConfiguration(conf);
 
-    // TaskRunnerListener's address
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-    final InetSocketAddress masterAddr =
-        NetUtils.createSocketAddrForHost(host, port);
-
     // SubQueryId from String
-    final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]);
+    final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[0]);
     // NodeId has a form of hostname:port.
-    NodeId nodeId = ConverterUtils.toNodeId(args[3]);
-    ContainerId containerId = ConverterUtils.toContainerId(args[4]);
+    NodeId nodeId = ConverterUtils.toNodeId(args[1]);
+    ContainerId containerId = ConverterUtils.toContainerId(args[2]);
+
+    // QueryMaster's address
+    String host = args[3];
+    int port = Integer.parseInt(args[4]);
+    final InetSocketAddress masterAddr =
+            NetUtils.createSocketAddrForHost(host, port);
 
     // TODO - 'load credential' should be implemented
     // Getting taskOwner
@@ -374,26 +372,29 @@ public class TaskRunner extends AbstractService {
         UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
     //taskOwner.addToken(token);
 
-    // TaskRunnerListener RPC
+    // QueryMasterService RPC
     ProtoAsyncRpcClient client;
-    MasterWorkerProtocolService.Interface master;
+    QueryMasterProtocolService.Interface master;
 
     // initialize MasterWorkerProtocol as an actual task owner.
     client =
         taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
           @Override
           public ProtoAsyncRpcClient run() throws Exception {
-            return new ProtoAsyncRpcClient(MasterWorkerProtocol.class, masterAddr);
+            return new ProtoAsyncRpcClient(QueryMasterProtocol.class, masterAddr);
           }
         });
     master = client.getStub();
 
 
     TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
-    taskRunner.init(conf);
-    taskRunner.start();
-    client.close();
-    LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
-    System.exit(0);
+    try {
+      taskRunner.init(conf);
+      taskRunner.start();
+    } finally {
+      client.close();
+      LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
+      System.exit(0);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
index c171c2b..6164553 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
@@ -25,9 +25,9 @@ option java_generate_equals_and_hash = true;
 import "DataTypes.proto";
 
 enum StoreType {
-  MEM = 0;
-  CSV = 1;
-  RAW = 2;
+	MEM = 0;
+	CSV = 1;
+	RAW = 2;
   RCFILE = 3;
   ROWFILE = 4;
   HCFILE = 5;
@@ -35,147 +35,147 @@ enum StoreType {
 }
 
 enum OrderType {
-  ORDER_NONE = 0;
-  ASC = 1;
-  DSC = 2;
+    ORDER_NONE = 0;
+    ASC = 1;
+    DSC = 2;
 }
 
 enum CompressType {
-  COMP_NONE = 0;
-  NULL_SUPPRESS = 1;
-  RUN_LENGTH = 2;
-  BIT_VECTOR = 3;
-  DICTIONARY = 4;
-  SNAPPY = 5;
-  LZ = 6;
+    COMP_NONE = 0;
+    NULL_SUPPRESS = 1;
+    RUN_LENGTH = 2;
+    BIT_VECTOR = 3;
+    DICTIONARY = 4;
+    SNAPPY = 5;
+    LZ = 6;
 }
 
 message ColumnMetaProto {
-  required DataType dataType = 1;
-  required bool compressed = 2;
-  required bool sorted = 3;
-  required bool contiguous = 4;
-  required StoreType storeType = 5;
-  required CompressType compType = 6;
-  required int64 startRid = 7;
-  required int32 recordNum = 8;
-  required int32 offsetToIndex = 9;
+    required DataType dataType = 1;
+    required bool compressed = 2;
+    required bool sorted = 3;
+    required bool contiguous = 4;
+    required StoreType storeType = 5;
+    required CompressType compType = 6;
+    required int64 startRid = 7;
+    required int32 recordNum = 8;
+    required int32 offsetToIndex = 9;
 }
 
 message ColumnProto {
-  required string columnName = 1;
-  required DataType dataType = 2;
+	required string columnName = 1;
+	required DataType dataType = 2;
 }
 
 message SchemaProto {
-  repeated ColumnProto fields = 1;
+	repeated ColumnProto fields = 1;
 }
 
 message KeyValueProto {
-  required string key = 1;
-  required string value = 2;
+	required string key = 1;
+	required string value = 2;
 }
 
 message KeyValueSetProto {
-  repeated KeyValueProto keyval = 1;
+	repeated KeyValueProto keyval = 1;
 }
 
 message FragmentProto {
-  required string id = 1;
-  required string path = 2;
-  required int64 startOffset = 3;
-  required int64 length = 4;
-  required TableProto meta = 5;
-  optional TableStatProto stat = 6;
+	required string id = 1;
+	required string path = 2;
+	required int64 startOffset = 3;
+	required int64 length = 4;
+	required TableProto meta = 5;
+	optional TableStatProto stat = 6;
   optional bool distCached = 7 [default = false];
 }
 
 message TableProto {
-  required SchemaProto schema = 1;
-  required StoreType storeType = 2;
-  required KeyValueSetProto params = 3;
-  optional TableStatProto stat = 4;
+    required SchemaProto schema = 1;
+    required StoreType storeType = 2;
+    required KeyValueSetProto params = 3;
+    optional TableStatProto stat = 4;
 }
 
 message TableDescProto {
-  required string id = 1;
-  required string path = 2;
-  required TableProto meta = 3;
+	required string id = 1;
+	required string path = 2;
+	required TableProto meta = 3;
 }
 
 enum FunctionType {
-  GENERAL = 0;
-  AGGREGATION = 1;
+	GENERAL = 0;
+	AGGREGATION = 1;
 }
 
 message FunctionDescProto {
-  required string signature = 1;
-  required string className = 2;
-  required FunctionType type = 3;
-  repeated DataType parameterTypes = 4;
-  required DataType returnType = 5;
+	required string signature = 1;
+	required string className = 2;
+	required FunctionType type = 3;
+	repeated DataType parameterTypes = 4;
+	required DataType returnType = 5;
 }
 
 message IndexDescProto {
-  required string name = 1;
-  required string tableId = 2;
-  required ColumnProto column = 3;
-  required IndexMethod indexMethod = 4;
-  optional bool isUnique = 5 [default = false];
-  optional bool isClustered = 6 [default = false];
-  optional bool isAscending = 7 [default = false];
+    required string name = 1;
+    required string tableId = 2;
+    required ColumnProto column = 3;
+    required IndexMethod indexMethod = 4;
+    optional bool isUnique = 5 [default = false];
+    optional bool isClustered = 6 [default = false];
+    optional bool isAscending = 7 [default = false];
 }
 
 enum IndexMethod {
-  TWO_LEVEL_BIN_TREE = 0;
-  BTREE = 1;
-  HASH = 2;
-  BITMAP = 3;
+    TWO_LEVEL_BIN_TREE = 0;
+    BTREE = 1;
+    HASH = 2;
+    BITMAP = 3;
 }
 
 message GetAllTableNamesResponse {
-  repeated string tableName = 1;
+    repeated string tableName = 1;
 }
 
 message GetIndexRequest {
-  required string tableName = 1;
-  required string columnName = 2;
+    required string tableName = 1;
+    required string columnName = 2;
 }
 
 message GetFunctionsResponse {
-  repeated FunctionDescProto functionDesc = 1;
+	repeated FunctionDescProto functionDesc = 1;
 }
 
 message UnregisterFunctionRequest {
-  required string signature = 1;
-  repeated DataType parameterTypes = 2;
+	required string signature = 1;
+	repeated DataType parameterTypes = 2;
 }
 
 message GetFunctionMetaRequest {
-  required string signature = 1;
-  repeated DataType parameterTypes = 2;
+	required string signature = 1;
+	repeated DataType parameterTypes = 2;
 }
 
 message ContainFunctionRequest {
-  required string signature = 1;
-  repeated DataType parameterTypes = 2;
+	required string signature = 1;
+	repeated DataType parameterTypes = 2;
 }
 
 message TableStatProto {
-  required int64 numRows = 1;
-  required int64 numBytes = 2;
-  optional int32 numBlocks = 3;
-  optional int32 numPartitions = 4;
-  optional int64 avgRows = 5;
-  repeated ColumnStatProto colStat = 6;
+	required int64 numRows = 1;
+	required int64 numBytes = 2;
+	optional int32 numBlocks = 3;
+	optional int32 numPartitions = 4;
+	optional int64 avgRows = 5;
+	repeated ColumnStatProto colStat = 6;
 }
 
 message ColumnStatProto {
-  required ColumnProto column = 1;
-  optional int64 numDistVal = 2;
-  optional int64 numNulls = 3;
-  optional bytes minValue = 4;
-  optional bytes maxValue = 5;
+    required ColumnProto column = 1;
+    optional int64 numDistVal = 2;
+    optional int64 numNulls = 3;
+    optional bytes minValue = 4;
+    optional bytes maxValue = 5;
 }
 
 enum StatType {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
index cbcccd3..61c14c4 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
@@ -49,7 +49,7 @@ message UpdateQueryResponse {
   optional string errorMessage = 2;
 }
 
-message SubmitQueryRespose {
+message SubmitQueryResponse {
   required ResultCode resultCode = 1;
   optional ApplicationAttemptIdProto queryId = 2;
   optional string errorMessage = 3;
@@ -94,6 +94,8 @@ message GetQueryStatusResponse {
   optional int64 finishTime = 7;
   optional bool hasResult = 8;
   optional string errorMessage = 9;
+  optional string queryMasterHost = 10;
+  optional int32 queryMasterPort = 11;
 }
 
 message GetClusterInfoRequest {
@@ -135,7 +137,7 @@ message TableResponse {
 
 service ClientProtocolService {
   rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
-  rpc submitQuery(QueryRequest) returns (SubmitQueryRespose);
+  rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
   rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
   rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
   rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
@@ -151,7 +153,6 @@ service ClientProtocolService {
   rpc detachTable(StringProto) returns (BoolProto);
 
 
-
   // TODO - to be implemented
   //
   // authenticate


Mime
View raw message